// Package video implements logic to stream video from a config. package video import ( "bufio" "context" "errors" "io" "os/exec" "sync" "github.com/chathaway-codes/home-sensors/v2/internal/pipespy" "github.com/chathaway-codes/home-sensors/v2/internal/watcher/config" "github.com/google/uuid" "github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3/pkg/media/ivfreader" "github.com/rs/zerolog/log" ) var Default = &Mod{} type Video struct { mu sync.Mutex h264Cmd, ivfCmd *exec.Cmd ivfListeners map[string]chan<- []byte ivfCodecReady chan struct{} ivfCodec string cancelFunc func() ctx context.Context } func New(cfg *config.Config) (*Video, error) { ctx, cancelFunc := context.WithCancel(context.Background()) // Setup commands h264cmd := exec.CommandContext(ctx, cfg.H264Cmd.Binary, cfg.H264Cmd.Arguments...) ivfCmd := exec.CommandContext(ctx, cfg.IVFCmd.Binary, cfg.IVFCmd.Arguments...) return &Video{ h264Cmd: h264cmd, ivfCmd: ivfCmd, ivfListeners: make(map[string]chan<- []byte), ivfCodecReady: make(chan struct{}), cancelFunc: cancelFunc, ctx: ctx, }, nil } // Run launches the commands and begins creating data. It will block until Done is called. func (v *Video) Run() { pipe := pipespy.New() pipe.Add(pipespy.NewCmd(v.h264Cmd)) ivfSnoop := pipe.Add(pipespy.NewCmd(v.ivfCmd)).Snoop() log.Info().Str("cmd", v.h264Cmd.String()).Msg("h264 command") log.Info().Str("cmd", v.ivfCmd.String()).Msg("ivf command") // Log stderr if it appears go logToStdErr("h264", &v.h264Cmd.Stderr) go logToStdErr("ivf", &v.ivfCmd.Stderr) cleanUp := pipe.Start() defer func() { errs := cleanUp() for _, err := range errs { log.Err(err).Send() } }() go func(r io.Reader) { ivf, header, err := ivfreader.NewWith(r) if err != nil { log.Error().Err(err).Msg("failed to create ivfreader") return } // Determine video codec var trackCodec string switch header.FourCC { case "AV01": trackCodec = webrtc.MimeTypeAV1 case "VP90": trackCodec = webrtc.MimeTypeVP9 case "VP80": trackCodec = webrtc.MimeTypeVP8 default: log.Error().Err(err).Str("fourcc", header.FourCC).Msg("unable to handle FourCC") } v.mu.Lock() v.ivfCodec = trackCodec close(v.ivfCodecReady) v.mu.Unlock() log.Info().Msgf("starting to stream IVF with codec %q", trackCodec) for { select { case <-v.ctx.Done(): // Exit cleanly return default: // do nothing } frame, _, ivfErr := ivf.ParseNextFrame() if errors.Is(ivfErr, io.EOF) { log.Debug().Msg("all video frames parsed and sent") return } if ivfErr != nil { log.Error().Err(err).Msg("failed to parse frame") } v.mu.Lock() for _, lis := range v.ivfListeners { lis <- frame } v.mu.Unlock() } }(ivfSnoop) log.Info().Msg("video streams started") <-v.ctx.Done() } // Join will connect to a running stream; note, it will block // until the ivf codec is decided. Make sure to call Run first. func (v *Video) Join() (<-chan []byte, string, func()) { <-v.ivfCodecReady v.mu.Lock() defer v.mu.Unlock() myID := uuid.New().String() ch := make(chan []byte) v.ivfListeners[myID] = ch return ch, v.ivfCodec, func() { v.mu.Lock() defer v.mu.Unlock() // Close the channel close(v.ivfListeners[myID]) // Consume any pending frames for _ := range v.ivfListeners[myID] { // do nothing } delete(v.ivfListeners, myID) } } func logToStdErr(name string, w *io.Writer) { pipeReader, pipeWriter := io.Pipe() *w = pipeWriter lineReader := bufio.NewScanner(pipeReader) for lineReader.Scan() { log.Info().Str("video", name).Str("stderr", lineReader.Text()).Send() } } // Done stops the processing. func (v *Video) Done() { v.cancelFunc() } type Mod struct{} func (m *Mod) Get() (*Video, error) { cfg, err := config.Default.Get() if err != nil { return nil, err } return New(cfg) }