Compare commits

..

1 Commits

Author SHA1 Message Date
99dc71ca40 add: rewriter logic and tests 2025-03-29 23:03:54 -07:00
10 changed files with 32 additions and 1388 deletions

999
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,11 +5,9 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.97" anyhow = "1.0.97"
chrono = "0.4.40"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
dir-diff = "0.3.3" dir-diff = "0.3.3"
env_logger = "0.11.7" env_logger = "0.11.7"
include_directory = "0.1.1" include_directory = "0.1.1"
log = "0.4.27" log = "0.4.27"
rouille = "3.6.2"
tempdir = "0.3.7" tempdir = "0.3.7"

169
README.md
View File

@@ -6,173 +6,14 @@ open a port on all nodes. When the request lands on any node,
it is forwarded to the correct pod via the network mesh kubernetes it is forwarded to the correct pod via the network mesh kubernetes
is using. In theory, there is one a hop penalty. is using. In theory, there is one a hop penalty.
But lets be honest. You're running with a single LB, probably on a But lets be honest. You're running with a single LB, probably a GCE free
GCE free tier N1 VM. That extra hop doesn't matter. tier N1 VM. That extra hop doesn't matter.
## Config ## Config
Configure nginx to do what you want, test it. Use any Node IP for your testing. Configure nginx to do what you want, test it. Use any Node IP for your testing.
This will become the 'template_dir' in the argument to the LB. This tool accepts an argument (rewrite_string) which will be replaced using these rules:
Move that directory (i.e., `/etc/nginx`) to somewhere new, 1. For each line that contains rewrite string:
(i.e. `/etc/nginx-template/`). 2. Copy the line once per node IP, replacing the string with host IPs
Make a workspace directory for this tool; it will write configs to this folder
before updating the symlink you created above. It needs to be persistent so on
server reboot the service starts ok (i.e., `mkdir /var/skubelb/`).
Create a symlink in the workspace which will point to the 'active' configuration;
this will be updated by the tool (i.e., `ln -s /etc/nginx-template /var/skubelb/nginx`).
Make a symlink from the old config directory to that symlink (i.e.,
`ln -s /var/skubelb/nginx /etc/nginx`). Two layers are symlinks are used so we can
have a non-root user run the rool when we setup the service.
Make sure the user running the tool has read access to the template folder, read-write
access to the workspace folder and config symlink.
Run the server with a command like:
```sh
skubelb --needle some_node_ip \
--workspace_dir /var/skubelb \
--config_symlink /var/skubelb/nginx \
--template_dir /etc/nginx-template
--listen 0.0.0.0:8888
```
Replacing `some_node_ip` with the node IP you used during the initial setup.
Next, configure the Kubernetes nodes to POST `http://loadbalancer:8888/register/${NODE_IP}` when
they started, and DELETE `http://loadbalancer:8888/register/${NODE_IP}` when they shutdown. The easiest
way to do this is with a daemonset; see the example below, or in daemon_set.yaml.
#### Running as a system service
Setup a user to run the service; make that user
with `useradd -M skubelb`, Prevent logins with `usermod -L skubelb`.
Make a workspace dir, `mkdir /var/skubelb/`, and give access to the
daemon user, `chown skubelb:skubelb /var/skubelb/`.
Add the systemd config to `/etc/systemd/system/skubelb.service`:
```toml
[Unit]
Description=Simple Kubernetes Load Balancer
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=skubelb
ExecStart=/usr/local/bin/skubelb --needle some_node_ip \
--workspace-dir /var/skubelb \
--config-symlink /var/skubelb/nginx \
--template-dir /etc/nginx-template \
--listen 0.0.0.0:8888 \
--reload-cmd '/usr/bin/sudo systemctl reload nginx'
[Install]
WantedBy=multi-user.target
```
Make sure you update `--needle some_node_ip` with something
like `--needle 123.44.55.123`. The IP of node you tested with.
### Sample Kubernets configuration
Deploy this [daemon set](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
to your cluster, replacing `lb_address` with the address of your load balancer.
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30
```
NOTE: you should need to make an entry in the firewall to allow this request through. It is very important that the firewall entry has a source filter; it should only be allowed from the Kubernetes cluster. Nginx will forward traffic to any host that registers, and this could easily become a MitM vulnerability.
## Other tips
### Use 'upstream' in nginx
Do this:
```
upstream hosts {
server 10.182.0.36:30004;
server 10.182.0.39:30004;
}
server {
server_name git.tipsy.codes tipsy.codes;
location / {
proxy_pass http://hosts;
}
}
```
Rather than just writing out the IP in the proxy_pass.
### visudo to only allow the nginx reload command
Use `sudo visudo` to update the sudoers file and add this line:
```
skubelb ALL=(root) NOPASSWD: /usr/bin/systemctl reload nginx
```
This will prevent the user from running commands other than reload.

View File

@@ -1,54 +0,0 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

View File

@@ -1,5 +0,0 @@
mod rewriter;
mod server;
pub use rewriter::*;
pub use server::*;

View File

@@ -1,15 +1,10 @@
use std::sync::Mutex;
use std::process::Command;
use clap::Parser; use clap::Parser;
use skubelb::Rewriter; mod rewriter;
use skubelb::Server;
use env_logger::Env; use env_logger::Env;
use log::{info, warn}; use log::info;
use anyhow::Result; use rewriter::Rewriter;
use rouille::{router, Request, Response};
/// Implements a HTTP server which allows clients to 'register' /// Implements a HTTP server which allows clients to 'register'
/// themselves. Their IP address will be used to replace a /// themselves. Their IP address will be used to replace a
@@ -29,30 +24,16 @@ struct Args {
/// and instead N lines (one per replacement) is added to /// and instead N lines (one per replacement) is added to
/// the output. /// the output.
#[arg(short, long)] #[arg(short, long)]
needle: String, rewrite_string: String,
/// The folder which contains the templates that /// The folder which contains the templates that
/// will be be searched for the needle. /// will be be searched for the needle.
#[arg(short, long)] #[arg(short, long)]
template_dir: String, source_folder: String,
/// The symlink that should be updated each time the config changes. /// Where to write the replaced lines.
///
/// Symlinks are used because file updates are not atomic.
#[arg(short, long)] #[arg(short, long)]
config_symlink: String, dest_folder: String,
/// Where to actually store the generated configs.
#[arg(short, long)]
workspace_dir: String,
/// Address to listen for http requests on.
#[arg(short, long, default_value_t = String::from("0.0.0.0:8080"))]
listen: String,
/// Command to reload nginx.
#[arg(short, long, default_value_t = String::from("sudo nginx -s reload"))]
reload_cmd: String,
} }
fn main() { fn main() {
@@ -60,50 +41,11 @@ fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse(); let args = Args::parse();
let rewriter = Rewriter::new(args.needle); let mut rewriter = Rewriter::new(args.rewrite_string);
let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink));
let reload_command = args.reload_cmd.leak();
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
rouille::start_server(args.listen, move |request| { rewriter.add_replacement(String::from("abc"));
info!("Processing request: {:?}", request); rewriter
match handle(request, &server_impl) { .rewrite_folder(&args.source_folder, &args.dest_folder)
Ok((resp, reload)) => { .unwrap();
if reload && reload_command.len() > 0 { info!("Finished writing new config to {}", args.dest_folder);
let output = Command::new(reload_command[0])
.args(&reload_command[1..])
.output();
match output {
Ok(o) => {
info!("Ran {:?}; exit code: {}", reload_command, o.status);
info!("Ran {:?}; stdout: {}", reload_command, String::from_utf8_lossy(&o.stdout));
info!("Ran {:?}; stderr: {}", reload_command, String::from_utf8_lossy(&o.stderr));
},
Err(e) => {
warn!("Failed to run {:?}: {:?}", reload_command, e);
}
};
}
resp
},
Err(e) => {
warn!("{:?}", e);
Response{status_code: 500, ..Response::empty_400()}
}
}
});
}
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
router!(request,
(POST) (/register/{ip: String}) => {
server_impl.lock().unwrap().register(request, &ip)?;
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
},
(DELETE) (/register/{ip: String}) => {
server_impl.lock().unwrap().unregister(request, &ip)?;
Ok((Response{status_code: 200, ..Response::empty_204()}, true))
},
_ => Ok((Response::empty_404(), false)),
)
} }

View File

@@ -51,41 +51,34 @@ impl Rewriter {
// Create the directory, then carry on. Note that we explore the // Create the directory, then carry on. Note that we explore the
// src_path after creating dst_path. // src_path after creating dst_path.
fs::create_dir(&dst_path)?; fs::create_dir(&dst_path)?;
println!("mkdir {:?}", dst_path);
to_visit.push(src_path.into_os_string().into_string().unwrap()); to_visit.push(src_path.into_os_string().into_string().unwrap());
continue; continue;
} }
// Open 2 files; one to read and translate, and one to write. // Open 2 files; one to read and translate, and one to write.
let source_file = File::open(&src_path)?; let source_file = File::open(&src_path)?;
println!("touch {:?}", dst_path);
let mut dest_file = File::create(&dst_path)?; let mut dest_file = File::create(&dst_path)?;
let mut buff = Vec::with_capacity(2048); let reader = BufReader::new(source_file);
let mut reader = BufReader::new(source_file);
while let Ok(count) = reader.read_until(b'\n', &mut buff) { for line in reader.lines() {
if count == 0 { let line = line?;
break;
}
// If the line is not subject to replacement, copy it and // If the line is not subject to replacement, copy it and
// carry on. // carry on.
let line = &buff[0..count]; if !line.contains(&self.source) {
let m = contains(&self.source.as_bytes(), line); println!("{}", line);
if m.is_none() { writeln!(dest_file, "{}", line)?;
dest_file.write(&line)?;
buff.clear();
continue; continue;
} }
let m = m.unwrap();
let start = &line[0..m.0];
let end = &line[m.1..];
// Else, repeat the line multiple times, replacing the string // Else, repeat the line multiple times, replacing the string
// in question // in question
for replacement in &replacements { for replacement in &replacements {
dest_file.write(start)?; let new_line = line.replace(&self.source, &replacement);
dest_file.write(replacement.as_bytes())?; println!("{}", new_line);
dest_file.write(end)?; writeln!(dest_file, "{}", new_line)?;
} }
buff.clear();
} }
} }
} }
@@ -93,21 +86,6 @@ impl Rewriter {
} }
} }
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
let mut i = 0;
while i + needle.len() <= haystack.len() {
let mut j = 0;
while i+j < haystack.len() && j < needle.len() && haystack[i+j] == needle[j] {
j += 1;
}
if j == needle.len() {
return Some((i, i+j));
}
i += 1;
}
None
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use include_directory::{Dir, include_directory}; use include_directory::{Dir, include_directory};

View File

@@ -1,59 +0,0 @@
use std::{fs, path::Path};
use chrono::Utc;
use log::info;
use rouille::Request;
use anyhow::{Context, Result};
use crate::Rewriter;
pub struct Server {
rewriter: Rewriter,
// Where we write temporary files
workspace_dir: String,
// Directory to read configs from
template_dir: String,
// The symlink that is updated
config_dir: String,
}
impl Server {
pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self {
Self {
rewriter,
workspace_dir,
template_dir,
config_dir,
}
}
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
info!("Registering {} as a handler", ip);
self.rewriter.add_replacement(ip.to_string());
self.generate_config()?;
Ok(())
}
pub fn unregister(&mut self, _request: &Request, ip: &str) -> Result<()> {
info!("Deregistering {} as a handler", ip);
self.rewriter.remove_replacement(ip);
self.generate_config()?;
Ok(())
}
pub fn generate_config(&self) -> Result<()> {
// Create a new directory in our workspace
let now = Utc::now();
// Writes into 2020/01/01/<unix timestamp>
// This will fail if we have multiple requests per second
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
let path = path.as_os_str().to_str().unwrap();
fs::create_dir_all(path).with_context(|| "creating directory")?;
self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?;
// Finally, symlink it to the output folder; only support Linux for now
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;
fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?;
Ok(())
}
}