use std::sync::atomic::{AtomicU64, Ordering}; use futures::{SinkExt, StreamExt, future::{select, Either}}; use gloo_net::websocket::{futures::WebSocket, Message}; use nrpc::{ClientHandler, ServiceError, ServiceClientStream, /*_helpers::async_trait,*/ _helpers::bytes}; use wasm_bindgen_futures::spawn_local; static LAST_ID: AtomicU64 = AtomicU64::new(0); /// Websocket client. /// In most cases, this shouldn't be used directly, but generated code will use this. pub struct WebSocketHandler { port: u16, } #[inline] fn ws_is_alive(ws_state: &gloo_net::websocket::State) -> bool { match ws_state { gloo_net::websocket::State::Connecting | gloo_net::websocket::State::Open => true, gloo_net::websocket::State::Closing | gloo_net::websocket::State::Closed => false, } } async fn send_recv_ws<'a>(mut tx: futures_channel::mpsc::Sender>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) { let ws = match WebSocket::open_with_protocol(&url, "usdpl-nrpc").map_err(|e| e.to_string()) { Ok(x) => x, Err(e) => { log::error!("ws open error: {}", e); tx.send(Err(e.to_string())).await.unwrap_or(()); return; } }; log::debug!("ws opened successfully with url `{}`", url); let (mut input_done, mut output_done) = (false, false); let mut last_ws_state = ws.state(); log::debug!("ws with url `{}` initial state: {:?}", url, last_ws_state); let (mut ws_sink, mut ws_stream) = ws.split(); let (mut left, mut right) = (input.next(), ws_stream.next()); while ws_is_alive(&last_ws_state) { if !input_done && !output_done { log::debug!("Input and output streams are both alive"); match select(left, right).await { Either::Left((next, outstanding)) => { log::debug!("Got message to send over websocket"); if let Some(next) = next { match next { Ok(next) => { if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await { tx.send(Err(e.to_string())).await.unwrap_or(()); } }, Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()) } } else { input_done = true; } right = outstanding; left = input.next(); }, Either::Right((response, outstanding)) => { log::debug!("Received message from websocket"); if let Some(next) = response { match next { Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()), Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()), Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()), } } else { output_done = true; } left = outstanding; let ws = ws_stream.reunite(ws_sink).unwrap(); last_ws_state = ws.state(); (ws_sink, ws_stream) = ws.split(); right = ws_stream.next(); } } } else if input_done { log::debug!("Input stream is complete"); if let Some(next) = right.await { log::debug!("Received message from websocket"); match next { Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()), Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()), Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()), } } else { output_done = true; } //left = outstanding; let ws = ws_stream.reunite(ws_sink).unwrap(); last_ws_state = ws.state(); (ws_sink, ws_stream) = ws.split(); right = ws_stream.next(); } else { // output_done is true log::debug!("Output stream is complete"); if let Some(next) = left.await { log::debug!("Got message to send over websocket"); match next { Ok(next) => { if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await { tx.send(Err(e.to_string())).await.unwrap_or(()); } }, Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()) } } else { input_done = true; } //right = outstanding; let ws = ws_stream.reunite(ws_sink).unwrap(); last_ws_state = ws.state(); (ws_sink, ws_stream) = ws.split(); left = input.next(); right = ws_stream.next(); // this should always resolve to None (but compiler is unhappy without this) } } log::debug!("ws with url `{}` has closed", url); } #[derive(Debug)] struct ErrorStr(String); impl std::fmt::Display for ErrorStr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Error message: {}", self.0) } } impl std::error::Error for ErrorStr {} const CHANNEL_BOUND: usize = 4; impl WebSocketHandler { /// Instantiate the web socket client for connecting on the specified port pub fn new(port: u16) -> Self { Self { port } } } //#[async_trait::async_trait(?Send)] impl ClientHandler<'static> for WebSocketHandler { #[allow(async_fn_in_trait)] async fn call<'a: 'static>( &self, package: &str, service: &str, method: &str, input: ServiceClientStream<'a, bytes::Bytes>, ) -> Result, ServiceError> { let id = LAST_ID.fetch_add(1, Ordering::SeqCst); let url = format!( "ws://usdpl-ws-{}.localhost:{}/{}.{}/{}", id, self.port, package, service, method, ); log::debug!("doing send/receive on ws url `{}`", url); let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_BOUND); spawn_local(send_recv_ws(tx, url, input)); Ok(Box::new(rx.map(|buf_result: Result| buf_result .map(|buf| bytes::Bytes::from(buf)) .map_err(|e| ServiceError::Method(Box::new(ErrorStr(e))))))) } }