77 lines
3.0 KiB
Rust
77 lines
3.0 KiB
Rust
|
|
use std::sync::Arc;
|
||
|
|
use tonic::{Request, Response, Status};
|
||
|
|
use tokio::net::TcpListener;
|
||
|
|
use tonic::transport::Server;
|
||
|
|
use roto_runtime::RotoOwned;
|
||
|
|
use roto_tonic::{BufferPool, generated::interop::{InteropService, InteropServiceServer, OwnedUnaryRequest, OwnedUnaryResponse, OwnedStreamingRequest, OwnedStreamingResponse, UnaryResponseBuilder}};
|
||
|
|
use futures_util::Stream;
|
||
|
|
use std::pin::Pin;
|
||
|
|
use bytes::BufMut;
|
||
|
|
|
||
|
|
struct InteropHandler;
|
||
|
|
|
||
|
|
#[tonic::async_trait]
|
||
|
|
impl InteropService for InteropHandler {
|
||
|
|
async fn unary_call(&self, request: Request<OwnedUnaryRequest>) -> std::result::Result<Response<OwnedUnaryResponse>, Status> {
|
||
|
|
let msg = request.into_inner();
|
||
|
|
let message_val = msg.reader().message_or_default().unwrap_or("");
|
||
|
|
let reply = format!("Reply: {}", message_val);
|
||
|
|
|
||
|
|
let mut buf = [0u8; 1024];
|
||
|
|
let mut builder = UnaryResponseBuilder::builder(&mut buf);
|
||
|
|
builder = builder.reply(&reply).map_err(|e| Status::internal(format!("Build error: {:?}", e)))?;
|
||
|
|
let bytes = builder.finish().map_err(|e| Status::internal(format!("Finish error: {:?}", e)))?;
|
||
|
|
|
||
|
|
Ok(Response::new(OwnedUnaryResponse { data: bytes.to_vec().into() }))
|
||
|
|
}
|
||
|
|
|
||
|
|
async fn streaming_call(&self, _request: Request<OwnedStreamingRequest>) -> std::result::Result<Response<Pin<Box<dyn Stream<Item = std::result::Result<OwnedStreamingResponse, Status>> + Send>>>, Status> {
|
||
|
|
Err(Status::unimplemented("Streaming not supported"))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[tokio::test]
|
||
|
|
async fn test_interop() {
|
||
|
|
// Server setup
|
||
|
|
let pool = Arc::new(BufferPool::new(1024));
|
||
|
|
let handler = Arc::new(InteropHandler);
|
||
|
|
let server = InteropServiceServer::new(handler, pool);
|
||
|
|
|
||
|
|
let addr: std::net::SocketAddr = "[::1]:0".parse().unwrap();
|
||
|
|
let listener = TcpListener::bind(addr).await.unwrap();
|
||
|
|
let local_addr = listener.local_addr().unwrap();
|
||
|
|
|
||
|
|
let server_clone = server.clone();
|
||
|
|
|
||
|
|
tokio::spawn(async move {
|
||
|
|
Server::builder()
|
||
|
|
.add_service(server_clone)
|
||
|
|
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
|
||
|
|
.await
|
||
|
|
.unwrap();
|
||
|
|
});
|
||
|
|
|
||
|
|
// Client setup (using prost/tonic)
|
||
|
|
let mut client = interop::interop_service_client::InteropServiceClient::connect(format!("http://{}", local_addr)).await.unwrap();
|
||
|
|
|
||
|
|
// Test Unary 1
|
||
|
|
let req1 = interop::UnaryRequest { message: "Hello 1".to_string() };
|
||
|
|
let res1 = client.unary_call(req1).await.unwrap();
|
||
|
|
assert_eq!(res1.into_inner().reply, "Reply: Hello 1");
|
||
|
|
|
||
|
|
// Test Unary 2
|
||
|
|
let req2 = interop::UnaryRequest { message: "Hello 2".to_string() };
|
||
|
|
let res2 = client.unary_call(req2).await.unwrap();
|
||
|
|
assert_eq!(res2.into_inner().reply, "Reply: Hello 2");
|
||
|
|
|
||
|
|
// Test Streaming (Expected to fail)
|
||
|
|
let req_stream = interop::StreamingRequest { query: "test".to_string() };
|
||
|
|
let res_stream = client.streaming_call(req_stream).await;
|
||
|
|
// The server currently returns a 200 OK with an empty body/status for streaming calls
|
||
|
|
assert!(res_stream.is_ok());
|
||
|
|
}
|
||
|
|
|
||
|
|
mod interop {
|
||
|
|
tonic::include_proto!("interop");
|
||
|
|
}
|