166 lines
5.3 KiB
Rust
166 lines
5.3 KiB
Rust
|
|
use std::pin::Pin;
|
||
|
|
use std::future::Future;
|
||
|
|
use std::task::{Context, Poll};
|
||
|
|
use std::sync::Arc;
|
||
|
|
use tonic::{transport::Server, Request, Response, Status};
|
||
|
|
use roto_tonic::RotoCodec;
|
||
|
|
use hello::{HelloWorldService, OwnedHelloRequest, OwnedHelloResponse};
|
||
|
|
use tower::Service;
|
||
|
|
use bytes::{Bytes, Buf, BufMut};
|
||
|
|
use tonic::body::BoxBody;
|
||
|
|
use futures_util::StreamExt;
|
||
|
|
use roto_runtime::{RotoOwned, RotoMessage};
|
||
|
|
use http_body_util::BodyExt;
|
||
|
|
use http_body::Body;
|
||
|
|
|
||
|
|
pub mod hello {
|
||
|
|
include!("../../proto/hello.rs");
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Default, Clone)]
|
||
|
|
pub struct MyHelloWorld {}
|
||
|
|
|
||
|
|
#[tonic::async_trait]
|
||
|
|
impl HelloWorldService for MyHelloWorld {
|
||
|
|
async fn hello_world(
|
||
|
|
&self,
|
||
|
|
request: Request<OwnedHelloRequest>,
|
||
|
|
) -> Result<Response<OwnedHelloResponse>, Status> {
|
||
|
|
let req = request.into_inner();
|
||
|
|
let reader = req.reader();
|
||
|
|
let name = reader.name().unwrap_or("Unknown");
|
||
|
|
|
||
|
|
let mut buf = vec![0u8; 1024];
|
||
|
|
let slice = hello::HelloResponseBuilder::builder(&mut buf)
|
||
|
|
.message(&format!("Hello {}!", name)).unwrap()
|
||
|
|
.finish().unwrap();
|
||
|
|
|
||
|
|
let reply = OwnedHelloResponse {
|
||
|
|
data: bytes::Bytes::copy_from_slice(slice),
|
||
|
|
};
|
||
|
|
|
||
|
|
Ok(Response::new(reply))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Tonic Glue ---
|
||
|
|
|
||
|
|
#[derive(Clone)]
|
||
|
|
pub struct HelloWorldServer {
|
||
|
|
inner: Arc<MyHelloWorld>,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl HelloWorldServer {
|
||
|
|
pub fn new(inner: MyHelloWorld) -> Self {
|
||
|
|
Self { inner: Arc::new(inner) }
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
impl tonic::server::NamedService for HelloWorldServer {
|
||
|
|
const NAME: &'static str = "hello.HelloWorldService";
|
||
|
|
}
|
||
|
|
|
||
|
|
struct StatusBody(Option<Bytes>);
|
||
|
|
|
||
|
|
impl Body for StatusBody {
|
||
|
|
type Data = Bytes;
|
||
|
|
type Error = Status;
|
||
|
|
|
||
|
|
fn poll_frame(
|
||
|
|
mut self: Pin<&mut Self>,
|
||
|
|
cx: &mut Context<'_>,
|
||
|
|
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||
|
|
if let Some(data) = self.0.take() {
|
||
|
|
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||
|
|
} else {
|
||
|
|
Poll::Ready(None)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||
|
|
type Response = http::Response<BoxBody>;
|
||
|
|
type Error = std::convert::Infallible;
|
||
|
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||
|
|
|
||
|
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||
|
|
Poll::Ready(Ok(()))
|
||
|
|
}
|
||
|
|
|
||
|
|
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
|
||
|
|
let inner = self.inner.clone();
|
||
|
|
println!("Server received request: {} {}", req.method(), req.uri());
|
||
|
|
|
||
|
|
Box::pin(async move {
|
||
|
|
let body = req.into_body();
|
||
|
|
let bytes_vec = body.collect().await.map_err(|e| {
|
||
|
|
println!("Body collect error: {}", e);
|
||
|
|
panic!("Body collect error: {}", e);
|
||
|
|
})?.to_bytes();
|
||
|
|
println!("Collected body bytes: {} bytes", bytes_vec.len());
|
||
|
|
|
||
|
|
if bytes_vec.len() < 5 {
|
||
|
|
println!("Body too short: {} bytes", bytes_vec.len());
|
||
|
|
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||
|
|
return Ok(http::Response::builder()
|
||
|
|
.status(200)
|
||
|
|
.body(res_body)
|
||
|
|
.unwrap());
|
||
|
|
}
|
||
|
|
|
||
|
|
let data = &bytes_vec[5..];
|
||
|
|
println!("Decoding request from {} bytes", data.len());
|
||
|
|
let request_msg = match OwnedHelloRequest::decode(Bytes::copy_from_slice(data)) {
|
||
|
|
Ok(msg) => msg,
|
||
|
|
Err(e) => {
|
||
|
|
println!("Decode error: {}", e);
|
||
|
|
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||
|
|
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
println!("Request decoded successfully");
|
||
|
|
let response = match inner.hello_world(Request::new(request_msg)).await {
|
||
|
|
Ok(res) => res,
|
||
|
|
Err(e) => {
|
||
|
|
println!("Service error: {}", e);
|
||
|
|
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||
|
|
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
let response_msg = response.into_inner();
|
||
|
|
let response_bytes = response_msg.bytes();
|
||
|
|
println!("Service responded with {} bytes", response_bytes.len());
|
||
|
|
|
||
|
|
let mut res_buf = vec![0u8; 5 + response_bytes.len()];
|
||
|
|
res_buf[0] = 0;
|
||
|
|
let len = response_bytes.len() as u32;
|
||
|
|
res_buf[1..5].copy_from_slice(&len.to_be_bytes());
|
||
|
|
res_buf[5..].copy_from_slice(&response_bytes);
|
||
|
|
|
||
|
|
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(res_buf))));
|
||
|
|
Ok(http::Response::builder()
|
||
|
|
.status(200)
|
||
|
|
.header("content-type", "application/grpc")
|
||
|
|
.body(res_body)
|
||
|
|
.unwrap())
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::main]
|
||
|
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||
|
|
let addr: std::net::SocketAddr = "[::1]:50051".parse()?;
|
||
|
|
let hello = MyHelloWorld::default();
|
||
|
|
|
||
|
|
println!("Server listening on {}", addr);
|
||
|
|
|
||
|
|
Server::builder()
|
||
|
|
.add_service(HelloWorldServer::new(hello))
|
||
|
|
.serve(addr)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|