Files
home-sensors/pkg/signaler/signaler.go
T
2023-10-01 22:37:12 -07:00

429 lines
11 KiB
Go

// Package signaler implements a signaler server.
package signaler
import (
"context"
"encoding/base64"
"fmt"
"log"
"strings"
"sync"
"time"
"connectrpc.com/connect"
pb "github.com/chathaway-codes/home-sensors/v2/gen"
internalpb "github.com/chathaway-codes/home-sensors/v2/gen/token"
"github.com/gofrs/uuid/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
var (
temperatureValues = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "temperature_values_c",
Help: "Temperature sensor values in celsius",
Buckets: func() []float64 {
var buckets []float64
for i := -150.0; i < 150; i++ {
buckets = append(buckets, i)
}
return buckets
}(),
}, []string{
"home",
"camera",
})
humidityValues = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "humidity_values_prh",
Help: "Temperature sensor values in %rH",
Buckets: func() []float64 {
var buckets []float64
for i := 0.0; i < 100; i++ {
buckets = append(buckets, i)
}
return buckets
}(),
}, []string{
"home",
"camera",
})
pressureValues = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "pressure_values_hpa",
Help: "Temperature sensor values in hPa",
Buckets: func() []float64 {
var buckets []float64
for i := 900.0; i < 1500; i++ {
buckets = append(buckets, i)
}
return buckets
}(),
}, []string{
"home",
"camera",
})
)
type camera struct {
id string
}
type session struct {
id string
cameraID string
createTime time.Time
toCamera chan *pb.IceMessage
toClient chan *pb.IceMessage
}
type Server struct {
mu sync.Mutex
camerasByHome map[string]map[string]*camera
sessionsByCamera map[string]chan *session
sessionsById map[string]*session
// Most recent sample
samplesByCamera map[string]map[pb.Sample_Type]*pb.Sample
}
func New() *Server {
s := &Server{
camerasByHome: make(map[string]map[string]*camera),
sessionsByCamera: make(map[string]chan *session),
sessionsById: make(map[string]*session),
samplesByCamera: make(map[string]map[pb.Sample_Type]*pb.Sample),
}
go s.cleanup()
go s.logSensorValues()
return s
}
func (s *Server) CreateAuthToken(ctx context.Context, request *connect.Request[pb.CreateAuthTokenRequest]) (*connect.Response[pb.AuthToken], error) {
req := request.Msg
log.Printf("Creating auth token")
defer log.Printf("Done creating auth token")
var id string
switch req.Type.(type) {
case *pb.CreateAuthTokenRequest_Camera_:
id = req.GetCamera().GetId()
thisCamera := &camera{
id: id,
}
home := req.GetHome()
s.mu.Lock()
if _, ok := s.camerasByHome[home]; !ok {
s.camerasByHome[home] = make(map[string]*camera)
}
s.camerasByHome[home][id] = thisCamera
if _, ok := s.sessionsByCamera[id]; ok {
close(s.sessionsByCamera[id])
}
s.sessionsByCamera[id] = make(chan *session, 100)
s.mu.Unlock()
case *pb.CreateAuthTokenRequest_Client_:
myUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error creating UUID: %v", err)
}
id = myUUID.String()
}
token := &internalpb.AuthToken{
Uid: id,
Home: req.GetHome(),
}
bytes, err := proto.Marshal(token)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to generate token: %v", err)
}
outToken := base64.URLEncoding.EncodeToString(bytes)
return connect.NewResponse(&pb.AuthToken{
Token: outToken,
}), nil
}
func (s *Server) ListCameras(ctx context.Context, request *connect.Request[pb.ListCamerasRequest]) (*connect.Response[pb.ListCamerasResponse], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
s.mu.Lock()
defer s.mu.Unlock()
var cameras []*pb.Camera
for _, camera := range s.camerasByHome[authToken.Home] {
cameras = append(cameras, &pb.Camera{
Identifier: &pb.Camera_Identifier{
Id: camera.id,
},
})
}
return connect.NewResponse(&pb.ListCamerasResponse{Cameras: cameras}), nil
}
// CreateSession creates a new session that can be seen bv the provided Camera and Peer.
//
// Optionally, wait_for_update can be set to prevent returning until the Camera has seen the
// session request, populated candidates, and returned a session offer.
func (s *Server) CreateSession(ctx context.Context, request *connect.Request[pb.CreateSessionRequest]) (*connect.Response[pb.Session], error) {
log.Printf("Creating session")
defer log.Printf("Done session")
thisSession := request.Msg.Session
if thisSession == nil {
return nil, status.Errorf(codes.InvalidArgument, "nil session")
}
myUUID, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error creating UUID: %v", err)
}
id := myUUID.String()
thisSession.Id = &pb.Session_Identifier{Id: id}
cameraID := thisSession.GetCamera().GetId()
sess := &session{
id: id,
cameraID: cameraID,
createTime: time.Now(),
toCamera: make(chan *pb.IceMessage, 100),
toClient: make(chan *pb.IceMessage, 100),
}
s.mu.Lock()
ch := s.sessionsByCamera[cameraID]
s.sessionsById[id] = sess
s.mu.Unlock()
ch <- sess
return connect.NewResponse(thisSession), nil
}
func (s *Server) PopSession(ctx context.Context, request *connect.Request[pb.PopSessionRequest]) (*connect.Response[pb.Session], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
s.mu.Lock()
if _, ok := s.camerasByHome[authToken.Home]; !ok {
s.mu.Unlock()
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
if _, ok := s.camerasByHome[authToken.Home][authToken.Uid]; !ok {
s.mu.Unlock()
return nil, status.Errorf(codes.Unauthenticated, "you are not a camera")
}
ch := s.sessionsByCamera[authToken.Uid]
s.mu.Unlock()
sess := <-ch
if sess == nil {
return nil, status.Errorf(codes.DataLoss, "someone else stole the session")
}
return connect.NewResponse(&pb.Session{
Id: &pb.Session_Identifier{
Id: sess.id,
},
}), nil
}
func (s *Server) CreateIceMessage(ctx context.Context, request *connect.Request[pb.CreateIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
req := request.Msg
s.mu.Lock()
defer s.mu.Unlock()
session, ok := s.sessionsById[req.GetSessionIdentifier().GetId()]
if !ok {
return nil, status.Errorf(codes.NotFound, "unknown session")
}
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
session.createTime = time.Now()
msg := req.GetIceMessage()
if isCamera {
session.toClient <- msg
} else {
session.toCamera <- msg
}
return connect.NewResponse(msg), nil
}
func (s *Server) PopIceMessage(ctx context.Context, request *connect.Request[pb.PopIceMessageRequest]) (*connect.Response[pb.IceMessage], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
req := request.Msg
s.mu.Lock()
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
_, isCamera := s.camerasByHome[authToken.Home][authToken.Uid]
session := s.sessionsById[req.GetSessionIdentifier().GetId()]
session.createTime = time.Now()
s.mu.Unlock()
var msg *pb.IceMessage
if isCamera {
msg = <-session.toCamera
} else {
msg = <-session.toClient
}
return connect.NewResponse(msg), nil
}
func (s *Server) CreateSample(ctx context.Context, request *connect.Request[pb.CreateSampleRequest]) (*connect.Response[pb.Sample], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
cam, ok := s.camerasByHome[authToken.Home][authToken.Uid]
if !ok {
return nil, status.Errorf(codes.Unauthenticated, "you are not a camera")
}
if _, ok := s.samplesByCamera[authToken.Uid]; !ok {
s.samplesByCamera[authToken.Uid] = make(map[pb.Sample_Type]*pb.Sample)
}
sample := request.Msg.GetSample()
s.samplesByCamera[authToken.Uid][sample.Type] = &pb.Sample{
Type: sample.Type,
Reading: sample.Reading,
CameraId: &pb.Camera_Identifier{
Id: cam.id,
},
}
return connect.NewResponse(s.samplesByCamera[authToken.Uid][sample.Type]), nil
}
func (s *Server) ListSamples(ctx context.Context, request *connect.Request[pb.ListSamplesRequest]) (*connect.Response[pb.ListSamplesResponse], error) {
authToken, err := getAuthToken(request)
if err != nil {
return nil, err
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.camerasByHome[authToken.Home]; !ok {
return nil, status.Errorf(codes.NotFound, "home %q not found", authToken.Home)
}
var samples []*pb.Sample
for camera := range s.camerasByHome[authToken.Home] {
if sample, ok := s.samplesByCamera[camera]; ok {
for _, sample := range sample {
samples = append(samples, sample)
}
}
}
return connect.NewResponse(&pb.ListSamplesResponse{
Samples: samples,
}), nil
}
func (s *Server) logSensorValues() {
ticker := time.NewTicker(time.Second * 10)
for range ticker.C {
func() {
s.mu.Lock()
defer s.mu.Unlock()
for home := range s.camerasByHome {
for camera := range s.camerasByHome[home] {
samples, ok := s.samplesByCamera[camera]
if !ok {
continue
}
for _, sample := range samples {
switch sample.Type {
case pb.Sample_TEMPERATURE_C:
temperatureValues.WithLabelValues(home, camera).Observe(sample.GetReading())
case pb.Sample_PRESSURE:
pressureValues.WithLabelValues(home, camera).Observe(sample.GetReading())
case pb.Sample_HUMIDITY:
humidityValues.WithLabelValues(home, camera).Observe(sample.GetReading())
}
}
}
}
}()
}
}
func (s *Server) cleanup() {
ticker := time.NewTicker(time.Minute * 5)
for t := range ticker.C {
func() {
log.Printf("Starting cleanup")
s.mu.Lock()
defer s.mu.Unlock()
log.Printf("Cleanup locked")
// Look for any stale sessions
staleSessionsByCamera := make(map[string]*session)
for _, session := range s.sessionsById {
if t.Sub(session.createTime) > time.Minute {
if prev, ok := staleSessionsByCamera[session.cameraID]; ok {
// Only use this session if it was created after the previous one
if prev.createTime.Before(session.createTime) {
staleSessionsByCamera[session.cameraID] = session
}
} else {
staleSessionsByCamera[session.cameraID] = session
}
}
}
log.Printf("Removing stale sessions")
// TODO: how do we prevent sessions from accumlating if cameras don't pick up on the request?
}()
}
}
func getAuthToken[T any](req *connect.Request[T]) (*internalpb.AuthToken, error) {
authHeader := req.Header().Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
return nil, fmt.Errorf("invalid authorization token")
}
authHeader = authHeader[len("Bearer "):]
bytes, err := base64.URLEncoding.DecodeString(authHeader)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (extract)")
}
authToken := &internalpb.AuthToken{}
if err := proto.Unmarshal(bytes, authToken); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "malformed authorization header (parse)")
}
return authToken, nil
}