2023-04-17 03:57:12 +01:00
|
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
|
|
|
2023-07-23 00:05:54 +01:00
|
|
|
use futures::{SinkExt, StreamExt, future::{select, Either}};
|
2023-09-01 23:04:34 +01:00
|
|
|
use gloo_net::websocket::{futures::WebSocket, Message};
|
2023-07-23 00:05:54 +01:00
|
|
|
use nrpc::{ClientHandler, ServiceError, ServiceClientStream, _helpers::async_trait, _helpers::bytes};
|
2023-06-04 19:05:33 +01:00
|
|
|
use wasm_bindgen_futures::spawn_local;
|
2023-04-17 03:57:12 +01:00
|
|
|
|
|
|
|
static LAST_ID: AtomicU64 = AtomicU64::new(0);
|
|
|
|
|
2023-06-04 19:05:33 +01:00
|
|
|
/// Websocket client.
|
|
|
|
/// In most cases, this shouldn't be used directly, but generated code will use this.
|
2023-04-17 03:57:12 +01:00
|
|
|
pub struct WebSocketHandler {
|
|
|
|
port: u16,
|
|
|
|
}
|
|
|
|
|
2023-09-01 23:04:34 +01:00
|
|
|
#[inline]
|
|
|
|
fn ws_is_alive(ws_state: &gloo_net::websocket::State) -> bool {
|
|
|
|
match ws_state {
|
|
|
|
gloo_net::websocket::State::Connecting | gloo_net::websocket::State::Open => true,
|
|
|
|
gloo_net::websocket::State::Closing | gloo_net::websocket::State::Closed => false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn send_recv_ws<'a>(mut tx: futures_channel::mpsc::Sender<Result<bytes::Bytes, String>>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) {
|
|
|
|
let ws = match WebSocket::open_with_protocol(&url, "usdpl-nrpc").map_err(|e| e.to_string()) {
|
2023-07-23 00:05:54 +01:00
|
|
|
Ok(x) => x,
|
|
|
|
Err(e) => {
|
2023-09-01 23:04:34 +01:00
|
|
|
log::error!("ws open error: {}", e);
|
2023-07-23 00:05:54 +01:00
|
|
|
tx.send(Err(e.to_string())).await.unwrap_or(());
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
2023-04-17 03:57:12 +01:00
|
|
|
|
2023-09-01 23:04:34 +01:00
|
|
|
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("ws opened successfully with url `{}`", url);
|
2023-09-01 23:04:34 +01:00
|
|
|
|
2023-07-23 00:05:54 +01:00
|
|
|
let (mut input_done, mut output_done) = (false, false);
|
|
|
|
let mut last_ws_state = ws.state();
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("ws with url `{}` initial state: {:?}", url, last_ws_state);
|
2023-07-23 00:05:54 +01:00
|
|
|
let (mut ws_sink, mut ws_stream) = ws.split();
|
|
|
|
let (mut left, mut right) = (input.next(), ws_stream.next());
|
2023-09-01 23:04:34 +01:00
|
|
|
while ws_is_alive(&last_ws_state) {
|
2023-07-23 00:05:54 +01:00
|
|
|
if !input_done && !output_done {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Input and output streams are both alive");
|
2023-07-23 00:05:54 +01:00
|
|
|
match select(left, right).await {
|
|
|
|
Either::Left((next, outstanding)) => {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Got message to send over websocket");
|
2023-07-23 00:05:54 +01:00
|
|
|
if let Some(next) = next {
|
|
|
|
match next {
|
|
|
|
Ok(next) => {
|
|
|
|
if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
|
|
|
|
tx.send(Err(e.to_string())).await.unwrap_or(());
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
input_done = true;
|
|
|
|
}
|
|
|
|
right = outstanding;
|
|
|
|
left = input.next();
|
|
|
|
},
|
|
|
|
Either::Right((response, outstanding)) => {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Received message from websocket");
|
2023-07-23 00:05:54 +01:00
|
|
|
if let Some(next) = response {
|
|
|
|
match next {
|
|
|
|
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
|
|
|
|
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
|
|
|
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
output_done = true;
|
|
|
|
}
|
|
|
|
left = outstanding;
|
|
|
|
let ws = ws_stream.reunite(ws_sink).unwrap();
|
|
|
|
last_ws_state = ws.state();
|
|
|
|
(ws_sink, ws_stream) = ws.split();
|
|
|
|
right = ws_stream.next();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if input_done {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Input stream is complete");
|
2023-07-23 00:05:54 +01:00
|
|
|
if let Some(next) = right.await {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Received message from websocket");
|
2023-07-23 00:05:54 +01:00
|
|
|
match next {
|
|
|
|
Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()),
|
|
|
|
Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()),
|
|
|
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()),
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
output_done = true;
|
|
|
|
}
|
|
|
|
//left = outstanding;
|
|
|
|
let ws = ws_stream.reunite(ws_sink).unwrap();
|
|
|
|
last_ws_state = ws.state();
|
|
|
|
(ws_sink, ws_stream) = ws.split();
|
|
|
|
right = ws_stream.next();
|
|
|
|
} else {
|
2023-09-01 23:04:34 +01:00
|
|
|
// output_done is true
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Output stream is complete");
|
2023-09-01 23:04:34 +01:00
|
|
|
if let Some(next) = left.await {
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("Got message to send over websocket");
|
2023-09-01 23:04:34 +01:00
|
|
|
match next {
|
|
|
|
Ok(next) => {
|
|
|
|
if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await {
|
|
|
|
tx.send(Err(e.to_string())).await.unwrap_or(());
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(())
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
input_done = true;
|
|
|
|
}
|
|
|
|
//right = outstanding;
|
|
|
|
let ws = ws_stream.reunite(ws_sink).unwrap();
|
|
|
|
last_ws_state = ws.state();
|
|
|
|
(ws_sink, ws_stream) = ws.split();
|
|
|
|
left = input.next();
|
|
|
|
right = ws_stream.next(); // this should always resolve to None (but compiler is unhappy without this)
|
2023-04-17 03:57:12 +01:00
|
|
|
}
|
|
|
|
}
|
2023-09-01 23:04:34 +01:00
|
|
|
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("ws with url `{}` has closed", url);
|
2023-04-17 03:57:12 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct ErrorStr(String);
|
|
|
|
|
|
|
|
impl std::fmt::Display for ErrorStr {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
write!(f, "Error message: {}", self.0)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::error::Error for ErrorStr {}
|
|
|
|
|
2023-07-23 00:05:54 +01:00
|
|
|
const CHANNEL_BOUND: usize = 4;
|
|
|
|
|
2023-04-17 03:57:12 +01:00
|
|
|
impl WebSocketHandler {
|
2023-06-04 19:05:33 +01:00
|
|
|
/// Instantiate the web socket client for connecting on the specified port
|
2023-04-17 03:57:12 +01:00
|
|
|
pub fn new(port: u16) -> Self {
|
|
|
|
Self { port }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-07-23 00:05:54 +01:00
|
|
|
#[async_trait::async_trait(?Send)]
|
|
|
|
impl ClientHandler<'static> for WebSocketHandler {
|
|
|
|
async fn call<'a: 'static>(
|
2023-06-29 03:21:56 +01:00
|
|
|
&self,
|
2023-06-04 19:05:33 +01:00
|
|
|
package: &str,
|
|
|
|
service: &str,
|
|
|
|
method: &str,
|
2023-07-23 00:05:54 +01:00
|
|
|
input: ServiceClientStream<'a, bytes::Bytes>,
|
|
|
|
) -> Result<ServiceClientStream<'a, bytes::Bytes>, ServiceError> {
|
2023-04-17 03:57:12 +01:00
|
|
|
let id = LAST_ID.fetch_add(1, Ordering::SeqCst);
|
|
|
|
let url = format!(
|
2023-04-24 04:03:10 +01:00
|
|
|
"ws://usdpl-ws-{}.localhost:{}/{}.{}/{}",
|
2023-06-04 19:05:33 +01:00
|
|
|
id, self.port, package, service, method,
|
2023-04-17 03:57:12 +01:00
|
|
|
);
|
2023-09-03 22:33:30 +01:00
|
|
|
log::debug!("doing send/receive on ws url `{}`", url);
|
2023-09-01 23:04:34 +01:00
|
|
|
let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_BOUND);
|
2023-07-23 00:05:54 +01:00
|
|
|
spawn_local(send_recv_ws(tx, url, input));
|
2023-04-17 03:57:12 +01:00
|
|
|
|
2023-07-23 00:05:54 +01:00
|
|
|
Ok(Box::new(rx.map(|buf_result: Result<bytes::Bytes, String>| buf_result
|
|
|
|
.map(|buf| bytes::Bytes::from(buf))
|
|
|
|
.map_err(|e| ServiceError::Method(Box::new(ErrorStr(e)))))))
|
2023-04-17 03:57:12 +01:00
|
|
|
}
|
|
|
|
}
|