feat: 添加 Session 亲和路由支持(NewSession / WithAffinity)

- 新增 NewSession(pool):返回固定到同一 worker 进程的 Pool 视图
- 新增 WithAffinity(ctx, key):相同 key 通过 FNV hash 稳定路由到同一 worker
- pool.acquire 支持亲和路由,无亲和时保持轮询
- example 添加 Session / WithAffinity / 全局变量共享三组演示
This commit is contained in:
2026-05-08 11:21:02 +08:00
parent 30004e44ee
commit cda662e874
5 changed files with 209 additions and 1 deletions

26
affinity.go Normal file
View File

@@ -0,0 +1,26 @@
package gobridge
import (
"context"
"hash/fnv"
)
type affinityKey struct{} // 用户提供的字符串 keyacquire 时 hash 到 worker 下标
// WithAffinity 将亲和键写入 ctx相同 key 始终路由到同一 worker 进程。
//
// ctx = gobridge.WithAffinity(ctx, "user-42")
// gobridge.Invoke(ctx, pool, "method", ...) // 同 key 始终走同一进程
func WithAffinity(ctx context.Context, key string) context.Context {
return context.WithValue(ctx, affinityKey{}, key)
}
// workerIndexFor 根据 ctx 计算应使用的 worker 下标,无亲和键时返回 -1交由轮询
func workerIndexFor(ctx context.Context, n int) int {
if key, ok := ctx.Value(affinityKey{}).(string); ok && key != "" {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32()) % n
}
return -1
}

View File

@@ -34,6 +34,7 @@ func main() {
demoPool(ctx, pool)
demoServer(ctx, script)
demoSession(ctx, script)
}
func demoPool(ctx context.Context, pool gobridge.Pool) {
@@ -172,6 +173,86 @@ func (s *goService) MakeUser(ctx context.Context, uid int) (User, error) {
return User{ID: uid, Name: fmt.Sprintf("user_%d", uid), Score: float64(uid) * 1.5}, nil
}
func demoSession(ctx context.Context, script string) {
fmt.Println("\n── Session 亲和示例workers=2─────────────────────────────────")
pool, err := gobridge.NewPool(script, gobridge.WithWorkers(2))
if err != nil {
log.Fatal(err)
}
defer pool.Close()
// ── NewSession三个 sessionC 与 A 落在同一 worker验证互不干扰 ───────
sessA := gobridge.NewSession(pool) // worker 1
sessB := gobridge.NewSession(pool) // worker 0
sessC := gobridge.NewSession(pool) // worker 1与 A 同进程,不同 session_id
msgA, err := gobridge.Invoke[string](ctx, sessA, "session_init", "A", 100)
if err != nil {
log.Fatal(err)
}
fmt.Println("A init:", msgA)
msgB, err := gobridge.Invoke[string](ctx, sessB, "session_init", "B", 200)
if err != nil {
log.Fatal(err)
}
fmt.Println("B init:", msgB)
msgC, err := gobridge.Invoke[string](ctx, sessC, "session_init", "C", 300)
if err != nil {
log.Fatal(err)
}
fmt.Println("C init:", msgC) // 应与 A 在同一 worker
// 对 A/B/C 各自做 step验证三者状态完全独立
v, _ := gobridge.Invoke[int](ctx, sessA, "session_step", "A", 10)
fmt.Println("A step(+10) =", v) // 110
v, _ = gobridge.Invoke[int](ctx, sessA, "session_step", "A", 5)
fmt.Println("A step(+5) =", v) // 115
v, _ = gobridge.Invoke[int](ctx, sessB, "session_step", "B", 50)
fmt.Println("B step(+50) =", v) // 250
v, _ = gobridge.Invoke[int](ctx, sessC, "session_step", "C", 99)
fmt.Println("C step(+99) =", v) // 399与 A 同 worker 但不受影响)
rA, _ := gobridge.Invoke[map[string]any](ctx, sessA, "session_result", "A")
rB, _ := gobridge.Invoke[map[string]any](ctx, sessB, "session_result", "B")
rC, _ := gobridge.Invoke[map[string]any](ctx, sessC, "session_result", "C")
fmt.Printf("A result = %v\n", rA) // map[steps:[10 5] value:115]
fmt.Printf("B result = %v\n", rB) // map[steps:[50] value:250]
fmt.Printf("C result = %v\n", rC) // map[steps:[99] value:399]
// ── 全局变量测试sessA 和 sessC 同 worker共享进程级 counter ──────────
fmt.Println()
r, _ := gobridge.Invoke[string](ctx, sessA, "global_increment", 10)
fmt.Println("sessA +10:", r) // worker 1 counter = 10
r, _ = gobridge.Invoke[string](ctx, sessC, "global_increment", 5)
fmt.Println("sessC +5 :", r) // worker 1 counter = 15与 sessA 共享)
r, _ = gobridge.Invoke[string](ctx, sessB, "global_increment", 99)
fmt.Println("sessB +99:", r) // worker 0 counter = 99独立进程从 0 开始)
r, _ = gobridge.Invoke[string](ctx, sessA, "global_get")
fmt.Println("sessA get:", r) // worker 1 counter = 15不受 sessB 影响)
// ── WithAffinity相同 key 跨调用始终路由同一 worker ────────────────────
fmt.Println()
for i := range 4 {
affinityCtx := gobridge.WithAffinity(ctx, "sticky-key")
msg, _ := gobridge.Invoke[string](affinityCtx, pool, "session_init",
fmt.Sprintf("aff-%d", i), i)
fmt.Printf("WithAffinity(sticky-key) #%d → %s\n", i, msg)
}
// ── 对照组:不带亲和,轮询分配给两个 worker ─────────────────────────────
fmt.Println()
for i := range 4 {
msg, _ := gobridge.Invoke[string](ctx, pool, "session_init",
fmt.Sprintf("rr-%d", i), i)
fmt.Printf("round-robin #%d → %s\n", i, msg)
}
}
func demoServer(ctx context.Context, script string) {
fmt.Println("\n── Server 全双工示例 ─────────────────────────────────────────────")

View File

@@ -143,6 +143,45 @@ def get_user_via_go(uid: int) -> dict:
return dataclasses.asdict(user)
# ── Session 亲和示例 ──────────────────────────────────────────────────────────
# _sessions 保存每个 session 的状态key 由调用方提供
_sessions: dict = {}
# _global_counter 是进程级全局变量,同一 worker 的所有 session 共享
_global_counter: int = 0
@expose
def global_increment(delta: int) -> str:
global _global_counter
_global_counter += delta
return f"[worker {worker_id}] counter = {_global_counter}"
@expose
def global_get() -> str:
return f"[worker {worker_id}] counter = {_global_counter}"
@expose
def session_init(session_id: str, value: int) -> str:
_sessions[session_id] = {"value": value, "steps": []}
return f"[worker {worker_id}] session {session_id} init with {value}"
@expose
def session_step(session_id: str, delta: int) -> int:
s = _sessions[session_id]
s["value"] += delta
s["steps"].append(delta)
return s["value"]
@expose
def session_result(session_id: str) -> dict:
return _sessions.pop(session_id)
if __name__ == "__main__":
run()
print("worker_id", worker_id)

13
pool.go
View File

@@ -210,7 +210,18 @@ func (p *pool) bindHandler(name string, fn reflect.Value) {
}
func (p *pool) acquire(ctx context.Context) (net.Conn, *worker, error) {
idx := p.idx.Add(1) % uint64(len(p.workers))
n := uint64(len(p.workers))
if n == 0 {
return nil, nil, fmt.Errorf("gobridge: pool has no workers")
}
var idx uint64
if i := workerIndexFor(ctx, int(n)); i >= 0 {
idx = uint64(i) % n // 防御ctx 可能来自不同 pool
} else {
idx = p.idx.Add(1) % n
}
w := p.workers[idx]
conn, err := w.acquire(ctx)
return conn, w, err

51
session.go Normal file
View File

@@ -0,0 +1,51 @@
package gobridge
import (
"context"
"net"
)
// singleWorkerPool 是 Pool 的包装,所有请求固定路由到同一个 worker 进程。
type singleWorkerPool struct {
pool *pool
workerIdx int
}
// NewSession 返回一个固定到某个 worker 进程的 Pool 视图。
// 通过该 Pool 发起的所有 Invoke 调用始终路由到同一 Python 进程,
// 适用于多次调用之间需要共享 Python 侧状态的场景。
//
// session := gobridge.NewSession(pool)
// gobridge.Invoke(ctx, session, "init", arg)
// gobridge.Invoke(ctx, session, "next_step") // 与上一行走同一进程
func NewSession(p Pool) Pool {
inner, ok := p.(*pool)
if !ok {
// 已经是 session 或其他实现,直接返回
return p
}
n := uint64(len(inner.workers))
if n == 0 {
return p
}
idx := inner.idx.Add(1) % n
return &singleWorkerPool{pool: inner, workerIdx: int(idx)}
}
func (s *singleWorkerPool) acquire(ctx context.Context) (net.Conn, *worker, error) {
idx := s.workerIdx % len(s.pool.workers)
w := s.pool.workers[idx]
conn, err := w.acquire(ctx)
return conn, w, err
}
// Close 不关闭底层 poolsession 不拥有 pool 的生命周期。
func (s *singleWorkerPool) Close() {}
func (s *singleWorkerPool) nextReqID() uint64 {
return s.pool.nextReqID()
}
func (s *singleWorkerPool) callbackDispatch(ctx context.Context, msg Message) (any, string) {
return s.pool.callbackDispatch(ctx, msg)
}