Compare commits

...

10 Commits

16 changed files with 1626 additions and 40 deletions

1005
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,9 +5,11 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.97" anyhow = "1.0.97"
chrono = "0.4.40"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
dir-diff = "0.3.3" dir-diff = "0.3.3"
env_logger = "0.11.7" env_logger = "0.11.7"
include_directory = "0.1.1" include_directory = "0.1.1"
log = "0.4.27" log = "0.4.27"
rouille = "3.6.2"
tempdir = "0.3.7" tempdir = "0.3.7"

12
Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
# Build Stage
FROM rust:latest AS builder
WORKDIR /app
COPY . .
RUN cargo build --release
# Runtime Stage
FROM debian:stable-slim
WORKDIR /app
COPY --from=builder /app/target/release/handler .
COPY --from=builder /app/target/release/skubelb .
CMD ["./handler"]

16
Makefile Normal file
View File

@@ -0,0 +1,16 @@
MAJOR_VERSION = 0
MINOR_VERSION = 0
PATH_VERSION = 1
TAG = $(MAJOR_VERSION).$(MINOR_VERSION).$(PATH_VERSION)
build:
docker build . -t skubelb-handler:$(TAG)
docker tag skubelb-handler:$(TAG) us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kube:
cat kubernetes.yaml.tmpl | sed 's/TAG/$(TAG)/' > kubernetes.yaml
deploy: build kube
docker push us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kubectl apply -f kubernetes.yaml

169
README.md
View File

@@ -6,14 +6,173 @@ open a port on all nodes. When the request lands on any node,
it is forwarded to the correct pod via the network mesh kubernetes it is forwarded to the correct pod via the network mesh kubernetes
is using. In theory, there is one a hop penalty. is using. In theory, there is one a hop penalty.
But lets be honest. You're running with a single LB, probably a GCE free But lets be honest. You're running with a single LB, probably on a
tier N1 VM. That extra hop doesn't matter. GCE free tier N1 VM. That extra hop doesn't matter.
## Config ## Config
Configure nginx to do what you want, test it. Use any Node IP for your testing. Configure nginx to do what you want, test it. Use any Node IP for your testing.
This tool accepts an argument (rewrite_string) which will be replaced using these rules: This will become the 'template_dir' in the argument to the LB.
1. For each line that contains rewrite string: Move that directory (i.e., `/etc/nginx`) to somewhere new,
2. Copy the line once per node IP, replacing the string with host IPs (i.e. `/etc/nginx-template/`).
Make a workspace directory for this tool; it will write configs to this folder
before updating the symlink you created above. It needs to be persistent so on
server reboot the service starts ok (i.e., `mkdir /var/skubelb/`).
Create a symlink in the workspace which will point to the 'active' configuration;
this will be updated by the tool (i.e., `ln -s /etc/nginx-template /var/skubelb/nginx`).
Make a symlink from the old config directory to that symlink (i.e.,
`ln -s /var/skubelb/nginx /etc/nginx`). Two layers are symlinks are used so we can
have a non-root user run the rool when we setup the service.
Make sure the user running the tool has read access to the template folder, read-write
access to the workspace folder and config symlink.
Run the server with a command like:
```sh
skubelb --needle some_node_ip \
--workspace_dir /var/skubelb \
--config_symlink /var/skubelb/nginx \
--template_dir /etc/nginx-template
--listen 0.0.0.0:8888
```
Replacing `some_node_ip` with the node IP you used during the initial setup.
Next, configure the Kubernetes nodes to POST `http://loadbalancer:8888/register/${NODE_IP}` when
they started, and DELETE `http://loadbalancer:8888/register/${NODE_IP}` when they shutdown. The easiest
way to do this is with a daemonset; see the example below, or in daemon_set.yaml.
#### Running as a system service
Setup a user to run the service; make that user
with `useradd -M skubelb`, Prevent logins with `usermod -L skubelb`.
Make a workspace dir, `mkdir /var/skubelb/`, and give access to the
daemon user, `chown skubelb:skubelb /var/skubelb/`.
Add the systemd config to `/etc/systemd/system/skubelb.service`:
```toml
[Unit]
Description=Simple Kubernetes Load Balancer
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=skubelb
ExecStart=/usr/local/bin/skubelb --needle some_node_ip \
--workspace-dir /var/skubelb \
--config-symlink /var/skubelb/nginx \
--template-dir /etc/nginx-template \
--listen 0.0.0.0:8888 \
--reload-cmd '/usr/bin/sudo systemctl reload nginx'
[Install]
WantedBy=multi-user.target
```
Make sure you update `--needle some_node_ip` with something
like `--needle 123.44.55.123`. The IP of node you tested with.
### Sample Kubernets configuration
Deploy this [daemon set](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
to your cluster, replacing `lb_address` with the address of your load balancer.
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30
```
NOTE: you should need to make an entry in the firewall to allow this request through. It is very important that the firewall entry has a source filter; it should only be allowed from the Kubernetes cluster. Nginx will forward traffic to any host that registers, and this could easily become a MitM vulnerability.
## Other tips
### Use 'upstream' in nginx
Do this:
```
upstream hosts {
server 10.182.0.36:30004;
server 10.182.0.39:30004;
}
server {
server_name git.tipsy.codes tipsy.codes;
location / {
proxy_pass http://hosts;
}
}
```
Rather than just writing out the IP in the proxy_pass.
### visudo to only allow the nginx reload command
Use `sudo visudo` to update the sudoers file and add this line:
```
skubelb ALL=(root) NOPASSWD: /usr/bin/systemctl reload nginx
```
This will prevent the user from running commands other than reload.

54
daemon_set.yaml Normal file
View File

@@ -0,0 +1,54 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

41
kubernetes.yaml Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:0.0.1
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

41
kubernetes.yaml.tmpl Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:TAG
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

5
src/lib.rs Normal file
View File

@@ -0,0 +1,5 @@
mod rewriter;
mod server;
pub use rewriter::*;
pub use server::*;

View File

@@ -1,17 +1,26 @@
use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use clap::Parser; use clap::Parser;
mod rewriter; use skubelb::Rewriter;
use skubelb::Server;
use anyhow::{Result, anyhow};
use env_logger::Env; use env_logger::Env;
use log::info; use log::{info, warn};
use rewriter::Rewriter; use rouille::{Request, Response, router};
/// Implements a HTTP server which allows clients to 'register' /// Implements a HTTP server which allows clients to 'register'
/// themselves. Their IP address will be used to replace a /// themselves. Their IP address will be used to replace a
/// needle in a set of config files. This is intended to be /// needle in a set of config files. This is intended to be
/// used as a low-cost way of enabling Kubernetes ingress /// used as a low-cost way of enabling Kubernetes ingress
/// using nginx running on a machine that has a public port. /// using nginx running on a machine that has a public port.
/// ///
/// The needle is expected to be a dummy IP address; something /// The needle is expected to be a dummy IP address; something
/// fairly unique. The goal is to replace nginx files, where /// fairly unique. The goal is to replace nginx files, where
/// we often repeat lines if we want nginx to load balance between /// we often repeat lines if we want nginx to load balance between
@@ -24,16 +33,30 @@ struct Args {
/// and instead N lines (one per replacement) is added to /// and instead N lines (one per replacement) is added to
/// the output. /// the output.
#[arg(short, long)] #[arg(short, long)]
rewrite_string: String, needle: String,
/// The folder which contains the templates that /// The folder which contains the templates that
/// will be be searched for the needle. /// will be be searched for the needle.
#[arg(short, long)] #[arg(short, long)]
source_folder: String, template_dir: String,
/// Where to write the replaced lines. /// The symlink that should be updated each time the config changes.
///
/// Symlinks are used because file updates are not atomic.
#[arg(short, long)] #[arg(short, long)]
dest_folder: String, config_symlink: String,
/// Where to actually store the generated configs.
#[arg(short, long)]
workspace_dir: String,
/// Address to listen for http requests on.
#[arg(short, long, default_value_t = String::from("0.0.0.0:8080"))]
listen: String,
/// Command to reload nginx.
#[arg(short, long, default_value_t = String::from("sudo nginx -s reload"))]
reload_cmd: String,
} }
fn main() { fn main() {
@@ -41,11 +64,92 @@ fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse(); let args = Args::parse();
let mut rewriter = Rewriter::new(args.rewrite_string); let rewriter = Rewriter::new(args.needle);
let server_impl = Arc::new(Mutex::new(Server::new(
rewriter,
args.workspace_dir,
args.template_dir,
args.config_symlink,
)));
let reload_command = args.reload_cmd.leak();
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
rewriter.add_replacement(String::from("abc")); // Start cleanup thread
rewriter {
.rewrite_folder(&args.source_folder, &args.dest_folder) let server_impl = server_impl.clone();
.unwrap(); thread::spawn(move || {
info!("Finished writing new config to {}", args.dest_folder); loop {
sleep(Duration::from_secs(30));
match cleanup_worker(&server_impl) {
Ok(_) => (),
Err(e) => {
warn!("Error cleaning up handlers {}", e);
}
}
}
});
}
rouille::start_server(args.listen, move |request| {
info!("Processing request: {:?}", request);
match handle(request, &server_impl) {
Ok((resp, reload)) => {
if reload && reload_command.len() > 0 {
let output = Command::new(reload_command[0])
.args(&reload_command[1..])
.output();
match output {
Ok(o) => {
info!("Ran {:?}; exit code: {}", reload_command, o.status);
info!(
"Ran {:?}; stdout: {}",
reload_command,
String::from_utf8_lossy(&o.stdout)
);
info!(
"Ran {:?}; stderr: {}",
reload_command,
String::from_utf8_lossy(&o.stderr)
);
}
Err(e) => {
warn!("Failed to run {:?}: {:?}", reload_command, e);
}
};
}
resp
}
Err(e) => {
warn!("{:?}", e);
Response {
status_code: 500,
..Response::empty_400()
}
}
}
});
}
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
router!(request,
(POST) (/register/{ip: String}) => {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.register(request, &ip)?;
let must_reload = server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, must_reload))
},
(DELETE) (/register/{ip: String}) => {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.unregister(request, &ip)?;
let must_reload = server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, must_reload))
},
_ => Ok((Response::empty_404(), false)),
)
}
fn cleanup_worker(server_impl: &Mutex<Server>) -> Result<()> {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.cleanup()?;
Ok(())
} }

View File

@@ -1,6 +1,7 @@
use anyhow::Result; use anyhow::Result;
use std::collections::HashSet; use std::collections::{HashMap, HashSet};
use std::path::Path; use std::path::Path;
use std::time::Instant;
use std::{ use std::{
fs::{self, File}, fs::{self, File},
io::{BufReader, prelude::*}, io::{BufReader, prelude::*},
@@ -9,6 +10,8 @@ use std::{
pub struct Rewriter { pub struct Rewriter {
source: String, source: String,
replacements: HashSet<String>, replacements: HashSet<String>,
// When each replacement should be cleaned up
replacement_cleanup: HashMap<String, Instant>,
} }
impl Rewriter { impl Rewriter {
@@ -16,15 +19,18 @@ impl Rewriter {
Self { Self {
source, source,
replacements: HashSet::new(), replacements: HashSet::new(),
replacement_cleanup: HashMap::default(),
} }
} }
pub fn add_replacement(&mut self, replacement: String) { pub fn add_replacement(&mut self, replacement: String, cleanup: Instant) -> bool {
self.replacements.insert(replacement); self.replacement_cleanup.insert(replacement.clone(), cleanup);
self.replacements.insert(replacement)
} }
pub fn remove_replacement(&mut self, replacement: &str) { pub fn remove_replacement(&mut self, replacement: &str) -> bool {
self.replacements.remove(replacement); self.replacement_cleanup.remove(replacement);
self.replacements.remove(replacement)
} }
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> { pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
@@ -58,32 +64,75 @@ impl Rewriter {
// Open 2 files; one to read and translate, and one to write. // Open 2 files; one to read and translate, and one to write.
let source_file = File::open(&src_path)?; let source_file = File::open(&src_path)?;
let mut dest_file = File::create(&dst_path)?; let mut dest_file = File::create(&dst_path)?;
let reader = BufReader::new(source_file); let mut buff = Vec::with_capacity(2048);
let mut reader = BufReader::new(source_file);
for line in reader.lines() { while let Ok(count) = reader.read_until(b'\n', &mut buff) {
let line = line?; if count == 0 {
break;
}
// If the line is not subject to replacement, copy it and // If the line is not subject to replacement, copy it and
// carry on. // carry on.
if !line.contains(&self.source) { let line = &buff[0..count];
writeln!(dest_file, "{}", line)?; let m = contains(&self.source.as_bytes(), line);
if m.is_none() {
dest_file.write(&line)?;
buff.clear();
continue; continue;
} }
let m = m.unwrap();
let start = &line[0..m.0];
let end = &line[m.1..];
// Else, repeat the line multiple times, replacing the string // Else, repeat the line multiple times, replacing the string
// in question // in question
for replacement in &replacements { for replacement in &replacements {
let new_line = line.replace(&self.source, &replacement); dest_file.write(start)?;
writeln!(dest_file, "{}", new_line)?; dest_file.write(replacement.as_bytes())?;
dest_file.write(end)?;
} }
buff.clear();
} }
} }
} }
Ok(()) Ok(())
} }
pub fn cleanup(&mut self) -> bool {
let now = Instant::now();
let mut to_remove = vec![];
for (name, when) in self.replacement_cleanup.iter() {
if when < &now {
to_remove.push(name.clone());
}
}
let will_cleanup = to_remove.len() > 0;
for name in to_remove {
self.remove_replacement(&name);
}
will_cleanup
}
}
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
let mut i = 0;
while i + needle.len() <= haystack.len() {
let mut j = 0;
while i+j < haystack.len() && j < needle.len() && haystack[i+j] == needle[j] {
j += 1;
}
if j == needle.len() {
return Some((i, i+j));
}
i += 1;
}
None
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::{Duration, Instant};
use include_directory::{Dir, include_directory}; use include_directory::{Dir, include_directory};
use tempdir::TempDir; use tempdir::TempDir;
@@ -97,10 +146,11 @@ mod tests {
let src = testdata.path().join("testsrc"); let src = testdata.path().join("testsrc");
let dst = TempDir::new("").unwrap(); let dst = TempDir::new("").unwrap();
let now = Instant::now();
let mut rewriter = Rewriter::new("to_be_replaced".into()); let mut rewriter = Rewriter::new("to_be_replaced".into());
rewriter.add_replacement("abc".into()); rewriter.add_replacement("abc".into(), now.checked_add(Duration::new(60*60*24, 0)).unwrap());
rewriter.add_replacement("def".into()); rewriter.add_replacement("def".into(), now);
rewriter.add_replacement("zyx".into()); rewriter.add_replacement("zyx".into(), now);
rewriter.remove_replacement("zyx"); rewriter.remove_replacement("zyx");
rewriter rewriter
.rewrite_folder( .rewrite_folder(
@@ -113,5 +163,20 @@ mod tests {
assert!( assert!(
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
); );
// Trigger the cleanup, which should GC abc
let dst = TempDir::new("").unwrap();
rewriter.cleanup();
rewriter
.rewrite_folder(
src.as_os_str().to_str().unwrap(),
dst.path().as_os_str().to_str().unwrap(),
)
.unwrap();
// Validate that everything matches
assert!(
dir_diff::is_different(testdata.path().join("testdst_after_gc"), dst.path()).unwrap() == false
);
} }
} }

83
src/server.rs Normal file
View File

@@ -0,0 +1,83 @@
use std::{
fs,
path::Path,
time::{Duration, Instant},
};
use anyhow::{Context, Result, anyhow};
use chrono::Utc;
use log::info;
use rouille::Request;
use crate::Rewriter;
pub struct Server {
rewriter: Rewriter,
// Where we write temporary files
workspace_dir: String,
// Directory to read configs from
template_dir: String,
// The symlink that is updated
config_dir: String,
}
impl Server {
pub fn new(
rewriter: Rewriter,
workspace_dir: String,
template_dir: String,
config_dir: String,
) -> Self {
Self {
rewriter,
workspace_dir,
template_dir,
config_dir,
}
}
pub fn cleanup(&mut self) -> Result<bool> {
let cleaned_up = self.rewriter.cleanup();
if cleaned_up {
self.generate_config()?;
}
Ok(cleaned_up)
}
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
info!("Registering {} as a handler", ip);
let cleanup_time = Instant::now()
.checked_add(Duration::from_secs(60))
.ok_or(anyhow!("failed to convert time"))?;
if self.rewriter.add_replacement(ip.to_string(), cleanup_time) {
self.generate_config()?;
}
Ok(())
}
pub fn unregister(&mut self, _request: &Request, ip: &str) -> Result<()> {
info!("Deregistering {} as a handler", ip);
if self.rewriter.remove_replacement(ip) {
self.generate_config()?;
}
Ok(())
}
pub fn generate_config(&self) -> Result<()> {
// Create a new directory in our workspace
let now = Utc::now();
// Writes into 2020/01/01/<unix timestamp>
// This will fail if we have multiple requests per second
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
let path = path.as_os_str().to_str().unwrap();
fs::create_dir_all(path).with_context(|| "creating directory")?;
self.rewriter
.rewrite_folder(&self.template_dir, path)
.with_context(|| "generating configs")?;
// Finally, symlink it to the output folder; only support Linux for now
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;
fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?;
Ok(())
}
}

View File

@@ -0,0 +1,4 @@
This is a line
This is abc line
This is another line

View File

@@ -0,0 +1,3 @@
This is a abc line.
In a nested directory.

View File

@@ -1,4 +1,4 @@
This is a line This is a line
This is to_be_replaced line This is to_be_replaced line
This is another line This is another line

View File

@@ -1,3 +1,3 @@
This is a to_be_replaced line. This is a to_be_replaced line.
In a nested directory. In a nested directory.