Files
home-sensors/internal/video/video.go
T

178 lines
3.9 KiB
Go
Raw Normal View History

2023-10-01 20:38:38 -07:00
// Package video implements logic to stream video from a config.
package video
import (
"bufio"
"context"
"errors"
"io"
"os/exec"
"sync"
"github.com/chathaway-codes/home-sensors/v2/internal/pipespy"
"github.com/chathaway-codes/home-sensors/v2/internal/watcher/config"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/ivfreader"
"github.com/rs/zerolog/log"
)
var Default = &Mod{}
type Video struct {
mu sync.Mutex
h264Cmd, ivfCmd *exec.Cmd
ivfListeners map[string]chan<- []byte
ivfCodecReady chan struct{}
ivfCodec string
cancelFunc func()
ctx context.Context
}
func New(cfg *config.Config) (*Video, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
// Setup commands
h264cmd := exec.CommandContext(ctx, cfg.H264Cmd.Binary, cfg.H264Cmd.Arguments...)
ivfCmd := exec.CommandContext(ctx, cfg.IVFCmd.Binary, cfg.IVFCmd.Arguments...)
return &Video{
h264Cmd: h264cmd,
ivfCmd: ivfCmd,
ivfListeners: make(map[string]chan<- []byte),
ivfCodecReady: make(chan struct{}),
cancelFunc: cancelFunc,
ctx: ctx,
}, nil
}
// Run launches the commands and begins creating data. It will block until Done is called.
func (v *Video) Run() {
pipe := pipespy.New()
pipe.Add(pipespy.NewCmd(v.h264Cmd))
ivfSnoop := pipe.Add(pipespy.NewCmd(v.ivfCmd)).Snoop()
log.Info().Str("cmd", v.h264Cmd.String()).Msg("h264 command")
log.Info().Str("cmd", v.ivfCmd.String()).Msg("ivf command")
// Log stderr if it appears
go logToStdErr("h264", &v.h264Cmd.Stderr)
go logToStdErr("ivf", &v.ivfCmd.Stderr)
cleanUp := pipe.Start()
defer func() {
errs := cleanUp()
for _, err := range errs {
log.Err(err).Send()
}
}()
go func(r io.Reader) {
ivf, header, err := ivfreader.NewWith(r)
if err != nil {
log.Error().Err(err).Msg("failed to create ivfreader")
return
}
// Determine video codec
var trackCodec string
switch header.FourCC {
case "AV01":
trackCodec = webrtc.MimeTypeAV1
case "VP90":
trackCodec = webrtc.MimeTypeVP9
case "VP80":
trackCodec = webrtc.MimeTypeVP8
default:
log.Error().Err(err).Str("fourcc", header.FourCC).Msg("unable to handle FourCC")
}
v.mu.Lock()
v.ivfCodec = trackCodec
close(v.ivfCodecReady)
v.mu.Unlock()
log.Info().Msgf("starting to stream IVF with codec %q", trackCodec)
for {
select {
case <-v.ctx.Done():
// Exit cleanly
return
default:
// do nothing
}
frame, _, ivfErr := ivf.ParseNextFrame()
if errors.Is(ivfErr, io.EOF) {
log.Debug().Msg("all video frames parsed and sent")
return
}
if ivfErr != nil {
log.Error().Err(err).Msg("failed to parse frame")
}
v.mu.Lock()
for _, lis := range v.ivfListeners {
lis <- frame
}
v.mu.Unlock()
}
}(ivfSnoop)
log.Info().Msg("video streams started")
<-v.ctx.Done()
}
// Join will connect to a running stream; note, it will block
// until the ivf codec is decided. Make sure to call Run first.
func (v *Video) Join() (<-chan []byte, string, func()) {
<-v.ivfCodecReady
v.mu.Lock()
defer v.mu.Unlock()
myID := uuid.New().String()
ch := make(chan []byte)
v.ivfListeners[myID] = ch
return ch, v.ivfCodec, func() {
v.mu.Lock()
defer v.mu.Unlock()
// Close the channel
close(v.ivfListeners[myID])
// Consume any pending frames
for _ := range v.ivfListeners[myID] {
// do nothing
}
2023-10-01 20:38:38 -07:00
delete(v.ivfListeners, myID)
}
}
func logToStdErr(name string, w *io.Writer) {
pipeReader, pipeWriter := io.Pipe()
*w = pipeWriter
lineReader := bufio.NewScanner(pipeReader)
for lineReader.Scan() {
log.Info().Str("video", name).Str("stderr", lineReader.Text()).Send()
}
}
// Done stops the processing.
func (v *Video) Done() {
v.cancelFunc()
}
type Mod struct{}
func (m *Mod) Get() (*Video, error) {
cfg, err := config.Default.Get()
if err != nil {
return nil, err
}
return New(cfg)
}