Compare commits

..

1 Commits

Author SHA1 Message Date
0ed045d73e add: implement basic server 2025-03-30 00:15:28 -07:00
14 changed files with 57 additions and 546 deletions

4
Cargo.lock generated
View File

@@ -654,9 +654,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.176"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "libredox"

View File

@@ -1,12 +0,0 @@
# Build Stage
FROM rust:latest AS builder
WORKDIR /app
COPY . .
RUN cargo build --release
# Runtime Stage
FROM debian:stable-slim
WORKDIR /app
COPY --from=builder /app/target/release/handler .
COPY --from=builder /app/target/release/skubelb .
CMD ["./handler"]

View File

@@ -1,16 +0,0 @@
MAJOR_VERSION = 0
MINOR_VERSION = 0
PATH_VERSION = 1
TAG = $(MAJOR_VERSION).$(MINOR_VERSION).$(PATH_VERSION)
build:
docker build . -t skubelb-handler:$(TAG)
docker tag skubelb-handler:$(TAG) us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kube:
cat kubernetes.yaml.tmpl | sed 's/TAG/$(TAG)/' > kubernetes.yaml
deploy: build kube
docker push us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:$(TAG)
kubectl apply -f kubernetes.yaml

169
README.md
View File

@@ -6,173 +6,14 @@ open a port on all nodes. When the request lands on any node,
it is forwarded to the correct pod via the network mesh kubernetes
is using. In theory, there is one a hop penalty.
But lets be honest. You're running with a single LB, probably on a
GCE free tier N1 VM. That extra hop doesn't matter.
But lets be honest. You're running with a single LB, probably a GCE free
tier N1 VM. That extra hop doesn't matter.
## Config
Configure nginx to do what you want, test it. Use any Node IP for your testing.
This will become the 'template_dir' in the argument to the LB.
This tool accepts an argument (rewrite_string) which will be replaced using these rules:
Move that directory (i.e., `/etc/nginx`) to somewhere new,
(i.e. `/etc/nginx-template/`).
1. For each line that contains rewrite string:
2. Copy the line once per node IP, replacing the string with host IPs
Make a workspace directory for this tool; it will write configs to this folder
before updating the symlink you created above. It needs to be persistent so on
server reboot the service starts ok (i.e., `mkdir /var/skubelb/`).
Create a symlink in the workspace which will point to the 'active' configuration;
this will be updated by the tool (i.e., `ln -s /etc/nginx-template /var/skubelb/nginx`).
Make a symlink from the old config directory to that symlink (i.e.,
`ln -s /var/skubelb/nginx /etc/nginx`). Two layers are symlinks are used so we can
have a non-root user run the rool when we setup the service.
Make sure the user running the tool has read access to the template folder, read-write
access to the workspace folder and config symlink.
Run the server with a command like:
```sh
skubelb --needle some_node_ip \
--workspace_dir /var/skubelb \
--config_symlink /var/skubelb/nginx \
--template_dir /etc/nginx-template
--listen 0.0.0.0:8888
```
Replacing `some_node_ip` with the node IP you used during the initial setup.
Next, configure the Kubernetes nodes to POST `http://loadbalancer:8888/register/${NODE_IP}` when
they started, and DELETE `http://loadbalancer:8888/register/${NODE_IP}` when they shutdown. The easiest
way to do this is with a daemonset; see the example below, or in daemon_set.yaml.
#### Running as a system service
Setup a user to run the service; make that user
with `useradd -M skubelb`, Prevent logins with `usermod -L skubelb`.
Make a workspace dir, `mkdir /var/skubelb/`, and give access to the
daemon user, `chown skubelb:skubelb /var/skubelb/`.
Add the systemd config to `/etc/systemd/system/skubelb.service`:
```toml
[Unit]
Description=Simple Kubernetes Load Balancer
After=network.target
StartLimitIntervalSec=0
[Service]
Type=simple
Restart=always
RestartSec=1
User=skubelb
ExecStart=/usr/local/bin/skubelb --needle some_node_ip \
--workspace-dir /var/skubelb \
--config-symlink /var/skubelb/nginx \
--template-dir /etc/nginx-template \
--listen 0.0.0.0:8888 \
--reload-cmd '/usr/bin/sudo systemctl reload nginx'
[Install]
WantedBy=multi-user.target
```
Make sure you update `--needle some_node_ip` with something
like `--needle 123.44.55.123`. The IP of node you tested with.
### Sample Kubernets configuration
Deploy this [daemon set](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
to your cluster, replacing `lb_address` with the address of your load balancer.
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30
```
NOTE: you should need to make an entry in the firewall to allow this request through. It is very important that the firewall entry has a source filter; it should only be allowed from the Kubernetes cluster. Nginx will forward traffic to any host that registers, and this could easily become a MitM vulnerability.
## Other tips
### Use 'upstream' in nginx
Do this:
```
upstream hosts {
server 10.182.0.36:30004;
server 10.182.0.39:30004;
}
server {
server_name git.tipsy.codes tipsy.codes;
location / {
proxy_pass http://hosts;
}
}
```
Rather than just writing out the IP in the proxy_pass.
### visudo to only allow the nginx reload command
Use `sudo visudo` to update the sudoers file and add this line:
```
skubelb ALL=(root) NOPASSWD: /usr/bin/systemctl reload nginx
```
This will prevent the user from running commands other than reload.

View File

@@ -1,54 +0,0 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: alpine/curl:latest
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ['sh', '-c', 'echo "Wait for heat death of universe" && sleep 999999d']
lifecycle:
preStop:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
postStart:
exec:
command:
- sh
- "-c"
- "curl -X POST 10.128.0.2:8888/register/${NODE_IP}"
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

View File

@@ -1,41 +0,0 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:0.0.1
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

View File

@@ -1,41 +0,0 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: skubelb
namespace: skubelb
labels:
k8s-app: skubelb
spec:
selector:
matchLabels:
name: skubelb
template:
metadata:
labels:
name: skubelb
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: skubelb
image: us-west4-docker.pkg.dev/nixernetes/images/skubelb-handler:TAG
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
command: ["sh", "-c", "./handler -s 10.128.0.2:8888 -l ${NODE_IP}"]
resources:
limits:
memory: 200Mi
requests:
cpu: 10m
memory: 100Mi
terminationGracePeriodSeconds: 30

View File

@@ -1,26 +1,21 @@
use std::process::Command;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use clap::Parser;
use skubelb::Rewriter;
use skubelb::Server;
use anyhow::{Result, anyhow};
use env_logger::Env;
use log::{info, warn};
use rouille::{Request, Response, router};
use anyhow::Result;
use rouille::{router, Request, Response};
/// Implements a HTTP server which allows clients to 'register'
/// themselves. Their IP address will be used to replace a
/// needle in a set of config files. This is intended to be
/// used as a low-cost way of enabling Kubernetes ingress
/// using nginx running on a machine that has a public port.
///
///
/// The needle is expected to be a dummy IP address; something
/// fairly unique. The goal is to replace nginx files, where
/// we often repeat lines if we want nginx to load balance between
@@ -33,7 +28,7 @@ struct Args {
/// and instead N lines (one per replacement) is added to
/// the output.
#[arg(short, long)]
needle: String,
rewrite_string: String,
/// The folder which contains the templates that
/// will be be searched for the needle.
@@ -41,7 +36,7 @@ struct Args {
template_dir: String,
/// The symlink that should be updated each time the config changes.
///
///
/// Symlinks are used because file updates are not atomic.
#[arg(short, long)]
config_symlink: String,
@@ -50,13 +45,8 @@ struct Args {
#[arg(short, long)]
workspace_dir: String,
/// Address to listen for http requests on.
#[arg(short, long, default_value_t = String::from("0.0.0.0:8080"))]
listen: String,
/// Command to reload nginx.
#[arg(short, long, default_value_t = String::from("sudo nginx -s reload"))]
reload_cmd: String,
}
fn main() {
@@ -64,92 +54,31 @@ fn main() {
env_logger::Builder::from_env(Env::default().default_filter_or("info")).init();
let args = Args::parse();
let rewriter = Rewriter::new(args.needle);
let server_impl = Arc::new(Mutex::new(Server::new(
rewriter,
args.workspace_dir,
args.template_dir,
args.config_symlink,
)));
let reload_command = args.reload_cmd.leak();
let reload_command: Vec<&str> = reload_command.split_ascii_whitespace().collect();
// Start cleanup thread
{
let server_impl = server_impl.clone();
thread::spawn(move || {
loop {
sleep(Duration::from_secs(30));
match cleanup_worker(&server_impl) {
Ok(_) => (),
Err(e) => {
warn!("Error cleaning up handlers {}", e);
}
}
}
});
}
let rewriter = Rewriter::new(args.rewrite_string);
let server_impl = Mutex::new(Server::new(rewriter, args.workspace_dir, args.template_dir, args.config_symlink));
rouille::start_server(args.listen, move |request| {
info!("Processing request: {:?}", request);
match handle(request, &server_impl) {
Ok((resp, reload)) => {
if reload && reload_command.len() > 0 {
let output = Command::new(reload_command[0])
.args(&reload_command[1..])
.output();
match output {
Ok(o) => {
info!("Ran {:?}; exit code: {}", reload_command, o.status);
info!(
"Ran {:?}; stdout: {}",
reload_command,
String::from_utf8_lossy(&o.stdout)
);
info!(
"Ran {:?}; stderr: {}",
reload_command,
String::from_utf8_lossy(&o.stderr)
);
}
Err(e) => {
warn!("Failed to run {:?}: {:?}", reload_command, e);
}
};
}
resp
}
Ok(resp) => resp,
Err(e) => {
warn!("{:?}", e);
Response {
status_code: 500,
..Response::empty_400()
}
Response{status_code: 500, ..Response::empty_400()}
}
}
});
}
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<(Response, bool)> {
fn handle(request: &Request, server_impl: &Mutex<Server>) -> Result<Response> {
router!(request,
(POST) (/register/{ip: String}) => {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.register(request, &ip)?;
let must_reload = server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, must_reload))
(POST) (/register) => {
server_impl.lock().unwrap().register(request)?;
Ok(Response{status_code: 200, ..Response::empty_204()})
},
(DELETE) (/register/{ip: String}) => {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.unregister(request, &ip)?;
let must_reload = server_impl.cleanup()?;
Ok((Response{status_code: 200, ..Response::empty_204()}, must_reload))
(DELETE) (/register) => {
server_impl.lock().unwrap().unregister(request)?;
Ok(Response{status_code: 200, ..Response::empty_204()})
},
_ => Ok((Response::empty_404(), false)),
_ => Ok(Response::empty_404()),
)
}
fn cleanup_worker(server_impl: &Mutex<Server>) -> Result<()> {
let mut server_impl = server_impl.lock().map_err(|_| anyhow!("failed to acquire lock"))?;
server_impl.cleanup()?;
Ok(())
}
}

View File

@@ -1,7 +1,6 @@
use anyhow::Result;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::path::Path;
use std::time::Instant;
use std::{
fs::{self, File},
io::{BufReader, prelude::*},
@@ -10,8 +9,6 @@ use std::{
pub struct Rewriter {
source: String,
replacements: HashSet<String>,
// When each replacement should be cleaned up
replacement_cleanup: HashMap<String, Instant>,
}
impl Rewriter {
@@ -19,18 +16,15 @@ impl Rewriter {
Self {
source,
replacements: HashSet::new(),
replacement_cleanup: HashMap::default(),
}
}
pub fn add_replacement(&mut self, replacement: String, cleanup: Instant) -> bool {
self.replacement_cleanup.insert(replacement.clone(), cleanup);
self.replacements.insert(replacement)
pub fn add_replacement(&mut self, replacement: String) {
self.replacements.insert(replacement);
}
pub fn remove_replacement(&mut self, replacement: &str) -> bool {
self.replacement_cleanup.remove(replacement);
self.replacements.remove(replacement)
pub fn remove_replacement(&mut self, replacement: &str) {
self.replacements.remove(replacement);
}
pub fn rewrite_folder(&self, src: &str, dst: &str) -> Result<()> {
@@ -64,75 +58,32 @@ impl Rewriter {
// Open 2 files; one to read and translate, and one to write.
let source_file = File::open(&src_path)?;
let mut dest_file = File::create(&dst_path)?;
let mut buff = Vec::with_capacity(2048);
let mut reader = BufReader::new(source_file);
let reader = BufReader::new(source_file);
while let Ok(count) = reader.read_until(b'\n', &mut buff) {
if count == 0 {
break;
}
for line in reader.lines() {
let line = line?;
// If the line is not subject to replacement, copy it and
// carry on.
let line = &buff[0..count];
let m = contains(&self.source.as_bytes(), line);
if m.is_none() {
dest_file.write(&line)?;
buff.clear();
if !line.contains(&self.source) {
writeln!(dest_file, "{}", line)?;
continue;
}
let m = m.unwrap();
let start = &line[0..m.0];
let end = &line[m.1..];
// Else, repeat the line multiple times, replacing the string
// in question
for replacement in &replacements {
dest_file.write(start)?;
dest_file.write(replacement.as_bytes())?;
dest_file.write(end)?;
let new_line = line.replace(&self.source, &replacement);
writeln!(dest_file, "{}", new_line)?;
}
buff.clear();
}
}
}
Ok(())
}
pub fn cleanup(&mut self) -> bool {
let now = Instant::now();
let mut to_remove = vec![];
for (name, when) in self.replacement_cleanup.iter() {
if when < &now {
to_remove.push(name.clone());
}
}
let will_cleanup = to_remove.len() > 0;
for name in to_remove {
self.remove_replacement(&name);
}
will_cleanup
}
}
fn contains(needle: &[u8], haystack: &[u8]) -> Option<(usize, usize)> {
let mut i = 0;
while i + needle.len() <= haystack.len() {
let mut j = 0;
while i+j < haystack.len() && j < needle.len() && haystack[i+j] == needle[j] {
j += 1;
}
if j == needle.len() {
return Some((i, i+j));
}
i += 1;
}
None
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use include_directory::{Dir, include_directory};
use tempdir::TempDir;
@@ -146,11 +97,10 @@ mod tests {
let src = testdata.path().join("testsrc");
let dst = TempDir::new("").unwrap();
let now = Instant::now();
let mut rewriter = Rewriter::new("to_be_replaced".into());
rewriter.add_replacement("abc".into(), now.checked_add(Duration::new(60*60*24, 0)).unwrap());
rewriter.add_replacement("def".into(), now);
rewriter.add_replacement("zyx".into(), now);
rewriter.add_replacement("abc".into());
rewriter.add_replacement("def".into());
rewriter.add_replacement("zyx".into());
rewriter.remove_replacement("zyx");
rewriter
.rewrite_folder(
@@ -163,20 +113,5 @@ mod tests {
assert!(
dir_diff::is_different(testdata.path().join("testdst"), dst.path()).unwrap() == false
);
// Trigger the cleanup, which should GC abc
let dst = TempDir::new("").unwrap();
rewriter.cleanup();
rewriter
.rewrite_folder(
src.as_os_str().to_str().unwrap(),
dst.path().as_os_str().to_str().unwrap(),
)
.unwrap();
// Validate that everything matches
assert!(
dir_diff::is_different(testdata.path().join("testdst_after_gc"), dst.path()).unwrap() == false
);
}
}

View File

@@ -1,13 +1,9 @@
use std::{
fs,
path::Path,
time::{Duration, Instant},
};
use std::{fs, path::Path};
use anyhow::{Context, Result, anyhow};
use chrono::Utc;
use log::info;
use rouille::Request;
use anyhow::{Context, Result};
use crate::Rewriter;
@@ -22,12 +18,7 @@ pub struct Server {
}
impl Server {
pub fn new(
rewriter: Rewriter,
workspace_dir: String,
template_dir: String,
config_dir: String,
) -> Self {
pub fn new(rewriter: Rewriter, workspace_dir: String, template_dir: String, config_dir: String) -> Self {
Self {
rewriter,
workspace_dir,
@@ -36,30 +27,19 @@ impl Server {
}
}
pub fn cleanup(&mut self) -> Result<bool> {
let cleaned_up = self.rewriter.cleanup();
if cleaned_up {
self.generate_config()?;
}
Ok(cleaned_up)
}
pub fn register(&mut self, _request: &Request, ip: &str) -> Result<()> {
pub fn register(&mut self, request: &Request) -> Result<()> {
let ip = request.remote_addr().ip().to_string();
info!("Registering {} as a handler", ip);
let cleanup_time = Instant::now()
.checked_add(Duration::from_secs(60))
.ok_or(anyhow!("failed to convert time"))?;
if self.rewriter.add_replacement(ip.to_string(), cleanup_time) {
self.generate_config()?;
}
self.rewriter.add_replacement(ip);
self.generate_config()?;
Ok(())
}
pub fn unregister(&mut self, _request: &Request, ip: &str) -> Result<()> {
pub fn unregister(&mut self, request: &Request) -> Result<()> {
let ip = request.remote_addr().ip().to_string();
info!("Deregistering {} as a handler", ip);
if self.rewriter.remove_replacement(ip) {
self.generate_config()?;
}
self.rewriter.remove_replacement(&ip);
self.generate_config()?;
Ok(())
}
@@ -71,13 +51,10 @@ impl Server {
let path = Path::new(&self.workspace_dir).join(&now.format("%Y/%m/%d/%s").to_string());
let path = path.as_os_str().to_str().unwrap();
fs::create_dir_all(path).with_context(|| "creating directory")?;
self.rewriter
.rewrite_folder(&self.template_dir, path)
.with_context(|| "generating configs")?;
self.rewriter.rewrite_folder(&self.template_dir, path).with_context(|| "generating configs")?;
// Finally, symlink it to the output folder; only support Linux for now
let symlink = Path::new(&self.workspace_dir).join("symlink.tmp");
std::os::unix::fs::symlink(path, &symlink).with_context(|| "creating symlink")?;
fs::rename(symlink, &self.config_dir).with_context(|| "renaming symlink")?;
fs::remove_file(&self.config_dir).with_context(|| "removing old symlink")?;
std::os::unix::fs::symlink(path, &self.config_dir).with_context(|| "updating symlink")?;
Ok(())
}
}
}

View File

@@ -1,4 +0,0 @@
This is a line
This is abc line
This is another line

View File

@@ -1,3 +0,0 @@
This is a abc line.
In a nested directory.

View File

@@ -1,4 +1,4 @@
This is a line
This is to_be_replaced line
This is another line
This is another line

View File

@@ -1,3 +1,3 @@
This is a to_be_replaced line.
In a nested directory.
In a nested directory.