// Package pipespy provides a structure which connects multiple things, forwarding data, but also // calling a helper in the middle to spy. package pipespy import ( "errors" "fmt" "io" "os/exec" "sync" "github.com/rs/zerolog/log" ) type process interface { // Sets the stdin. SetStdin(io.Reader) // Sets the stdout. SetStdout(io.Writer) // Start the process; should block until the process completes, then free Stdin and Stdout. Run() error } type CmdWrap struct { cmd *exec.Cmd } func NewCmd(cmd *exec.Cmd) *CmdWrap { return &CmdWrap{ cmd: cmd, } } func (s *CmdWrap) SetStdin(r io.Reader) { s.cmd.Stdin = r } func (s *CmdWrap) SetStdout(w io.Writer) { s.cmd.Stdout = w } func (s *CmdWrap) Run() error { if err := s.cmd.Start(); err != nil { return fmt.Errorf("failed to start command: %w", err) } return s.cmd.Wait() } type wrap struct { proc process writeCloser io.WriteCloser } func (s *wrap) SetStdin(r io.ReadCloser) { s.proc.SetStdin(r) } func (s *wrap) SetStdout(w io.WriteCloser) { s.writeCloser = w s.proc.SetStdout(w) } func (s *wrap) Run() error { defer s.writeCloser.Close() return s.proc.Run() } type Pipe struct { processOrder []*Spy } func New() *Pipe { return &Pipe{} } // Add a process to the pipeline. Use the returned spy object to snoop on it. // Snoop must be called before Start; will panic otherwise. func (p *Pipe) Add(proc process) *Spy { w := &wrap{ proc: proc, } piperReader, pipeWriter := io.Pipe() w.SetStdout(pipeWriter) spy := &Spy{ stdIn: piperReader, proc: w, } p.processOrder = append(p.processOrder, spy) if l := len(p.processOrder) - 1; l > 0 { p.processOrder[l].proc.SetStdin(p.processOrder[l-1].Snoop()) } return spy } // Starts the pipeline; there is a Go routine created to ensure readers dont stall. // Make sure to kill the processes, which will cause EOF to propegate. The returned // function can be used to wait for all to cleanly exit. func (p *Pipe) Start() func() []error { errorCh := make(chan error, len(p.processOrder)) wg := sync.WaitGroup{} wg.Add(len(p.processOrder)) for _, s := range p.processOrder { s.running = true go func(s *Spy) { defer wg.Done() buff := make([]byte, 128) var err error for n := 1; err == nil || n == 0; _, err = s.stdIn.Read(buff) { // do nothing } if !errors.Is(err, io.EOF) { log.Warn().Err(err).Msg("unexpected error from reader") } }(s) go func(s *Spy) { if err := s.proc.Run(); err != nil { errorCh <- err } // will this cause some writes to get lost? is that a problem? for _, closer := range s.closers { if err := closer(); err != nil { log.Warn().Err(err).Msg("error closing pipe") } } }(s) } return func() []error { wg.Wait() close(errorCh) var errs []error for err := range errorCh { errs = append(errs, err) } return errs } } type Spy struct { stdIn io.Reader closers []func() error proc *wrap running bool } // Snoop on the Stdout of the process being spied; reader will recieve a copy of the bytes. func (s *Spy) Snoop() io.ReadCloser { if s.running { panic("Call to snoop on a running process") } pipeReader, pipeWriter := io.Pipe() s.closers = append(s.closers, pipeWriter.Close) r := io.TeeReader(s.stdIn, pipeWriter) s.stdIn = r return pipeReader }