From b4e77ce9a8bb655394ee87d5b9daac11092a019c Mon Sep 17 00:00:00 2001 From: what Date: Tue, 14 Apr 2026 14:38:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=BA=20Python=20worker=20=E8=BE=93?= =?UTF-8?q?=E5=87=BA=E6=B7=BB=E5=8A=A0=E8=A1=8C=E5=89=8D=E7=BC=80=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E5=90=AF=E7=94=A8=E6=97=A0=E7=BC=93=E5=86=B2=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 prefixWriter,对每行输出添加 [python:N] 前缀,便于与 Go 日志区分 - 注入 PYTHONUNBUFFERED=1,确保 print() 实时输出不被管道缓冲 --- prefix_writer.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ worker.go | 18 +++++++++-------- 2 files changed, 62 insertions(+), 8 deletions(-) create mode 100644 prefix_writer.go diff --git a/prefix_writer.go b/prefix_writer.go new file mode 100644 index 0000000..2be05ad --- /dev/null +++ b/prefix_writer.go @@ -0,0 +1,52 @@ +package gobridge + +import ( + "bytes" + "io" + "sync" +) + +// prefixWriter 对每一行输出添加固定前缀,线程安全。 +type prefixWriter struct { + mu sync.Mutex + w io.Writer + prefix []byte + buf []byte +} + +func newPrefixWriter(w io.Writer, prefix string) *prefixWriter { + return &prefixWriter{w: w, prefix: []byte(prefix)} +} + +func (p *prefixWriter) Write(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + n := len(b) + p.buf = append(p.buf, b...) + + for { + idx := bytes.IndexByte(p.buf, '\n') + if idx < 0 { + break + } + line := append(p.prefix, p.buf[:idx+1]...) + if _, err := p.w.Write(line); err != nil { + return n, err + } + p.buf = p.buf[idx+1:] + } + return n, nil +} + +// flush 将缓冲区中尚未以换行结尾的内容输出(进程退出时调用)。 +func (p *prefixWriter) flush() { + p.mu.Lock() + defer p.mu.Unlock() + if len(p.buf) > 0 { + line := append(p.prefix, p.buf...) + line = append(line, '\n') + p.w.Write(line) //nolint + p.buf = nil + } +} diff --git a/worker.go b/worker.go index 4e1a5cd..5ed5477 100644 --- a/worker.go +++ b/worker.go @@ -60,18 +60,20 @@ func (w *worker) start() error { fmt.Sprintf("GOBRIDGE_WORKER_ID=%d", w.id), fmt.Sprintf("GOBRIDGE_WORKER_COUNT=%d", w.cfg.workers), "GOBRIDGE_DEATH_FD=3", // ExtraFiles[0] → fd 3 + "PYTHONUNBUFFERED=1", // Python stdout/stderr 不缓冲,print() 立即输出 ) cmd.ExtraFiles = []*os.File{pr} // fd 3 in child - if w.cfg.stdout != nil { - cmd.Stdout = w.cfg.stdout - } else { - cmd.Stdout = os.Stdout + prefix := fmt.Sprintf("[python:%d] ", w.id) + stdout := w.cfg.stdout + if stdout == nil { + stdout = os.Stdout } - if w.cfg.stderr != nil { - cmd.Stderr = w.cfg.stderr - } else { - cmd.Stderr = os.Stderr + stderr := w.cfg.stderr + if stderr == nil { + stderr = os.Stderr } + cmd.Stdout = newPrefixWriter(stdout, prefix) + cmd.Stderr = newPrefixWriter(stderr, prefix) if err := cmd.Start(); err != nil { pr.Close()