From 39517dc5fc60619428f6b41a20521db888c230c1 Mon Sep 17 00:00:00 2001 From: what Date: Tue, 14 Apr 2026 13:49:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=80=9A=E8=BF=87=E5=8C=BF=E5=90=8D?= =?UTF-8?q?=E7=AE=A1=E9=81=93=E5=AE=9E=E7=8E=B0=E7=88=B6=E8=BF=9B=E7=A8=8B?= =?UTF-8?q?=E6=AD=BB=E4=BA=A1=E6=A3=80=E6=B5=8B=EF=BC=8C=E6=9B=BF=E4=BB=A3?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - worker.go: 为每个子进程创建 death pipe,Go 持有写端 - __init__.py: run() 中监听管道读端 EOF,Go 退出时立即终止子进程 - 解决 Ctrl+C / panic / SIGKILL 等场景下 Python worker 变成孤儿进程的问题 - Python 包版本升至 0.1.2 --- example/worker.py | 11 ++++++++++- python/gobridge/__init__.py | 11 +++++++++++ python/pyproject.toml | 2 +- worker.go | 30 ++++++++++++++++++++++++++---- 4 files changed, 48 insertions(+), 6 deletions(-) diff --git a/example/worker.py b/example/worker.py index 6cb7197..7e3fb4d 100644 --- a/example/worker.py +++ b/example/worker.py @@ -1,5 +1,6 @@ import sys import os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "python")) import dataclasses @@ -13,14 +14,17 @@ from gobridge import expose, call_go, run, worker_id, worker_count # 其余 worker 跳过,避免端口冲突 / 重复连接。 print(f"[worker {worker_id}/{worker_count}] started", flush=True) if worker_id == 0: + def _init_shared_resource(): # 示例:此处可启动 WebSocket 客户端、监听 TCP 端口等 print(f"[worker {worker_id}] shared resource initialized", flush=True) + threading.Thread(target=_init_shared_resource, daemon=True).start() # ── 基础类型 ───────────────────────────────────────────────────────────────── + @expose def add(a: int, b: int) -> int: return a + b @@ -48,6 +52,7 @@ def double_stream(numbers: Iterator[int]) -> Iterator[int]: # ── struct(dataclass / dict)类型 ─────────────────────────────────────────── + @dataclasses.dataclass class User: id: int @@ -95,6 +100,7 @@ def process_users(users: Iterator[dict]) -> Iterator[dict]: # ── Server 全双工示例 ──────────────────────────────────────────────────────── + @expose def compute_with_go_mul(a: int, b: int) -> int: """示例1:call_go[int] 指定返回类型""" @@ -137,4 +143,7 @@ def get_user_via_go(uid: int) -> dict: return dataclasses.asdict(user) -run() +if __name__ == "__main__": + run() + print("worker_id", worker_id) + print("worker_count", worker_count) diff --git a/python/gobridge/__init__.py b/python/gobridge/__init__.py index 796c32d..6c6d081 100644 --- a/python/gobridge/__init__.py +++ b/python/gobridge/__init__.py @@ -385,6 +385,17 @@ def run(): server.bind(sock_path) server.listen(64) + # Death pipe:Go 持有写端,本进程阻塞读端。 + # Go 进程消亡(任何原因:Ctrl+C、panic、SIGKILL)时写端自动关闭,read() 立即返回 EOF。 + death_fd = int(os.environ.get("GOBRIDGE_DEATH_FD", "0")) + if death_fd: + def _watch_death_pipe(): + with os.fdopen(death_fd, "rb") as f: + f.read(1) # 阻塞直到 Go 关闭写端(EOF) + server.close() + os._exit(0) + threading.Thread(target=_watch_death_pipe, daemon=True).start() + try: while True: try: diff --git a/python/pyproject.toml b/python/pyproject.toml index c272b4a..7f6db1b 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "gobridge" -version = "0.1.1" +version = "0.1.2" description = "Python 端库,配合 Go 侧 gobridge 使用" requires-python = ">=3.10" diff --git a/worker.go b/worker.go index df8851e..4e1a5cd 100644 --- a/worker.go +++ b/worker.go @@ -20,8 +20,9 @@ type worker struct { stopCh chan struct{} stopOnce sync.Once - mu sync.Mutex // 保护 cmd - cmd *exec.Cmd + mu sync.Mutex // 保护 cmd + cmd *exec.Cmd + deathPipe *os.File // 管道写端,关闭时通知 Python 子进程退出 } func newWorker(cfg *poolConfig, id int) (*worker, error) { @@ -44,6 +45,13 @@ func (w *worker) start() error { os.Remove(sockPath) w.sockPath = sockPath + // 创建 death pipe:Go 持有写端,Python 阻塞读端。 + // Go 进程消亡(任何原因)时写端自动关闭,Python 立即收到 EOF 并退出。 + pr, pw, err := os.Pipe() + if err != nil { + return fmt.Errorf("create death pipe: %w", err) + } + cmd := exec.Command(w.cfg.pythonExe, w.cfg.scriptArgs...) cmd.Dir = w.cfg.workDir cmd.Env = append(os.Environ(), w.cfg.env...) @@ -51,7 +59,9 @@ func (w *worker) start() error { "GOBRIDGE_SOCKET_PATH="+sockPath, fmt.Sprintf("GOBRIDGE_WORKER_ID=%d", w.id), fmt.Sprintf("GOBRIDGE_WORKER_COUNT=%d", w.cfg.workers), + "GOBRIDGE_DEATH_FD=3", // ExtraFiles[0] → fd 3 ) + cmd.ExtraFiles = []*os.File{pr} // fd 3 in child if w.cfg.stdout != nil { cmd.Stdout = w.cfg.stdout } else { @@ -64,11 +74,18 @@ func (w *worker) start() error { } if err := cmd.Start(); err != nil { + pr.Close() + pw.Close() return fmt.Errorf("start python worker: %w", err) } + pr.Close() // 父进程不需要读端,关闭后子进程独占 w.mu.Lock() w.cmd = cmd + if w.deathPipe != nil { + w.deathPipe.Close() // 关闭上一个周期的写端(重启场景) + } + w.deathPipe = pw w.mu.Unlock() // 等待 socket 文件出现(最多 10 秒) @@ -174,9 +191,14 @@ func (w *worker) stop() { cmd := w.cmd w.mu.Unlock() + w.mu.Lock() + pipe := w.deathPipe + w.deathPipe = nil + w.mu.Unlock() + if pipe != nil { + pipe.Close() // 关闭写端,Python 收到 EOF 自动退出 + } if cmd != nil && cmd.Process != nil { - cmd.Process.Signal(os.Interrupt) // 先发 SIGINT,Python 已忽略,等效于 Kill - time.Sleep(50 * time.Millisecond) cmd.Process.Kill() } for len(w.conns) > 0 {