188 lines
4.4 KiB
Go
188 lines
4.4 KiB
Go
package logs
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
// Logger represents a Kubernetes log tailer
|
|
type Logger struct {
|
|
// Deployment name
|
|
Deployment string
|
|
// Namespace name
|
|
Namespace string
|
|
// Optional: Pod name (if specified, will tail logs from this specific pod)
|
|
Pod string
|
|
// Optional: Container name (if specified, will tail logs from this specific container)
|
|
Container string
|
|
// Optional: Number of lines to tail from the end of the logs
|
|
// If not specified, will tail from the beginning
|
|
TailLines *int64
|
|
// Optional: Time to wait before giving up on tailing
|
|
Timeout time.Duration
|
|
}
|
|
|
|
func LoggerFromEnv() *Logger {
|
|
deployment := os.Getenv("RCON_DEPLOYMENT")
|
|
namespace := os.Getenv("RCON_NAMESPACE")
|
|
pod := os.Getenv("RCON_POD")
|
|
container := os.Getenv("RCON_CONTAINER")
|
|
|
|
return &Logger{
|
|
Deployment: deployment,
|
|
Namespace: namespace,
|
|
Pod: pod,
|
|
Container: container,
|
|
}
|
|
}
|
|
|
|
// Tailer represents a running log tailer
|
|
type Tailer struct {
|
|
lines chan string
|
|
done chan struct{}
|
|
}
|
|
|
|
// NextLine returns a channel that will receive log lines
|
|
func (t *Tailer) NextLine() <-chan string {
|
|
return t.lines
|
|
}
|
|
|
|
// Stop stops the tailer and closes the line channel
|
|
func (t *Tailer) Stop() {
|
|
close(t.done)
|
|
}
|
|
|
|
// Start starts the log tailer
|
|
func (l *Logger) Start(ctx context.Context, client *kubernetes.Clientset) (*Tailer, func() error) {
|
|
tailer := &Tailer{
|
|
lines: make(chan string),
|
|
done: make(chan struct{}),
|
|
}
|
|
|
|
// Get pod name if not specified
|
|
var err error
|
|
podName := l.Pod
|
|
if podName == "" {
|
|
podName, err = l.getPodName(ctx, client)
|
|
if err != nil {
|
|
close(tailer.lines)
|
|
return tailer, func() error { return err }
|
|
}
|
|
}
|
|
|
|
// Get container name if not specified
|
|
containerName := l.Container
|
|
if containerName == "" {
|
|
containerName, err = l.getContainerName(ctx, client, podName)
|
|
if err != nil {
|
|
close(tailer.lines)
|
|
return tailer, func() error { return err }
|
|
}
|
|
}
|
|
|
|
// Start tailing logs
|
|
go l.tailLogs(ctx, client, podName, containerName, tailer)
|
|
|
|
return tailer, func() error {
|
|
tailer.Stop()
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (l *Logger) getPodName(ctx context.Context, client *kubernetes.Clientset) (string, error) {
|
|
// List pods with the deployment label
|
|
podList, err := client.CoreV1().Pods(l.Namespace).List(ctx, metav1.ListOptions{
|
|
LabelSelector: fmt.Sprintf("app=%s", l.Deployment),
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to list pods: %w", err)
|
|
}
|
|
|
|
if len(podList.Items) == 0 {
|
|
return "", fmt.Errorf("no pods found for deployment %s in namespace %s", l.Deployment, l.Namespace)
|
|
}
|
|
|
|
// Return the first pod that's running
|
|
for _, pod := range podList.Items {
|
|
if pod.Status.Phase == corev1.PodRunning {
|
|
return pod.Name, nil
|
|
}
|
|
}
|
|
|
|
// If no running pods, return the first pod
|
|
return podList.Items[0].Name, nil
|
|
}
|
|
|
|
func (l *Logger) getContainerName(ctx context.Context, client *kubernetes.Clientset, podName string) (string, error) {
|
|
pod, err := client.CoreV1().Pods(l.Namespace).Get(ctx, podName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to get pod %s: %w", podName, err)
|
|
}
|
|
|
|
if len(pod.Spec.Containers) == 0 {
|
|
return "", fmt.Errorf("pod %s has no containers", podName)
|
|
}
|
|
|
|
// Return the first container name
|
|
return pod.Spec.Containers[0].Name, nil
|
|
}
|
|
|
|
func (l *Logger) tailLogs(ctx context.Context, client *kubernetes.Clientset, podName, containerName string, tailer *Tailer) {
|
|
defer close(tailer.lines)
|
|
|
|
// Prepare log request
|
|
logOpts := &corev1.PodLogOptions{
|
|
Container: containerName,
|
|
Follow: true,
|
|
}
|
|
|
|
if l.TailLines != nil {
|
|
logOpts.TailLines = l.TailLines
|
|
}
|
|
|
|
// Create context with timeout if specified
|
|
if l.Timeout > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, l.Timeout)
|
|
defer cancel()
|
|
}
|
|
|
|
// Get logs stream
|
|
req := client.CoreV1().Pods(l.Namespace).GetLogs(podName, logOpts)
|
|
logsStream, err := req.Stream(ctx)
|
|
if err != nil {
|
|
select {
|
|
case tailer.lines <- fmt.Sprintf("Error: failed to get logs stream: %v", err):
|
|
case <-tailer.done:
|
|
return
|
|
}
|
|
return
|
|
}
|
|
defer logsStream.Close()
|
|
|
|
// Read logs line by line
|
|
scanner := bufio.NewScanner(logsStream)
|
|
for scanner.Scan() {
|
|
select {
|
|
case tailer.lines <- scanner.Text():
|
|
case <-tailer.done:
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
select {
|
|
case tailer.lines <- fmt.Sprintf("Error: failed to read logs: %v", err):
|
|
case <-tailer.done:
|
|
return
|
|
}
|
|
}
|
|
}
|