diff --git a/Cargo.lock b/Cargo.lock index 57d6178..b70aa29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,17 +68,6 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - [[package]] name = "async-lock" version = "2.7.0" @@ -197,15 +186,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "concurrent-queue" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -251,15 +231,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-utils" -version = "0.8.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] - [[package]] name = "ctr" version = "0.8.0" @@ -557,9 +528,9 @@ checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" [[package]] name = "gloo-net" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3000ef231a67d5bfee6b35f2c0f6f5c8d45b3381ef5bbbea603690ec4e539762" +checksum = "8ac9e8288ae2c632fa9f8657ac70bfe38a1530f345282d7ba66a1f70b72b7dc4" dependencies = [ "futures-channel", "futures-core", @@ -578,9 +549,9 @@ dependencies = [ [[package]] name = "gloo-utils" -version = "0.1.7" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037fcb07216cb3a30f7292bd0176b050b7b9a052ba830ef7d5d65f6dc64ba58e" +checksum = "0b5555354113b18c547c1d3a98fbf7fb32a9ff4f6fa112ce823a21641a0ba3aa" dependencies = [ "js-sys", "serde", @@ -1593,10 +1564,10 @@ dependencies = [ name = "usdpl-front" version = "0.11.0" dependencies = [ - "async-channel", "console_error_panic_hook", "console_log", "futures", + "futures-channel", "gloo-net", "hex", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index 4c8660f..52e25b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,8 @@ exclude = [ "templates/decky/backend" ] +resolver = "2" + [profile.release] # Tell `rustc` to optimize for small code size. opt-level = "s" diff --git a/usdpl-back/src/websockets.rs b/usdpl-back/src/websockets.rs index 9658e2c..7be05d0 100644 --- a/usdpl-back/src/websockets.rs +++ b/usdpl-back/src/websockets.rs @@ -98,7 +98,7 @@ impl WebsocketServer { RatchetError::with_cause(ratchet_rs::ErrorKind::Protocol, e.to_string()) })?; - output_stream.for_each_concurrent(None, |result| async { + output_stream.for_each(|result| async { match result { Ok(msg) => { let mut ws_lock = websocket.lock().await; @@ -112,6 +112,11 @@ impl WebsocketServer { } }).await; + websocket.lock().await.close(ratchet_rs::CloseReason { + code: ratchet_rs::CloseCode::Normal, + description: None, + }).await?; + /*let mut buf = BytesMut::new(); loop { match websocket.read(&mut buf).await? { diff --git a/usdpl-front/Cargo.toml b/usdpl-front/Cargo.toml index 33a7877..942c526 100644 --- a/usdpl-front/Cargo.toml +++ b/usdpl-front/Cargo.toml @@ -20,8 +20,9 @@ encrypt = ["usdpl-core/encrypt", "obfstr", "hex"] [dependencies] wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" -gloo-net = { version = "0.3", features = ["websocket"] } +gloo-net = { version = "0.4", features = ["websocket"] } futures = "0.3" +futures-channel = "0.3" console_log = { version = "1.0", optional = true, features = ["color"] } # The `console_error_panic_hook` crate provides better debugging of panics by @@ -37,11 +38,10 @@ web-sys = { version = "0.3", features = [ 'RequestMode', 'Response', 'Window', + 'console', ]} js-sys = { version = "0.3" } -async-channel = "1.8" - obfstr = { version = "0.3", optional = true } hex = { version = "0.4", optional = true } diff --git a/usdpl-front/src/client_handler.rs b/usdpl-front/src/client_handler.rs index 9c3a703..4f3c799 100644 --- a/usdpl-front/src/client_handler.rs +++ b/usdpl-front/src/client_handler.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use futures::{SinkExt, StreamExt, future::{select, Either}}; -use gloo_net::websocket::{futures::WebSocket, Message, State}; +use gloo_net::websocket::{futures::WebSocket, Message}; use nrpc::{ClientHandler, ServiceError, ServiceClientStream, _helpers::async_trait, _helpers::bytes}; use wasm_bindgen_futures::spawn_local; @@ -13,23 +13,42 @@ pub struct WebSocketHandler { port: u16, } -async fn send_recv_ws<'a>(tx: async_channel::Sender>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) { - let ws = match WebSocket::open(&url).map_err(|e| e.to_string()) { +#[inline] +fn ws_is_alive(ws_state: &gloo_net::websocket::State) -> bool { + match ws_state { + gloo_net::websocket::State::Connecting | gloo_net::websocket::State::Open => true, + gloo_net::websocket::State::Closing | gloo_net::websocket::State::Closed => false, + } +} + +async fn send_recv_ws<'a>(mut tx: futures_channel::mpsc::Sender>, url: String, mut input: ServiceClientStream<'a, bytes::Bytes>) { + let ws = match WebSocket::open_with_protocol(&url, "usdpl-nrpc").map_err(|e| e.to_string()) { Ok(x) => x, Err(e) => { + log::error!("ws open error: {}", e); tx.send(Err(e.to_string())).await.unwrap_or(()); return; } }; + + #[cfg(feature = "debug")] + web_sys::console::log_1(&format!("ws opened successfully with url `{}`", url).into()); + let (mut input_done, mut output_done) = (false, false); let mut last_ws_state = ws.state(); + #[cfg(feature = "debug")] + web_sys::console::log_1(&format!("ws with url `{}` initial state: {:?}", url, last_ws_state).into()); let (mut ws_sink, mut ws_stream) = ws.split(); let (mut left, mut right) = (input.next(), ws_stream.next()); - while let State::Open = last_ws_state { + while ws_is_alive(&last_ws_state) { if !input_done && !output_done { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Input and output streams are both alive").into()); match select(left, right).await { Either::Left((next, outstanding)) => { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Got message to send over websocket").into()); if let Some(next) = next { match next { Ok(next) => { @@ -46,6 +65,8 @@ async fn send_recv_ws<'a>(tx: async_channel::Sender left = input.next(); }, Either::Right((response, outstanding)) => { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Received message from websocket").into()); if let Some(next) = response { match next { Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()), @@ -63,7 +84,11 @@ async fn send_recv_ws<'a>(tx: async_channel::Sender } } } else if input_done { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Input stream is complete").into()); if let Some(next) = right.await { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Received message from websocket").into()); match next { Ok(Message::Bytes(b)) => tx.send(Ok(b.into())).await.unwrap_or(()), Ok(_) => tx.send(Err("Message::Text not allowed".into())).await.unwrap_or(()), @@ -78,9 +103,34 @@ async fn send_recv_ws<'a>(tx: async_channel::Sender (ws_sink, ws_stream) = ws.split(); right = ws_stream.next(); } else { - + // output_done is true + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Output stream is complete").into()); + if let Some(next) = left.await { + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("Got message to send over websocket").into()); + match next { + Ok(next) => { + if let Err(e) = ws_sink.send(Message::Bytes(next.into())).await { + tx.send(Err(e.to_string())).await.unwrap_or(()); + } + }, + Err(e) => tx.send(Err(e.to_string())).await.unwrap_or(()) + } + } else { + input_done = true; + } + //right = outstanding; + let ws = ws_stream.reunite(ws_sink).unwrap(); + last_ws_state = ws.state(); + (ws_sink, ws_stream) = ws.split(); + left = input.next(); + right = ws_stream.next(); // this should always resolve to None (but compiler is unhappy without this) } } + + #[cfg(feature = "debug")] + web_sys::console::debug_1(&format!("ws with url `{}` has closed", url).into()); /*spawn_local(async move { while let State::Open = ws.state() { if let Some(next) = input.next().await { @@ -147,7 +197,9 @@ impl ClientHandler<'static> for WebSocketHandler { "ws://usdpl-ws-{}.localhost:{}/{}.{}/{}", id, self.port, package, service, method, ); - let (tx, rx) = async_channel::bounded(CHANNEL_BOUND); + #[cfg(feature = "debug")] + web_sys::console::log_1(&format!("doing send/receive on ws url `{}`", url).into()); + let (tx, rx) = futures_channel::mpsc::channel(CHANNEL_BOUND); spawn_local(send_recv_ws(tx, url, input)); Ok(Box::new(rx.map(|buf_result: Result| buf_result diff --git a/usdpl-front/src/convert.rs b/usdpl-front/src/convert.rs index f0cc660..a59e6fa 100644 --- a/usdpl-front/src/convert.rs +++ b/usdpl-front/src/convert.rs @@ -1,10 +1,10 @@ -use js_sys::JsString; -use js_sys::JSON::{parse, stringify}; +//use js_sys::JsString; +//use js_sys::JSON::{parse, stringify}; use wasm_bindgen::prelude::JsValue; -use usdpl_core::serdes::Primitive; +//use usdpl_core::serdes::Primitive; -pub(crate) fn primitive_to_js(primitive: Primitive) -> JsValue { +/*pub(crate) fn primitive_to_js(primitive: Primitive) -> JsValue { match primitive { Primitive::Empty => JsValue::null(), Primitive::String(s) => JsValue::from_str(&s), @@ -33,11 +33,11 @@ pub(crate) fn js_to_primitive(val: JsValue) -> Primitive { } else { Primitive::Empty } -} +}*/ -pub(crate) fn str_to_js(s: S) -> JsString { +/*pub(crate) fn str_to_js(s: S) -> JsString { s.to_string().into() -} +}*/ pub(crate) fn js_to_str(js: JsValue) -> String { if let Some(s) = js.as_string() { diff --git a/usdpl-front/src/imports.rs b/usdpl-front/src/imports.rs deleted file mode 100644 index 8e13b48..0000000 --- a/usdpl-front/src/imports.rs +++ /dev/null @@ -1,16 +0,0 @@ -use wasm_bindgen::prelude::*; - -#[wasm_bindgen] -extern "C" { - #[cfg(feature = "debug")] - #[wasm_bindgen(js_namespace = console, js_name = log)] - pub fn console_log(s: &str); - - #[cfg(feature = "debug")] - #[wasm_bindgen(js_namespace = console, js_name = warn)] - pub fn console_warn(s: &str); - - #[cfg(feature = "debug")] - #[wasm_bindgen(js_namespace = console, js_name = error)] - pub fn console_error(s: &str); -} diff --git a/usdpl-front/src/lib.rs b/usdpl-front/src/lib.rs index 27a75f8..aa2fd5d 100644 --- a/usdpl-front/src/lib.rs +++ b/usdpl-front/src/lib.rs @@ -7,9 +7,8 @@ mod client_handler; pub use client_handler::WebSocketHandler; -mod connection; +//mod connection; mod convert; -mod imports; pub mod wasm; /*#[allow(missing_docs)] // existence is pain otherwise @@ -27,21 +26,21 @@ pub mod _helpers { pub use nrpc; } -use std::sync::atomic::{AtomicU64, Ordering}; +//use std::sync::atomic::{AtomicU64, Ordering}; -use js_sys::Array; +//use js_sys::Array; use wasm_bindgen::prelude::*; -use usdpl_core::{socket::Packet, RemoteCall}; +//use usdpl_core::{socket::Packet, RemoteCall}; //const REMOTE_CALL_ID: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); //const REMOTE_PORT: std::sync::atomic::AtomicU16 = std::sync::atomic::AtomicU16::new(31337); -static mut CTX: UsdplContext = UsdplContext { +/*static mut CTX: UsdplContext = UsdplContext { port: 0, - id: AtomicU64::new(0), + //id: AtomicU64::new(0), #[cfg(feature = "encrypt")] key: Vec::new(), -}; +};*/ static mut CACHE: Option> = None; @@ -52,6 +51,7 @@ fn encryption_key() -> Vec { hex::decode(obfstr::obfstr!(env!("USDPL_ENCRYPTION_KEY"))).unwrap() } +/* //#[wasm_bindgen] #[derive(Debug)] struct UsdplContext { @@ -68,33 +68,39 @@ fn get_port() -> u16 { #[cfg(feature = "encrypt")] fn get_key() -> Vec { unsafe { CTX.key.clone() } -} +}*/ -fn increment_id() -> u64 { +/*fn increment_id() -> u64 { let atomic = unsafe { &CTX.id }; atomic.fetch_add(1, Ordering::SeqCst) -} +}*/ /// Initialize the front-end library #[wasm_bindgen] pub fn init_usdpl(port: u16) { + #[cfg(feature = "debug")] + web_sys::console::log_1(&format!("init_usdpl(port={})", port).into()); #[cfg(feature = "console_error_panic_hook")] console_error_panic_hook::set_once(); #[cfg(feature = "console_log")] console_log::init_with_level(log::Level::Debug).expect("USDPL: error initializing console log"); - //REMOTE_PORT.store(port, std::sync::atomic::Ordering::SeqCst); - unsafe { + + /*unsafe { CTX = UsdplContext { port: port, - id: AtomicU64::new(0), + //id: AtomicU64::new(0), #[cfg(feature = "encrypt")] key: encryption_key(), }; - } + }*/ unsafe { CACHE = Some(std::collections::HashMap::new()); } + + #[cfg(feature = "debug")] + web_sys::console::log_1(&format!("USDPL:{} init succeeded", port).into()); + log::info!("USDPL:{} init succeeded", port); } /// Get the targeted plugin framework, or "any" if unknown @@ -134,16 +140,17 @@ pub fn get_value(key: String) -> JsValue { } } +/* /// Call a function on the back-end. /// Returns null (None) if this fails for any reason. #[wasm_bindgen] pub async fn call_backend(name: String, parameters: Vec) -> JsValue { #[cfg(feature = "debug")] - imports::console_log(&format!( + web_sys::console::log_1(&format!( "call_backend({}, [params; {}])", name, parameters.len() - )); + ).into()); let next_id = increment_id(); let mut params = Vec::with_capacity(parameters.len()); for val in parameters { @@ -151,7 +158,7 @@ pub async fn call_backend(name: String, parameters: Vec) -> JsValue { } let port = get_port(); #[cfg(feature = "debug")] - imports::console_log(&format!("USDPL: Got port {}", port)); + web_sys::console::log_1(&format!("USDPL: Got port {}", port).into()); let results = connection::send_call( next_id, Packet::Call(RemoteCall { @@ -169,7 +176,7 @@ pub async fn call_backend(name: String, parameters: Vec) -> JsValue { #[allow(unused_variables)] Err(e) => { #[cfg(feature = "debug")] - imports::console_error(&format!("USDPL: Got error while calling {}: {:?}", name, e)); + web_sys::console::error_1(&format!("USDPL: Got error while calling {}: {:?}", name, e).into()); return JsValue::NULL; } }; @@ -197,7 +204,7 @@ pub async fn init_tr(locale: String) { { Ok(Packet::Translations(translations)) => { #[cfg(feature = "debug")] - imports::console_log(&format!("USDPL: Got translations for {}", locale)); + web_sys::console::log_1(&format!("USDPL: Got translations for {}", locale).into()); // convert translations into map let mut tr_map = std::collections::HashMap::with_capacity(translations.len()); for (key, val) in translations { @@ -207,17 +214,17 @@ pub async fn init_tr(locale: String) { } Ok(_) => { #[cfg(feature = "debug")] - imports::console_error(&format!("USDPL: Got wrong packet response for init_tr")); + web_sys::console::error_1(&format!("USDPL: Got wrong packet response for init_tr").into()); unsafe { TRANSLATIONS = None } } #[allow(unused_variables)] Err(e) => { #[cfg(feature = "debug")] - imports::console_error(&format!("USDPL: Got wrong error for init_tr: {:#?}", e)); + web_sys::console::error_1(&format!("USDPL: Got wrong error for init_tr: {:#?}", e).into()); unsafe { TRANSLATIONS = None } } } -} +}*/ /// Translate a phrase, equivalent to tr_n(msg_id, 0) #[wasm_bindgen]