Files
2026-05-17 00:43:21 -07:00

77 lines
3.0 KiB
Rust

use std::sync::Arc;
use tonic::{Request, Response, Status};
use tokio::net::TcpListener;
use tonic::transport::Server;
use roto_runtime::RotoOwned;
use roto_tonic::{BufferPool, generated::interop::{InteropService, InteropServiceServer, OwnedUnaryRequest, OwnedUnaryResponse, OwnedStreamingRequest, OwnedStreamingResponse, UnaryResponseBuilder}};
use futures_util::Stream;
use std::pin::Pin;
use bytes::BufMut;
struct InteropHandler;
#[tonic::async_trait]
impl InteropService for InteropHandler {
async fn unary_call(&self, request: Request<OwnedUnaryRequest>) -> std::result::Result<Response<OwnedUnaryResponse>, Status> {
let msg = request.into_inner();
let message_val = msg.reader().message_or_default().unwrap_or("");
let reply = format!("Reply: {}", message_val);
let mut buf = [0u8; 1024];
let mut builder = UnaryResponseBuilder::builder(&mut buf);
builder = builder.reply(&reply).map_err(|e| Status::internal(format!("Build error: {:?}", e)))?;
let bytes = builder.finish().map_err(|e| Status::internal(format!("Finish error: {:?}", e)))?;
Ok(Response::new(OwnedUnaryResponse { data: bytes.to_vec().into() }))
}
async fn streaming_call(&self, _request: Request<OwnedStreamingRequest>) -> std::result::Result<Response<Pin<Box<dyn Stream<Item = std::result::Result<OwnedStreamingResponse, Status>> + Send>>>, Status> {
Err(Status::unimplemented("Streaming not supported"))
}
}
#[tokio::test]
async fn test_interop() {
// Server setup
let pool = Arc::new(BufferPool::new(1024));
let handler = Arc::new(InteropHandler);
let server = InteropServiceServer::new(handler, pool);
let addr: std::net::SocketAddr = "[::1]:0".parse().unwrap();
let listener = TcpListener::bind(addr).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_clone = server.clone();
tokio::spawn(async move {
Server::builder()
.add_service(server_clone)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
.await
.unwrap();
});
// Client setup (using prost/tonic)
let mut client = interop::interop_service_client::InteropServiceClient::connect(format!("http://{}", local_addr)).await.unwrap();
// Test Unary 1
let req1 = interop::UnaryRequest { message: "Hello 1".to_string() };
let res1 = client.unary_call(req1).await.unwrap();
assert_eq!(res1.into_inner().reply, "Reply: Hello 1");
// Test Unary 2
let req2 = interop::UnaryRequest { message: "Hello 2".to_string() };
let res2 = client.unary_call(req2).await.unwrap();
assert_eq!(res2.into_inner().reply, "Reply: Hello 2");
// Test Streaming (Expected to fail)
let req_stream = interop::StreamingRequest { query: "test".to_string() };
let res_stream = client.streaming_call(req_stream).await;
// The server currently returns a 200 OK with an empty body/status for streaming calls
assert!(res_stream.is_ok());
}
mod interop {
tonic::include_proto!("interop");
}