use std::sync::atomic::{AtomicU64, Ordering}; use nrpc::{ClientHandler, ServiceError, _helpers::bytes, _helpers::async_trait}; use gloo_net::websocket::{Message, futures::WebSocket}; use wasm_bindgen_futures::spawn_local; use futures::{SinkExt, StreamExt}; static LAST_ID: AtomicU64 = AtomicU64::new(0); pub struct WebSocketHandler { // TODO port: u16, } async fn send_recv_ws(url: String, input: bytes::Bytes) -> Result, String> { let mut ws = WebSocket::open(&url).map_err(|e| e.to_string())?; ws.send(Message::Bytes(input.into())).await.map_err(|e| e.to_string())?; read_next_incoming(ws).await } async fn read_next_incoming(mut ws: WebSocket) -> Result, String> { if let Some(msg) = ws.next().await { match msg.map_err(|e| e.to_string())? { Message::Bytes(b) => Ok(b), Message::Text(_) => Err("Message::Text not allowed".into()), } } else { Err("No response received".into()) } } #[derive(Debug)] struct ErrorStr(String); impl std::fmt::Display for ErrorStr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Error message: {}", self.0) } } impl std::error::Error for ErrorStr {} impl WebSocketHandler { #[allow(dead_code)] pub fn new(port: u16) -> Self { Self { port } } } #[async_trait::async_trait] impl ClientHandler for WebSocketHandler { async fn call(&mut self, package: &str, service: &str, method: &str, input: bytes::Bytes, output: &mut bytes::BytesMut) -> Result<(), ServiceError> { let id = LAST_ID.fetch_add(1, Ordering::SeqCst); let url = format!( "ws://usdpl-ws-{}.localhost:{}/{}.{}/{}", id, self.port, package, service, method, ); let (tx, rx) = async_channel::bounded(1); spawn_local(async move { tx.send(send_recv_ws( url, input ).await).await.unwrap_or(()); }); output.extend_from_slice( &rx.recv().await .map_err(|e| ServiceError::Method(Box::new(e)))? .map_err(|e| ServiceError::Method(Box::new(ErrorStr(e))))? ); Ok(()) } }