134 lines
3.5 KiB
Go
134 lines
3.5 KiB
Go
package gobridge
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"net"
|
||
"sync/atomic"
|
||
)
|
||
|
||
// poolConfig 是进程池内部配置,通过 Option 函数填充
|
||
type poolConfig struct {
|
||
workers int
|
||
maxConnsPerWorker int
|
||
pythonExe string
|
||
scriptArgs []string
|
||
workDir string
|
||
env []string
|
||
socketDir string
|
||
stdout io.Writer
|
||
stderr io.Writer
|
||
}
|
||
|
||
// Option 是 NewPool 的函数选项
|
||
type Option func(*poolConfig)
|
||
|
||
// WithWorkers 设置 Python 进程数量(默认 2)
|
||
func WithWorkers(n int) Option {
|
||
return func(c *poolConfig) { c.workers = n }
|
||
}
|
||
|
||
// WithMaxConns 设置每个进程的最大连接数(默认 4)
|
||
func WithMaxConns(n int) Option {
|
||
return func(c *poolConfig) { c.maxConnsPerWorker = n }
|
||
}
|
||
|
||
// WithPythonExe 设置 Python 可执行文件(默认 "python3")
|
||
// uv 模式:WithPythonExe("uv"), WithScriptArgs("run")
|
||
func WithPythonExe(exe string) Option {
|
||
return func(c *poolConfig) { c.pythonExe = exe }
|
||
}
|
||
|
||
// WithScriptArgs 设置脚本路径之后的附加参数
|
||
// uv 模式示例:WithScriptArgs("run") → 执行 uv run <script>
|
||
func WithScriptArgs(args ...string) Option {
|
||
return func(c *poolConfig) { c.scriptArgs = args }
|
||
}
|
||
|
||
// WithWorkDir 设置子进程工作目录(默认继承当前进程)
|
||
func WithWorkDir(workDir string) Option {
|
||
return func(c *poolConfig) { c.workDir = workDir }
|
||
}
|
||
|
||
// WithEnv 设置附加环境变量,格式为 "KEY=VALUE"
|
||
// 与当前进程环境合并,同名时以此处为准
|
||
func WithEnv(env ...string) Option {
|
||
return func(c *poolConfig) { c.env = env }
|
||
}
|
||
|
||
// WithSocketDir 设置 UDS socket 文件目录(默认 /tmp)
|
||
func WithSocketDir(dir string) Option {
|
||
return func(c *poolConfig) { c.socketDir = dir }
|
||
}
|
||
|
||
// WithStdout 设置子进程标准输出目标(默认 os.Stdout,传 io.Discard 可静默)
|
||
func WithStdout(w io.Writer) Option {
|
||
return func(c *poolConfig) { c.stdout = w }
|
||
}
|
||
|
||
// WithStderr 设置子进程标准错误目标(默认 os.Stderr,传 io.Discard 可静默)
|
||
func WithStderr(w io.Writer) Option {
|
||
return func(c *poolConfig) { c.stderr = w }
|
||
}
|
||
|
||
// Pool 管理多个 Python worker 进程及其连接池
|
||
type Pool struct {
|
||
workers []*worker
|
||
idx atomic.Uint64
|
||
reqID atomic.Uint64
|
||
}
|
||
|
||
// NewPool 创建并启动进程池
|
||
//
|
||
// pool, err := gobridge.NewPool("worker.py")
|
||
// pool, err := gobridge.NewPool("worker.py", gobridge.WithWorkers(4))
|
||
// pool, err := gobridge.NewPool("run",
|
||
// gobridge.WithPythonExe("uv"),
|
||
// gobridge.WithScriptArgs("worker.py"),
|
||
// gobridge.WithWorkDir("./worker"),
|
||
// )
|
||
func NewPool(script string, opts ...Option) (*Pool, error) {
|
||
cfg := &poolConfig{
|
||
workers: 2,
|
||
maxConnsPerWorker: 4,
|
||
pythonExe: "python3",
|
||
socketDir: "/tmp",
|
||
}
|
||
for _, o := range opts {
|
||
o(cfg)
|
||
}
|
||
if script == "" {
|
||
return nil, fmt.Errorf("NewPool: script must not be empty")
|
||
}
|
||
cfg.scriptArgs = append([]string{script}, cfg.scriptArgs...)
|
||
|
||
workers := make([]*worker, cfg.workers)
|
||
for i := range workers {
|
||
w, err := newWorker(cfg, i)
|
||
if err != nil {
|
||
for j := 0; j < i; j++ {
|
||
workers[j].stop()
|
||
}
|
||
return nil, fmt.Errorf("create worker %d: %w", i, err)
|
||
}
|
||
workers[i] = w
|
||
}
|
||
return &Pool{workers: workers}, nil
|
||
}
|
||
|
||
// acquire 以轮询方式从进程池取出一个可用连接
|
||
func (p *Pool) acquire(ctx context.Context) (net.Conn, *worker, error) {
|
||
idx := p.idx.Add(1) % uint64(len(p.workers))
|
||
w := p.workers[idx]
|
||
conn, err := w.acquire(ctx)
|
||
return conn, w, err
|
||
}
|
||
|
||
// Close 关闭所有 worker 进程和连接
|
||
func (p *Pool) Close() {
|
||
for _, w := range p.workers {
|
||
w.stop()
|
||
}
|
||
}
|