diff --git a/prefix_writer.go b/prefix_writer.go new file mode 100644 index 0000000..2be05ad --- /dev/null +++ b/prefix_writer.go @@ -0,0 +1,52 @@ +package gobridge + +import ( + "bytes" + "io" + "sync" +) + +// prefixWriter 对每一行输出添加固定前缀,线程安全。 +type prefixWriter struct { + mu sync.Mutex + w io.Writer + prefix []byte + buf []byte +} + +func newPrefixWriter(w io.Writer, prefix string) *prefixWriter { + return &prefixWriter{w: w, prefix: []byte(prefix)} +} + +func (p *prefixWriter) Write(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + n := len(b) + p.buf = append(p.buf, b...) + + for { + idx := bytes.IndexByte(p.buf, '\n') + if idx < 0 { + break + } + line := append(p.prefix, p.buf[:idx+1]...) + if _, err := p.w.Write(line); err != nil { + return n, err + } + p.buf = p.buf[idx+1:] + } + return n, nil +} + +// flush 将缓冲区中尚未以换行结尾的内容输出(进程退出时调用)。 +func (p *prefixWriter) flush() { + p.mu.Lock() + defer p.mu.Unlock() + if len(p.buf) > 0 { + line := append(p.prefix, p.buf...) + line = append(line, '\n') + p.w.Write(line) //nolint + p.buf = nil + } +} diff --git a/worker.go b/worker.go index 4e1a5cd..5ed5477 100644 --- a/worker.go +++ b/worker.go @@ -60,18 +60,20 @@ func (w *worker) start() error { fmt.Sprintf("GOBRIDGE_WORKER_ID=%d", w.id), fmt.Sprintf("GOBRIDGE_WORKER_COUNT=%d", w.cfg.workers), "GOBRIDGE_DEATH_FD=3", // ExtraFiles[0] → fd 3 + "PYTHONUNBUFFERED=1", // Python stdout/stderr 不缓冲,print() 立即输出 ) cmd.ExtraFiles = []*os.File{pr} // fd 3 in child - if w.cfg.stdout != nil { - cmd.Stdout = w.cfg.stdout - } else { - cmd.Stdout = os.Stdout + prefix := fmt.Sprintf("[python:%d] ", w.id) + stdout := w.cfg.stdout + if stdout == nil { + stdout = os.Stdout } - if w.cfg.stderr != nil { - cmd.Stderr = w.cfg.stderr - } else { - cmd.Stderr = os.Stderr + stderr := w.cfg.stderr + if stderr == nil { + stderr = os.Stderr } + cmd.Stdout = newPrefixWriter(stdout, prefix) + cmd.Stderr = newPrefixWriter(stderr, prefix) if err := cmd.Start(); err != nil { pr.Close()