From 57775a33ec1df2c7ff3ab0974e426ca1a1b1d0e1 Mon Sep 17 00:00:00 2001 From: what Date: Tue, 19 May 2026 20:07:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20[]byte=20=E2=86=94?= =?UTF-8?q?=20bytes=20=E6=94=AF=E6=8C=81=EF=BC=88base64=20=E9=80=8F?= =?UTF-8?q?=E6=98=8E=E7=BC=96=E8=A7=A3=E7=A0=81=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Python 侧:_decode_bytes_args 根据函数注解自动解码入参,_bytes_encode 自动编码 bytes 返回值 - _cast 支持 bytes 类型(call_go[bytes] 返回值解码) - 流式输出同步支持 chan []byte(每个 chunk 独立编解码) - example/worker.py 新增 bytes_reverse / bytes_concat / bytes_chunks 示例 - example/main.go 新增对应演示用例 - README 补充类型表 []byte 行及完整使用章节 --- README.md | 54 +++++++++++++++++++++++++++++++++++++ example/main.go | 24 +++++++++++++++++ example/worker.py | 22 +++++++++++++++ python/gobridge/__init__.py | 30 +++++++++++++++++++-- 4 files changed, 128 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a4b7c51..9478752 100644 --- a/README.md +++ b/README.md @@ -269,6 +269,57 @@ gobridge.Invoke(ctx, sessB, "increment", 99) // worker 0: counter = 99(独立 | 携带方式 | 替换 `pool` 参数 | 写入 `ctx` | | 超时支持 | `context.WithTimeout(ctx, d)` 正常使用 | 同左 | +## []byte ↔ bytes 支持 + +Go 的 `[]byte` 通过 **base64** 编码在 JSON 帧中传输,框架在 Python 侧自动完成编解码,用户侧完全透明。 + +### Python 侧 + +函数参数注解为 `bytes` 时,框架自动将 Go 传入的 base64 字符串解码为 `bytes`; +返回值为 `bytes` 时,框架自动将其编码为 base64 字符串再发送给 Go。 + +```python +from gobridge import expose +from typing import Iterator + +@expose +def bytes_reverse(data: bytes) -> bytes: + return data[::-1] + +@expose +def bytes_concat(a: bytes, b: bytes) -> bytes: + return a + b + +# 流式输出 bytes:对应 Go Invoke[chan []byte] +@expose +def bytes_chunks(data: bytes, size: int) -> Iterator[bytes]: + for i in range(0, len(data), size): + yield data[i:i + size] +``` + +### Go 侧 + +Go 直接使用 `[]byte`,`encoding/json` 自动处理 base64 编解码: + +```go +// 普通 []byte 参数与返回值 +rev, err := gobridge.Invoke[[]byte](ctx, pool, "bytes_reverse", []byte("hello")) +fmt.Printf("%s\n", rev) // olleh + +cat, err := gobridge.Invoke[[]byte](ctx, pool, "bytes_concat", []byte("foo"), []byte("bar")) +fmt.Printf("%s\n", cat) // foobar + +// 流式输出 []byte +ch, err := gobridge.Invoke[chan []byte](ctx, pool, "bytes_chunks", []byte("abcdefgh"), 3) +for chunk := range ch { + fmt.Printf("%s ", chunk) // abc def gh +} +``` + +> **效率说明**:base64 编码约使数据体积增大 33%,并有少量 CPU 开销。 +> 对于小块二进制数据(< 1 MB)的 RPC 调用,这通常可以忽略不计; +> 若需传输大量原始二进制流,建议改用独立的 socket/文件通道。 + ## 注意事项 ### 在 handler 中使用 `threading.Thread` @@ -540,10 +591,13 @@ gobridge/ | `float64` | `float` | | `string` | `str` | | `bool` | `bool` | +| `[]byte` | `bytes` | | `struct` | `dict` | | `[]T` | `list` | | `chan T` | `Iterator[T]` | +> `[]byte` 经 base64 编码在 JSON 帧中传输,框架自动完成编解码,用户侧透明。`chan T` 中的 `T` 同样支持 `[]byte`,即 `chan []byte` ↔ `Iterator[bytes]`。 + ## 参考 本项目的进程池、UDS 通信、帧协议设计参考自 [pyproc](https://github.com/YuminosukeSato/pyproc),在此基础上增加了 Go channel 与 Python yield 的流式对接及 ctx 取消支持。 diff --git a/example/main.go b/example/main.go index 16e61be..d5056b6 100644 --- a/example/main.go +++ b/example/main.go @@ -145,6 +145,30 @@ func demoPool(ctx context.Context, pool gobridge.Pool) { for u := range procCh { fmt.Printf(" %+v\n", u) } + + // ── []byte 输入输出 ─────────────────────────────────────────────────────── + rev, err := gobridge.Invoke[[]byte](ctx, pool, "bytes_reverse", []byte("hello")) + if err != nil { + log.Fatal(err) + } + fmt.Printf("bytes_reverse(hello) = %s\n", rev) // olleh + + cat, err := gobridge.Invoke[[]byte](ctx, pool, "bytes_concat", []byte("foo"), []byte("bar")) + if err != nil { + log.Fatal(err) + } + fmt.Printf("bytes_concat(foo, bar) = %s\n", cat) // foobar + + // ── []byte 流式输出:Python yield bytes → Go chan []byte ────────────────── + bCh, err := gobridge.Invoke[chan []byte](ctx, pool, "bytes_chunks", []byte("abcdefgh"), 3) + if err != nil { + log.Fatal(err) + } + fmt.Print("bytes_chunks(abcdefgh, 3) =") + for chunk := range bCh { + fmt.Printf(" %s", chunk) + } + fmt.Println() // abc def gh } // goService 实现 Handler 接口,公开方法自动暴露给 Python 通过 call_go() 调用 diff --git a/example/worker.py b/example/worker.py index 3cb981a..5d3e705 100644 --- a/example/worker.py +++ b/example/worker.py @@ -98,6 +98,28 @@ def process_users(users: Iterator[dict]) -> Iterator[dict]: yield {"id": u["id"], "name": u["name"].upper(), "score": u["score"] * 2} +# ── []byte / bytes 示例 ────────────────────────────────────────────────────── + + +@expose +def bytes_reverse(data: bytes) -> bytes: + """接收 []byte,返回翻转后的 []byte""" + return data[::-1] + + +@expose +def bytes_concat(a: bytes, b: bytes) -> bytes: + """接收两个 []byte 参数,返回拼接结果""" + return a + b + + +@expose +def bytes_chunks(data: bytes, size: int): + """流式输出:将 []byte 按 size 切分,逐块 yield(对应 Go Invoke[chan []byte])""" + for i in range(0, len(data), size): + yield data[i:i + size] + + # ── Server 全双工示例 ──────────────────────────────────────────────────────── diff --git a/python/gobridge/__init__.py b/python/gobridge/__init__.py index 6c6d081..2b417c3 100644 --- a/python/gobridge/__init__.py +++ b/python/gobridge/__init__.py @@ -36,6 +36,7 @@ gobridge - Python 端库,配合 Go 侧 gobridge 使用 run() """ +import base64 import ctypes import dataclasses import inspect @@ -91,15 +92,39 @@ def _cast(t: type, value: Any) -> Any: """将 Go 返回的 JSON 值转换为指定类型。 - dataclass:用 dict 字段构造实例 + - bytes:base64 str → bytes(对应 Go []byte) - 其余类型:JSON 已经是正确的 Python 原生类型,直接返回 """ if value is None or t is type(None): return value + if t is bytes and isinstance(value, str): + return base64.b64decode(value) if dataclasses.is_dataclass(t) and not isinstance(value, t) and isinstance(value, dict): return t(**value) return value +def _bytes_encode(v: Any) -> Any: + """bytes → base64 str,使 json.dumps 可序列化(对应 Go []byte)。""" + if isinstance(v, bytes): + return base64.b64encode(v).decode() + return v + + +def _decode_bytes_args(fn, args: list) -> list: + """根据函数类型注解将 base64 str 参数解码为 bytes(对应 Go []byte 入参)。""" + try: + params = list(inspect.signature(fn).parameters.values()) + for i, param in enumerate(params): + if i >= len(args): + break + if param.annotation is bytes and isinstance(args[i], str): + args[i] = base64.b64decode(args[i]) + except Exception: + pass + return args + + class _CallGoType: """实现 call_go 和 call_go[Type] 两种调用形式。 @@ -305,16 +330,17 @@ def _dispatch(mux: _ConnMux, msg: dict): args[stream_arg_idx] = chunk_iter_instance try: + args = _decode_bytes_args(fn, args) result = fn(*args) if inspect.isgenerator(result): try: for item in result: - mux.write({"id": msg_id, "type": "chunk", "data": item}) + mux.write({"id": msg_id, "type": "chunk", "data": _bytes_encode(item)}) finally: mux.write({"id": msg_id, "type": "end"}) else: - mux.write({"id": msg_id, "type": "result", "data": result}) + mux.write({"id": msg_id, "type": "result", "data": _bytes_encode(result)}) except InterruptedError: pass # ctx 取消,连接已被 Go 侧关闭,无需回写