Misc improvements/fixes for USDPL

This commit is contained in:
NGnius (Graham) 2023-08-01 11:40:25 -04:00
parent edfe8c24d1
commit a8044bff66
7 changed files with 81 additions and 54 deletions

4
Cargo.lock generated
View file

@ -417,7 +417,7 @@ checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]] [[package]]
name = "nrpc" name = "nrpc"
version = "0.8.0" version = "0.10.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
@ -427,7 +427,7 @@ dependencies = [
[[package]] [[package]]
name = "nrpc-build" name = "nrpc-build"
version = "0.8.0" version = "0.10.0"
dependencies = [ dependencies = [
"nrpc", "nrpc",
"prettyplease 0.2.9", "prettyplease 0.2.9",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "nrpc-build" name = "nrpc-build"
version = "0.8.0" version = "0.10.0"
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
repository = "https://github.com/NGnius/nRPC" repository = "https://github.com/NGnius/nRPC"
@ -21,4 +21,4 @@ quote = "1.0"
syn = "2.0" syn = "2.0"
proc-macro2 = "1.0" proc-macro2 = "1.0"
nrpc = { version = "0.8", path = "../nrpc" } nrpc = { version = "0.10", path = "../nrpc" }

View file

@ -47,9 +47,15 @@ impl ProtobufServiceGenerator {
} }
} }
fn stream_type(item_type: &syn::Ident) -> proc_macro2::TokenStream { fn stream_server_type(item_type: &syn::Ident) -> proc_macro2::TokenStream {
quote::quote!{ quote::quote!{
::nrpc::ServiceStream<'a, #item_type> ::nrpc::ServiceServerStream<'a, #item_type>
}
}
fn stream_client_type(item_type: &syn::Ident) -> proc_macro2::TokenStream {
quote::quote!{
::nrpc::ServiceClientStream<'a, #item_type>
} }
} }
@ -93,10 +99,10 @@ fn trait_methods_server(descriptors: &Vec<prost_build::Method>) -> proc_macro2::
(false, true) => { (false, true) => {
// client streaming; 1 -> many // client streaming; 1 -> many
//let stream_out_ty = stream_type_static_lifetime(&output_ty); //let stream_out_ty = stream_type_static_lifetime(&output_ty);
let stream_out_ty = stream_type(&output_ty); let stream_out_ty = stream_server_type(&output_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
async fn #fn_name<'a>(&mut self, input: #input_ty) -> Result<#stream_out_ty, Box<dyn std::error::Error + Send>>; async fn #fn_name<'a: 'b>(&mut self, input: #input_ty) -> Result<#stream_out_ty, Box<dyn std::error::Error + Send>>;
} }
); );
@ -124,10 +130,10 @@ fn trait_methods_server(descriptors: &Vec<prost_build::Method>) -> proc_macro2::
} }
(true, false) => { (true, false) => {
// server streaming; many -> 1 // server streaming; many -> 1
let stream_in_ty = stream_type(&input_ty); let stream_in_ty = stream_server_type(&input_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
async fn #fn_name<'a>(&mut self, input: #stream_in_ty) -> Result<#output_ty, Box<dyn std::error::Error + Send>>; async fn #fn_name<'a: 'b>(&mut self, input: #stream_in_ty) -> Result<#output_ty, Box<dyn std::error::Error + Send>>;
} }
); );
@ -145,11 +151,11 @@ fn trait_methods_server(descriptors: &Vec<prost_build::Method>) -> proc_macro2::
} }
(true, true) => { (true, true) => {
// all streaming; many -> many // all streaming; many -> many
let stream_in_ty = stream_type(&input_ty); let stream_in_ty = stream_server_type(&input_ty);
let stream_out_ty = stream_type(&output_ty); let stream_out_ty = stream_server_type(&output_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
async fn #fn_name<'a>(&mut self, input: #stream_in_ty) -> Result<#stream_out_ty, Box<dyn std::error::Error + Send>>; async fn #fn_name<'a: 'b>(&mut self, input: #stream_in_ty) -> Result<#stream_out_ty, Box<dyn std::error::Error + Send>>;
} }
); );
@ -186,11 +192,11 @@ fn trait_methods_server(descriptors: &Vec<prost_build::Method>) -> proc_macro2::
} }
}*/ }*/
async fn call<'a>( async fn call<'a: 'b>(
&mut self, &mut self,
method: &str, method: &str,
mut stream_in: ::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, mut stream_in: ::nrpc::ServiceServerStream<'a, ::nrpc::_helpers::bytes::Bytes>,
) -> Result<::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, ::nrpc::ServiceError> { ) -> Result<::nrpc::ServiceServerStream<'a, ::nrpc::_helpers::bytes::Bytes>, ::nrpc::ServiceError> {
match method { match method {
#(#gen_method_match_arms)* #(#gen_method_match_arms)*
_ => Err(::nrpc::ServiceError::MethodNotFound) _ => Err(::nrpc::ServiceError::MethodNotFound)
@ -232,10 +238,10 @@ fn struct_methods_client(
} }
(false, true) => { (false, true) => {
// client streaming; 1 -> many // client streaming; 1 -> many
let stream_out_ty = stream_type(&output_ty); let stream_out_ty = stream_client_type(&output_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
pub async fn #fn_name<'a>(&self, input: #input_ty) -> Result<#stream_out_ty, ::nrpc::ServiceError> { pub async fn #fn_name<'a: 'b>(&self, input: #input_ty) -> Result<#stream_out_ty, ::nrpc::ServiceError> {
let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new(); let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new();
input.encode(&mut in_buf)?; input.encode(&mut in_buf)?;
let in_stream = ::nrpc::OnceStream::once(Ok(in_buf.freeze())); let in_stream = ::nrpc::OnceStream::once(Ok(in_buf.freeze()));
@ -252,10 +258,10 @@ fn struct_methods_client(
} }
(true, false) => { (true, false) => {
// server streaming; many -> 1 // server streaming; many -> 1
let stream_in_ty = stream_type(&input_ty); let stream_in_ty = stream_client_type(&input_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
pub async fn #fn_name<'a>(&self, input: #stream_in_ty) -> Result<#output_ty, ::nrpc::ServiceError> { pub async fn #fn_name<'a: 'b>(&self, input: #stream_in_ty) -> Result<#output_ty, ::nrpc::ServiceError> {
let in_stream = input.map(|item_result| { let in_stream = input.map(|item_result| {
let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new(); let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new();
item_result.and_then(|item| item.encode(&mut in_buf) item_result.and_then(|item| item.encode(&mut in_buf)
@ -276,11 +282,11 @@ fn struct_methods_client(
} }
(true, true) => { (true, true) => {
// all streaming; many -> many // all streaming; many -> many
let stream_in_ty = stream_type(&input_ty); let stream_in_ty = stream_client_type(&input_ty);
let stream_out_ty = stream_type(&output_ty); let stream_out_ty = stream_client_type(&output_ty);
gen_methods.push( gen_methods.push(
quote! { quote! {
pub async fn #fn_name<'a>(&self, input: #stream_in_ty) -> Result<#stream_out_ty, ::nrpc::ServiceError> { pub async fn #fn_name<'a: 'b>(&self, input: #stream_in_ty) -> Result<#stream_out_ty, ::nrpc::ServiceError> {
let in_stream = input.map(|item_result| { let in_stream = input.map(|item_result| {
let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new(); let mut in_buf = ::nrpc::_helpers::bytes::BytesMut::new();
item_result.and_then(|item| item.encode(&mut in_buf) item_result.and_then(|item| item.encode(&mut in_buf)
@ -342,33 +348,35 @@ impl ServiceGenerator for ProtobufServiceGenerator {
use ::nrpc::_helpers::futures::StreamExt; use ::nrpc::_helpers::futures::StreamExt;
#[async_trait] #[async_trait]
pub trait #service_trait_name: Send { pub trait #service_trait_name<'b>: Send {
#service_trait_methods #service_trait_methods
} }
pub struct #service_struct_name<T: #service_trait_name> { pub struct #service_struct_name<'b, T: #service_trait_name<'b>> {
inner: T, inner: T,
_idc: std::marker::PhantomData<&'b ()>,
} }
impl <T: #service_trait_name> #service_struct_name<T> { impl <'b, T: #service_trait_name<'b>> #service_struct_name<'b, T> {
pub fn new(inner: T) -> Self { pub fn new(inner: T) -> Self {
Self { Self {
inner, inner,
_idc: Default::default(),
} }
} }
} }
#[async_trait] #[async_trait]
impl<T: #service_trait_name> ::nrpc::ServerService for #service_struct_name<T> { impl<'b, T: #service_trait_name<'b>> ::nrpc::ServerService<'b> for #service_struct_name<'b, T> {
fn descriptor(&self) -> &'static str { fn descriptor(&self) -> &'static str {
#descriptor_str #descriptor_str
} }
async fn call<'a>( async fn call<'a: 'b>(
&mut self, &mut self,
method: &str, method: &str,
input: ::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, input: ::nrpc::ServiceServerStream<'a, ::nrpc::_helpers::bytes::Bytes>,
) -> Result<::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, ::nrpc::ServiceError> { ) -> Result<::nrpc::ServiceServerStream<'a, ::nrpc::_helpers::bytes::Bytes>, ::nrpc::ServiceError> {
self.inner.call(method, input).await self.inner.call(method, input).await
} }
} }
@ -400,20 +408,22 @@ impl ServiceGenerator for ProtobufServiceGenerator {
use ::nrpc::_helpers::futures::StreamExt; use ::nrpc::_helpers::futures::StreamExt;
//#[derive(core::any::Any)] //#[derive(core::any::Any)]
pub struct #service_struct_name<T: ::nrpc::ClientHandler> { pub struct #service_struct_name<'b, T: ::nrpc::ClientHandler<'b>> {
inner: T, inner: T,
_idc: std::marker::PhantomData<&'b ()>,
} }
impl <T: ::nrpc::ClientHandler> ::nrpc::ClientService for #service_struct_name<T> { impl <'b, T: ::nrpc::ClientHandler<'b>> ::nrpc::ClientService for #service_struct_name<'b, T> {
fn descriptor(&self) -> &'static str { fn descriptor(&self) -> &'static str {
#descriptor_str #descriptor_str
} }
} }
impl <T: ::nrpc::ClientHandler> #service_struct_name<T> { impl <'b, T: ::nrpc::ClientHandler<'b>> #service_struct_name<'b, T> {
pub fn new(inner: T) -> Self { pub fn new(inner: T) -> Self {
Self { Self {
inner, inner,
_idc: Default::default(),
} }
} }

View file

@ -131,7 +131,7 @@ async fn main() {
struct GreeterService; struct GreeterService;
#[async_trait::async_trait] #[async_trait::async_trait]
impl helloworld::IGreeter for GreeterService { impl helloworld::IGreeter<'_> for GreeterService {
async fn say_hello( async fn say_hello(
&mut self, &mut self,
input: helloworld::HelloRequest, input: helloworld::HelloRequest,
@ -147,7 +147,7 @@ impl helloworld::IGreeter for GreeterService {
&mut self, &mut self,
input: helloworld::HelloRequest, input: helloworld::HelloRequest,
) -> Result< ) -> Result<
::nrpc::ServiceStream<'a, helloworld::HelloReply>, ::nrpc::ServiceServerStream<'a, helloworld::HelloReply>,
Box<dyn std::error::Error + Send>, Box<dyn std::error::Error + Send>,
> { > {
let result = helloworld::HelloReply { let result = helloworld::HelloReply {
@ -159,7 +159,7 @@ impl helloworld::IGreeter for GreeterService {
async fn say_hello_many_to_one<'a>( async fn say_hello_many_to_one<'a>(
&mut self, &mut self,
mut input: ::nrpc::ServiceStream<'a, helloworld::HelloRequest>, mut input: ::nrpc::ServiceServerStream<'a, helloworld::HelloRequest>,
) -> Result<helloworld::HelloReply, Box<dyn Error + Send>>{ ) -> Result<helloworld::HelloReply, Box<dyn Error + Send>>{
let mut message = "Hello ".to_string(); let mut message = "Hello ".to_string();
while let Some(item_result) = input.next().await { while let Some(item_result) = input.next().await {
@ -173,9 +173,9 @@ impl helloworld::IGreeter for GreeterService {
async fn say_hello_many_to_many<'a>( async fn say_hello_many_to_many<'a>(
&mut self, &mut self,
input: ::nrpc::ServiceStream<'a, helloworld::HelloRequest>, input: ::nrpc::ServiceServerStream<'a, helloworld::HelloRequest>,
) -> Result< ) -> Result<
::nrpc::ServiceStream<'a, helloworld::HelloReply>, ::nrpc::ServiceServerStream<'a, helloworld::HelloReply>,
Box<dyn std::error::Error + Send>, Box<dyn std::error::Error + Send>,
>{ >{
Ok(Box::new(input.map(|item_result| item_result.map(|input| { Ok(Box::new(input.map(|item_result| item_result.map(|input| {
@ -191,7 +191,7 @@ impl helloworld::IGreeter for GreeterService {
struct ClientHandler; struct ClientHandler;
#[async_trait::async_trait] #[async_trait::async_trait]
impl nrpc::ClientHandler for ClientHandler { impl nrpc::ClientHandler<'_> for ClientHandler {
/*async fn call( /*async fn call(
&mut self, &mut self,
package: &str, package: &str,
@ -216,8 +216,8 @@ impl nrpc::ClientHandler for ClientHandler {
package: &str, package: &str,
service: &str, service: &str,
method: &str, method: &str,
input: ::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, input: ::nrpc::ServiceClientStream<'a, ::nrpc::_helpers::bytes::Bytes>,
) -> Result<::nrpc::ServiceStream<'a, ::nrpc::_helpers::bytes::Bytes>, ServiceError> { ) -> Result<::nrpc::ServiceClientStream<'a, ::nrpc::_helpers::bytes::Bytes>, ServiceError> {
println!( println!(
"call {}.{}/{} with data stream", "call {}.{}/{} with data stream",
package, service, method package, service, method

View file

@ -1,6 +1,6 @@
[package] [package]
name = "nrpc" name = "nrpc"
version = "0.8.0" version = "0.10.0"
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
repository = "https://github.com/NGnius/nRPC" repository = "https://github.com/NGnius/nRPC"
@ -12,3 +12,8 @@ prost = "0.11"
bytes = "1" bytes = "1"
async-trait = "0.1" async-trait = "0.1"
futures = "0.3" futures = "0.3"
[features]
default = ["client-send", "server-send"]
client-send = []
server-send = []

View file

@ -1,7 +1,7 @@
mod service; mod service;
mod stream_utils; mod stream_utils;
pub use service::{ClientHandler, ClientService, ServerService, ServiceError, ServiceStream}; pub use service::{ClientHandler, ClientService, ServerService, ServiceError, ServiceClientStream, ServiceServerStream};
pub use stream_utils::{EmptyStream, OnceStream, VecStream}; pub use stream_utils::{EmptyStream, OnceStream, VecStream};

View file

@ -1,28 +1,40 @@
use futures::Stream; use futures::Stream;
use core::marker::Unpin; use core::marker::Unpin;
pub type ServiceStream<'a, T> = Box<dyn Stream<Item=Result<T, ServiceError>> + Unpin + Send + 'a>; #[cfg(feature = "client-send")]
pub type ServiceClientStream<'a, T> = Box<dyn Stream<Item=Result<T, ServiceError>> + Unpin + Send + 'a>;
#[async_trait::async_trait] #[cfg(not(feature = "client-send"))]
pub trait ServerService { pub type ServiceClientStream<'a, T> = Box<dyn Stream<Item=Result<T, ServiceError>> + Unpin + 'a>;
#[cfg(feature = "server-send")]
pub type ServiceServerStream<'a, T> = Box<dyn Stream<Item=Result<T, ServiceError>> + Unpin + Send + 'a>;
#[cfg(not(feature = "server-send"))]
pub type ServiceServerStream<'a, T> = Box<dyn Stream<Item=Result<T, ServiceError>> + Unpin + 'a>;
#[cfg_attr(feature = "server-send", async_trait::async_trait)]
#[cfg_attr(not(feature = "server-send"), async_trait::async_trait(?Send))]
pub trait ServerService<'b> {
fn descriptor(&self) -> &'static str; fn descriptor(&self) -> &'static str;
async fn call<'a>( async fn call<'a: 'b>(
&mut self, &mut self,
method: &str, method: &str,
input: ServiceStream<'a, bytes::Bytes>, input: ServiceServerStream<'a, bytes::Bytes>,
) -> Result<ServiceStream<'a, bytes::Bytes>, ServiceError>; ) -> Result<ServiceServerStream<'a, bytes::Bytes>, ServiceError>;
} }
#[async_trait::async_trait] #[cfg_attr(feature = "client-send", async_trait::async_trait)]
pub trait ClientHandler { #[cfg_attr(not(feature = "client-send"), async_trait::async_trait(?Send))]
async fn call<'a>( pub trait ClientHandler<'b> {
async fn call<'a: 'b>(
&self, &self,
package: &str, package: &str,
service: &str, service: &str,
method: &str, method: &str,
input: ServiceStream<'a, bytes::Bytes>, input: ServiceClientStream<'a, bytes::Bytes>,
) -> Result<ServiceStream<'a, bytes::Bytes>, ServiceError>; ) -> Result<ServiceClientStream<'a, bytes::Bytes>, ServiceError>;
} }
pub trait ClientService { pub trait ClientService {