Files
gobridge/session.go
what cda662e874 feat: 添加 Session 亲和路由支持(NewSession / WithAffinity)
- 新增 NewSession(pool):返回固定到同一 worker 进程的 Pool 视图
- 新增 WithAffinity(ctx, key):相同 key 通过 FNV hash 稳定路由到同一 worker
- pool.acquire 支持亲和路由,无亲和时保持轮询
- example 添加 Session / WithAffinity / 全局变量共享三组演示
2026-05-08 11:21:02 +08:00

52 lines
1.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package gobridge
import (
"context"
"net"
)
// singleWorkerPool 是 Pool 的包装,所有请求固定路由到同一个 worker 进程。
type singleWorkerPool struct {
pool *pool
workerIdx int
}
// NewSession 返回一个固定到某个 worker 进程的 Pool 视图。
// 通过该 Pool 发起的所有 Invoke 调用始终路由到同一 Python 进程,
// 适用于多次调用之间需要共享 Python 侧状态的场景。
//
// session := gobridge.NewSession(pool)
// gobridge.Invoke(ctx, session, "init", arg)
// gobridge.Invoke(ctx, session, "next_step") // 与上一行走同一进程
func NewSession(p Pool) Pool {
inner, ok := p.(*pool)
if !ok {
// 已经是 session 或其他实现,直接返回
return p
}
n := uint64(len(inner.workers))
if n == 0 {
return p
}
idx := inner.idx.Add(1) % n
return &singleWorkerPool{pool: inner, workerIdx: int(idx)}
}
func (s *singleWorkerPool) acquire(ctx context.Context) (net.Conn, *worker, error) {
idx := s.workerIdx % len(s.pool.workers)
w := s.pool.workers[idx]
conn, err := w.acquire(ctx)
return conn, w, err
}
// Close 不关闭底层 poolsession 不拥有 pool 的生命周期。
func (s *singleWorkerPool) Close() {}
func (s *singleWorkerPool) nextReqID() uint64 {
return s.pool.nextReqID()
}
func (s *singleWorkerPool) callbackDispatch(ctx context.Context, msg Message) (any, string) {
return s.pool.callbackDispatch(ctx, msg)
}