From b390effd8e204f6b88f7576232f4b192579e6bb5 Mon Sep 17 00:00:00 2001 From: what Date: Tue, 14 Apr 2026 13:06:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20Python=E2=86=92Go?= =?UTF-8?q?=20=E5=85=A8=E5=8F=8C=E5=B7=A5=E5=9B=9E=E8=B0=83=E6=94=AF?= =?UTF-8?q?=E6=8C=81=EF=BC=88call=5Fgo=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 WithHandlers 选项,通过反射将 Go 结构体方法暴露给 Python - 新增 callback/callback_result 消息类型,支持 Python 在处理中回调 Go - client 侧新增 readResult,内联处理 callback,复用同一连接避免死锁 - Python 侧新增 call_go[T]() 泛型调用,支持 dataclass 自动构造 - 注入 GOBRIDGE_WORKER_ID/WORKER_COUNT 环境变量,支持多 worker 初始化分工 - 新增示例演示 Go→Python→Go→Python 四层全双工链路 - Python 包版本升至 0.1.1 --- client.go | 86 ++++++++++++++----- example/main.go | 81 ++++++++++++++++++ example/worker.py | 75 +++++++++++++++-- pool.go | 161 ++++++++++++++++++++++++++++++++---- protocol.go | 14 ++-- python/gobridge/__init__.py | 119 +++++++++++++++++++++++++- python/pyproject.toml | 2 +- worker.go | 6 +- 8 files changed, 490 insertions(+), 54 deletions(-) diff --git a/client.go b/client.go index a6b850d..808ddb4 100644 --- a/client.go +++ b/client.go @@ -53,14 +53,15 @@ func Invoke[R any](ctx context.Context, pool Pool, method string, args ...any) ( // - ctx 取消时先发送 cancel 消息(Python 侧收到后注入 InterruptedError) // - 再关闭连接,解除阻塞中的读写操作 // +// write 是调用方提供的互斥写函数,保证与其他写操作不并发。 // 返回 stop 函数,必须在 conn 归还连接池前调用,可安全多次调用。 -func watchCtx(ctx context.Context, conn net.Conn, id uint64) (stop func()) { +func watchCtx(ctx context.Context, conn net.Conn, id uint64, write func(Message)) (stop func()) { done := make(chan struct{}) var once sync.Once go func() { select { case <-ctx.Done(): - writeMsg(conn, Message{ID: id, Type: TypeCancel}) //nolint + write(Message{ID: id, Type: TypeCancel}) conn.Close() case <-done: } @@ -89,6 +90,30 @@ func contextErr(ctx context.Context, err error) error { return err } +// readResult 读取下一条非 callback 消息,期间内联处理所有 Python→Go 回调。 +// 保证 py→go→py→go→... 全链路复用同一条连接,不产生额外线程。 +// write 是调用方提供的互斥写函数,与 watchCtx 共享同一把锁,避免并发写。 +func readResult(ctx context.Context, conn net.Conn, pool Pool, write func(Message)) (Message, error) { + for { + msg, err := readMsg(conn) + if err != nil { + return Message{}, err + } + if msg.Type != TypeCallback { + return msg, nil + } + result, errStr := pool.callbackDispatch(ctx, msg) + var resp Message + if errStr != "" { + resp = Message{ID: msg.ID, Type: TypeError, Error: errStr} + } else { + data, _ := json.Marshal(result) + resp = Message{ID: msg.ID, Type: TypeCallbackResult, Data: data} + } + write(resp) + } +} + func invokeRegular[R any](ctx context.Context, pool Pool, method string, args ...any) (R, error) { var zero R @@ -102,21 +127,22 @@ func invokeRegular[R any](ctx context.Context, pool Pool, method string, args .. return zero, err } + var mu sync.Mutex + write := func(msg Message) { mu.Lock(); writeMsg(conn, msg); mu.Unlock() } //nolint + id := pool.nextReqID() - stop := watchCtx(ctx, conn, id) + stop := watchCtx(ctx, conn, id, write) defer stop() - if err := writeMsg(conn, Message{ - ID: id, - Type: TypeCall, - Method: method, - Args: argsJSON, - }); err != nil { + mu.Lock() + err = writeMsg(conn, Message{ID: id, Type: TypeCall, Method: method, Args: argsJSON}) + mu.Unlock() + if err != nil { w.release(conn, false) return zero, contextErr(ctx, fmt.Errorf("write call: %w", err)) } - resp, err := readMsg(conn) + resp, err := readResult(ctx, conn, pool, write) if err != nil { w.release(conn, false) return zero, contextErr(ctx, fmt.Errorf("read response: %w", err)) @@ -163,14 +189,16 @@ func invokeStreamOut[R any](ctx context.Context, pool Pool, method string, rt re ch := reflect.MakeChan(rt, 64) go func() { - stop := watchCtx(ctx, conn, id) + var mu sync.Mutex + write := func(msg Message) { mu.Lock(); writeMsg(conn, msg); mu.Unlock() } //nolint + stop := watchCtx(ctx, conn, id, write) defer func() { stop() ch.Close() w.release(conn, ctx.Err() == nil) }() for { - msg, err := readMsg(conn) + msg, err := readResult(ctx, conn, pool, write) if err != nil || msg.Type == TypeEnd || msg.Type == TypeError { return } @@ -204,11 +232,19 @@ func invokeStreamIn[R any](ctx context.Context, pool Pool, method string, stream return zero, err } + var mu sync.Mutex + writeErr := func(msg Message) error { + mu.Lock() + defer mu.Unlock() + return writeMsg(conn, msg) + } + write := func(msg Message) { writeErr(msg) } //nolint + id := pool.nextReqID() - stop := watchCtx(ctx, conn, id) + stop := watchCtx(ctx, conn, id, write) defer stop() - if err := writeMsg(conn, Message{ + if err := writeErr(Message{ ID: id, Type: TypeCall, Method: method, @@ -234,18 +270,18 @@ func invokeStreamIn[R any](ctx context.Context, pool Pool, method string, stream w.release(conn, false) return zero, fmt.Errorf("marshal chunk: %w", err) } - if err := writeMsg(conn, Message{ID: id, Type: TypeChunk, Data: chunkData}); err != nil { + if err := writeErr(Message{ID: id, Type: TypeChunk, Data: chunkData}); err != nil { w.release(conn, false) return zero, contextErr(ctx, fmt.Errorf("write chunk: %w", err)) } } - if err := writeMsg(conn, Message{ID: id, Type: TypeEnd}); err != nil { + if err := writeErr(Message{ID: id, Type: TypeEnd}); err != nil { w.release(conn, false) return zero, contextErr(ctx, fmt.Errorf("write end: %w", err)) } - resp, err := readMsg(conn) + resp, err := readResult(ctx, conn, pool, write) if err != nil { w.release(conn, false) return zero, contextErr(ctx, fmt.Errorf("read response: %w", err)) @@ -297,6 +333,9 @@ func invokeStreamBoth[R any](ctx context.Context, pool Pool, method string, stre outCh := reflect.MakeChan(rt, 64) + var mu sync.Mutex + write := func(msg Message) { mu.Lock(); writeMsg(conn, msg); mu.Unlock() } //nolint + // 写入 goroutine:输入 channel → Python chunks go func() { for { @@ -308,23 +347,26 @@ func invokeStreamBoth[R any](ctx context.Context, pool Pool, method string, stre if err != nil { break } - if err := writeMsg(conn, Message{ID: id, Type: TypeChunk, Data: data}); err != nil { + mu.Lock() + err = writeMsg(conn, Message{ID: id, Type: TypeChunk, Data: data}) + mu.Unlock() + if err != nil { break } } - writeMsg(conn, Message{ID: id, Type: TypeEnd}) //nolint + write(Message{ID: id, Type: TypeEnd}) }() - // 读取 goroutine:Python chunks → 输出 channel + // 读取 goroutine:Python chunks → 输出 channel,内联处理 callback go func() { - stop := watchCtx(ctx, conn, id) + stop := watchCtx(ctx, conn, id, write) defer func() { stop() outCh.Close() w.release(conn, ctx.Err() == nil) }() for { - msg, err := readMsg(conn) + msg, err := readResult(ctx, conn, pool, write) if err != nil || msg.Type == TypeEnd || msg.Type == TypeError { return } diff --git a/example/main.go b/example/main.go index 782a9c3..ea13d1a 100644 --- a/example/main.go +++ b/example/main.go @@ -32,6 +32,12 @@ func main() { ctx := context.Background() + demoPool(ctx, pool) + demoServer(ctx, script) +} + +func demoPool(ctx context.Context, pool gobridge.Pool) { + // ── 普通调用 ────────────────────────────────────────────────────────── sum, err := gobridge.Invoke[int](ctx, pool, "add", 3, 4) if err != nil { @@ -139,3 +145,78 @@ func main() { fmt.Printf(" %+v\n", u) } } + +// goService 实现 Handler 接口,公开方法自动暴露给 Python 通过 call_go() 调用 +type goService struct { + pool gobridge.Pool // 用于 EnrichName 内部再调 Python +} + +func (s *goService) Multiply(ctx context.Context, a, b int) (int, error) { + return a * b, nil +} + +func (s *goService) Log(msg string) { + fmt.Println("[Go Log]", msg) +} + +// EnrichName 内部通过 Invoke 调用 Python 的 to_upper,演示 Go→Python→Go→Python 四层链路 +func (s *goService) EnrichName(ctx context.Context, name string) (string, error) { + upper, err := gobridge.Invoke[string](ctx, s.pool, "to_upper", name) + if err != nil { + return "", err + } + return "Hello, " + upper + "!", nil +} + +func (s *goService) MakeUser(ctx context.Context, uid int) (User, error) { + return User{ID: uid, Name: fmt.Sprintf("user_%d", uid), Score: float64(uid) * 1.5}, nil +} + +func demoServer(ctx context.Context, script string) { + fmt.Println("\n── Server 全双工示例 ─────────────────────────────────────────────") + + svc := &goService{} + serv, err := gobridge.NewPool(script, + gobridge.WithWorkers(1), + gobridge.WithHandlers(svc), + ) + if err != nil { + log.Fatal(err) + } + defer serv.Close() + svc.pool = serv // 注入 pool 供 EnrichName 内部调用 + + // ── 示例1:Python 调用 Go Multiply ─────────────────────────────────────── + result, err := gobridge.Invoke[int](ctx, serv, "compute_with_go_mul", 6, 7) + if err != nil { + log.Fatal(err) + } + fmt.Println("compute_with_go_mul(6, 7) =", result) // 42 + + // ── 示例2:流式输出 + Go Log 回调 ──────────────────────────────────────── + ch, err := gobridge.Invoke[chan int](ctx, serv, "squared_with_log", 4) + if err != nil { + log.Fatal(err) + } + fmt.Print("squared_with_log(4) =") + for v := range ch { + fmt.Print(" ", v) + } + fmt.Println() // 1 4 9 16 + + // ── 示例3:Go→Python→Go→Python 四层全双工链路 ─────────────────────────── + // full_chain("world") → call_go[str]("EnrichName","world") → Invoke to_upper("world") → "WORLD" + // ← "Hello, WORLD!" ← "Hello, WORLD!" + greeting, err := gobridge.Invoke[string](ctx, serv, "full_chain", "world") + if err != nil { + log.Fatal(err) + } + fmt.Println("full_chain(world) =", greeting) // Hello, WORLD! + + // ── 示例4:call_go[User] 将 Go 返回的 dict 自动构造为 dataclass ────────── + enriched, err := gobridge.Invoke[User](ctx, serv, "get_user_via_go", 12) + if err != nil { + log.Fatal(err) + } + fmt.Printf("get_user_via_go(12) = %+v\n", enriched) // {ID:12 Name:user_12 Score:18 Level:gold} +} diff --git a/example/worker.py b/example/worker.py index 5626c90..6cb7197 100644 --- a/example/worker.py +++ b/example/worker.py @@ -2,9 +2,24 @@ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "python")) -from gobridge import expose, run +import dataclasses +import threading from typing import Iterator +from gobridge import expose, call_go, run, worker_id, worker_count + +# ── worker_id / worker_count ────────────────────────────────────────────────── +# 只有 worker 0 才执行一次性初始化(如监听端口、建立长连接等), +# 其余 worker 跳过,避免端口冲突 / 重复连接。 +print(f"[worker {worker_id}/{worker_count}] started", flush=True) +if worker_id == 0: + def _init_shared_resource(): + # 示例:此处可启动 WebSocket 客户端、监听 TCP 端口等 + print(f"[worker {worker_id}] shared resource initialized", flush=True) + threading.Thread(target=_init_shared_resource, daemon=True).start() + + +# ── 基础类型 ───────────────────────────────────────────────────────────────── @expose def add(a: int, b: int) -> int: @@ -26,12 +41,20 @@ def sum_stream(numbers: Iterator[int]) -> int: @expose def double_stream(numbers: Iterator[int]) -> Iterator[int]: - """双向流:输入每个数,yield 其平方,对应 Go 侧 Invoke[chan int](c, ctx, "double_stream", inputChan)""" + """双向流:输入每个数,yield 其平方""" for n in numbers: yield n * n -# ── struct(dict)类型 ────────────────────────────────────────────────────── +# ── struct(dataclass / dict)类型 ─────────────────────────────────────────── + +@dataclasses.dataclass +class User: + id: int + name: str + score: float + level: str = "" + @expose def get_user(uid: int) -> dict: @@ -56,8 +79,6 @@ def enrich_users(users: list) -> list: return result -# ── struct/slice 流式组合 ──────────────────────────────────────────────────── - @expose def gen_users(count: int) -> Iterator[dict]: """流式输出 struct:yield 多个 User,对应 Go 侧 Invoke[chan User]""" @@ -72,4 +93,48 @@ def process_users(users: Iterator[dict]) -> Iterator[dict]: yield {"id": u["id"], "name": u["name"].upper(), "score": u["score"] * 2} +# ── Server 全双工示例 ──────────────────────────────────────────────────────── + +@expose +def compute_with_go_mul(a: int, b: int) -> int: + """示例1:call_go[int] 指定返回类型""" + return call_go[int]("Multiply", a, b) + + +@expose +def squared_with_log(n: int) -> Iterator[int]: + """示例2:流式输出,每次 yield 前 call_go("Log") 回调 Go""" + for i in range(1, n + 1): + call_go("Log", f"yielding {i}² = {i * i}") + yield i * i + + +@expose +def to_upper(s: str) -> str: + """辅助方法:被 Go 的 EnrichName handler 内部调用""" + return s.upper() + + +@expose +def full_chain(name: str) -> str: + """示例3:Go→Python→Go→Python 四层链路 + + full_chain("world") + → call_go[str]("EnrichName", "world") # Python 调 Go + → Invoke[string](ctx, serv, "to_upper", "world") # Go 再调 Python + ← "WORLD" + ← "Hello, WORLD!" + ← "Hello, WORLD!" + """ + return call_go[str]("EnrichName", name) + + +@expose +def get_user_via_go(uid: int) -> dict: + """示例4:call_go[User] 自动将 Go 返回的 dict 构造为 dataclass 实例""" + user = call_go[User]("MakeUser", uid) # Go 返回 {"id":..,"name":..,"score":..} + user.level = "gold" if user.score >= 10 else "silver" + return dataclasses.asdict(user) + + run() diff --git a/pool.go b/pool.go index eb451c1..a91c96c 100644 --- a/pool.go +++ b/pool.go @@ -2,9 +2,12 @@ package gobridge import ( "context" + "encoding/json" "fmt" "io" "net" + "reflect" + "sync" "sync/atomic" ) @@ -19,6 +22,7 @@ type poolConfig struct { socketDir string stdout io.Writer stderr io.Writer + handler any } // Option 是 NewPool 的函数选项 @@ -41,7 +45,6 @@ func WithPythonExe(exe string) Option { } // WithScriptArgs 设置脚本路径之后的附加参数 -// uv 模式示例:WithScriptArgs("run") → 执行 uv run