From 92aa2cd6a8a8b4f32b9b7fc3de311a2428541637 Mon Sep 17 00:00:00 2001 From: Charles Date: Mon, 29 Sep 2025 22:43:48 -0700 Subject: [PATCH] add: cleanup routine to avoid stale handlers --- src/main.rs | 74 +++++++++++++++---- src/rewriter.rs | 50 +++++++++++-- src/server.rs | 31 ++++++-- src/testdata/testdst_after_gc/hello.txt | 4 + .../testdst_after_gc/recursive/world.txt | 3 + 5 files changed, 136 insertions(+), 26 deletions(-) create mode 100644 src/testdata/testdst_after_gc/hello.txt create mode 100644 src/testdata/testdst_after_gc/recursive/world.txt diff --git a/src/main.rs b/src/main.rs index c11c1fb..68830bf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,22 +1,26 @@ -use std::sync::Mutex; use std::process::Command; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; +use std::thread::sleep; +use std::time::Duration; use clap::Parser; use skubelb::Rewriter; use skubelb::Server; +use anyhow::{Result, anyhow}; use env_logger::Env; use log::{info, warn}; -use anyhow::Result; -use rouille::{router, Request, Response}; +use rouille::{Request, Response, router}; /// Implements a HTTP server which allows clients to 'register' /// themselves. Their IP address will be used to replace a /// needle in a set of config files. This is intended to be /// used as a low-cost way of enabling Kubernetes ingress /// using nginx running on a machine that has a public port. -/// +/// /// The needle is expected to be a dummy IP address; something /// fairly unique. The goal is to replace nginx files, where /// we often repeat lines if we want nginx to load balance between @@ -37,7 +41,7 @@ struct Args { template_dir: String, /// The symlink that should be updated each time the config changes. - /// + /// /// Symlinks are used because file updates are not atomic. #[arg(short, long)] config_symlink: String, @@ -61,10 +65,31 @@ fn main() { let args = Args::parse(); let rewriter = Rewriter::new(args.needle); - let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink)); + let server_impl = Arc::new(Mutex::new(Server::new( + rewriter, + args.workspace_dir, + args.template_dir, + args.config_symlink, + ))); let reload_command = args.reload_cmd.leak(); let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect(); + // Start cleanup thread + { + let server_impl = server_impl.clone(); + thread::spawn(move || { + loop { + sleep(Duration::from_secs(30)); + match cleanup_worker(&server_impl) { + Ok(_) => (), + Err(e) => { + warn!("Error cleaning up handlers {}", e); + } + } + } + }); + } + rouille::start_server(args.listen, move |request| { info!("Processing request: {:?}", request); match handle(request, &server_impl) { @@ -76,19 +101,30 @@ fn main() { match output { Ok(o) => { info!("Ran {:?}; exit code: {}", reload_command, o.status); - info!("Ran {:?}; stdout: {}", reload_command, String::from_utf8_lossy(&o.stdout)); - info!("Ran {:?}; stderr: {}", reload_command, String::from_utf8_lossy(&o.stderr)); - }, + info!( + "Ran {:?}; stdout: {}", + reload_command, + String::from_utf8_lossy(&o.stdout) + ); + info!( + "Ran {:?}; stderr: {}", + reload_command, + String::from_utf8_lossy(&o.stderr) + ); + } Err(e) => { warn!("Failed to run {:?}: {:?}", reload_command, e); } }; } resp - }, + } Err(e) => { warn!("{:?}", e); - Response{status_code: 500, ..Response::empty_400()} + Response { + status_code: 500, + ..Response::empty_400() + } } } }); @@ -97,13 +133,23 @@ fn main() { fn handle(request: &Request, server_impl: &Mutex) -> Result<(Response, bool)> { router!(request, (POST) (/register/{ip: String}) => { - server_impl.lock().unwrap().register(request, &ip)?; + let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?; + server_impl.register(request, &ip)?; + server_impl.cleanup()?; Ok((Response{status_code: 200, ..Response::empty_204()}, true)) }, (DELETE) (/register/{ip: String}) => { - server_impl.lock().unwrap().unregister(request, &ip)?; + let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?; + server_impl.unregister(request, &ip)?; + server_impl.cleanup()?; Ok((Response{status_code: 200, ..Response::empty_204()}, true)) }, _ => Ok((Response::empty_404(), false)), ) -} \ No newline at end of file +} + +fn cleanup_worker(server_impl: &Mutex) -> Result<()> { + let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?; + server_impl.cleanup()?; + Ok(()) +} diff --git a/src/rewriter.rs b/src/rewriter.rs index 9c7dc16..9317c37 100644 --- a/src/rewriter.rs +++ b/src/rewriter.rs @@ -1,6 +1,7 @@ use anyhow::Result; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::Path; +use std::time::Instant; use std::{ fs::{self, File}, io::{BufReader, prelude::*}, @@ -9,6 +10,8 @@ use std::{ pub struct Rewriter { source: String, replacements: HashSet, + // When each replacement should be cleaned up + replacement_cleanup: HashMap, } impl Rewriter { @@ -16,15 +19,18 @@ impl Rewriter { Self { source, replacements: HashSet::new(), + replacement_cleanup: HashMap::default(), } } - pub fn add_replacement(&mut self, replacement: String) { - self.replacements.insert(replacement); + pub fn add_replacement(&mut self, replacement: String, cleanup: Instant) { + self.replacements.insert(replacement.clone()); + self.replacement_cleanup.insert(replacement, cleanup); } pub fn remove_replacement(&mut self, replacement: &str) { self.replacements.remove(replacement); + self.replacement_cleanup.remove(replacement); } pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> { @@ -91,6 +97,20 @@ impl Rewriter { } Ok(()) } + + pub fn cleanup(&mut self) { + let now = Instant::now(); + let mut to_remove = vec![]; + for (name, when) in self.replacement_cleanup.iter() { + if when < &now { + to_remove.push(name.clone()); + } + } + + for name in to_remove { + self.remove_replacement(&name); + } + } } fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> { @@ -110,6 +130,8 @@ fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> { #[cfg(test)] mod tests { + use std::time::{Duration, Instant}; + use include_directory::{Dir, include_directory}; use tempdir::TempDir; @@ -123,10 +145,11 @@ mod tests { let src = testdata.path().join("testsrc"); let dst = TempDir::new("").unwrap(); + let now = Instant::now(); let mut rewriter = Rewriter::new("to_be_replaced".into()); - rewriter.add_replacement("abc".into()); - rewriter.add_replacement("def".into()); - rewriter.add_replacement("zyx".into()); + rewriter.add_replacement("abc".into(), now.checked_add(Duration::new(60*60*24, 0)).unwrap()); + rewriter.add_replacement("def".into(), now); + rewriter.add_replacement("zyx".into(), now); rewriter.remove_replacement("zyx"); rewriter .rewrite_folder( @@ -139,5 +162,20 @@ mod tests { assert!( dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false ); + + // Trigger the cleanup, which should GC abc + let dst = TempDir::new("").unwrap(); + rewriter.cleanup(); + rewriter + .rewrite_folder( + src.as_os_str().to_str().unwrap(), + dst.path().as_os_str().to_str().unwrap(), + ) + .unwrap(); + + // Validate that everything matches + assert!( + dir_diff::is_different(testdata.path().join("testdst_after_gc"), dst.path()).unwrap() == false + ); } } diff --git a/src/server.rs b/src/server.rs index bf67785..d1acf23 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,9 +1,13 @@ -use std::{fs, path::Path}; +use std::{ + fs, + path::Path, + time::{Duration, Instant}, +}; +use anyhow::{anyhow, Context, Result}; use chrono::Utc; use log::info; use rouille::Request; -use anyhow::{Context, Result}; use crate::Rewriter; @@ -18,7 +22,12 @@ pub struct Server { } impl Server { - pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self { + pub fn new( + rewriter: Rewriter, + workspace_dir: String, + template_dir: String, + config_dir: String, + ) -> Self { Self { rewriter, workspace_dir, @@ -27,9 +36,17 @@ impl Server { } } + pub fn cleanup(&mut self) -> Result<()> { + self.rewriter.cleanup(); + Ok(()) + } + pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> { info!("Registering {} as a handler", ip); - self.rewriter.add_replacement(ip.to_string()); + let cleanup_time = Instant::now() + .checked_add(Duration::from_secs(60 * 5)) + .ok_or(anyhow!("failed to convert time"))?; + self.rewriter.add_replacement(ip.to_string(), cleanup_time); self.generate_config()?; Ok(()) } @@ -49,11 +66,13 @@ impl Server { let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string()); let path = path.as_os_str().to_str().unwrap(); fs::create_dir_all(path).with_context(|| "creating directory")?; - self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?; + self.rewriter + .rewrite_folder(&self.template_dir, path) + .with_context(|| "generating configs")?; // Finally, symlink it to the output folder; only support Linux for now let symlink = Path::new(&self.workspace_dir).join("symlink.tmp"); std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?; fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?; Ok(()) } -} \ No newline at end of file +} diff --git a/src/testdata/testdst_after_gc/hello.txt b/src/testdata/testdst_after_gc/hello.txt new file mode 100644 index 0000000..b90c337 --- /dev/null +++ b/src/testdata/testdst_after_gc/hello.txt @@ -0,0 +1,4 @@ +This is a line +This is abc line + +This is another line diff --git a/src/testdata/testdst_after_gc/recursive/world.txt b/src/testdata/testdst_after_gc/recursive/world.txt new file mode 100644 index 0000000..fbbc0e3 --- /dev/null +++ b/src/testdata/testdst_after_gc/recursive/world.txt @@ -0,0 +1,3 @@ +This is a abc line. + +In a nested directory.