usdpl-rs/usdpl-front/src/client_handler.rs

210 lines
8.4 KiB
Rust
Raw Normal View History

use std::sync::atomic::{AtomicU64, Ordering};
use futures::{SinkExt, StreamExt, future::{select, Either}};
use gloo_net::websocket::{futures::WebSocket, Message};
use nrpc::{ClientHandler, ServiceError, ServiceClientStream, _helpers::async_trait, _helpers::bytes};
use wasm_bindgen_futures::spawn_local;
static LAST_ID: AtomicU64 = AtomicU64::new(0);
/// Websocket client.
/// In most cases, this shouldn't be used directly, but generated code will use this.
pub struct WebSocketHandler {
port: u16,
}
#[inline]
fn ws_is_alive(ws_state: &gloo_net::websocket::State) -> bool {
match ws_state {
gloo_net::websocket::State::Connecting | gloo_net::websocket::State::Open => true,
gloo_net::websocket::State::Closing | gloo_net::websocket::State::Closed => false,
}
}
async fn send_recv_ws<'a>(mut tx: futures_channel::mpsc::Sender<Result<bytes::Bytes, String>>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) {
let ws = match WebSocket::open_with_protocol(&url, "usdpl-nrpc").map_err(|e| e.to_string()) {
Ok(x) => x,
Err(e) => {
log::error!("ws open error: {}", e);
tx.send(Err(e.to_string())).await.unwrap_or(());
return;
}
};
#[cfg(feature = "debug")]
web_sys::console::log_1(&format!("ws opened successfully with url `{}`", url).into());
let (mut input_done, mut output_done) = (false, false);
let mut last_ws_state = ws.state();
#[cfg(feature = "debug")]
web_sys::console::log_1(&format!("ws with url `{}` initial state: {:?}", url, last_ws_state).into());
let (mut ws_sink, mut ws_stream) = ws.split();
let (mut left, mut right) = (input.next(), ws_stream.next());
while ws_is_alive(&last_ws_state) {
if !input_done && !output_done {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Input and output streams are both alive").into());
match select(left, right).await {
Either::Left((next, outstanding)) => {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Got message to send over websocket").into());
if let Some(next) = next {
match next {
Ok(next) => {
if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
tx.send(Err(e.to_string())).await.unwrap_or(());
}
},
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
}
} else {
input_done = true;
}
right = outstanding;
left = input.next();
},
Either::Right((response, outstanding)) => {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Received message from websocket").into());
if let Some(next) = response {
match next {
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
}
} else {
output_done = true;
}
left = outstanding;
let ws = ws_stream.reunite(ws_sink).unwrap();
last_ws_state = ws.state();
(ws_sink, ws_stream) = ws.split();
right = ws_stream.next();
}
}
} else if input_done {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Input stream is complete").into());
if let Some(next) = right.await {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Received message from websocket").into());
match next {
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
}
} else {
output_done = true;
}
//left = outstanding;
let ws = ws_stream.reunite(ws_sink).unwrap();
last_ws_state = ws.state();
(ws_sink, ws_stream) = ws.split();
right = ws_stream.next();
} else {
// output_done is true
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Output stream is complete").into());
if let Some(next) = left.await {
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("Got message to send over websocket").into());
match next {
Ok(next) => {
if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
tx.send(Err(e.to_string())).await.unwrap_or(());
}
},
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
}
} else {
input_done = true;
}
//right = outstanding;
let ws = ws_stream.reunite(ws_sink).unwrap();
last_ws_state = ws.state();
(ws_sink, ws_stream) = ws.split();
left = input.next();
right = ws_stream.next(); // this should always resolve to None (but compiler is unhappy without this)
}
}
#[cfg(feature = "debug")]
web_sys::console::debug_1(&format!("ws with url `{}` has closed", url).into());
/*spawn_local(async move {
while let State::Open = ws.state() {
if let Some(next) = input.next().await {
match next {
Ok(next) => {
if let Err(e) = ws.send(Message::Bytes(next.into())).await {
tx2.send(Err(e.to_string())).await.unwrap_or(());
}
},
Err(e) => tx2.send(Err(e.to_string())).await.unwrap_or(())
}
} else {
break;
}
}
});
spawn_local(async move {
while let State::Open = ws.state() {
if let Some(next) = ws.next().await {
match next {
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
}
} else {
break;
}
}
});*/
}
#[derive(Debug)]
struct ErrorStr(String);
impl std::fmt::Display for ErrorStr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Error message: {}", self.0)
}
}
impl std::error::Error for ErrorStr {}
const CHANNEL_BOUND: usize = 4;
impl WebSocketHandler {
/// Instantiate the web socket client for connecting on the specified port
pub fn new(port: u16) -> Self {
Self { port }
}
}
#[async_trait::async_trait(?Send)]
impl ClientHandler<'static> for WebSocketHandler {
async fn call<'a: 'static>(
&self,
package: &str,
service: &str,
method: &str,
input: ServiceClientStream<'a, bytes::Bytes>,
) -> Result<ServiceClientStream<'a, bytes::Bytes>, ServiceError> {
let id = LAST_ID.fetch_add(1, Ordering::SeqCst);
let url = format!(
"ws://usdpl-ws-{}.localhost:{}/{}.{}/{}",
id, self.port, package, service, method,
);
#[cfg(feature = "debug")]
web_sys::console::log_1(&format!("doing send/receive on ws url `{}`", url).into());
let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_BOUND);
spawn_local(send_recv_ws(tx, url, input));
Ok(Box::new(rx.map(|buf_result: Result<bytes::Bytes, String>| buf_result
.map(|buf| bytes::Bytes::from(buf))
.map_err(|e| ServiceError::Method(Box::new(ErrorStr(e)))))))
}
}