gobridge

在 Go 与 Python 之间建立双向通信桥接,将 Go 的 channel 与 Python 的 yield 原生对接,支持普通调用、单向流、双向流。

底层通过 Unix Domain Socket (UDS) 通信Go 侧维护 Worker 进程池Python 侧以多线程方式并发处理请求。

特性

  • 零配置序列化Go struct/slice ↔ Python dict/list 通过 JSON 自动互转
  • 原生流语义Go chan T 对应 Python Iterator[T],无需额外 API
  • 进程池Go 自动启动并管理多个 Python 子进程,崩溃后自动重启
  • ctx 取消Go context 取消时自动中断 Python 计算,无需函数内检查
  • 四种调用模式:普通、流式输出、流式输入、双向流,同一个 Invoke 函数自动推断

安装

go get git.fsdpf.net/go/gobridge

Python 端直接复制 python/gobridge/ 目录到项目中,无需安装依赖(仅用标准库)。

快速开始

Python 端worker.py

from gobridge import gobridge, run
from typing import Iterator

@gobridge
def add(a: int, b: int) -> int:
    return a + b

run()

Go 端:

pool, _ := gobridge.NewPool("worker.py")
defer pool.Close()

ctx := context.Background()
sum, _ := gobridge.Invoke[int](ctx, pool, "add", 3, 4)
fmt.Println(sum) // 7

四种调用模式

1. 普通调用

// Go
sum, err := gobridge.Invoke[int](ctx, pool, "add", 3, 4)

user, err := gobridge.Invoke[User](ctx, pool, "get_user", 42)

result, err := gobridge.Invoke[[]User](ctx, pool, "enrich_users", users)
# Python
@gobridge
def add(a: int, b: int) -> int:
    return a + b

@gobridge
def get_user(uid: int) -> dict:
    return {"id": uid, "name": f"user_{uid}", "score": uid * 1.5}

@gobridge
def enrich_users(users: list) -> list:
    for u in users:
        u["level"] = "gold" if u["score"] >= 10 else "silver"
    return users

2. 流式输出Python yield → Go channel

返回类型为 chan T 时自动进入流式输出模式Python 函数使用 yieldGo 侧通过 range 消费。

// Go
ch, err := gobridge.Invoke[chan int](ctx, pool, "range_gen", 1, 6)
for v := range ch {
    fmt.Println(v) // 1 2 3 4 5
}

userCh, err := gobridge.Invoke[chan User](ctx, pool, "gen_users", 3)
for u := range userCh {
    fmt.Println(u)
}
# Python
@gobridge
def range_gen(start: int, stop: int) -> Iterator[int]:
    for i in range(start, stop):
        yield i

@gobridge
def gen_users(count: int) -> Iterator[dict]:
    for i in range(1, count + 1):
        yield {"id": i, "name": f"user_{i}", "score": float(i * 3)}

3. 流式输入Go channel → Python Iterator

参数中含 chan T 且返回非 chan 时自动进入流式输入模式。

// Go
inputCh := make(chan int, 10)
go func() {
    for i := 1; i <= 5; i++ {
        inputCh <- i
    }
    close(inputCh)
}()
total, err := gobridge.Invoke[int](ctx, pool, "sum_stream", inputCh)
fmt.Println(total) // 15
# Python
@gobridge
def sum_stream(numbers: Iterator[int]) -> int:
    return sum(numbers)

4. 双向流Go channel 输入 + Go channel 输出)

参数含 chan T 且返回类型也为 chan R 时自动进入双向流模式。

// Go
inCh := make(chan User, 5)
go func() {
    for _, u := range users {
        inCh <- u
    }
    close(inCh)
}()
outCh, err := gobridge.Invoke[chan User](ctx, pool, "process_users", inCh)
for u := range outCh {
    fmt.Println(u)
}
# Python
@gobridge
def process_users(users: Iterator[dict]) -> Iterator[dict]:
    for u in users:
        yield {"id": u["id"], "name": u["name"].upper(), "score": u["score"] * 2}

ctx 取消

Go 的 context 取消会自动中断 Python 侧的执行,无需在 Python 函数中做任何检查:

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

// 超时或手动 cancel() 后Python 计算立即中断,返回 context.DeadlineExceeded
result, err := gobridge.Invoke[int](ctx, pool, "slow_compute", 1000000)
@gobridge
def slow_compute(n: int) -> int:
    total = 0
    for i in range(n):
        total += i   # ctx 取消时此处自动抛出 InterruptedError无需手动检查
    return total

实现机制:

  1. Go ctx 取消 → 发送 cancel 消息给 Python
  2. Python 单 reader 线程收到 cancel → 向执行线程注入 InterruptedErrorPyThreadState_SetAsyncExc
  3. Python 函数在下一条字节码指令处中断
  4. Go 同时关闭连接,解除阻塞的读写操作

限制:长时间不释放 GIL 的 C 扩展(如大规模 numpy 矩阵运算)无法被中断,需等其释放 GIL 后才触发。

进程自动重启

Python worker 进程崩溃时自动重启,调用方无感知:

Python 进程崩溃
  → monitor goroutine 检测到退出
  → 排空失效连接
  → 指数退避重启100ms → 200ms → ... → 30s
  → 新进程就绪后恢复连接池

配置

NewPool 使用函数选项模式,第一个参数为脚本路径:

// 最简调用
pool, err := gobridge.NewPool("worker.py")

// 完整配置
pool, err := gobridge.NewPool("worker.py",
    gobridge.WithWorkers(4),                         // Python 进程数量,默认 2
    gobridge.WithMaxConns(8),                        // 每进程最大连接数,默认 4
    gobridge.WithPythonExe("python3"),               // 可执行文件,默认 "python3"
    gobridge.WithWorkDir("/path/to/workdir"),        // 工作目录,默认继承当前进程
    gobridge.WithEnv("PYTHONUNBUFFERED=1", "K=V"),  // 附加环境变量,与当前进程环境合并
    gobridge.WithSocketDir("/var/run/myapp"),        // socket 文件目录,默认 /tmp
    gobridge.WithStdout(os.Stdout),                 // 子进程 stdout默认 os.Stdout
    gobridge.WithStderr(os.Stderr),                 // 子进程 stderr默认 os.Stderr
)

// 静默模式:丢弃子进程输出
pool, err := gobridge.NewPool("worker.py",
    gobridge.WithStdout(io.Discard),
    gobridge.WithStderr(io.Discard),
)
Option 说明 默认值
WithWorkers(n) Python 进程数量 2
WithMaxConns(n) 每进程最大连接数 4
WithPythonExe(exe) 可执行文件 "python3"
WithScriptArgs(args...) 脚本路径之后的附加参数
WithWorkDir(dir) 子进程工作目录 继承当前进程
WithEnv(kv...) 附加环境变量 "K=V"
WithSocketDir(dir) UDS socket 文件目录 "/tmp"
WithStdout(w) 子进程标准输出 os.Stdout
WithStderr(w) 子进程标准错误 os.Stderr

使用 uv 管理 Python 环境

推荐使用 uv 管理 Python 版本和虚拟环境。

方式一:uv run(推荐,无需手动激活环境)

// 等价于执行uv run worker.py
pool, err := gobridge.NewPool("run",
    gobridge.WithPythonExe("uv"),
    gobridge.WithScriptArgs("worker.py"),
    gobridge.WithWorkDir("./worker"), // uv 项目目录(含 pyproject.toml
)

方式二:直接使用虚拟环境的 python

cd worker && uv sync
venvPython, _ := exec.LookPath("worker/.venv/bin/python")
pool, err := gobridge.NewPool("worker/worker.py",
    gobridge.WithPythonExe(venvPython),
)

方式三shell 脚本封装(适合 CI/部署)

#!/bin/sh
# run_worker.sh
cd "$(dirname "$0")"
exec uv run python worker.py
pool, err := gobridge.NewPool("./run_worker.sh",
    gobridge.WithPythonExe("/bin/sh"),
)

典型项目结构:

myproject/
├── main.go
├── go.mod
└── worker/
    ├── pyproject.toml
    ├── uv.lock
    ├── .venv/
    └── worker.py

pyproject.toml

[project]
name = "worker"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = []

[tool.uv.sources]
# 从 git 仓库安装(推荐)
gobridge = { git = "https://git.fsdpf.net/go/gobridge.git", subdirectory = "python" }

# 本地开发时改用本地路径
# gobridge = { path = "../../python", editable = true }

通信协议

整体架构

  Go 进程
  ┌─────────────────────────────────────────────────────────┐
  │  Invoke[R](ctx, pool, method, args...)                  │
  │       │                                                 │
  │  ┌────▼─────────────────────────────────────────────┐  │
  │  │  Pool                                            │  │
  │  │  workers[0]  workers[1]  ...  workers[N-1]       │  │
  │  │  (轮询选择)                                     │  │
  │  └────┬─────────────────────────────────────────────┘  │
  │       │  每 worker 维护 M 个可复用连接                  │
  └───────┼─────────────────────────────────────────────────┘
          │ Unix Domain Socket每 worker 独立 .sock 文件)
  ┌───────▼──────────────┐  ┌──────────────────────────┐
  │  Python 进程 0        │  │  Python 进程 1            │
  │  worker.py           │  │  worker.py               │
  └──────────────────────┘  └──────────────────────────┘

Python Worker 内部结构

  Python 进程
  ┌──────────────────────────────────────────────────────────────┐
  │  run()  ──  UDS server.accept() 循环                         │
  │                │                                             │
  │         每个连接 → 独立线程 _handle_conn()                    │
  │                                                              │
  │  ┌─────────────────────────────────────────────────────┐     │
  │  │  _handle_conn连接线程                            │     │
  │  │                                                     │     │
  │  │  ┌──────────────────────────────────────────────┐   │     │
  │  │  │  _ConnMux单 reader 线程)                   │   │     │
  │  │  │                                              │   │     │
  │  │  │  socket ──► 读消息                           │   │     │
  │  │  │                  │                           │   │     │
  │  │  │         ┌────────┼──────────┐                │   │     │
  │  │  │         ▼        ▼          ▼                │   │     │
  │  │  │      call_q   chunk_q    cancel              │   │     │
  │  │  └─────────┬────────┬──────────┼───────────────┘   │     │
  │  │            │        │          │                    │     │
  │  │            ▼        │          ▼                    │     │
  │  │      主循环读取      │    PyThreadState_SetAsyncExc  │     │
  │  │      _dispatch()    │    → 执行线程抛 InterruptedError│    │
  │  │            │        │                              │     │
  │  │     ┌──────▼──────┐ │                              │     │
  │  │     │ @gobridge fn│ │                              │     │
  │  │     │             │ │                              │     │
  │  │     │  普通函数    │ │                              │     │
  │  │     │  return val ──────────────► result/error     │     │
  │  │     │             │ │                              │     │
  │  │     │  生成器函数  │ │                              │     │
  │  │     │  yield val ───────────────► chunk × N        │     │
  │  │     │             │ │              + end           │     │
  │  │     │  流式输入    │ │                              │     │
  │  │     │  Iterator ◄─┘ │  ← chunk_q                  │     │
  │  │     └─────────────┘                               │     │
  │  └─────────────────────────────────────────────────────┘     │
  └──────────────────────────────────────────────────────────────┘

消息帧: [4字节大端长度][JSON载荷]

消息类型:

type 方向 含义
call Go → Python 调用请求
result Python → Go 普通返回值
chunk 双向 流数据块
end 双向 流结束标记
error 双向 错误响应
cancel Go → Python 取消请求,触发 InterruptedError

项目结构

gobridge/
├── protocol.go          # Message 结构与类型常量
├── framing.go           # 帧读写4字节长度前缀 + JSON
├── worker.go            # Python 子进程管理 + UDS 连接池 + 自动重启
├── pool.go              # 多进程池(轮询负载均衡)+ Option 函数
├── client.go            # Invoke[R] 泛型函数(四种模式自动推断)
├── example/
│   ├── main.go          # 完整调用示例
│   └── worker.py        # Python 函数示例
└── python/
    └── gobridge/
        └── __init__.py  # Python 库expose、run、_ConnMux

类型对应关系

Go 类型 Python 类型
int int
float64 float
string str
bool bool
struct dict
[]T list
chan T Iterator[T]

参考

本项目的进程池、UDS 通信、帧协议设计参考自 pyproc,在此基础上增加了 Go channel 与 Python yield 的流式对接及 ctx 取消支持。

Description
No description provided
Readme 52 KiB
Languages
Go 66.3%
Python 33.7%