package gobridge import ( "context" "fmt" "net" "os" "os/exec" "sync" "sync/atomic" "time" ) type worker struct { cfg *poolConfig id int sockPath string conns chan net.Conn stopped atomic.Bool stopCh chan struct{} stopOnce sync.Once mu sync.Mutex // 保护 cmd cmd *exec.Cmd deathPipe *os.File // 管道写端,关闭时通知 Python 子进程退出 } func newWorker(cfg *poolConfig, id int) (*worker, error) { w := &worker{ cfg: cfg, id: id, conns: make(chan net.Conn, cfg.maxConnsPerWorker), stopCh: make(chan struct{}), } if err := w.start(); err != nil { return nil, err } go w.monitor() return w, nil } // start 启动 Python 子进程并预建连接池,返回当前 cmd 供 monitor() 等待 func (w *worker) start() error { sockPath := fmt.Sprintf("%s/gobridge-%d-%d.sock", w.cfg.socketDir, os.Getpid(), w.id) os.Remove(sockPath) w.sockPath = sockPath // 创建 death pipe:Go 持有写端,Python 阻塞读端。 // Go 进程消亡(任何原因)时写端自动关闭,Python 立即收到 EOF 并退出。 pr, pw, err := os.Pipe() if err != nil { return fmt.Errorf("create death pipe: %w", err) } cmd := exec.Command(w.cfg.pythonExe, w.cfg.scriptArgs...) cmd.Dir = w.cfg.workDir cmd.Env = append(os.Environ(), w.cfg.env...) cmd.Env = append(cmd.Env, "GOBRIDGE_SOCKET_PATH="+sockPath, fmt.Sprintf("GOBRIDGE_WORKER_ID=%d", w.id), fmt.Sprintf("GOBRIDGE_WORKER_COUNT=%d", w.cfg.workers), "GOBRIDGE_DEATH_FD=3", // ExtraFiles[0] → fd 3 "PYTHONUNBUFFERED=1", // Python stdout/stderr 不缓冲,print() 立即输出 ) cmd.ExtraFiles = []*os.File{pr} // fd 3 in child prefix := fmt.Sprintf("[python:%d] ", w.id) stdout := w.cfg.stdout if stdout == nil { stdout = os.Stdout } stderr := w.cfg.stderr if stderr == nil { stderr = os.Stderr } cmd.Stdout = newPrefixWriter(stdout, prefix) cmd.Stderr = newPrefixWriter(stderr, prefix) if err := cmd.Start(); err != nil { pr.Close() pw.Close() return fmt.Errorf("start python worker: %w", err) } pr.Close() // 父进程不需要读端,关闭后子进程独占 w.mu.Lock() w.cmd = cmd if w.deathPipe != nil { w.deathPipe.Close() // 关闭上一个周期的写端(重启场景) } w.deathPipe = pw w.mu.Unlock() // 等待 socket 文件出现(最多 10 秒) deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { if _, err := os.Stat(sockPath); err == nil { break } select { case <-w.stopCh: cmd.Process.Kill() cmd.Wait() return fmt.Errorf("stopped while waiting for socket") case <-time.After(50 * time.Millisecond): } } if _, err := os.Stat(sockPath); err != nil { cmd.Process.Kill() cmd.Wait() return fmt.Errorf("worker socket did not appear: %s", sockPath) } // 预建连接 for i := 0; i < w.cfg.maxConnsPerWorker; i++ { conn, err := net.DialTimeout("unix", sockPath, 5*time.Second) if err != nil { cmd.Process.Kill() cmd.Wait() for len(w.conns) > 0 { (<-w.conns).Close() } return fmt.Errorf("connect to worker: %w", err) } w.conns <- conn } return nil } // monitor 监控 Python 进程,崩溃时自动重启(指数退避,最长 30s) func (w *worker) monitor() { for { // 等待当前进程退出 w.mu.Lock() cmd := w.cmd w.mu.Unlock() cmd.Wait() if w.stopped.Load() { return } os.Remove(w.sockPath) // 排空失效连接 for len(w.conns) > 0 { (<-w.conns).Close() } // 指数退避重启 for attempt := 0; !w.stopped.Load(); attempt++ { if err := w.start(); err == nil { break } delay := time.Duration(min(1< 0 { (<-w.conns).Close() } os.Remove(w.sockPath) }