refactor: 将 Pool 重构为接口,提取 pool 为具体实现
This commit is contained in:
18
client.go
18
client.go
@@ -20,7 +20,7 @@ import (
|
|||||||
//
|
//
|
||||||
// ctx 取消时会立即中断与 Python 的通信并返回 ctx.Err()。
|
// ctx 取消时会立即中断与 Python 的通信并返回 ctx.Err()。
|
||||||
// 对于流式输出/双向流,ctx 取消会关闭返回的 channel。
|
// 对于流式输出/双向流,ctx 取消会关闭返回的 channel。
|
||||||
func Invoke[R any](ctx context.Context, pool *Pool, method string, args ...any) (R, error) {
|
func Invoke[R any](ctx context.Context, pool Pool, method string, args ...any) (R, error) {
|
||||||
rt := reflect.TypeFor[R]()
|
rt := reflect.TypeFor[R]()
|
||||||
|
|
||||||
// 查找 chan 类型的输入参数
|
// 查找 chan 类型的输入参数
|
||||||
@@ -89,7 +89,7 @@ func contextErr(ctx context.Context, err error) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func invokeRegular[R any](ctx context.Context, pool *Pool, method string, args ...any) (R, error) {
|
func invokeRegular[R any](ctx context.Context, pool Pool, method string, args ...any) (R, error) {
|
||||||
var zero R
|
var zero R
|
||||||
|
|
||||||
argsJSON, err := json.Marshal(args)
|
argsJSON, err := json.Marshal(args)
|
||||||
@@ -102,7 +102,7 @@ func invokeRegular[R any](ctx context.Context, pool *Pool, method string, args .
|
|||||||
return zero, err
|
return zero, err
|
||||||
}
|
}
|
||||||
|
|
||||||
id := pool.reqID.Add(1)
|
id := pool.nextReqID()
|
||||||
stop := watchCtx(ctx, conn, id)
|
stop := watchCtx(ctx, conn, id)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
@@ -136,7 +136,7 @@ func invokeRegular[R any](ctx context.Context, pool *Pool, method string, args .
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func invokeStreamOut[R any](ctx context.Context, pool *Pool, method string, rt reflect.Type, args ...any) (R, error) {
|
func invokeStreamOut[R any](ctx context.Context, pool Pool, method string, rt reflect.Type, args ...any) (R, error) {
|
||||||
var zero R
|
var zero R
|
||||||
|
|
||||||
argsJSON, err := json.Marshal(args)
|
argsJSON, err := json.Marshal(args)
|
||||||
@@ -149,7 +149,7 @@ func invokeStreamOut[R any](ctx context.Context, pool *Pool, method string, rt r
|
|||||||
return zero, err
|
return zero, err
|
||||||
}
|
}
|
||||||
|
|
||||||
id := pool.reqID.Add(1)
|
id := pool.nextReqID()
|
||||||
if err := writeMsg(conn, Message{
|
if err := writeMsg(conn, Message{
|
||||||
ID: id,
|
ID: id,
|
||||||
Type: TypeCall,
|
Type: TypeCall,
|
||||||
@@ -187,7 +187,7 @@ func invokeStreamOut[R any](ctx context.Context, pool *Pool, method string, rt r
|
|||||||
return ch.Interface().(R), nil
|
return ch.Interface().(R), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func invokeStreamIn[R any](ctx context.Context, pool *Pool, method string, streamArgIdx int, streamCh reflect.Value, args ...any) (R, error) {
|
func invokeStreamIn[R any](ctx context.Context, pool Pool, method string, streamArgIdx int, streamCh reflect.Value, args ...any) (R, error) {
|
||||||
var zero R
|
var zero R
|
||||||
|
|
||||||
jsonArgs := make([]any, len(args))
|
jsonArgs := make([]any, len(args))
|
||||||
@@ -204,7 +204,7 @@ func invokeStreamIn[R any](ctx context.Context, pool *Pool, method string, strea
|
|||||||
return zero, err
|
return zero, err
|
||||||
}
|
}
|
||||||
|
|
||||||
id := pool.reqID.Add(1)
|
id := pool.nextReqID()
|
||||||
stop := watchCtx(ctx, conn, id)
|
stop := watchCtx(ctx, conn, id)
|
||||||
defer stop()
|
defer stop()
|
||||||
|
|
||||||
@@ -265,7 +265,7 @@ func invokeStreamIn[R any](ctx context.Context, pool *Pool, method string, strea
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func invokeStreamBoth[R any](ctx context.Context, pool *Pool, method string, streamArgIdx int, streamCh reflect.Value, rt reflect.Type, args ...any) (R, error) {
|
func invokeStreamBoth[R any](ctx context.Context, pool Pool, method string, streamArgIdx int, streamCh reflect.Value, rt reflect.Type, args ...any) (R, error) {
|
||||||
var zero R
|
var zero R
|
||||||
|
|
||||||
jsonArgs := make([]any, len(args))
|
jsonArgs := make([]any, len(args))
|
||||||
@@ -282,7 +282,7 @@ func invokeStreamBoth[R any](ctx context.Context, pool *Pool, method string, str
|
|||||||
return zero, err
|
return zero, err
|
||||||
}
|
}
|
||||||
|
|
||||||
id := pool.reqID.Add(1)
|
id := pool.nextReqID()
|
||||||
if err := writeMsg(conn, Message{
|
if err := writeMsg(conn, Message{
|
||||||
ID: id,
|
ID: id,
|
||||||
Type: TypeCall,
|
Type: TypeCall,
|
||||||
|
|||||||
24
pool.go
24
pool.go
@@ -72,8 +72,16 @@ func WithStderr(w io.Writer) Option {
|
|||||||
return func(c *poolConfig) { c.stderr = w }
|
return func(c *poolConfig) { c.stderr = w }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool 管理多个 Python worker 进程及其连接池
|
// Pool 是 Python worker 进程池的接口
|
||||||
type Pool struct {
|
type Pool interface {
|
||||||
|
// Close 关闭所有 worker 进程和连接
|
||||||
|
Close()
|
||||||
|
acquire(ctx context.Context) (net.Conn, *worker, error)
|
||||||
|
nextReqID() uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
// pool 是 Pool 的具体实现
|
||||||
|
type pool struct {
|
||||||
workers []*worker
|
workers []*worker
|
||||||
idx atomic.Uint64
|
idx atomic.Uint64
|
||||||
reqID atomic.Uint64
|
reqID atomic.Uint64
|
||||||
@@ -88,7 +96,7 @@ type Pool struct {
|
|||||||
// gobridge.WithScriptArgs("worker.py"),
|
// gobridge.WithScriptArgs("worker.py"),
|
||||||
// gobridge.WithWorkDir("./worker"),
|
// gobridge.WithWorkDir("./worker"),
|
||||||
// )
|
// )
|
||||||
func NewPool(script string, opts ...Option) (*Pool, error) {
|
func NewPool(script string, opts ...Option) (Pool, error) {
|
||||||
cfg := &poolConfig{
|
cfg := &poolConfig{
|
||||||
workers: 2,
|
workers: 2,
|
||||||
maxConnsPerWorker: 4,
|
maxConnsPerWorker: 4,
|
||||||
@@ -114,19 +122,23 @@ func NewPool(script string, opts ...Option) (*Pool, error) {
|
|||||||
}
|
}
|
||||||
workers[i] = w
|
workers[i] = w
|
||||||
}
|
}
|
||||||
return &Pool{workers: workers}, nil
|
return &pool{workers: workers}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// acquire 以轮询方式从进程池取出一个可用连接
|
// acquire 以轮询方式从进程池取出一个可用连接
|
||||||
func (p *Pool) acquire(ctx context.Context) (net.Conn, *worker, error) {
|
func (p *pool) acquire(ctx context.Context) (net.Conn, *worker, error) {
|
||||||
idx := p.idx.Add(1) % uint64(len(p.workers))
|
idx := p.idx.Add(1) % uint64(len(p.workers))
|
||||||
w := p.workers[idx]
|
w := p.workers[idx]
|
||||||
conn, err := w.acquire(ctx)
|
conn, err := w.acquire(ctx)
|
||||||
return conn, w, err
|
return conn, w, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *pool) nextReqID() uint64 {
|
||||||
|
return p.reqID.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
// Close 关闭所有 worker 进程和连接
|
// Close 关闭所有 worker 进程和连接
|
||||||
func (p *Pool) Close() {
|
func (p *pool) Close() {
|
||||||
for _, w := range p.workers {
|
for _, w := range p.workers {
|
||||||
w.stop()
|
w.stop()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user