MoreRSS

site iconDerobukal修改

博客名:御坂研究所,一个偏软件开发和技术实践的博客。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Derobukal的 RSS 预览

XXL-JOB的部署、搭建与使用

2025-08-23 00:57:47

XXL-JOB是一个分布式任务调度平台,它可以很方便的实现分布式的任务调度。

部署

首先我们下载XXL-JOB的源码

git clone [email protected]:xuxueli/xxl-job.git

之后我们根据doc/db/tables_xxl_job.sql中的sql创建对应的数据库和表。

在启动XXL-JOB之前,有些配置需要修改:

  1. xxl-job-admin/src/main/resources/logback.xml中property的value需要设置为xxl-job/xxl-job-admin/data/applogs/xxl-job/xxl-job-admin.log,否则可能会因为文件夹不存在而启动不了
  2. xxl-job-admin/src/main/resources/application.properties中数据库的地址、用户名和密码,以及accessToken等等需要修改为自定义的配置值

修改完配置,就可以启动服务了

mvn clean packagecd xxl-job-admin/targetjava -jar xxl-job-admin-3.1.2-SNAPSHOT.jar

这样服务就启动成功了,之后我们可以访问http://127.0.0.1:8080/xxl-job-admin/进入控制台

服务搭建

我们基于SpringBoot来构建业务服务,首先我们添加依赖

<dependency>    <groupId>com.xuxueli</groupId>    <artifactId>xxl-job-core</artifactId>    <version>3.1.1</version></dependency>

之后创建一个XXL-JOB的配置bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class XxlJobConfig {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
// XXL-JOB地址
xxlJobSpringExecutor.setAdminAddresses("http://127.0.0.1:8080/xxl-job-admin");
// 当前这个XXL-JOB的名称
xxlJobSpringExecutor.setAppname("test");
// 密钥,和之前的配置需要一致
xxlJobSpringExecutor.setAccessToken("1sg");
// 设置端口,不设置会自动选择
// xxlJobSpringExecutor.setPort(9999);
// 设置日志目录和保存策略
xxlJobSpringExecutor.setLogPath("./log");
xxlJobSpringExecutor.setLogRetentionDays(30);
return xxlJobSpringExecutor;
}
}

配置好了之后我们就可以新建一个任务了

1
2
3
4
5
6
7
8
9
10
@Component
public class DemoJob {

private static final Logger logger = LoggerFactory.getLogger(DemoJob.class);

@XxlJob("demoJobHandler")
public void demoJobHandler() {
logger.info("XXL-JOB 任务开始执行: {}", new Date());
}
}

添加了配置和任务之后,我们就可以启动服务了。

使用

在控制台中,我们选择执行器管理,之后新增一个执行器。其中,AppName就是上面配置的应用名,如上就是test,名称就是这个管理器的自定义名称,注册方式选择自动注册即可。

创建好执行器,我们就可以新增任务了。在任务管理中,我们新增一个执行器为刚刚新增那个执行器的任务。运行模式选择BEAN,JobHandler设置为@XxlJob注解中的值demoJobHandler。之后当任务执行的条件达到时,demoJobHandler方法就会执行了。

参考

XXL开源社区
XXL-JOB 安装及使用教程

使用NGINX的auth_request进行统一jwt鉴权

2025-07-24 23:22:56

NGINX的auth_request模块提供了一种统一的认证机制,可以在NGINX层面进行JWT鉴权,而不需要在每个后端服务中重复实现认证逻辑。

首先我们定义一下nginx的配置,它的配置如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
server {
listen 8965;

# 鉴权接口,仅供 Nginx 内部 auth_request 使用
location = /auth {
internal; # 该接口只能被 Nginx 内部请求,防止外部访问
# 转发到实际的认证服务
proxy_pass http://localhost:5001/api/auth/verify;
# 不转发请求体,提升效率
proxy_pass_request_body off;
# 防止后端因 Content-Length 不确定而报错
proxy_set_header Content-Length "";
# 将客户端传来的 Authorization 头(JWT Token)传给认证服务
proxy_set_header Authorization $http_authorization;
# 传入原始请求路径,供认证服务判断路径是否需要鉴权
proxy_set_header X-Original-URI $request_uri;
}

# 所有 / 路径下的请求都进行认证
location / {
# 认证请求会先调用上面的 /auth 接口
auth_request /auth;
# 如果认证失败(如返回 401),跳转到自定义处理逻辑
error_page 401 = @unauthorized;
# 从 /auth 的响应头中提取用户信息
auth_request_set $user_id $upstream_http_x_user_id;
# 把用户信息注入到请求头中,转发给后端业务服务
proxy_set_header X-User-ID $user_id;
# 转发到后端服务
proxy_pass http://localhost:5001;
}

# 自定义未授权响应(认证失败时返回)
location @unauthorized {
# 返回 401 状态码 + 文本内容
return 401 "Unauthorized";
}
}

有了这个nginx的配置之后,我们就可以实现鉴权的逻辑了,具体逻辑如下

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import jwt
from flask import Flask, request, make_response, jsonify
from jwt import InvalidTokenError

app = Flask(__name__)

SECRET_KEY = "a-string-secret-at-least-256-bits-long"

# 路径白名单,不需要鉴权的接口
PUBLIC_PATHS = [
"/api/public/hello",
"/api/login",
"/api/register"
]


@app.route("/api/auth/verify", methods=["GET"])
def verify_token():
original_uri = request.headers.get("X-Original-URI", "")
# 不需要鉴权的接口,直接返回200
if original_uri in PUBLIC_PATHS:
return "", 200

# 否则继续验证 JWT
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return "Missing or invalid Authorization header", 401

# 解析得到token
token = auth_header.split(" ", 1)[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
response = make_response("", 200)
response.headers["X-User-ID"] = str(payload.get("user_id", ""))
return response
except InvalidTokenError:
return "Invalid token", 401


@app.route("/api/hello")
def hello():
return jsonify({"message": "hello from auth", "user_id": request.headers.get("X-User-ID")})


@app.route("/api/public/hello")
def ping():
return {"msg": "hello without auth"}


if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)

启动nginx和如上python服务,之后我们使用如下payload和header以及密钥生成token

payload

{    "alg": "HS256",    "typ": "JWT"}

header

{    "user_id": 656670838050885}

密钥

a-string-secret-at-least-256-bits-long

生成得到token

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM

之后我们就可以使用token进行测试了,具体测试结果如下

~ AUTH_HEADER="Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2NTY2NzA4MzgwNTA4ODV9.caQ6cp-BA-OMxXu4zTUjV0OiZo1iygvdi7GPQNjNVHM"~ curl -H "$AUTH_HEADER" http://localhost:8965/api/hello{"message":"hello from auth","user_id":"656670838050885"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl -H "$AUTH_HEADER" http://localhost:8965/api/login<!doctype html><html lang=en><title>404 Not Found</title><h1>Not Found</h1><p>The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.</p>~ curl http://localhost:8965/api/public/hello{"msg":"hello without auth"}~ curl http://localhost:8965/api/helloUnauthorized%

如上我们正确设置了Authorization之后就可以正常访问需要鉴权的接口了,但是去掉了Authorization之后需要鉴权的接口就会返回Unauthorized。此外还可以看到,不需要鉴权的接口,即使不添加鉴权配置也是可以正常访问的。

使用APISIX解析jwt并获取payload信息

2025-07-11 19:23:07

APISIX支持获取jwt的信息,并且将这个信息进行解码并转发给后端服务。

1. 启动服务

首先我们根据官方脚本来启动APISIX服务

~ curl -sL "https://run.api7.ai/apisix/quickstart" | shDestroying existing apisix-quickstart container, if any.Installing APISIX with the quickstart options.Creating bridge network apisix-quickstart-net.77e35df073894075ad77facd9d1c7d2a35b280213732c1b631052caede079bab✔ network apisix-quickstart-net createdStarting the container etcd-quickstart.d123605c8b7658b130be97e5f44e7a160aa85858db008032ecf594266225e342✔ etcd is listening on etcd-quickstart:2379Starting the container apisix-quickstart.38434806c63b3a72f53fb6ad849cb4c11781eebaff79c8db04510226593fcf46⚠ WARNING: The Admin API key is currently disabled. You should turn on admin_key_required and set a strong Admin API key in production for security.✔ APISIX is ready!

2. 配置插件

启动了APISIX之后,我们首先创建一个插件配置。在这个插件中我们定义了一个Lua方法,这个方法的目的是从请求的header中获取authorization信息,并进行解码,之后将解码的信息放到HTTP header中传给后端

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/plugin_configs/1001' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "plugins": {        "serverless-pre-function": {            "phase": "access",            "functions": [                "return function(_, ctx) local core = require(\"apisix.core\") local jwt = require(\"resty.jwt\") local auth_header = ctx.var.http_authorization if not auth_header then return end local token = auth_header:match(\"Bearer%s+(.+)\") if not token then return end local obj = jwt:load_jwt(token) if obj and obj.valid and obj.payload then if obj.payload.user_id then core.request.set_header(\"X-User-Id\", obj.payload.user_id) end if obj.payload.role then core.request.set_header(\"X-User-Role\", obj.payload.role) end end end"            ]        }    }}'

如上的fucntions属性中添加了一个Lua方法,格式化之后的Lua代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
return function(_, ctx)
local core = require("apisix.core")
local jwt = require("resty.jwt")

local auth_header = ctx.var.http_authorization
if not auth_header then
return
end

local token = auth_header:match("Bearer%s+(.+)")
if not token then
return
end

local obj = jwt:load_jwt(token)
if obj and obj.valid and obj.payload then
if obj.payload.user_id then
core.request.set_header("X-User-Id", obj.payload.user_id)
end
if obj.payload.role then
core.request.set_header("X-User-Role", obj.payload.role)
end
end
end

这段代码实现了如下几个功能:

  1. 从 Authorization: Bearer 中提取 JWT
  2. 使用 resty.jwt 解码
  3. 如果合法,提取 user_id 和 role
  4. 注入到 header(X-User-Id, X-User-Role)中供后端读取

3. 配置consumer

创建了这个插件之后,我们再新建一个consumer。在APISIX中,consumer代表了一类客户端,比如APP。我们可以针对这类客户端添加一些配置,多种不同类型的客户端(比如APP、网页、开放平台,等等)可以分别设置成不同的consumer以方便管理

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/consumers/app' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "username": "app",    "plugins": {        "jwt-auth": {            "key": "app-key",            "secret": "a-string-secret-at-least-256-bits-long",            "algorithm": "HS256"        }    }}'

如上添加了一个名为app的consumer,它的key是app-key,加密方式是HS256,密钥是a-string-secret-at-least-256-bits-long。有了解析插件和consumer之后,我们就可以创建路由了。

4. 配置路由

如下请求会创建一个ID为1的路由,使用了ID为1001插件,并且添加了jwt-auth的配置,路由的后端是https://httpbin.org,这个网站会把我们请求的信息返回给我们。

curl --location --request PUT 'http://127.0.0.1:9180/apisix/admin/routes/1' \--header 'Content-Type: application/json' \--header 'Accept: */*' \--header 'Host: 127.0.0.1:9180' \--header 'Connection: keep-alive' \--data-raw '{    "uri": "/headers",    "plugin_config_id": 1001,    "plugins": {        "jwt-auth": {}    },    "upstream": {        "type": "roundrobin",        "nodes": {            "httpbin.org:80": 1        }    }}'

5. 发起请求

在创建好了plugin_config、consumer和route之后,我们就可以测试请求了。首先我们构建如下payload

1
2
3
4
5
6
{
"key": "app-key",
"user_id": 100001,
"role": "admin",
"exp": 1900000000
}

这个payload包含了user_idrole两个业务属性,exp代表这个jwt的过期时间戳,key是APISIX用于识别匹配哪个consumer的,这里我们选择匹配app-key这个consumer。之后我们将该payload和密钥a-string-secret-at-least-256-bits-long一起在https://jwt.io/进行编码,得到编码jwt信息如下

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I

之后我们执行HTTP请求,将这个jwt放到Authorization header中

curl --location --request GET 'http://127.0.0.1:9080/headers' \--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I' \--header 'Accept: */*' \--header 'Host: httpbin.org:80' \--header 'Connection: keep-alive'

请求得到的响应如下,可以看到user_id和role属性已经成功的传给后端服务了

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"headers": {
"Accept": "*/*",
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJrZXkiOiJhcHAta2V5IiwidXNlcl9pZCI6MTAwMDAxLCJyb2xlIjoiYWRtaW4iLCJleHAiOjE5MDAwMDAwMDB9.qG7PNPz2XlatmjrhNW_xf6SmI8T9JSIx2lJVJcAox0I",
"Host": "httpbin.org",
"User-Agent": "curl/7.81.0",
"X-Amzn-Trace-Id": "Root=1-68709903-309891501348175943af3223",
"X-Consumer-Username": "app",
"X-Forwarded-Host": "httpbin.org",
"X-User-Id": "100001",
"X-User-Role": "admin"
}
}

《推荐系统实践》

2025-06-22 18:54:59

从某种意义上说,推荐系统和搜索引擎对于用户来说是两个互补的工具。搜索引擎满足了用户有明确目的时的主动查找需求,而推荐系统能够在用户没有明确目的的时候帮助他们发现感兴趣的新内容。

基于用户行为分析的推荐算法是个性化推荐系统的重要算法,学术界一般将这种类型的算法称为协同过滤(Collaborative filtering)算法。顾名思义,协同过滤就是指用户可以齐心协力,通过不断地和网站互动,使自己的推荐列表能够不断过滤掉自己不感兴趣的物品,从而越来越满足自己的需求。

用户行为分类

用户行为在个性化推荐系统中一般分两种——显性反馈行为(explicit feedback)和隐性反馈行为(implicit feedback)。显示反馈行为是用户主动做的,比如给视频点赞、给书籍打分等等;隐式反馈行为的代表就是用户浏览页面,这种行为显示出来的用户偏好不是那么明显,但是数据量更大。

常用算法

基于邻域的算法

  • 基于用户的协同过滤算法 这种算法给用户推荐和他兴趣相似的其他用户喜欢的物品。
    1. 找到和目标用户兴趣相似的用户集合(P45)。
    2. 找到这个集合中的用户喜欢的,且目标用户没有听说过的物品推荐给目标用户。
  • 基于物品的协同过滤算法 这种算法给用户推荐和他之前喜欢的物品相似的物品。
    1. 计算物品之间的相似度(P53)。
    2. 根据物品的相似度和用户的历史行为给用户生成推荐列表。

基于用户的协同过滤算法

计算两个用户的兴趣相似程度:给定用户u和用户v,N(u)表示用户u曾经有过正反馈的物品集合,N(v)表示用户v曾经有过正反馈的物品集合。可以使用Jaccard公式计算两个用户的兴趣相似程度

wuv=|N(u)N(v)||N(u)N(v)|

或者使用余弦相似公式计算相似程度

wuv=|N(u)N(v)||N(u)||N(v)|

以余弦相似公式为例,假设有用户ABCD,物品abcde,用户喜欢的物品如下

用户 物品 a 物品 b 物品 c 物品 d 物品 e
A ☑️ ☑️ ☑️
B ☑️ ☑️
C ☑️ ☑️
D ☑️ ☑️ ☑️

那么我们可以得到用户A和BCD的相似度

wAB=|{a,b,d}{a,c}||{a,b,d}||{a,c}|=16

wAC=|{a,b,d}{b,e}||{a,b,d}||{b,e}|=16

wAD=|{a,b,d}{c,d,e}||{a,b,d}||{c,d,e}|=13

具体计算过程以AD的相似度计算为例

  1. 分子为交集并且交集为 {d}|{d}| = 1,所以分子为1
  2. 分母为并集,3 x 3 = 9,开根号为3
    最终值为 1 / 3

以上逻辑可以用代码进行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w

def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}
for k, v in similarity(users).items():
print(f'{k}: {json.dumps(v)}')

执行后得到结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}

据此我们就可以得到各个用户之间的兴趣相似度了。有了用户兴趣的相似度之后,我们可以给用户推荐和他兴趣最相似的K个用户喜欢的物品。我们可以使用如下公式计算用户u对物品i的感兴趣程度

p(u,i)=vS(u,K)N(i)wuvrvi

其中,S(u, K)包含和用户u兴趣最接近的K个用户,N(i)是对物品i有过行为的用户集合,wuv是用户u和用户v的兴趣相似度,rvi代表用户v对物品i的兴趣,因为使用的是单一行为的隐反馈数据,所以所有的rvi=1。

具体的逻辑实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank

通过这个代码我们就可以计算得到指定用户的物品推荐程度了。完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import math
from collections import defaultdict
from itertools import combinations


def similarity(users):
w = defaultdict(dict)
for u, v in combinations(users.keys(), 2):
r1 = len(users[u] & users[v])
r2 = math.sqrt(len(users[u]) * len(users[v]) * 1.0)
r = r1 / r2
w[u][v], w[v][u] = r, r # 保存两次,方便后面使用
return w


def recommend(user, users, w, k):
"""
:param user: 计算指定用户的物品推荐程度
:param users: 数据集
:param w: 前一步计算得到的用户兴趣相似度
:param k: 取k个兴趣最相似的用户
:return:
"""
rank = defaultdict(float)
# 获取指定用户和其它用户的兴趣相似度,并按照相似度从大到小排序,取前k个数据
for v, wuv in sorted(w[user].items(), key=lambda item: item[1], reverse=True)[:k]:
# 取出指定用户的数据集
for i in users[v]:
# 如果这个数据已经在当前用户的数据集里面,跳过,因为已经感兴趣的数据不需要再次推荐
if i in users[user]:
continue
rank[i] += wuv
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e'},
'D': {'c', 'd', 'e'}
}

w = similarity(users)
for k, v in w.items():
print(f'{k}: {json.dumps(v)}')

rank = recommend('C', users, w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

执行代码得到的结果如下

A: {"B": 0.4082482904638631, "C": 0.4082482904638631, "D": 0.3333333333333333}B: {"A": 0.4082482904638631, "C": 0.0, "D": 0.4082482904638631}C: {"A": 0.4082482904638631, "B": 0.0, "D": 0.4082482904638631}D: {"A": 0.3333333333333333, "B": 0.4082482904638631, "C": 0.4082482904638631}d: 0.8164965809277261a: 0.4082482904638631c: 0.4082482904638631

由上面的结果我们可以知道,针对用户C,最推荐的物品是物品d

根据上面的例子我们已经简单了解了基于用户的协同过滤算法,不过这种算法存在问题,主要是

  1. 随着网站的用户数目越来越大,计算用户兴趣相似度矩阵将越来越困难,其运算时间复杂度和空间复杂度的增长和用户数的增长近似于平方关系
  2. 基于用户的协同过滤很难对推荐结果作出解释

因此,在实际的使用中,更常见的是基于物品的协同过滤算法

基于物品的协同过滤算法

为了挖掘长尾信息,避免热门物品对推荐产生影响,减小二八定律的出现。可以用如下公式计算物品之间的相似度

wij=|N(i)N(j)||N(i)||N(j)|

分子是同时喜欢物品i和物品j的用户数,分母是喜欢两个物品用户数的并集。为了减小计算量,我们可以构建一个矩阵来存储某个用户喜欢的物品集合。

举个例子,比如用户A喜欢物品 {a, b, d},那我们可以构建如下矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  0  |  1  |  0  |b   |  1  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

因为a、b、d可以组成ab、ad、bd,所以将矩阵中的对应位置都填上1。这是一个用户的物品信息,对于多个用户,只需要把这些矩阵相加即可。例如有5个用户,他们的物品信息和生成的对应物品矩阵如下

用户 1: {a, c, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  1  |  1  |  0  |b   |  0  |  0  |  0  |  0  |  0  |c   |  1  |  0  |  0  |  1  |  0  |d   |  1  |  0  |  1  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 2: {b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  1  |  0  |  1  |c   |  0  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  0  |  1  |  1  |  0  |  0  |

用户 3: {a, d, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  1  |  1  |b   |  0  |  0  |  0  |  0  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  1  |  0  |  0  |  0  |  1  |e   |  1  |  0  |  0  |  1  |  0  |

用户 4: {b, d}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  0  |  0  |  0  |  0  |b   |  0  |  0  |  0  |  1  |  0  |c   |  0  |  0  |  0  |  0  |  0  |d   |  0  |  1  |  0  |  0  |  0  |e   |  0  |  0  |  0  |  0  |  0  |

用户 5: {a, b, c, e}

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  1  |  0  |  1  |b   |  1  |  0  |  1  |  0  |  1  |c   |  1  |  1  |  0  |  0  |  1  |d   |  0  |  0  |  0  |  0  |  0  |e   |  1  |  1  |  1  |  0  |  0  |

将这5个用户的物品信息相加,得到矩阵

    |  a  |  b  |  c  |  d  |  e  |----|-----|-----|-----|-----|-----|a   |  0  |  1  |  2  |  3  |  2  |b   |  1  |  0  |  3  |  2  |  3  |c   |  2  |  3  |  0  |  2  |  3  |d   |  3  |  2  |  2  |  0  |  2  |e   |  2  |  3  |  3  |  2  |  0  |

在这个矩阵中值越高,代表物品的相关度越高。接下来我们将这个规则用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
number = 'number'
def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1
# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity
return w

如上我们先计算每个物品各自被用户喜欢的次数,再计算每个物品和其它物品同时被某个用户喜欢的次数,之后根据物品相似度公式即可计算出物品之间的相关性。为了简单起见,如上代码只使用了一个字典变量,物品自己被喜欢的次数被保存在key为number的字段中,物品和其它物品同时被喜欢的次数则保存在key为其它物品ID的字段中。

有了如上逻辑之后,我们就可以计算物品相似度了,假设有用户如下

{  'A': {'a', 'b', 'd'},  'B': {'a', 'c'},  'C': {'b', 'e', 'a'},  'D': {'c', 'd', 'e'}}

计算得到的物品相似度

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}c: {'e': 0.5, 'd': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}

可以看到物品a和物品b的相关度是最高的。在得到物品的相关度之后,我们可以使用如下公式计算用户u对一个物品j的兴趣

puj=iN(u)S(j,K)wjirui

N(u)是用户喜欢的物品集合,S(j, K)是物品j最相似的K个物品的集合,wji是物品j和i的相似度,rui是用户对物品i的兴趣,可令rui为1。我们可以把这个逻辑使用代码进行实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank

有了计算用户和物品相关度的代码,我们就可以把逻辑结合起来,实现向用户推荐物品了。完整的代码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import math
from collections import defaultdict
from typing import Union

number = 'number'


def item_similarity(train):
# c[i][number]表示使用物品i的用户数量
# c[i][j]表示同时交互物品i和j的用户数
c = defaultdict(lambda: defaultdict(int))
for user, items in train.items():
for i in items:
# 统计每个物品被交互的总次数
c[i][number] += 1
# 统计物品i与其他物品的共现次数
for j in items:
if i == j:
continue
c[i][j] += 1

# 计算最终的相似度矩阵 w
w = defaultdict(dict)
for i, related_items in c.items():
for j, cij in related_items.items():
if j == number: continue
# 余弦相似度公式
similarity = cij / math.sqrt(c[i][number] * c[j][number])
w[i][j] = similarity

return w


def recommend(interacted_items: Union[set, dict], w, k):
"""
:param interacted_items: 指定用户交互过的物品
:param w: 物品的相似度
:param k: 取最相似的k个物品
:return:
"""
if isinstance(interacted_items, set): # 如果只有物品,没有评分,那么将评分统一设置为1
interacted_items = {k: 1 for k in interacted_items}
rank = defaultdict(float)
# 用户交互过的物品,和用户对这个物品的评分
for item, score in interacted_items.items():
# 物品的相似度信息,得到related_item和item的相似度similarity,按照相似度的值从大到小排序,取k个值
for related_item, similarity in sorted(w[item].items(), key=lambda x: x[1], reverse=True)[:k]:
# 如果这个物品已经被用户交互过了,跳过
if related_item in interacted_items:
continue
# 计算相关的物品的相似度评分
rank[related_item] += score * similarity
return rank


def main():
users = {
'A': {'a', 'b', 'd'},
'B': {'a', 'c'},
'C': {'b', 'e', 'a'},
'D': {'c', 'd', 'e'}
}
w = item_similarity(users)
for k, v in w.items():
print(f'{k}: {dict(sorted(v.items(), key=lambda item: item[1], reverse=True))}')

rank = recommend(users['B'], w, 3)
for k, v in sorted(rank.items(), key=lambda item: item[1], reverse=True):
print(f'{k}: {v}')


if __name__ == '__main__':
main()

以上代码的执行结果如下,可见用户B和物品d的相关度最高

b: {'a': 0.8164965809277261, 'd': 0.5, 'e': 0.5}d: {'b': 0.5, 'c': 0.5, 'e': 0.5, 'a': 0.4082482904638631}a: {'b': 0.8164965809277261, 'd': 0.4082482904638631, 'c': 0.4082482904638631, 'e': 0.4082482904638631}c: {'d': 0.5, 'e': 0.5, 'a': 0.4082482904638631}e: {'b': 0.5, 'c': 0.5, 'd': 0.5, 'a': 0.4082482904638631}d: 0.9082482904638631b: 0.8164965809277261e: 0.5

基于物品的推荐在工程中使用的比基于用户的推荐要多,因为UserCF(User Collaborative Filtering)的推荐更社会化,反映了用户所在的小型兴趣群体中物品的热门程度,而ItemCF(Item Collaborative Filtering)的推荐更加个性化,反映了用户自己的兴趣传承。

LFM(latent factor model)隐语义模型

隐语义模型核心思想是通过隐含特征(latent factor)联系用户兴趣和物品,它可以通过对数据进行分类来实现推荐。这种基于用户对数据的兴趣分类的方式,需要解决如下三个问题:

  1. 如何给物品分类
  2. 如何确定用户对哪些分类感兴趣,以及感兴趣的程度
  3. 对于一个分类,选择哪些物品推荐给用户,以及这些物品的权重如何

隐含语义分析技术(latent variable analysis)采取基于用户行为统计的自动聚类,来实现数据自动分类。

评测指标

一个推荐系统好不好,可以从用户满意度、预测准确度、覆盖率、多样性、新颖性、惊喜度、信任度、实时性、健壮性、商业目标等多个角度来进行评测

准确度

我们可以使用TopN推荐的方式来计算准确度,TopN的准确度一般通过准确率(precision)/召回率(recall)来进行度量。令R(u)是根据用户在训练集上的行为给用户作出的推荐列表,而T(u)是用户在测试集上的行为列表。那么,推荐结果的召回率定义为:

Recall=uU|R(u)T(u)|uU|T(u)|

推荐结果的准确率定义为:

Precision=uU|R(u)T(u)|uU|R(u)|

简单来说,R(u)代表系统推荐给用户u的Top-N列表(预测值),T(u)代表用户实际喜欢或点击过的项目(真实值),召回率和准确率公式的分子都是同时存在于推荐列表和用户喜欢列表的物品数。召回率的分母是用户喜欢的物品数,召回率是看系统有没有把用户喜欢的物品推荐出来。准确率的分母是系统推荐的物品总数,目的是看推荐有多少是对的。

指标 含义 关注点
Recall 你真正喜欢的内容中,被系统找出来了多少 不漏掉好东西
Precision 系统推荐的内容中,有多少真的是你喜欢的 不乱推荐垃圾

我们可以把召回率和准确率的计算通过如下代码实现

flat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def precision_recall(test_data, train_data, n, recommend_func):
"""
计算推荐系统在测试集上的准确率和召回率

:param test_data: dict,用户 -> 测试集中真实交互物品列表
:param train_data: dict,用户 -> 训练集中交互物品列表(用于生成推荐)
:param n: int,每个用户推荐的物品数量
:param recommend_func: function(user, n, train_data),返回推荐物品列表
:return: [recall, precision]
"""
hit = 0 # 交集
total_actual = 0 # 所有用户的真实物品总数
total_recommend = 0 # 所有推荐物品总数

for user, actual_items in test_data.items():
# 计算推荐物品
recommended_items = recommend_func(user, n, train_data)
# 计算交集
hit += len(set(recommended_items) & set(actual_items))
# 真实物品数
total_actual += len(actual_items)
# 推荐物品数
total_recommend += len(recommended_items)

recall = hit / total_actual if total_actual else 0
precision = hit / total_recommend if total_recommend else 0
return [recall, precision]

参考

推荐系统实践

《上瘾:让用户养成使用习惯的四大产品逻辑》

2025-06-19 01:44:52

Hooked: How to Build Habit-Forming Products

如何卖出更多的产品:产能 -> 营销/渠道 -> 产品设计

上瘾如何设计产品:触发 -> 行动 -> 多变的酬赏 -> 投入

习惯是大脑借以掌握复杂举动的途径之一。神经系统科学家指出,人脑中存在一个负责无意识行为的基底神经节,那些无意中产生的条件反射会以习惯的形式存储在基底神经节中,从而使人们腾出精力来关注其他的事物。当大脑试图走捷径而不再主动思考接下来该做些什么时,习惯就养成了。为解决当下面临的问题,大脑会在极短的时间内从行为存储库里提取出相宜的对策。
(就是基底核,有点像缓存的作用)

我们所要描述的体验更接近于“痒”,它是潜伏于我们内心的一种渴求,当这种渴求得不到满足时,不适感就会出现。那些让我们养成某种习惯的产品正好可以缓解这种不适感。比起听之任之的做法,利用技术或产品来”挠痒痒”能够更快地满足我们的渴求。一旦我们对某种技术或产品产生依赖,那它就是唯一的灵丹妙药了。

福格行为模型可以用公式来呈现,即B=MAT。B代表行为,M代表动机,A代表能力,T代表触发。要想使人们完成特定的行为,动机、能力、触发这三样缺一不可。1否则,人们将无法跨过”行动线”,也就是说,不会实施某种行为。

  • 稀缺效应:物以稀为贵
  • 环境效应:环境会影响人们的价值判断
  • 锚定效应
  • 赠券效应

多变的酬赏主要表现为三种形式:社交酬赏,猎物酬赏,自我酬赏

沉没成本:通过用户对产品的投入程度,留住用户

总体评价:很薄的一本书,有部分的观点有参考意义,但是大部分的论调都是老生常谈。大多数的观点在很多心理学的书籍里面已经讲过了,本书主要是讲怎么依赖于这些原理来进行实操,有一定的参考意义。

https://book.douban.com/subject/27030507/

Kotlin与Java对照手册

2025-04-24 22:21:27

1. 基本类型

类型 Kotlin 写法 Java 写法 简要说明
数字 Int, Long, Float, Double, Short, Byte int, long, float, double, short, byte Kotlin 数值类型映射到相应的原生/包装类型。
布尔 Boolean boolean 只能取 true/false,与数字不互通。
字符 Char char 单个 Unicode 字符,支持转义序列。
字符串 String String 不可变;支持多行文本块 """..."""
数组 Array<T>, IntArray T[] 提供原始类型专用数组如 IntArrayByteArray
无符号整型 UInt, ULong, UShort, UByte 编译时检查范围,运行时越界抛 IllegalArgumentException

2. 语法对照

功能 Java 写法 Kotlin 写法 简要说明
变量定义 int x = 10; final String name = "Tom"; var x = 10 val name = "Tom" var 可变,val 只读;类型由编译器推断。
类 + 构造 public class P { P(String n) { ... } } class P(val name: String) 主构造中声明属性,自动生成字段 & 访问器。
数据类 手动写字段/构造/equals/toString data class User(val id: Int, val n: String) data 自动生成常用方法 & 解构组件。
函数定义 public int sum(int a, int b) { return a + b; } fun sum(a: Int, b: Int) = a + b 表达式函数可省略大括号和 return
空安全 if (s != null) len = s.length(); else len = 0; val len = s?.length ?: 0 String? 可空,?.?: 插入编译期空检查。
分支匹配 switch(x) { case 1: ... } when(x) { 1 -> ...; else -> ... } when 是表达式,支持范围 & 任意对象比较。
循环 & 集合 for(int i=0;i<10;i++)``list.stream().filter() for(i in 0 until 10)``list.filter{} 0 until 生成 IntRange;集合链式调用基于扩展函数。
单例 class S { private static S i=new S(); … } object S { fun foo() {} } object 编译时生成线程安全单例,无需额外样板。

3. 独有亮点

特性 示例 简要说明
默认 & 命名参数 fun g(msg: String = "Hi", name: String = "You") g(name="Tom") 编译器生成默认方法,命名参数避免重载歧义。
扩展函数 fun String.ex() = uppercase() 编译后为静态方法,第一个参数是接收者,调用如成员方法。
解构声明 val (x, y) = Point(1, 2) data class 自动生成 componentN(),一行取多值。
密封类 sealed class R; data class Ok(val d: String): R(); object Err: R() 限定子类范围,when 可做穷尽检查。
内联函数 inline fun <T> m(b: ()->T): T { … } 在调用处展开函数体,减少高阶函数的运行时开销。
集合构造器 listOf(1, 2), mutableListOf("A"), mapOf("a" to 1) 内建集合工厂函数,语法简洁;to 表示键值对。
数组构造器 arrayOf(1, 2), intArrayOf(1, 2) 支持泛型与原始类型数组,避免装箱。
表达式返回值 val max = if (a > b) a else b val result = try { … } catch { … } ifwhentry 都是表达式,可直接赋值。
区间语法 & 步进 for (i in 1..5), for (j in 1 until 5 step 2) .. 表闭区间,until 表半开,step 控制步长。
字符串模板 "Hello, $name" "Length: ${s.length}" $变量 可直接拼接,复杂表达式用 ${}
Lambda 尾随语法 list.filter { it > 0 }.map { it * 2 } 大括号可直接跟随函数调用,链式语法自然、简洁。

4. 常用标准库函数

函数 用法示例 简要说明
let user?.let { print(it.name) } 非空时执行块,it 引用原对象。
apply User().apply { age = 18 } 在对象上执行块并返回该对象,常用于初始化。
also list.also { println("init") } 执行副作用并返回对象,常用于日志 / 调试。
run val r = run { compute(); result } 无接收者的作用域块,返回最后一行结果。
with with(cfg) { load(); validate() } 对象上下文块,this 指向接收者,返回结果。
takeIf str.takeIf { it.isNotBlank() } 条件为真返回对象,否则返回 null
sequence sequenceOf(1,2,3).map { … } 惰性集合处理,适合大规模数据管道。

5. 类型系统对比

功能 Java 写法 Kotlin 写法 简要说明
泛型 List<String> List<String> 支持协变 / 逆变(out / in)和 reified 泛型函数。
类型别名 typealias Name = String 简化复杂类型声明。
枚举类 enum Color { RED, GREEN } enum class Color { RED, GREEN } 支持在枚举中定义属性 & 方法。
内联类 @JvmInline value class USD(val amount: Int) 编译时包装或展开,零开销封装。

6. 类型检测与转换

功能 Java 写法 Kotlin 写法 简要说明
类型检查 if (obj instanceof String) if (obj is String) is 后自动智能转换,无需显式强转。
安全转换 (String) obj obj as String / obj as? String as? 安全转换失败返回 null
基本转换 Integer.parseInt(str) str.toInt(), toDouble(), toLong() 通过扩展函数提供常见类型转换。

7. 控制流程 & 异常

功能 Java 写法 Kotlin 写法 简要说明
条件 & 循环 if, switch, for, while, do-while if, when, for, while, do-while when 可做表达式,替代 switch
返回 & 跳转 return, break, continue, throw 同 Java 支持在 lambda 中局部返回,如 return@label
异常处理 try-catch-finally, checked exception try-catch-finally,无 checked exception Kotlin 不区分受检异常,简化错误处理。

8. 包与导入

功能 Java 写法 Kotlin 写法 简要说明
包声明 package com.example; package com.example 不需要分号。
导入 import java.util.List; import java.util.List 支持导入顶层函数和属性。
别名导入 import foo.Bar as Baz 解决命名冲突或简化引用。

9. 面向对象相关

功能 Java 写法 Kotlin 写法 简要说明
接口默认实现 default void f() {} 接口中可直接写方法体 接口内方法可有实现,无需关键字。
抽象类 abstract class Shape { … } abstract class Shape { … } 抽象成员不需再加 abstract 前缀。
继承 & 覆写 class A extends B { @Override … } class A : B() { override fun … } : 表示继承,override 必显式标注。
可见性修饰符 public/protected/private public/protected/private/internal internal 表示同模块内可见。
内部类 class Outer { class Inner {} } class Outer { inner class Inner {} } 默认是静态嵌套,加 inner 变为非静态内部类。

10. 协程 vs 多线程

场景 Java 写法(线程/异步) Kotlin 写法(协程) 简要说明
启动任务 new Thread(() -> work()).start(); GlobalScope.launch { work() } 协程更轻量、省资源,适合大规模并发。
异步返回值 Future<Integer> f = exec.submit(...); val result = async { compute() }.await() 内建 async/await,语义更清晰。
延迟执行 Thread.sleep(1000) delay(1000) 非阻塞挂起,不占用线程。
结构化并发 手动管理线程池和生命周期 coroutineScope { … } 协程作用域自动管理生命周期,避免泄漏。

📦 11. 集合操作对比

功能 Java 写法(Stream) Kotlin 写法(扩展函数) 简要说明
过滤 list.stream().filter(x -> x > 0).collect(...) list.filter { it > 0 } 语法简洁,链式调用更直观。
映射 list.stream().map(x -> x * 2).collect(...) list.map { it * 2 } Lambda 简洁,扩展函数无额外依赖。
分组 Collectors.groupingBy(...) list.groupBy { it.key } 直接返回 Map<K, List<V>>,更易读。
排序 list.sort(Comparator.comparing(...)) list.sortedBy { it.prop } 函数式排序,链式可读性好。
聚合 reduce, sum, collect reduce, sumOf, fold 内建多种聚合函数,常用时无需额外导入。

参考:

Kotlin内核编程
Kotlin 语言参考文档
深入理解Kotlin协程
有没有 Kotlin 讲协程比较好的书籍或博客连载
Kotlin 官方文档 中文版