diff --git a/affinity.go b/affinity.go new file mode 100644 index 0000000..91b4558 --- /dev/null +++ b/affinity.go @@ -0,0 +1,26 @@ +package gobridge + +import ( + "context" + "hash/fnv" +) + +type affinityKey struct{} // 用户提供的字符串 key,acquire 时 hash 到 worker 下标 + +// WithAffinity 将亲和键写入 ctx,相同 key 始终路由到同一 worker 进程。 +// +// ctx = gobridge.WithAffinity(ctx, "user-42") +// gobridge.Invoke(ctx, pool, "method", ...) // 同 key 始终走同一进程 +func WithAffinity(ctx context.Context, key string) context.Context { + return context.WithValue(ctx, affinityKey{}, key) +} + +// workerIndexFor 根据 ctx 计算应使用的 worker 下标,无亲和键时返回 -1(交由轮询)。 +func workerIndexFor(ctx context.Context, n int) int { + if key, ok := ctx.Value(affinityKey{}).(string); ok && key != "" { + h := fnv.New32a() + h.Write([]byte(key)) + return int(h.Sum32()) % n + } + return -1 +} diff --git a/example/main.go b/example/main.go index ea13d1a..facdf10 100644 --- a/example/main.go +++ b/example/main.go @@ -34,6 +34,7 @@ func main() { demoPool(ctx, pool) demoServer(ctx, script) + demoSession(ctx, script) } func demoPool(ctx context.Context, pool gobridge.Pool) { @@ -172,6 +173,86 @@ func (s *goService) MakeUser(ctx context.Context, uid int) (User, error) { return User{ID: uid, Name: fmt.Sprintf("user_%d", uid), Score: float64(uid) * 1.5}, nil } +func demoSession(ctx context.Context, script string) { + fmt.Println("\n── Session 亲和示例(workers=2)─────────────────────────────────") + + pool, err := gobridge.NewPool(script, gobridge.WithWorkers(2)) + if err != nil { + log.Fatal(err) + } + defer pool.Close() + + // ── NewSession:三个 session,C 与 A 落在同一 worker,验证互不干扰 ─────── + sessA := gobridge.NewSession(pool) // worker 1 + sessB := gobridge.NewSession(pool) // worker 0 + sessC := gobridge.NewSession(pool) // worker 1(与 A 同进程,不同 session_id) + + msgA, err := gobridge.Invoke[string](ctx, sessA, "session_init", "A", 100) + if err != nil { + log.Fatal(err) + } + fmt.Println("A init:", msgA) + + msgB, err := gobridge.Invoke[string](ctx, sessB, "session_init", "B", 200) + if err != nil { + log.Fatal(err) + } + fmt.Println("B init:", msgB) + + msgC, err := gobridge.Invoke[string](ctx, sessC, "session_init", "C", 300) + if err != nil { + log.Fatal(err) + } + fmt.Println("C init:", msgC) // 应与 A 在同一 worker + + // 对 A/B/C 各自做 step,验证三者状态完全独立 + v, _ := gobridge.Invoke[int](ctx, sessA, "session_step", "A", 10) + fmt.Println("A step(+10) =", v) // 110 + v, _ = gobridge.Invoke[int](ctx, sessA, "session_step", "A", 5) + fmt.Println("A step(+5) =", v) // 115 + + v, _ = gobridge.Invoke[int](ctx, sessB, "session_step", "B", 50) + fmt.Println("B step(+50) =", v) // 250 + + v, _ = gobridge.Invoke[int](ctx, sessC, "session_step", "C", 99) + fmt.Println("C step(+99) =", v) // 399(与 A 同 worker 但不受影响) + + rA, _ := gobridge.Invoke[map[string]any](ctx, sessA, "session_result", "A") + rB, _ := gobridge.Invoke[map[string]any](ctx, sessB, "session_result", "B") + rC, _ := gobridge.Invoke[map[string]any](ctx, sessC, "session_result", "C") + fmt.Printf("A result = %v\n", rA) // map[steps:[10 5] value:115] + fmt.Printf("B result = %v\n", rB) // map[steps:[50] value:250] + fmt.Printf("C result = %v\n", rC) // map[steps:[99] value:399] + + // ── 全局变量测试:sessA 和 sessC 同 worker,共享进程级 counter ────────── + fmt.Println() + r, _ := gobridge.Invoke[string](ctx, sessA, "global_increment", 10) + fmt.Println("sessA +10:", r) // worker 1 counter = 10 + r, _ = gobridge.Invoke[string](ctx, sessC, "global_increment", 5) + fmt.Println("sessC +5 :", r) // worker 1 counter = 15(与 sessA 共享) + r, _ = gobridge.Invoke[string](ctx, sessB, "global_increment", 99) + fmt.Println("sessB +99:", r) // worker 0 counter = 99(独立进程,从 0 开始) + r, _ = gobridge.Invoke[string](ctx, sessA, "global_get") + fmt.Println("sessA get:", r) // worker 1 counter = 15(不受 sessB 影响) + + // ── WithAffinity:相同 key 跨调用始终路由同一 worker ──────────────────── + fmt.Println() + for i := range 4 { + affinityCtx := gobridge.WithAffinity(ctx, "sticky-key") + msg, _ := gobridge.Invoke[string](affinityCtx, pool, "session_init", + fmt.Sprintf("aff-%d", i), i) + fmt.Printf("WithAffinity(sticky-key) #%d → %s\n", i, msg) + } + + // ── 对照组:不带亲和,轮询分配给两个 worker ───────────────────────────── + fmt.Println() + for i := range 4 { + msg, _ := gobridge.Invoke[string](ctx, pool, "session_init", + fmt.Sprintf("rr-%d", i), i) + fmt.Printf("round-robin #%d → %s\n", i, msg) + } +} + func demoServer(ctx context.Context, script string) { fmt.Println("\n── Server 全双工示例 ─────────────────────────────────────────────") diff --git a/example/worker.py b/example/worker.py index 7e3fb4d..3cb981a 100644 --- a/example/worker.py +++ b/example/worker.py @@ -143,6 +143,45 @@ def get_user_via_go(uid: int) -> dict: return dataclasses.asdict(user) +# ── Session 亲和示例 ────────────────────────────────────────────────────────── +# _sessions 保存每个 session 的状态,key 由调用方提供 +_sessions: dict = {} + +# _global_counter 是进程级全局变量,同一 worker 的所有 session 共享 +_global_counter: int = 0 + + +@expose +def global_increment(delta: int) -> str: + global _global_counter + _global_counter += delta + return f"[worker {worker_id}] counter = {_global_counter}" + + +@expose +def global_get() -> str: + return f"[worker {worker_id}] counter = {_global_counter}" + + +@expose +def session_init(session_id: str, value: int) -> str: + _sessions[session_id] = {"value": value, "steps": []} + return f"[worker {worker_id}] session {session_id} init with {value}" + + +@expose +def session_step(session_id: str, delta: int) -> int: + s = _sessions[session_id] + s["value"] += delta + s["steps"].append(delta) + return s["value"] + + +@expose +def session_result(session_id: str) -> dict: + return _sessions.pop(session_id) + + if __name__ == "__main__": run() print("worker_id", worker_id) diff --git a/pool.go b/pool.go index a91c96c..aa84121 100644 --- a/pool.go +++ b/pool.go @@ -210,7 +210,18 @@ func (p *pool) bindHandler(name string, fn reflect.Value) { } func (p *pool) acquire(ctx context.Context) (net.Conn, *worker, error) { - idx := p.idx.Add(1) % uint64(len(p.workers)) + n := uint64(len(p.workers)) + if n == 0 { + return nil, nil, fmt.Errorf("gobridge: pool has no workers") + } + + var idx uint64 + if i := workerIndexFor(ctx, int(n)); i >= 0 { + idx = uint64(i) % n // 防御:ctx 可能来自不同 pool + } else { + idx = p.idx.Add(1) % n + } + w := p.workers[idx] conn, err := w.acquire(ctx) return conn, w, err diff --git a/session.go b/session.go new file mode 100644 index 0000000..62c3f1d --- /dev/null +++ b/session.go @@ -0,0 +1,51 @@ +package gobridge + +import ( + "context" + "net" +) + +// singleWorkerPool 是 Pool 的包装,所有请求固定路由到同一个 worker 进程。 +type singleWorkerPool struct { + pool *pool + workerIdx int +} + +// NewSession 返回一个固定到某个 worker 进程的 Pool 视图。 +// 通过该 Pool 发起的所有 Invoke 调用始终路由到同一 Python 进程, +// 适用于多次调用之间需要共享 Python 侧状态的场景。 +// +// session := gobridge.NewSession(pool) +// gobridge.Invoke(ctx, session, "init", arg) +// gobridge.Invoke(ctx, session, "next_step") // 与上一行走同一进程 +func NewSession(p Pool) Pool { + inner, ok := p.(*pool) + if !ok { + // 已经是 session 或其他实现,直接返回 + return p + } + n := uint64(len(inner.workers)) + if n == 0 { + return p + } + idx := inner.idx.Add(1) % n + return &singleWorkerPool{pool: inner, workerIdx: int(idx)} +} + +func (s *singleWorkerPool) acquire(ctx context.Context) (net.Conn, *worker, error) { + idx := s.workerIdx % len(s.pool.workers) + w := s.pool.workers[idx] + conn, err := w.acquire(ctx) + return conn, w, err +} + +// Close 不关闭底层 pool,session 不拥有 pool 的生命周期。 +func (s *singleWorkerPool) Close() {} + +func (s *singleWorkerPool) nextReqID() uint64 { + return s.pool.nextReqID() +} + +func (s *singleWorkerPool) callbackDispatch(ctx context.Context, msg Message) (any, string) { + return s.pool.callbackDispatch(ctx, msg) +}