Add streaming method generation (untested)
This commit is contained in:
parent
68b7455c9e
commit
84cae5af7d
11 changed files with 903 additions and 379 deletions
517
Cargo.lock
generated
517
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -19,7 +19,7 @@ usdpl-core = { version = "0.11", path = "../usdpl-core"}
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
||||||
# gRPC/protobuf
|
# gRPC/protobuf
|
||||||
nrpc = "0.6"
|
nrpc = { version = "0.10", path = "../../nRPC/nrpc", default-features = false, features = [ "server-send" ] }
|
||||||
async-lock = "2.7"
|
async-lock = "2.7"
|
||||||
prost = "0.11"
|
prost = "0.11"
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
nrpc-build = { version = "0.7", path = "../../nRPC/nrpc-build" }
|
nrpc-build = { version = "0.10", path = "../../nRPC/nrpc-build" }
|
||||||
prost-build = "0.11"
|
prost-build = "0.11"
|
||||||
prost-types = "0.11"
|
prost-types = "0.11"
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,9 @@ fn generate_service_methods(
|
||||||
|
|
||||||
let mut input_params = Vec::with_capacity(input_type.field.len());
|
let mut input_params = Vec::with_capacity(input_type.field.len());
|
||||||
let mut params_to_fields = Vec::with_capacity(input_type.field.len());
|
let mut params_to_fields = Vec::with_capacity(input_type.field.len());
|
||||||
|
|
||||||
|
match (method.client_streaming, method.server_streaming) {
|
||||||
|
(false, false) => {
|
||||||
for field in &input_type.field {
|
for field in &input_type.field {
|
||||||
//let param_name = quote::format_ident!("val{}", i.to_string());
|
//let param_name = quote::format_ident!("val{}", i.to_string());
|
||||||
let type_enum = ProtobufType::from_field(field, &service.name, false);
|
let type_enum = ProtobufType::from_field(field, &service.name, false);
|
||||||
|
@ -97,6 +100,138 @@ fn generate_service_methods(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
},
|
||||||
|
(true, false) => {
|
||||||
|
// many -> 1
|
||||||
|
gen_methods.push(quote::quote! {
|
||||||
|
#[wasm_bindgen]
|
||||||
|
pub async fn #method_name(&self, generator: js_sys::Function) -> Option<#method_output> {
|
||||||
|
|
||||||
|
// function into Rust futures Stream
|
||||||
|
let stream = Box::new(::usdpl_front::wasm::JsFunctionStream::<#method_input>::from_function(generator));
|
||||||
|
|
||||||
|
match self.service.#method_name(stream).await {
|
||||||
|
Ok(x) => {
|
||||||
|
let x2: #method_output_as_in = x.into();
|
||||||
|
Some(x2.into_wasm())
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// log error
|
||||||
|
log::error!("service:{}|method:{}|error:{}", self.service.descriptor(), #method_name_str, e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
(false, true) => {
|
||||||
|
// 1 -> many
|
||||||
|
for field in &input_type.field {
|
||||||
|
//let param_name = quote::format_ident!("val{}", i.to_string());
|
||||||
|
let type_enum = ProtobufType::from_field(field, &service.name, false);
|
||||||
|
//let rs_type_name = type_enum.to_tokens();
|
||||||
|
let js_type_name = type_enum.to_wasm_tokens();
|
||||||
|
let rs_type_name = type_enum.to_tokens();
|
||||||
|
let field_name = quote::format_ident!(
|
||||||
|
"{}",
|
||||||
|
field
|
||||||
|
.name
|
||||||
|
.as_ref()
|
||||||
|
.expect("Protobuf message field needs a name")
|
||||||
|
);
|
||||||
|
input_params.push(quote::quote! {
|
||||||
|
#field_name: #js_type_name,
|
||||||
|
});
|
||||||
|
params_to_fields.push(quote::quote! {
|
||||||
|
#field_name: #rs_type_name::from_wasm(#field_name.into()),//: #field_name,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let params_to_fields_transformer = if input_type.field.len() == 1 {
|
||||||
|
let field_name = quote::format_ident!(
|
||||||
|
"{}",
|
||||||
|
input_type.field[0]
|
||||||
|
.name
|
||||||
|
.as_ref()
|
||||||
|
.expect("Protobuf message field needs a name")
|
||||||
|
);
|
||||||
|
quote::quote! {
|
||||||
|
let val = #method_input::from_wasm(#field_name.into());
|
||||||
|
}
|
||||||
|
} else if input_type.field.is_empty() {
|
||||||
|
quote::quote! {
|
||||||
|
let val = #method_input {};
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
quote::quote! {
|
||||||
|
let val = #method_input {
|
||||||
|
#(#params_to_fields)*
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
gen_methods.push(quote::quote! {
|
||||||
|
#[wasm_bindgen]
|
||||||
|
pub async fn #method_name(&self, #(#input_params)*, callback: js_sys::Function) {
|
||||||
|
|
||||||
|
#params_to_fields_transformer
|
||||||
|
|
||||||
|
match self.service.#method_name(val.into()).await {
|
||||||
|
Ok(x) => {
|
||||||
|
while let Some(next_result) = x.next().await {
|
||||||
|
match next_result {
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("service:{}|method:{}|error:{}", self.service.descriptor(), #method_name_str, e);
|
||||||
|
},
|
||||||
|
Ok(item) => {
|
||||||
|
callback.call1(JsValue::undefined(), item.into_wasm_streamable());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// log error
|
||||||
|
log::error!("service:{}|method:{}|error:{}", self.service.descriptor(), #method_name_str, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
(true, true) => {
|
||||||
|
// many -> many
|
||||||
|
gen_methods.push(quote::quote! {
|
||||||
|
#[wasm_bindgen]
|
||||||
|
pub async fn #method_name(&self, generator: js_sys::Function, callback: js_sys::Function) -> Option<#method_output> {
|
||||||
|
|
||||||
|
// function into Rust futures Stream
|
||||||
|
let stream = Box::new(::usdpl_front::wasm::JsFunctionStream::<#method_input>::from_function(generator));
|
||||||
|
|
||||||
|
match self.service.#method_name(stream).await {
|
||||||
|
Ok(x) => {
|
||||||
|
while let Some(next_result) = x.next().await {
|
||||||
|
match next_result {
|
||||||
|
Err(e) => {
|
||||||
|
log::error!("service:{}|method:{}|error:{}", self.service.descriptor(), #method_name_str, e);
|
||||||
|
},
|
||||||
|
Ok(item) => {
|
||||||
|
callback.call1(JsValue::undefined(), item.into_wasm_streamable());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// log error
|
||||||
|
log::error!("service:{}|method:{}|error:{}", self.service.descriptor(), #method_name_str, e);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
quote::quote! {
|
quote::quote! {
|
||||||
#(#gen_methods)*
|
#(#gen_methods)*
|
||||||
|
@ -198,6 +333,7 @@ fn generate_wasm_struct_interop(
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("Protobuf message needs a name")
|
.expect("Protobuf message needs a name")
|
||||||
);
|
);
|
||||||
|
let js_map_name = quote::format_ident!("{}", "js_map");
|
||||||
let mut gen_fields = Vec::with_capacity(descriptor.field.len());
|
let mut gen_fields = Vec::with_capacity(descriptor.field.len());
|
||||||
let mut gen_into_fields = Vec::with_capacity(descriptor.field.len());
|
let mut gen_into_fields = Vec::with_capacity(descriptor.field.len());
|
||||||
let mut gen_from_fields = Vec::with_capacity(descriptor.field.len());
|
let mut gen_from_fields = Vec::with_capacity(descriptor.field.len());
|
||||||
|
@ -288,17 +424,21 @@ fn generate_wasm_struct_interop(
|
||||||
} else if descriptor.field.len() == 1 {
|
} else if descriptor.field.len() == 1 {
|
||||||
let field = &descriptor.field[0];
|
let field = &descriptor.field[0];
|
||||||
//dbg!(descriptor, field);
|
//dbg!(descriptor, field);
|
||||||
let field_name = quote::format_ident!(
|
let field_name_str = field
|
||||||
"{}",
|
|
||||||
field
|
|
||||||
.name
|
.name
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("Protobuf message field needs a name")
|
.expect("Protobuf message field needs a name");
|
||||||
|
let field_name = quote::format_ident!(
|
||||||
|
"{}",
|
||||||
|
field_name_str
|
||||||
);
|
);
|
||||||
let type_enum = ProtobufType::from_field(field, service, is_known_map(field, known_maps));
|
let type_enum = ProtobufType::from_field(field, service, is_known_map(field, known_maps));
|
||||||
let type_name = type_enum.to_tokens();
|
let type_name = type_enum.to_tokens();
|
||||||
let wasm_type_name = type_enum.to_wasm_tokens();
|
let wasm_type_name = type_enum.to_wasm_tokens();
|
||||||
|
|
||||||
|
let into_wasm_streamable = quote::quote!{self.into_wasm_streamable()};
|
||||||
|
let from_wasm_streamable = quote::quote!{#type_name::from_wasm_streamable(js)};
|
||||||
|
|
||||||
quote::quote! {
|
quote::quote! {
|
||||||
pub type #msg_name = #type_name;
|
pub type #msg_name = #type_name;
|
||||||
pub type #msg_name_wasm = #wasm_type_name;
|
pub type #msg_name_wasm = #wasm_type_name;
|
||||||
|
@ -320,22 +460,41 @@ fn generate_wasm_struct_interop(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ::usdpl_front::wasm::FromWasmStreamableType for #msg_name {
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, ::usdpl_front::wasm::WasmStreamableConversionError> {
|
||||||
|
#from_wasm_streamable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::usdpl_front::wasm::IntoWasmStreamableType for #msg_name {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
#into_wasm_streamable
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#(#gen_nested_types)*
|
#(#gen_nested_types)*
|
||||||
|
|
||||||
#(#gen_enums)*
|
#(#gen_enums)*
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
let mut gen_into_wasm_streamable_fields = Vec::with_capacity(descriptor.field.len());
|
||||||
|
let mut gen_from_wasm_streamable_fields = Vec::with_capacity(descriptor.field.len());
|
||||||
|
|
||||||
for field in &descriptor.field {
|
for field in &descriptor.field {
|
||||||
let field_name = quote::format_ident!(
|
let field_name_str = field
|
||||||
"{}",
|
|
||||||
field
|
|
||||||
.name
|
.name
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.expect("Protobuf message field needs a name")
|
.expect("Protobuf message field needs a name");
|
||||||
|
let field_name = quote::format_ident!(
|
||||||
|
"{}",
|
||||||
|
field_name_str
|
||||||
);
|
);
|
||||||
let type_enum =
|
let type_enum =
|
||||||
ProtobufType::from_field(field, service, is_known_map(field, known_maps));
|
ProtobufType::from_field(field, service, is_known_map(field, known_maps));
|
||||||
let type_name = type_enum.to_tokens();
|
let type_name = type_enum.to_tokens();
|
||||||
|
|
||||||
|
let into_wasm_streamable = type_enum.to_into_wasm_streamable(field_name_str, &js_map_name);
|
||||||
|
let from_wasm_streamable = type_enum.to_from_wasm_streamable(field_name_str, &js_map_name);
|
||||||
//let wasm_type_name = type_enum.to_wasm_tokens();
|
//let wasm_type_name = type_enum.to_wasm_tokens();
|
||||||
gen_fields.push(quote::quote! {
|
gen_fields.push(quote::quote! {
|
||||||
pub #field_name: #type_name,
|
pub #field_name: #type_name,
|
||||||
|
@ -347,6 +506,9 @@ fn generate_wasm_struct_interop(
|
||||||
gen_from_fields.push(quote::quote! {
|
gen_from_fields.push(quote::quote! {
|
||||||
#field_name: <_>::from(other.#field_name),
|
#field_name: <_>::from(other.#field_name),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
gen_into_wasm_streamable_fields.push(into_wasm_streamable);
|
||||||
|
gen_from_wasm_streamable_fields.push(from_wasm_streamable);
|
||||||
}
|
}
|
||||||
|
|
||||||
let wasm_attribute_maybe =
|
let wasm_attribute_maybe =
|
||||||
|
@ -399,6 +561,23 @@ fn generate_wasm_struct_interop(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ::usdpl_front::wasm::FromWasmStreamableType for #msg_name {
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, ::usdpl_front::wasm::WasmStreamableConversionError> {
|
||||||
|
let #js_map_name = js_sys::Map::from(js);
|
||||||
|
Ok(Self {
|
||||||
|
#(#gen_from_wasm_streamable_fields)*
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ::usdpl_front::wasm::IntoWasmStreamableType for #msg_name {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
let #js_map_name = js_sys::Map::new();
|
||||||
|
#(#gen_into_wasm_streamable_fields)*
|
||||||
|
#js_map_name.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#(#gen_nested_types)*
|
#(#gen_nested_types)*
|
||||||
|
|
||||||
#(#gen_enums)*
|
#(#gen_enums)*
|
||||||
|
@ -551,6 +730,18 @@ impl ProtobufType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn to_into_wasm_streamable(&self, field_name: &str, js_map_name: &syn::Ident) -> proc_macro2::TokenStream {
|
||||||
|
//let type_tokens = self.to_tokens();
|
||||||
|
//let field_ident = quote::format_ident!("{}", field_name);
|
||||||
|
quote::quote!{#js_map_name.set(#field_name.into(), self.field_ident);}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn to_from_wasm_streamable(&self, field_name: &str, js_map_name: &syn::Ident) -> proc_macro2::TokenStream {
|
||||||
|
let type_tokens = self.to_tokens();
|
||||||
|
//let field_ident = quote::format_ident!("{}", field_name);
|
||||||
|
quote::quote!{#field_name: #type_tokens::from_wasm_streamable(#js_map_name.get(#field_name.into()))?,}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_wasm_enum_interop(
|
fn generate_wasm_enum_interop(
|
||||||
|
@ -815,8 +1006,10 @@ impl IServiceGenerator for WasmServiceGenerator {
|
||||||
use usdpl_front::_helpers::wasm_bindgen_futures;
|
use usdpl_front::_helpers::wasm_bindgen_futures;
|
||||||
use usdpl_front::_helpers::js_sys;
|
use usdpl_front::_helpers::js_sys;
|
||||||
use usdpl_front::_helpers::log;
|
use usdpl_front::_helpers::log;
|
||||||
|
use usdpl_front::_helpers::futures;
|
||||||
use ::nrpc::ClientService;
|
use usdpl_front::_helpers::futures::StreamExt;
|
||||||
|
use usdpl_front::_helpers::nrpc::ClientService;
|
||||||
|
use usdpl_front::wasm::{IntoWasmStreamableType, FromWasmStreamableType};
|
||||||
|
|
||||||
use usdpl_front::wasm::*;
|
use usdpl_front::wasm::*;
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ encrypt = ["usdpl-core/encrypt", "obfstr", "hex"]
|
||||||
[dependencies]
|
[dependencies]
|
||||||
wasm-bindgen = "0.2"
|
wasm-bindgen = "0.2"
|
||||||
wasm-bindgen-futures = "0.4"
|
wasm-bindgen-futures = "0.4"
|
||||||
gloo-net = { version = "0.2", features = ["websocket"] }
|
gloo-net = { version = "0.3", features = ["websocket"] }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
console_log = { version = "1.0", optional = true, features = ["color"] }
|
console_log = { version = "1.0", optional = true, features = ["color"] }
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ async-channel = "1.8"
|
||||||
obfstr = { version = "0.3", optional = true }
|
obfstr = { version = "0.3", optional = true }
|
||||||
hex = { version = "0.4", optional = true }
|
hex = { version = "0.4", optional = true }
|
||||||
|
|
||||||
nrpc = { version = "0.7", path = "../../nRPC/nrpc" }
|
nrpc = { version = "0.10", path = "../../nRPC/nrpc", default-features = false}
|
||||||
usdpl-core = { version = "0.11", path = "../usdpl-core" }
|
usdpl-core = { version = "0.11", path = "../usdpl-core" }
|
||||||
prost = "0.11"
|
prost = "0.11"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt, future::{select, Either}};
|
||||||
use gloo_net::websocket::{futures::WebSocket, Message};
|
use gloo_net::websocket::{futures::WebSocket, Message, State};
|
||||||
use nrpc::{ClientHandler, ServiceError, _helpers::async_trait, _helpers::bytes};
|
use nrpc::{ClientHandler, ServiceError, ServiceClientStream, _helpers::async_trait, _helpers::bytes};
|
||||||
use wasm_bindgen_futures::spawn_local;
|
use wasm_bindgen_futures::spawn_local;
|
||||||
|
|
||||||
static LAST_ID: AtomicU64 = AtomicU64::new(0);
|
static LAST_ID: AtomicU64 = AtomicU64::new(0);
|
||||||
|
@ -13,24 +13,104 @@ pub struct WebSocketHandler {
|
||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_recv_ws(url: String, input: bytes::Bytes) -> Result<Vec<u8>, String> {
|
async fn send_recv_ws<'a>(tx: async_channel::Sender<Result<bytes::Bytes, String>>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) {
|
||||||
let mut ws = WebSocket::open(&url).map_err(|e| e.to_string())?;
|
let ws = match WebSocket::open(&url).map_err(|e| e.to_string()) {
|
||||||
ws.send(Message::Bytes(input.into()))
|
Ok(x) => x,
|
||||||
.await
|
Err(e) => {
|
||||||
.map_err(|e| e.to_string())?;
|
tx.send(Err(e.to_string())).await.unwrap_or(());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
read_next_incoming(ws).await
|
let (mut input_done, mut output_done) = (false, false);
|
||||||
}
|
let mut last_ws_state = ws.state();
|
||||||
|
let (mut ws_sink, mut ws_stream) = ws.split();
|
||||||
async fn read_next_incoming(mut ws: WebSocket) -> Result<Vec<u8>, String> {
|
let (mut left, mut right) = (input.next(), ws_stream.next());
|
||||||
if let Some(msg) = ws.next().await {
|
while let State::Open = last_ws_state {
|
||||||
match msg.map_err(|e| e.to_string())? {
|
if !input_done && !output_done {
|
||||||
Message::Bytes(b) => Ok(b),
|
match select(left, right).await {
|
||||||
Message::Text(_) => Err("Message::Text not allowed".into()),
|
Either::Left((next, outstanding)) => {
|
||||||
|
if let Some(next) = next {
|
||||||
|
match next {
|
||||||
|
Ok(next) => {
|
||||||
|
if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
|
||||||
|
tx.send(Err(e.to_string())).await.unwrap_or(());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Err("No response received".into())
|
input_done = true;
|
||||||
}
|
}
|
||||||
|
right = outstanding;
|
||||||
|
left = input.next();
|
||||||
|
},
|
||||||
|
Either::Right((response, outstanding)) => {
|
||||||
|
if let Some(next) = response {
|
||||||
|
match next {
|
||||||
|
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
|
||||||
|
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
|
||||||
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
output_done = true;
|
||||||
|
}
|
||||||
|
left = outstanding;
|
||||||
|
let ws = ws_stream.reunite(ws_sink).unwrap();
|
||||||
|
last_ws_state = ws.state();
|
||||||
|
(ws_sink, ws_stream) = ws.split();
|
||||||
|
right = ws_stream.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if input_done {
|
||||||
|
if let Some(next) = right.await {
|
||||||
|
match next {
|
||||||
|
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
|
||||||
|
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
|
||||||
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
output_done = true;
|
||||||
|
}
|
||||||
|
//left = outstanding;
|
||||||
|
let ws = ws_stream.reunite(ws_sink).unwrap();
|
||||||
|
last_ws_state = ws.state();
|
||||||
|
(ws_sink, ws_stream) = ws.split();
|
||||||
|
right = ws_stream.next();
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*spawn_local(async move {
|
||||||
|
while let State::Open = ws.state() {
|
||||||
|
if let Some(next) = input.next().await {
|
||||||
|
match next {
|
||||||
|
Ok(next) => {
|
||||||
|
if let Err(e) = ws.send(Message::Bytes(next.into())).await {
|
||||||
|
tx2.send(Err(e.to_string())).await.unwrap_or(());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => tx2.send(Err(e.to_string())).await.unwrap_or(())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
spawn_local(async move {
|
||||||
|
while let State::Open = ws.state() {
|
||||||
|
if let Some(next) = ws.next().await {
|
||||||
|
match next {
|
||||||
|
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
|
||||||
|
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
|
||||||
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});*/
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -44,6 +124,8 @@ impl std::fmt::Display for ErrorStr {
|
||||||
|
|
||||||
impl std::error::Error for ErrorStr {}
|
impl std::error::Error for ErrorStr {}
|
||||||
|
|
||||||
|
const CHANNEL_BOUND: usize = 4;
|
||||||
|
|
||||||
impl WebSocketHandler {
|
impl WebSocketHandler {
|
||||||
/// Instantiate the web socket client for connecting on the specified port
|
/// Instantiate the web socket client for connecting on the specified port
|
||||||
pub fn new(port: u16) -> Self {
|
pub fn new(port: u16) -> Self {
|
||||||
|
@ -51,32 +133,25 @@ impl WebSocketHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl ClientHandler for WebSocketHandler {
|
impl ClientHandler<'static> for WebSocketHandler {
|
||||||
async fn call(
|
async fn call<'a: 'static>(
|
||||||
&self,
|
&self,
|
||||||
package: &str,
|
package: &str,
|
||||||
service: &str,
|
service: &str,
|
||||||
method: &str,
|
method: &str,
|
||||||
input: bytes::Bytes,
|
input: ServiceClientStream<'a, bytes::Bytes>,
|
||||||
output: &mut bytes::BytesMut,
|
) -> Result<ServiceClientStream<'a, bytes::Bytes>, ServiceError> {
|
||||||
) -> Result<(), ServiceError> {
|
|
||||||
let id = LAST_ID.fetch_add(1, Ordering::SeqCst);
|
let id = LAST_ID.fetch_add(1, Ordering::SeqCst);
|
||||||
let url = format!(
|
let url = format!(
|
||||||
"ws://usdpl-ws-{}.localhost:{}/{}.{}/{}",
|
"ws://usdpl-ws-{}.localhost:{}/{}.{}/{}",
|
||||||
id, self.port, package, service, method,
|
id, self.port, package, service, method,
|
||||||
);
|
);
|
||||||
let (tx, rx) = async_channel::bounded(1);
|
let (tx, rx) = async_channel::bounded(CHANNEL_BOUND);
|
||||||
spawn_local(async move {
|
spawn_local(send_recv_ws(tx, url, input));
|
||||||
tx.send(send_recv_ws(url, input).await).await.unwrap_or(());
|
|
||||||
});
|
|
||||||
|
|
||||||
output.extend_from_slice(
|
Ok(Box::new(rx.map(|buf_result: Result<bytes::Bytes, String>| buf_result
|
||||||
&rx.recv()
|
.map(|buf| bytes::Bytes::from(buf))
|
||||||
.await
|
.map_err(|e| ServiceError::Method(Box::new(ErrorStr(e)))))))
|
||||||
.map_err(|e| ServiceError::Method(Box::new(e)))?
|
|
||||||
.map_err(|e| ServiceError::Method(Box::new(ErrorStr(e))))?,
|
|
||||||
);
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,3 +38,11 @@ pub(crate) fn js_to_primitive(val: JsValue) -> Primitive {
|
||||||
pub(crate) fn str_to_js<S: std::string::ToString>(s: S) -> JsString {
|
pub(crate) fn str_to_js<S: std::string::ToString>(s: S) -> JsString {
|
||||||
s.to_string().into()
|
s.to_string().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn js_to_str(js: JsValue) -> String {
|
||||||
|
if let Some(s) = js.as_string() {
|
||||||
|
s
|
||||||
|
} else {
|
||||||
|
format!("{:?}", js)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ pub mod _helpers {
|
||||||
pub use wasm_bindgen;
|
pub use wasm_bindgen;
|
||||||
pub use wasm_bindgen_futures;
|
pub use wasm_bindgen_futures;
|
||||||
pub use log;
|
pub use log;
|
||||||
|
pub use futures;
|
||||||
|
pub use nrpc;
|
||||||
}
|
}
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
74
usdpl-front/src/wasm/js_function_stream.rs
Normal file
74
usdpl-front/src/wasm/js_function_stream.rs
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
use core::pin::Pin;
|
||||||
|
use core::future::Future;
|
||||||
|
|
||||||
|
use futures::{Stream, task::{Poll, Context}};
|
||||||
|
use wasm_bindgen_futures::JsFuture;
|
||||||
|
use wasm_bindgen::JsValue;
|
||||||
|
use js_sys::{Function, Promise};
|
||||||
|
|
||||||
|
use nrpc::ServiceError;
|
||||||
|
use super::FromWasmStreamableType;
|
||||||
|
use crate::convert::js_to_str;
|
||||||
|
|
||||||
|
/// futures::Stream wrapper for a JS async function that generates a new T-like value every call
|
||||||
|
pub struct JsFunctionStream<T: FromWasmStreamableType + Unpin + 'static> {
|
||||||
|
function: Function,
|
||||||
|
promise: Option<JsFuture>,
|
||||||
|
_idc: std::marker::PhantomData<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T: FromWasmStreamableType + Unpin + 'static> JsFunctionStream<T> {
|
||||||
|
/// Construct the function stream wrapper
|
||||||
|
pub fn from_function(f: Function) -> Self {
|
||||||
|
Self {
|
||||||
|
function: f,
|
||||||
|
promise: None,
|
||||||
|
_idc: std::marker::PhantomData::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T: FromWasmStreamableType + Unpin + 'static> Stream for JsFunctionStream<T> {
|
||||||
|
type Item = Result<T, ServiceError>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
// this is horrible, I'm sorry
|
||||||
|
let js_poll = if let Some(mut promise) = self.promise.take() {
|
||||||
|
let mut pin = Pin::new(&mut promise);
|
||||||
|
JsFuture::poll(pin.as_mut(), cx)
|
||||||
|
} else {
|
||||||
|
let function_result = match self.function.call0(&JsValue::undefined()) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => return Poll::Ready(Some(Err(ServiceError::Method(s_to_err(format!("JS function call error: {}", js_to_str(e)))))))
|
||||||
|
};
|
||||||
|
|
||||||
|
let js_promise = Promise::from(function_result);
|
||||||
|
let mut js_future = JsFuture::from(js_promise);
|
||||||
|
let mut pin = Pin::new(&mut js_future);
|
||||||
|
let poll = JsFuture::poll(pin.as_mut(), cx);
|
||||||
|
self.promise = Some(js_future);
|
||||||
|
poll
|
||||||
|
};
|
||||||
|
js_poll.map(|t| match t {
|
||||||
|
Ok(t) => {
|
||||||
|
if t.is_null() || t.is_undefined() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(T::from_wasm_streamable(t).map_err(|e| ServiceError::Method(s_to_err(format!("JS type conversion error: {}", e)))))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Some(Err(ServiceError::Method(s_to_err(format!("JS function promise error: {}", js_to_str(e))))))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn s_to_err(s: String) -> Box<(dyn std::error::Error + Send + Sync + 'static)> {
|
||||||
|
s.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _check_service_stream<T: FromWasmStreamableType + Unpin + 'static>(js_stream: JsFunctionStream<T>) {
|
||||||
|
let _: nrpc::ServiceClientStream<'static, T> = Box::new(js_stream);
|
||||||
|
}
|
|
@ -1,7 +1,11 @@
|
||||||
//! WASM <-> Rust interop utilities
|
//! WASM <-> Rust interop utilities
|
||||||
mod arrays;
|
mod arrays;
|
||||||
|
mod js_function_stream;
|
||||||
mod maps;
|
mod maps;
|
||||||
|
mod streaming;
|
||||||
mod trivials;
|
mod trivials;
|
||||||
mod wasm_traits;
|
mod wasm_traits;
|
||||||
|
|
||||||
|
pub use js_function_stream::JsFunctionStream;
|
||||||
pub use wasm_traits::*;
|
pub use wasm_traits::*;
|
||||||
|
pub use streaming::*;
|
||||||
|
|
189
usdpl-front/src/wasm/streaming.rs
Normal file
189
usdpl-front/src/wasm/streaming.rs
Normal file
|
@ -0,0 +1,189 @@
|
||||||
|
use wasm_bindgen::JsValue;
|
||||||
|
|
||||||
|
/// Convert Rust type to WASM-compatible type involved in nRPC streaming
|
||||||
|
pub trait IntoWasmStreamableType {
|
||||||
|
/// Required method
|
||||||
|
fn into_wasm_streamable(self) -> JsValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
/// Conversion error from FromWasmStreamableType
|
||||||
|
pub enum WasmStreamableConversionError {
|
||||||
|
/// JSValue underlying type is incorrect
|
||||||
|
UnexpectedType {
|
||||||
|
/// Expected Javascript type
|
||||||
|
expected: JsType,
|
||||||
|
/// Actual Javascript type
|
||||||
|
got: JsType,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::fmt::Display for WasmStreamableConversionError {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::UnexpectedType { expected, got } => write!(f, "Unexpected type {}, expected {}", expected, got),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for WasmStreamableConversionError {}
|
||||||
|
|
||||||
|
/// Approximation of all possible JS types detectable through Wasm
|
||||||
|
#[allow(missing_docs)]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum JsType {
|
||||||
|
Number,
|
||||||
|
String,
|
||||||
|
Bool,
|
||||||
|
Array,
|
||||||
|
BigInt,
|
||||||
|
Function,
|
||||||
|
Symbol,
|
||||||
|
Undefined,
|
||||||
|
Null,
|
||||||
|
Object,
|
||||||
|
Unknown,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::fmt::Display for JsType {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Number => write!(f, "number"),
|
||||||
|
Self::String => write!(f, "string"),
|
||||||
|
Self::Bool => write!(f, "boolean"),
|
||||||
|
Self::Array => write!(f, "array"),
|
||||||
|
Self::BigInt => write!(f, "bigint"),
|
||||||
|
Self::Function => write!(f, "function"),
|
||||||
|
Self::Symbol => write!(f, "symbol"),
|
||||||
|
Self::Undefined => write!(f, "undefined"),
|
||||||
|
Self::Null => write!(f, "null"),
|
||||||
|
Self::Object => write!(f, "object"),
|
||||||
|
Self::Unknown => write!(f, "<unknown>"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JsType {
|
||||||
|
fn guess(js: &JsValue) -> JsType {
|
||||||
|
if js.as_f64().is_some() {
|
||||||
|
Self::Number
|
||||||
|
} else if js.as_string().is_some() {
|
||||||
|
Self::String
|
||||||
|
} else if js.as_bool().is_some() {
|
||||||
|
Self::Bool
|
||||||
|
} else if js.is_array() {
|
||||||
|
Self::Array
|
||||||
|
} else if js.is_bigint() {
|
||||||
|
Self::BigInt
|
||||||
|
} else if js.is_function() {
|
||||||
|
Self::Function
|
||||||
|
} else if js.is_symbol() {
|
||||||
|
Self::Symbol
|
||||||
|
} else if js.is_undefined() {
|
||||||
|
Self::Undefined
|
||||||
|
} else if js.is_null() {
|
||||||
|
Self::Null
|
||||||
|
} else if js.is_object() {
|
||||||
|
Self::Object
|
||||||
|
} else {
|
||||||
|
Self::Unknown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert WASM-compatible type involved in nRPC streaming to Rust-centric type
|
||||||
|
pub trait FromWasmStreamableType: Sized {
|
||||||
|
/// Required method
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, WasmStreamableConversionError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! trivial_convert_number {
|
||||||
|
($ty: ty) => {
|
||||||
|
impl FromWasmStreamableType for $ty {
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, WasmStreamableConversionError> {
|
||||||
|
if let Some(num) = js.as_f64() {
|
||||||
|
Ok(num as $ty)
|
||||||
|
} else {
|
||||||
|
Err(WasmStreamableConversionError::UnexpectedType {
|
||||||
|
expected: JsType::Number,
|
||||||
|
got: JsType::guess(&js),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoWasmStreamableType for $ty {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
self.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
trivial_convert_number! { f64 }
|
||||||
|
trivial_convert_number! { f32 }
|
||||||
|
|
||||||
|
trivial_convert_number! { isize }
|
||||||
|
trivial_convert_number! { usize }
|
||||||
|
|
||||||
|
trivial_convert_number! { i8 }
|
||||||
|
trivial_convert_number! { i16 }
|
||||||
|
trivial_convert_number! { i32 }
|
||||||
|
trivial_convert_number! { i64 }
|
||||||
|
trivial_convert_number! { i128 }
|
||||||
|
|
||||||
|
trivial_convert_number! { u8 }
|
||||||
|
trivial_convert_number! { u16 }
|
||||||
|
trivial_convert_number! { u32 }
|
||||||
|
trivial_convert_number! { u64 }
|
||||||
|
trivial_convert_number! { u128 }
|
||||||
|
|
||||||
|
impl FromWasmStreamableType for String {
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, WasmStreamableConversionError> {
|
||||||
|
if let Some(s) = js.as_string() {
|
||||||
|
Ok(s)
|
||||||
|
} else {
|
||||||
|
Err(WasmStreamableConversionError::UnexpectedType {
|
||||||
|
expected: JsType::String,
|
||||||
|
got: JsType::guess(&js),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoWasmStreamableType for String {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
self.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromWasmStreamableType for bool {
|
||||||
|
fn from_wasm_streamable(js: JsValue) -> Result<Self, WasmStreamableConversionError> {
|
||||||
|
if let Some(b) = js.as_bool() {
|
||||||
|
Ok(b)
|
||||||
|
} else {
|
||||||
|
Err(WasmStreamableConversionError::UnexpectedType {
|
||||||
|
expected: JsType::Bool,
|
||||||
|
got: JsType::guess(&js),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoWasmStreamableType for bool {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
self.into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromWasmStreamableType for () {
|
||||||
|
fn from_wasm_streamable(_js: JsValue) -> Result<Self, WasmStreamableConversionError> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoWasmStreamableType for () {
|
||||||
|
fn into_wasm_streamable(self) -> JsValue {
|
||||||
|
JsValue::undefined()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue