Compare commits

...

3 Commits

Author SHA1 Message Date
1dfdff35c3 add: dockerfile and deploy command 2025-10-17 05:09:41 +00:00
92c8e56f24 add: utility to constantly refresh skubelb 2025-10-17 05:09:41 +00:00
c4c4067106 add: cleanup routine to avoid stale handlers 2025-10-17 05:09:41 +00:00
12 changed files with 1149 additions and 37 deletions

871
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,5 +11,6 @@ dir-diff = "0.3.3"
env_logger = "0.11.7" env_logger = "0.11.7"
include_directory = "0.1.1" include_directory = "0.1.1"
log = "0.4.27" log = "0.4.27"
reqwest = { version = "0.12.23", features = ["blocking"] }
rouille = "3.6.2" rouille = "3.6.2"
tempdir = "0.3.7" tempdir = "0.3.7"

11
Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
# Build Stage
FROM rust:latest AS builder
WORKDIR /app
COPY . .
RUN cargo build --release
# Runtime Stage
FROM debian:stable-slim
WORKDIR /app
COPY --from=builder /app/target/release/handler .
CMD ["./handler"]

16
Makefile Normal file
View File

@@ -0,0 +1,16 @@
MAJOR_VERSION = 0
MINOR_VERSION = 0
PATH_VERSION = 1
TAG = $(MAJOR_VERSION).$(MINOR_VERSION).$(PATH_VERSION)
build:
docker build . -t skubelb-handler:$(TAG)
docker tag skubelb-handler:$(TAG) us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kube:
cat kubernetes.yaml.tmpl | sed 's/TAG/$(TAG)/' > kubernetes.yaml
deploy: build kube
docker push us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kubectl apply -f kubernetes.yaml

41
kubernetes.yaml Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:0.0.1
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

41
kubernetes.yaml.tmpl Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:TAG
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

43
src/bin/handler.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::thread::sleep;
use std::time::Duration;
use clap::Parser;
use log::{error, info};
use anyhow::Result;
use env_logger::Env;
/// Implements a client that constantly refreshes with the server.
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// The skubelb server we should register with
#[arg(short, long)]
server: String,
/// The listen address that should be sent to skubelb.
#[arg(short, long)]
listen: String,
}
fn main() {
// Log info and above by default
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse();
match handle(&args.server, &args.listen) {
Ok(_) => (),
Err(e) => error!("{}", e),
}
}
fn handle(remote: &str, listen: &str) -> Result<()> {
let client = reqwest::blocking::Client::new();
let url = format!("http://{}/register/{}", remote, listen);
loop {
sleep(Duration::from_secs(20));
info!("sending post to {}", url);
client.post(&url).send()?;
}
}

View File

@@ -1,15 +1,19 @@
use std::sync::Mutex;
use std::process::Command; use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use clap::Parser; use clap::Parser;
use skubelb::Rewriter; use skubelb::Rewriter;
use skubelb::Server; use skubelb::Server;
use anyhow::{Result, anyhow};
use env_logger::Env; use env_logger::Env;
use log::{info, warn}; use log::{info, warn};
use anyhow::Result; use rouille::{Request, Response, router};
use rouille::{router, Request, Response};
/// Implements a HTTP server which allows clients to 'register' /// Implements a HTTP server which allows clients to 'register'
/// themselves. Their IP address will be used to replace a /// themselves. Their IP address will be used to replace a
@@ -61,10 +65,31 @@ fn main() {
let args = Args::parse(); let args = Args::parse();
let rewriter = Rewriter::new(args.needle); let rewriter = Rewriter::new(args.needle);
let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink)); let server_impl = Arc::new(Mutex::new(Server::new(
rewriter,
args.workspace_dir,
args.template_dir,
args.config_symlink,
)));
let reload_command = args.reload_cmd.leak(); let reload_command = args.reload_cmd.leak();
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect(); let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
// Start cleanup thread
{
let server_impl = server_impl.clone();
thread::spawn(move || {
loop {
sleep(Duration::from_secs(30));
match cleanup_worker(&server_impl) {
Ok(_) => (),
Err(e) => {
warn!("Error cleaning up handlers {}", e);
}
}
}
});
}
rouille::start_server(args.listen, move |request| { rouille::start_server(args.listen, move |request| {
info!("Processing request: {:?}", request); info!("Processing request: {:?}", request);
match handle(request, &server_impl) { match handle(request, &server_impl) {
@@ -76,19 +101,30 @@ fn main() {
match output { match output {
Ok(o) => { Ok(o) => {
info!("Ran {:?}; exit code: {}", reload_command, o.status); info!("Ran {:?}; exit code: {}", reload_command, o.status);
info!("Ran {:?}; stdout: {}", reload_command, String::from_utf8_lossy(&o.stdout)); info!(
info!("Ran {:?}; stderr: {}", reload_command, String::from_utf8_lossy(&o.stderr)); "Ran {:?}; stdout: {}",
}, reload_command,
String::from_utf8_lossy(&o.stdout)
);
info!(
"Ran {:?}; stderr: {}",
reload_command,
String::from_utf8_lossy(&o.stderr)
);
}
Err(e) => { Err(e) => {
warn!("Failed to run {:?}: {:?}", reload_command, e); warn!("Failed to run {:?}: {:?}", reload_command, e);
} }
}; };
} }
resp resp
}, }
Err(e) => { Err(e) => {
warn!("{:?}", e); warn!("{:?}", e);
Response{status_code: 500, ..Response::empty_400()} Response {
status_code: 500,
..Response::empty_400()
}
} }
} }
}); });
@@ -97,13 +133,23 @@ fn main() {
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> { fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
router!(request, router!(request,
(POST) (/register/{ip: String}) => { (POST) (/register/{ip: String}) => {
server_impl.lock().unwrap().register(request, &ip)?; let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.register(request, &ip)?;
server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, true)) Ok((Response{status_code: 200, ..Response::empty_204()}, true))
}, },
(DELETE) (/register/{ip: String}) => { (DELETE) (/register/{ip: String}) => {
server_impl.lock().unwrap().unregister(request, &ip)?; let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.unregister(request, &ip)?;
server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, true)) Ok((Response{status_code: 200, ..Response::empty_204()}, true))
}, },
_ => Ok((Response::empty_404(), false)), _ => Ok((Response::empty_404(), false)),
) )
} }
fn cleanup_worker(server_impl: &Mutex<Server>) -> Result<()> {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.cleanup()?;
Ok(())
}

View File

@@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::path::Path; use std::path::Path;
use std::time::Instant;
use std::{ use std::{
fs::{self, File}, fs::{self, File},
io::{BufReader, prelude::*}, io::{BufReader, prelude::*},
@@ -9,6 +10,8 @@ use std::{
pub struct Rewriter { pub struct Rewriter {
source: String, source: String,
replacements: HashSet<String>, replacements: HashSet<String>,
// When each replacement should be cleaned up
replacement_cleanup: HashMap<String, Instant>,
} }
impl Rewriter { impl Rewriter {
@@ -16,15 +19,18 @@ impl Rewriter {
Self { Self {
source, source,
replacements: HashSet::new(), replacements: HashSet::new(),
replacement_cleanup: HashMap::default(),
} }
} }
pub fn add_replacement(&mut self, replacement: String) { pub fn add_replacement(&mut self, replacement: String, cleanup: Instant) {
self.replacements.insert(replacement); self.replacements.insert(replacement.clone());
self.replacement_cleanup.insert(replacement, cleanup);
} }
pub fn remove_replacement(&mut self, replacement: &str) { pub fn remove_replacement(&mut self, replacement: &str) {
self.replacements.remove(replacement); self.replacements.remove(replacement);
self.replacement_cleanup.remove(replacement);
} }
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> { pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
@@ -91,6 +97,20 @@ impl Rewriter {
} }
Ok(()) Ok(())
} }
pub fn cleanup(&mut self) {
let now = Instant::now();
let mut to_remove = vec![];
for (name, when) in self.replacement_cleanup.iter() {
if when < &now {
to_remove.push(name.clone());
}
}
for name in to_remove {
self.remove_replacement(&name);
}
}
} }
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> { fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
@@ -110,6 +130,8 @@ fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::{Duration, Instant};
use include_directory::{Dir, include_directory}; use include_directory::{Dir, include_directory};
use tempdir::TempDir; use tempdir::TempDir;
@@ -123,10 +145,11 @@ mod tests {
let src = testdata.path().join("testsrc"); let src = testdata.path().join("testsrc");
let dst = TempDir::new("").unwrap(); let dst = TempDir::new("").unwrap();
let now = Instant::now();
let mut rewriter = Rewriter::new("to_be_replaced".into()); let mut rewriter = Rewriter::new("to_be_replaced".into());
rewriter.add_replacement("abc".into()); rewriter.add_replacement("abc".into(), now.checked_add(Duration::new(60*60*24, 0)).unwrap());
rewriter.add_replacement("def".into()); rewriter.add_replacement("def".into(), now);
rewriter.add_replacement("zyx".into()); rewriter.add_replacement("zyx".into(), now);
rewriter.remove_replacement("zyx"); rewriter.remove_replacement("zyx");
rewriter rewriter
.rewrite_folder( .rewrite_folder(
@@ -139,5 +162,20 @@ mod tests {
assert!( assert!(
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
); );
// Trigger the cleanup, which should GC abc
let dst = TempDir::new("").unwrap();
rewriter.cleanup();
rewriter
.rewrite_folder(
src.as_os_str().to_str().unwrap(),
dst.path().as_os_str().to_str().unwrap(),
)
.unwrap();
// Validate that everything matches
assert!(
dir_diff::is_different(testdata.path().join("testdst_after_gc"), dst.path()).unwrap() == false
);
} }
} }

View File

@@ -1,9 +1,13 @@
use std::{fs, path::Path}; use std::{
fs,
path::Path,
time::{Duration, Instant},
};
use anyhow::{anyhow, Context, Result};
use chrono::Utc; use chrono::Utc;
use log::info; use log::info;
use rouille::Request; use rouille::Request;
use anyhow::{Context, Result};
use crate::Rewriter; use crate::Rewriter;
@@ -18,7 +22,12 @@ pub struct Server {
} }
impl Server { impl Server {
pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self { pub fn new(
rewriter: Rewriter,
workspace_dir: String,
template_dir: String,
config_dir: String,
) -> Self {
Self { Self {
rewriter, rewriter,
workspace_dir, workspace_dir,
@@ -27,9 +36,17 @@ impl Server {
} }
} }
pub fn cleanup(&mut self) -> Result<()> {
self.rewriter.cleanup();
Ok(())
}
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> { pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
info!("Registering {} as a handler", ip); info!("Registering {} as a handler", ip);
self.rewriter.add_replacement(ip.to_string()); let cleanup_time = Instant::now()
.checked_add(Duration::from_secs(60 * 5))
.ok_or(anyhow!("failed to convert time"))?;
self.rewriter.add_replacement(ip.to_string(), cleanup_time);
self.generate_config()?; self.generate_config()?;
Ok(()) Ok(())
} }
@@ -49,7 +66,9 @@ impl Server {
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string()); let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
let path = path.as_os_str().to_str().unwrap(); let path = path.as_os_str().to_str().unwrap();
fs::create_dir_all(path).with_context(|| "creating directory")?; fs::create_dir_all(path).with_context(|| "creating directory")?;
self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?; self.rewriter
.rewrite_folder(&self.template_dir, path)
.with_context(|| "generating configs")?;
// Finally, symlink it to the output folder; only support Linux for now // Finally, symlink it to the output folder; only support Linux for now
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp"); let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?; std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;

View File

@@ -0,0 +1,4 @@
This is a line
This is abc line
This is another line

View File

@@ -0,0 +1,3 @@
This is a abc line.
In a nested directory.