commit a92abca252deb15b133f2e09b83c994e055d1fb0 Author: what Date: Fri Apr 10 09:24:28 2026 +0800 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..c19b5d3 --- /dev/null +++ b/README.md @@ -0,0 +1,418 @@ +# gobridge + +在 Go 与 Python 之间建立双向通信桥接,将 Go 的 `channel` 与 Python 的 `yield` 原生对接,支持普通调用、单向流、双向流。 + +底层通过 **Unix Domain Socket (UDS)** 通信,Go 侧维护 **Worker 进程池**,Python 侧以多线程方式并发处理请求。 + +## 特性 + +- **零配置序列化**:Go struct/slice ↔ Python dict/list 通过 JSON 自动互转 +- **原生流语义**:Go `chan T` 对应 Python `Iterator[T]`,无需额外 API +- **进程池**:Go 自动启动并管理多个 Python 子进程,崩溃后自动重启 +- **ctx 取消**:Go `context` 取消时自动中断 Python 计算,无需函数内检查 +- **四种调用模式**:普通、流式输出、流式输入、双向流,同一个 `Invoke` 函数自动推断 + +## 安装 + +```bash +go get git.fsdpf.net/go/gobridge +``` + +Python 端直接复制 `python/gobridge/` 目录到项目中,无需安装依赖(仅用标准库)。 + +## 快速开始 + +**Python 端(worker.py):** + +```python +from gobridge import gobridge, run +from typing import Iterator + +@gobridge +def add(a: int, b: int) -> int: + return a + b + +run() +``` + +**Go 端:** + +```go +pool, _ := gobridge.NewPool("worker.py") +defer pool.Close() + +ctx := context.Background() +sum, _ := gobridge.Invoke[int](ctx, pool, "add", 3, 4) +fmt.Println(sum) // 7 +``` + +## 四种调用模式 + +### 1. 普通调用 + +```go +// Go +sum, err := gobridge.Invoke[int](ctx, pool, "add", 3, 4) + +user, err := gobridge.Invoke[User](ctx, pool, "get_user", 42) + +result, err := gobridge.Invoke[[]User](ctx, pool, "enrich_users", users) +``` + +```python +# Python +@gobridge +def add(a: int, b: int) -> int: + return a + b + +@gobridge +def get_user(uid: int) -> dict: + return {"id": uid, "name": f"user_{uid}", "score": uid * 1.5} + +@gobridge +def enrich_users(users: list) -> list: + for u in users: + u["level"] = "gold" if u["score"] >= 10 else "silver" + return users +``` + +### 2. 流式输出(Python yield → Go channel) + +返回类型为 `chan T` 时自动进入流式输出模式,Python 函数使用 `yield`,Go 侧通过 `range` 消费。 + +```go +// Go +ch, err := gobridge.Invoke[chan int](ctx, pool, "range_gen", 1, 6) +for v := range ch { + fmt.Println(v) // 1 2 3 4 5 +} + +userCh, err := gobridge.Invoke[chan User](ctx, pool, "gen_users", 3) +for u := range userCh { + fmt.Println(u) +} +``` + +```python +# Python +@gobridge +def range_gen(start: int, stop: int) -> Iterator[int]: + for i in range(start, stop): + yield i + +@gobridge +def gen_users(count: int) -> Iterator[dict]: + for i in range(1, count + 1): + yield {"id": i, "name": f"user_{i}", "score": float(i * 3)} +``` + +### 3. 流式输入(Go channel → Python Iterator) + +参数中含 `chan T` 且返回非 `chan` 时自动进入流式输入模式。 + +```go +// Go +inputCh := make(chan int, 10) +go func() { + for i := 1; i <= 5; i++ { + inputCh <- i + } + close(inputCh) +}() +total, err := gobridge.Invoke[int](ctx, pool, "sum_stream", inputCh) +fmt.Println(total) // 15 +``` + +```python +# Python +@gobridge +def sum_stream(numbers: Iterator[int]) -> int: + return sum(numbers) +``` + +### 4. 双向流(Go channel 输入 + Go channel 输出) + +参数含 `chan T` 且返回类型也为 `chan R` 时自动进入双向流模式。 + +```go +// Go +inCh := make(chan User, 5) +go func() { + for _, u := range users { + inCh <- u + } + close(inCh) +}() +outCh, err := gobridge.Invoke[chan User](ctx, pool, "process_users", inCh) +for u := range outCh { + fmt.Println(u) +} +``` + +```python +# Python +@gobridge +def process_users(users: Iterator[dict]) -> Iterator[dict]: + for u in users: + yield {"id": u["id"], "name": u["name"].upper(), "score": u["score"] * 2} +``` + +## ctx 取消 + +Go 的 `context` 取消会自动中断 Python 侧的执行,无需在 Python 函数中做任何检查: + +```go +ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) +defer cancel() + +// 超时或手动 cancel() 后,Python 计算立即中断,返回 context.DeadlineExceeded +result, err := gobridge.Invoke[int](ctx, pool, "slow_compute", 1000000) +``` + +```python +@gobridge +def slow_compute(n: int) -> int: + total = 0 + for i in range(n): + total += i # ctx 取消时此处自动抛出 InterruptedError,无需手动检查 + return total +``` + +**实现机制:** +1. Go ctx 取消 → 发送 `cancel` 消息给 Python +2. Python 单 reader 线程收到 `cancel` → 向执行线程注入 `InterruptedError`(`PyThreadState_SetAsyncExc`) +3. Python 函数在下一条字节码指令处中断 +4. Go 同时关闭连接,解除阻塞的读写操作 + +> 限制:长时间不释放 GIL 的 C 扩展(如大规模 numpy 矩阵运算)无法被中断,需等其释放 GIL 后才触发。 + +## 进程自动重启 + +Python worker 进程崩溃时自动重启,调用方无感知: + +``` +Python 进程崩溃 + → monitor goroutine 检测到退出 + → 排空失效连接 + → 指数退避重启(100ms → 200ms → ... → 30s) + → 新进程就绪后恢复连接池 +``` + +## 配置 + +`NewPool` 使用函数选项模式,第一个参数为脚本路径: + +```go +// 最简调用 +pool, err := gobridge.NewPool("worker.py") + +// 完整配置 +pool, err := gobridge.NewPool("worker.py", + gobridge.WithWorkers(4), // Python 进程数量,默认 2 + gobridge.WithMaxConns(8), // 每进程最大连接数,默认 4 + gobridge.WithPythonExe("python3"), // 可执行文件,默认 "python3" + gobridge.WithWorkDir("/path/to/workdir"), // 工作目录,默认继承当前进程 + gobridge.WithEnv("PYTHONUNBUFFERED=1", "K=V"), // 附加环境变量,与当前进程环境合并 + gobridge.WithSocketDir("/var/run/myapp"), // socket 文件目录,默认 /tmp + gobridge.WithStdout(os.Stdout), // 子进程 stdout,默认 os.Stdout + gobridge.WithStderr(os.Stderr), // 子进程 stderr,默认 os.Stderr +) + +// 静默模式:丢弃子进程输出 +pool, err := gobridge.NewPool("worker.py", + gobridge.WithStdout(io.Discard), + gobridge.WithStderr(io.Discard), +) +``` + +| Option | 说明 | 默认值 | +|--------|------|--------| +| `WithWorkers(n)` | Python 进程数量 | `2` | +| `WithMaxConns(n)` | 每进程最大连接数 | `4` | +| `WithPythonExe(exe)` | 可执行文件 | `"python3"` | +| `WithScriptArgs(args...)` | 脚本路径之后的附加参数 | 无 | +| `WithWorkDir(dir)` | 子进程工作目录 | 继承当前进程 | +| `WithEnv(kv...)` | 附加环境变量 `"K=V"` | 无 | +| `WithSocketDir(dir)` | UDS socket 文件目录 | `"/tmp"` | +| `WithStdout(w)` | 子进程标准输出 | `os.Stdout` | +| `WithStderr(w)` | 子进程标准错误 | `os.Stderr` | + +## 使用 uv 管理 Python 环境 + +推荐使用 [uv](https://github.com/astral-sh/uv) 管理 Python 版本和虚拟环境。 + +**方式一:`uv run`(推荐,无需手动激活环境)** + +```go +// 等价于执行:uv run worker.py +pool, err := gobridge.NewPool("run", + gobridge.WithPythonExe("uv"), + gobridge.WithScriptArgs("worker.py"), + gobridge.WithWorkDir("./worker"), // uv 项目目录(含 pyproject.toml) +) +``` + +**方式二:直接使用虚拟环境的 python** + +```bash +cd worker && uv sync +``` + +```go +venvPython, _ := exec.LookPath("worker/.venv/bin/python") +pool, err := gobridge.NewPool("worker/worker.py", + gobridge.WithPythonExe(venvPython), +) +``` + +**方式三:shell 脚本封装(适合 CI/部署)** + +```bash +#!/bin/sh +# run_worker.sh +cd "$(dirname "$0")" +exec uv run python worker.py +``` + +```go +pool, err := gobridge.NewPool("./run_worker.sh", + gobridge.WithPythonExe("/bin/sh"), +) +``` + +**典型项目结构:** + +``` +myproject/ +├── main.go +├── go.mod +└── worker/ + ├── pyproject.toml + ├── uv.lock + ├── .venv/ + └── worker.py +``` + +`pyproject.toml`: + +```toml +[project] +name = "worker" +version = "0.1.0" +requires-python = ">=3.11" +dependencies = [] + +[tool.uv.sources] +gobridge = { path = "../../python", editable = true } +``` + +## 通信协议 + +### 整体架构 + +``` + Go 进程 + ┌─────────────────────────────────────────────────────────┐ + │ Invoke[R](ctx, pool, method, args...) │ + │ │ │ + │ ┌────▼─────────────────────────────────────────────┐ │ + │ │ Pool │ │ + │ │ workers[0] workers[1] ... workers[N-1] │ │ + │ │ (轮询选择) │ │ + │ └────┬─────────────────────────────────────────────┘ │ + │ │ 每 worker 维护 M 个可复用连接 │ + └───────┼─────────────────────────────────────────────────┘ + │ Unix Domain Socket(每 worker 独立 .sock 文件) + ┌───────▼──────────────┐ ┌──────────────────────────┐ + │ Python 进程 0 │ │ Python 进程 1 │ + │ worker.py │ │ worker.py │ + └──────────────────────┘ └──────────────────────────┘ +``` + +### Python Worker 内部结构 + +``` + Python 进程 + ┌──────────────────────────────────────────────────────────────┐ + │ run() ── UDS server.accept() 循环 │ + │ │ │ + │ 每个连接 → 独立线程 _handle_conn() │ + │ │ + │ ┌─────────────────────────────────────────────────────┐ │ + │ │ _handle_conn(连接线程) │ │ + │ │ │ │ + │ │ ┌──────────────────────────────────────────────┐ │ │ + │ │ │ _ConnMux(单 reader 线程) │ │ │ + │ │ │ │ │ │ + │ │ │ socket ──► 读消息 │ │ │ + │ │ │ │ │ │ │ + │ │ │ ┌────────┼──────────┐ │ │ │ + │ │ │ ▼ ▼ ▼ │ │ │ + │ │ │ call_q chunk_q cancel │ │ │ + │ │ └─────────┬────────┬──────────┼───────────────┘ │ │ + │ │ │ │ │ │ │ + │ │ ▼ │ ▼ │ │ + │ │ 主循环读取 │ PyThreadState_SetAsyncExc │ │ + │ │ _dispatch() │ → 执行线程抛 InterruptedError│ │ + │ │ │ │ │ │ + │ │ ┌──────▼──────┐ │ │ │ + │ │ │ @gobridge fn│ │ │ │ + │ │ │ │ │ │ │ + │ │ │ 普通函数 │ │ │ │ + │ │ │ return val ──────────────► result/error │ │ + │ │ │ │ │ │ │ + │ │ │ 生成器函数 │ │ │ │ + │ │ │ yield val ───────────────► chunk × N │ │ + │ │ │ │ │ + end │ │ + │ │ │ 流式输入 │ │ │ │ + │ │ │ Iterator ◄─┘ │ ← chunk_q │ │ + │ │ └─────────────┘ │ │ + │ └─────────────────────────────────────────────────────┘ │ + └──────────────────────────────────────────────────────────────┘ +``` + +**消息帧:** `[4字节大端长度][JSON载荷]` + +**消息类型:** + +| type | 方向 | 含义 | +|----------|--------------|------------------------------| +| `call` | Go → Python | 调用请求 | +| `result` | Python → Go | 普通返回值 | +| `chunk` | 双向 | 流数据块 | +| `end` | 双向 | 流结束标记 | +| `error` | 双向 | 错误响应 | +| `cancel` | Go → Python | 取消请求,触发 InterruptedError | + +## 项目结构 + +``` +gobridge/ +├── protocol.go # Message 结构与类型常量 +├── framing.go # 帧读写(4字节长度前缀 + JSON) +├── worker.go # Python 子进程管理 + UDS 连接池 + 自动重启 +├── pool.go # 多进程池(轮询负载均衡)+ Option 函数 +├── client.go # Invoke[R] 泛型函数(四种模式自动推断) +├── example/ +│ ├── main.go # 完整调用示例 +│ └── worker.py # Python 函数示例 +└── python/ + └── gobridge/ + └── __init__.py # Python 库(expose、run、_ConnMux) +``` + +## 类型对应关系 + +| Go 类型 | Python 类型 | +|-----------|---------------| +| `int` | `int` | +| `float64` | `float` | +| `string` | `str` | +| `bool` | `bool` | +| `struct` | `dict` | +| `[]T` | `list` | +| `chan T` | `Iterator[T]` | + +## 参考 + +本项目的进程池、UDS 通信、帧协议设计参考自 [pyproc](https://github.com/YuminosukeSato/pyproc),在此基础上增加了 Go channel 与 Python yield 的流式对接及 ctx 取消支持。 diff --git a/client.go b/client.go new file mode 100644 index 0000000..e979058 --- /dev/null +++ b/client.go @@ -0,0 +1,342 @@ +// Package gobridge 提供 Go 与 Python 之间的双向通信桥接, +// 支持普通调用、流式输出、流式输入和双向流四种模式。 +package gobridge + +import ( + "context" + "encoding/json" + "fmt" + "net" + "reflect" + "sync" +) + +// Invoke 调用 Python 暴露的函数,支持四种模式: +// +// 普通调用: Invoke[int](ctx, pool, "Add", 3, 4) +// 流式输出: Invoke[chan int](ctx, pool, "RangeGen", 1, 10) // Python yield → Go channel +// 流式输入: Invoke[int](ctx, pool, "SumStream", inputChan) // Go channel → Python Iterator +// 双向流: Invoke[chan int](ctx, pool, "Transform", inputChan) // 两端均为流 +// +// ctx 取消时会立即中断与 Python 的通信并返回 ctx.Err()。 +// 对于流式输出/双向流,ctx 取消会关闭返回的 channel。 +func Invoke[R any](ctx context.Context, pool *Pool, method string, args ...any) (R, error) { + rt := reflect.TypeFor[R]() + + // 查找 chan 类型的输入参数 + streamArgIdx := -1 + var streamCh reflect.Value + for i, arg := range args { + if arg != nil { + rv := reflect.ValueOf(arg) + if rv.Kind() == reflect.Chan { + streamArgIdx = i + streamCh = rv + break + } + } + } + + switch { + case rt.Kind() == reflect.Chan && streamArgIdx >= 0: + return invokeStreamBoth[R](ctx, pool, method, streamArgIdx, streamCh, rt, args...) + case rt.Kind() == reflect.Chan: + return invokeStreamOut[R](ctx, pool, method, rt, args...) + case streamArgIdx >= 0: + return invokeStreamIn[R](ctx, pool, method, streamArgIdx, streamCh, args...) + default: + return invokeRegular[R](ctx, pool, method, args...) + } +} + +// watchCtx 启动一个 goroutine 监听 ctx: +// - ctx 取消时先发送 cancel 消息(Python 侧收到后注入 InterruptedError) +// - 再关闭连接,解除阻塞中的读写操作 +// +// 返回 stop 函数,必须在 conn 归还连接池前调用,可安全多次调用。 +func watchCtx(ctx context.Context, conn net.Conn, id uint64) (stop func()) { + done := make(chan struct{}) + var once sync.Once + go func() { + select { + case <-ctx.Done(): + writeMsg(conn, Message{ID: id, Type: TypeCancel}) //nolint + conn.Close() + case <-done: + } + }() + return func() { once.Do(func() { close(done) }) } +} + +// chanRecv 从 ch 接收一个值,同时监听 ctx.Done()。 +// 返回 (值, channel是否open, ctx是否已取消)。 +func chanRecv(ctx context.Context, ch reflect.Value) (reflect.Value, bool, bool) { + chosen, val, ok := reflect.Select([]reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: ch}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}, + }) + if chosen == 1 { + return reflect.Value{}, false, true + } + return val, ok, false +} + +// contextErr 在 io 错误时优先返回 ctx 的错误原因 +func contextErr(ctx context.Context, err error) error { + if e := ctx.Err(); e != nil { + return e + } + return err +} + +func invokeRegular[R any](ctx context.Context, pool *Pool, method string, args ...any) (R, error) { + var zero R + + argsJSON, err := json.Marshal(args) + if err != nil { + return zero, fmt.Errorf("marshal args: %w", err) + } + + conn, w, err := pool.acquire(ctx) + if err != nil { + return zero, err + } + + id := pool.reqID.Add(1) + stop := watchCtx(ctx, conn, id) + defer stop() + + if err := writeMsg(conn, Message{ + ID: id, + Type: TypeCall, + Method: method, + Args: argsJSON, + }); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write call: %w", err)) + } + + resp, err := readMsg(conn) + if err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("read response: %w", err)) + } + + stop() + w.release(conn, true) + + if resp.Type == TypeError { + return zero, fmt.Errorf("remote error: %s", resp.Error) + } + + var result R + if err := json.Unmarshal(resp.Data, &result); err != nil { + return zero, fmt.Errorf("unmarshal result: %w", err) + } + return result, nil +} + +func invokeStreamOut[R any](ctx context.Context, pool *Pool, method string, rt reflect.Type, args ...any) (R, error) { + var zero R + + argsJSON, err := json.Marshal(args) + if err != nil { + return zero, fmt.Errorf("marshal args: %w", err) + } + + conn, w, err := pool.acquire(ctx) + if err != nil { + return zero, err + } + + id := pool.reqID.Add(1) + if err := writeMsg(conn, Message{ + ID: id, + Type: TypeCall, + Method: method, + Args: argsJSON, + }); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write call: %w", err)) + } + + ch := reflect.MakeChan(rt, 64) + + go func() { + stop := watchCtx(ctx, conn, id) + defer func() { + stop() + ch.Close() + w.release(conn, ctx.Err() == nil) + }() + for { + msg, err := readMsg(conn) + if err != nil || msg.Type == TypeEnd || msg.Type == TypeError { + return + } + if msg.Type == TypeChunk { + val := reflect.New(rt.Elem()) + if err := json.Unmarshal(msg.Data, val.Interface()); err != nil { + return + } + ch.Send(val.Elem()) + } + } + }() + + return ch.Interface().(R), nil +} + +func invokeStreamIn[R any](ctx context.Context, pool *Pool, method string, streamArgIdx int, streamCh reflect.Value, args ...any) (R, error) { + var zero R + + jsonArgs := make([]any, len(args)) + copy(jsonArgs, args) + jsonArgs[streamArgIdx] = nil + + argsJSON, err := json.Marshal(jsonArgs) + if err != nil { + return zero, fmt.Errorf("marshal args: %w", err) + } + + conn, w, err := pool.acquire(ctx) + if err != nil { + return zero, err + } + + id := pool.reqID.Add(1) + stop := watchCtx(ctx, conn, id) + defer stop() + + if err := writeMsg(conn, Message{ + ID: id, + Type: TypeCall, + Method: method, + Args: argsJSON, + StreamInput: true, + StreamArgIdx: streamArgIdx, + }); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write call: %w", err)) + } + + for { + val, ok, cancelled := chanRecv(ctx, streamCh) + if cancelled { + w.release(conn, false) + return zero, ctx.Err() + } + if !ok { + break + } + chunkData, err := json.Marshal(val.Interface()) + if err != nil { + w.release(conn, false) + return zero, fmt.Errorf("marshal chunk: %w", err) + } + if err := writeMsg(conn, Message{ID: id, Type: TypeChunk, Data: chunkData}); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write chunk: %w", err)) + } + } + + if err := writeMsg(conn, Message{ID: id, Type: TypeEnd}); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write end: %w", err)) + } + + resp, err := readMsg(conn) + if err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("read response: %w", err)) + } + + stop() + w.release(conn, true) + + if resp.Type == TypeError { + return zero, fmt.Errorf("remote error: %s", resp.Error) + } + + var result R + if err := json.Unmarshal(resp.Data, &result); err != nil { + return zero, fmt.Errorf("unmarshal result: %w", err) + } + return result, nil +} + +func invokeStreamBoth[R any](ctx context.Context, pool *Pool, method string, streamArgIdx int, streamCh reflect.Value, rt reflect.Type, args ...any) (R, error) { + var zero R + + jsonArgs := make([]any, len(args)) + copy(jsonArgs, args) + jsonArgs[streamArgIdx] = nil + + argsJSON, err := json.Marshal(jsonArgs) + if err != nil { + return zero, fmt.Errorf("marshal args: %w", err) + } + + conn, w, err := pool.acquire(ctx) + if err != nil { + return zero, err + } + + id := pool.reqID.Add(1) + if err := writeMsg(conn, Message{ + ID: id, + Type: TypeCall, + Method: method, + Args: argsJSON, + StreamInput: true, + StreamArgIdx: streamArgIdx, + }); err != nil { + w.release(conn, false) + return zero, contextErr(ctx, fmt.Errorf("write call: %w", err)) + } + + outCh := reflect.MakeChan(rt, 64) + + // 写入 goroutine:输入 channel → Python chunks + go func() { + for { + val, ok, cancelled := chanRecv(ctx, streamCh) + if cancelled || !ok { + break + } + data, err := json.Marshal(val.Interface()) + if err != nil { + break + } + if err := writeMsg(conn, Message{ID: id, Type: TypeChunk, Data: data}); err != nil { + break + } + } + writeMsg(conn, Message{ID: id, Type: TypeEnd}) //nolint + }() + + // 读取 goroutine:Python chunks → 输出 channel + go func() { + stop := watchCtx(ctx, conn, id) + defer func() { + stop() + outCh.Close() + w.release(conn, ctx.Err() == nil) + }() + for { + msg, err := readMsg(conn) + if err != nil || msg.Type == TypeEnd || msg.Type == TypeError { + return + } + if msg.Type == TypeChunk { + val := reflect.New(rt.Elem()) + if err := json.Unmarshal(msg.Data, val.Interface()); err != nil { + return + } + outCh.Send(val.Elem()) + } + } + }() + + return outCh.Interface().(R), nil +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..782a9c3 --- /dev/null +++ b/example/main.go @@ -0,0 +1,141 @@ +package main + +import ( + "context" + "fmt" + "log" + "path/filepath" + "runtime" + + "git.fsdpf.net/go/gobridge" +) + +type User struct { + ID int `json:"id"` + Name string `json:"name"` + Score float64 `json:"score"` + Level string `json:"level,omitempty"` +} + +func main() { + _, file, _, _ := runtime.Caller(0) + script := filepath.Join(filepath.Dir(file), "worker.py") + + pool, err := gobridge.NewPool(script, + gobridge.WithWorkers(2), + gobridge.WithMaxConns(4), + ) + if err != nil { + log.Fatal(err) + } + defer pool.Close() + + ctx := context.Background() + + // ── 普通调用 ────────────────────────────────────────────────────────── + sum, err := gobridge.Invoke[int](ctx, pool, "add", 3, 4) + if err != nil { + log.Fatal(err) + } + fmt.Println("add(3, 4) =", sum) // 7 + + // ── 流式输出:Python yield → Go channel ────────────────────────────── + ch, err := gobridge.Invoke[chan int](ctx, pool, "range_gen", 1, 6) + if err != nil { + log.Fatal(err) + } + fmt.Print("range_gen(1, 6) =") + for v := range ch { + fmt.Print(" ", v) + } + fmt.Println() // 1 2 3 4 5 + + // ── 流式输入:Go channel → Python Iterator ─────────────────────────── + inputCh := make(chan int, 10) + go func() { + for i := 1; i <= 5; i++ { + inputCh <- i + } + close(inputCh) + }() + total, err := gobridge.Invoke[int](ctx, pool, "sum_stream", inputCh) + if err != nil { + log.Fatal(err) + } + fmt.Println("sum_stream(1..5) =", total) // 15 + + // ── 双向流:Go channel 输入 + Go channel 输出 ──────────────────────── + inputCh2 := make(chan int, 10) + go func() { + for i := 1; i <= 5; i++ { + inputCh2 <- i + } + close(inputCh2) + }() + outCh, err := gobridge.Invoke[chan int](ctx, pool, "double_stream", inputCh2) + if err != nil { + log.Fatal(err) + } + fmt.Print("double_stream(1..5) =") + for v := range outCh { + fmt.Print(" ", v) + } + fmt.Println() // 1 4 9 16 25 + + // ── struct 普通调用 ─────────────────────────────────────────────────── + user, err := gobridge.Invoke[User](ctx, pool, "get_user", 42) + if err != nil { + log.Fatal(err) + } + fmt.Printf("get_user(42) = %+v\n", user) + + // ── slice 输入,返回标量 ─────────────────────────────────────────────── + users := []User{ + {ID: 1, Name: "alice", Score: 5.0}, + {ID: 2, Name: "bob", Score: 8.0}, + {ID: 3, Name: "carol", Score: 12.0}, + } + scoreSum, err := gobridge.Invoke[float64](ctx, pool, "total_score", users) + if err != nil { + log.Fatal(err) + } + fmt.Printf("total_score([alice,bob,carol]) = %.1f\n", scoreSum) + + // ── slice 输入输出 ───────────────────────────────────────────────────── + enriched, err := gobridge.Invoke[[]User](ctx, pool, "enrich_users", users) + if err != nil { + log.Fatal(err) + } + fmt.Println("enrich_users:") + for _, u := range enriched { + fmt.Printf(" %+v\n", u) + } + + // ── 流式输出 struct:Python yield User → Go chan User ───────────────── + userCh, err := gobridge.Invoke[chan User](ctx, pool, "gen_users", 3) + if err != nil { + log.Fatal(err) + } + fmt.Print("gen_users(3) =") + for u := range userCh { + fmt.Printf(" {%d %s %.0f}", u.ID, u.Name, u.Score) + } + fmt.Println() + + // ── 双向流 struct:Go chan User 输入 → Python 处理 → Go chan User 输出 ─ + inCh := make(chan User, 5) + go func() { + for _, u := range users { + inCh <- u + } + close(inCh) + }() + procCh, err := gobridge.Invoke[chan User](ctx, pool, "process_users", inCh) + if err != nil { + log.Fatal(err) + } + fmt.Println("process_users:") + for u := range procCh { + fmt.Printf(" %+v\n", u) + } +} diff --git a/example/worker.py b/example/worker.py new file mode 100644 index 0000000..90baefa --- /dev/null +++ b/example/worker.py @@ -0,0 +1,75 @@ +import sys +import os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "python")) + +from gobridge import gobridge, run +from typing import Iterator + + +@gobridge +def add(a: int, b: int) -> int: + return a + b + + +@gobridge +def range_gen(start: int, stop: int) -> Iterator[int]: + """流式输出:对应 Go 侧 Invoke[chan int]""" + for i in range(start, stop): + yield i + + +@gobridge +def sum_stream(numbers: Iterator[int]) -> int: + """流式输入:对应 Go 侧传入 chan int 参数""" + return sum(numbers) + + +@gobridge +def double_stream(numbers: Iterator[int]) -> Iterator[int]: + """双向流:输入每个数,yield 其平方,对应 Go 侧 Invoke[chan int](c, ctx, "double_stream", inputChan)""" + for n in numbers: + yield n * n + + +# ── struct(dict)类型 ────────────────────────────────────────────────────── + +@gobridge +def get_user(uid: int) -> dict: + """普通调用:返回一个 struct(Go 对应 User)""" + return {"id": uid, "name": f"user_{uid}", "score": uid * 1.5} + + +@gobridge +def total_score(users: list) -> float: + """slice 输入:接收 []User,返回总分""" + return sum(u["score"] for u in users) + + +@gobridge +def enrich_users(users: list) -> list: + """slice 输入输出:为每个 user 追加 level 字段""" + result = [] + for u in users: + u = dict(u) + u["level"] = "gold" if u["score"] >= 10 else "silver" + result.append(u) + return result + + +# ── struct/slice 流式组合 ──────────────────────────────────────────────────── + +@gobridge +def gen_users(count: int) -> Iterator[dict]: + """流式输出 struct:yield 多个 User,对应 Go 侧 Invoke[chan User]""" + for i in range(1, count + 1): + yield {"id": i, "name": f"user_{i}", "score": float(i * 3)} + + +@gobridge +def process_users(users: Iterator[dict]) -> Iterator[dict]: + """双向流 struct:输入流式 User,yield 处理后的 User""" + for u in users: + yield {"id": u["id"], "name": u["name"].upper(), "score": u["score"] * 2} + + +run() diff --git a/framing.go b/framing.go new file mode 100644 index 0000000..f88fa76 --- /dev/null +++ b/framing.go @@ -0,0 +1,39 @@ +package gobridge + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io" + "net" +) + +// writeMsg 将消息以 [4字节长度][JSON载荷] 格式写入连接 +func writeMsg(conn net.Conn, msg Message) error { + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("marshal message: %w", err) + } + header := make([]byte, 4) + binary.BigEndian.PutUint32(header, uint32(len(data))) + _, err = conn.Write(append(header, data...)) + return err +} + +// readMsg 从连接中读取一条消息 +func readMsg(conn net.Conn) (Message, error) { + header := make([]byte, 4) + if _, err := io.ReadFull(conn, header); err != nil { + return Message{}, fmt.Errorf("read header: %w", err) + } + length := binary.BigEndian.Uint32(header) + body := make([]byte, length) + if _, err := io.ReadFull(conn, body); err != nil { + return Message{}, fmt.Errorf("read body: %w", err) + } + var msg Message + if err := json.Unmarshal(body, &msg); err != nil { + return Message{}, fmt.Errorf("unmarshal message: %w", err) + } + return msg, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d9b8656 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.fsdpf.net/go/gobridge + +go 1.23.10 diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..25c3850 --- /dev/null +++ b/pool.go @@ -0,0 +1,133 @@ +package gobridge + +import ( + "context" + "fmt" + "io" + "net" + "sync/atomic" +) + +// poolConfig 是进程池内部配置,通过 Option 函数填充 +type poolConfig struct { + workers int + maxConnsPerWorker int + pythonExe string + scriptArgs []string + workDir string + env []string + socketDir string + stdout io.Writer + stderr io.Writer +} + +// Option 是 NewPool 的函数选项 +type Option func(*poolConfig) + +// WithWorkers 设置 Python 进程数量(默认 2) +func WithWorkers(n int) Option { + return func(c *poolConfig) { c.workers = n } +} + +// WithMaxConns 设置每个进程的最大连接数(默认 4) +func WithMaxConns(n int) Option { + return func(c *poolConfig) { c.maxConnsPerWorker = n } +} + +// WithPythonExe 设置 Python 可执行文件(默认 "python3") +// uv 模式:WithPythonExe("uv"), WithScriptArgs("run") +func WithPythonExe(exe string) Option { + return func(c *poolConfig) { c.pythonExe = exe } +} + +// WithScriptArgs 设置脚本路径之后的附加参数 +// uv 模式示例:WithScriptArgs("run") → 执行 uv run