From 7e2aa1ccb707dff5775f46de8948dd0ab59444f8 Mon Sep 17 00:00:00 2001 From: "NGnius (Graham)" Date: Mon, 12 Sep 2022 20:45:42 -0400 Subject: [PATCH] Scalability improvements for kaylon --- Cargo.lock | 29 +++++- usdpl-back/Cargo.toml | 8 +- usdpl-back/src/callable.rs | 80 ++++++++++++++- usdpl-back/src/instance.rs | 179 ++++++++++++++++++---------------- usdpl-back/src/lib.rs | 3 +- usdpl-front/Cargo.toml | 2 +- usdpl-front/src/connection.rs | 3 +- usdpl-front/src/lib.rs | 1 + 8 files changed, 211 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9847ad..00979ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,28 @@ dependencies = [ "zeroize", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -1026,10 +1048,13 @@ dependencies = [ [[package]] name = "usdpl-back" -version = "0.6.0" +version = "0.7.0" dependencies = [ + "async-recursion", + "async-trait", "bytes", "hex", + "log", "obfstr", "tokio", "usdpl-core", @@ -1047,7 +1072,7 @@ dependencies = [ [[package]] name = "usdpl-front" -version = "0.6.2" +version = "0.7.0" dependencies = [ "console_error_panic_hook", "hex", diff --git a/usdpl-back/Cargo.toml b/usdpl-back/Cargo.toml index 7c98345..a03494e 100644 --- a/usdpl-back/Cargo.toml +++ b/usdpl-back/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "usdpl-back" -version = "0.6.0" +version = "0.7.0" edition = "2021" license = "GPL-3.0-only" repository = "https://github.com/NGnius/usdpl-rs" @@ -17,11 +17,17 @@ encrypt = ["usdpl-core/encrypt", "obfstr", "hex"] [dependencies] usdpl-core = { version = "0.6.0", path = "../usdpl-core"} +log = "0.4" + # HTTP web framework warp = { version = "0.3" } bytes = { version = "1.1" } tokio = { version = "1.19", features = ["rt", "rt-multi-thread"], optional = true } +# this is why people don't like async +async-trait = "0.1.57" +async-recursion = "1.0.0" + # encryption helpers obfstr = { version = "0.3", optional = true } hex = { version = "0.4", optional = true } diff --git a/usdpl-back/src/callable.rs b/usdpl-back/src/callable.rs index a3fdd73..2a954c2 100644 --- a/usdpl-back/src/callable.rs +++ b/usdpl-back/src/callable.rs @@ -1,13 +1,87 @@ +use std::sync::{Arc, Mutex}; + use usdpl_core::serdes::Primitive; -/// A function which can be called from the front-end (remotely) -pub trait Callable: Send + Sync { +/// A mutable function which can be called from the front-end (remotely) +pub trait MutCallable: Send + Sync { /// Invoke the function fn call(&mut self, params: Vec) -> Vec; } -impl) -> Vec) + Send + Sync> Callable for F { +impl) -> Vec) + Send + Sync> MutCallable for F { fn call(&mut self, params: Vec) -> Vec { (self)(params) } } + +/// A function which can be called from the front-end (remotely) +pub trait Callable: Send + Sync { + /// Invoke the function + fn call(&self, params: Vec) -> Vec; +} + +impl) -> Vec) + Send + Sync> Callable for F { + fn call(&self, params: Vec) -> Vec { + (self)(params) + } +} + +/// An async function which can be called from the front-end (remotely) +#[async_trait::async_trait] +pub trait AsyncCallable: Send + Sync { + /// Invoke the function + async fn call(&self, params: Vec) -> Vec; +} + +#[async_trait::async_trait] +impl) -> A) + Send + Sync, A: core::future::Future> + Send> AsyncCallable for F { + async fn call(&self, params: Vec) -> Vec { + (self)(params).await + } +} + +pub enum WrappedCallable { + Blocking(Arc>>), + Ref(Arc>), + Async(Arc>), +} + +impl WrappedCallable { + pub fn new_ref(callable: T) -> Self { + Self::Ref(Arc::new(Box::new(callable))) + } + + pub fn new_locking(callable: T) -> Self { + Self::Blocking(Arc::new(Mutex::new(Box::new(callable)))) + } + + pub fn new_async(callable: T) -> Self { + Self::Async(Arc::new(Box::new(callable))) + } +} + +impl Clone for WrappedCallable { + fn clone(&self) -> Self { + match self { + Self::Blocking(x) => Self::Blocking(x.clone()), + Self::Ref(x) => Self::Ref(x.clone()), + Self::Async(x) => Self::Async(x.clone()), + } + } +} + +#[async_trait::async_trait] +impl AsyncCallable for WrappedCallable { + async fn call(&self, params: Vec) -> Vec { + match self { + Self::Blocking(mut_callable) => { + mut_callable + .lock() + .expect("Failed to acquire mut_callable lock") + .call(params) + }, + Self::Ref(callable) => callable.call(params), + Self::Async(async_callable) => async_callable.call(params).await, + } + } +} diff --git a/usdpl-back/src/instance.rs b/usdpl-back/src/instance.rs index f7d673f..40f6773 100644 --- a/usdpl-back/src/instance.rs +++ b/usdpl-back/src/instance.rs @@ -1,15 +1,13 @@ use std::collections::HashMap; -use std::sync::Arc; -use std::sync::Mutex; use warp::Filter; use usdpl_core::serdes::{Dumpable, Loadable}; use usdpl_core::{socket, RemoteCallResponse}; -use super::Callable; +use super::{Callable, MutCallable, AsyncCallable, WrappedCallable}; -type WrappedCallable = Arc>>; // thread-safe, cloneable Callable +//type WrappedCallable = Arc>>; // thread-safe, cloneable Callable #[cfg(feature = "encrypt")] const NONCE: [u8; socket::NONCE_SIZE] = [0u8; socket::NONCE_SIZE]; @@ -34,25 +32,36 @@ impl Instance { } } - /// Register a function which can be invoked by the front-end, builder style + /// Register a thread-safe function which can be invoked by the front-end pub fn register, F: Callable + 'static>( mut self, name: S, f: F, ) -> Self { self.calls - .insert(name.into(), Arc::new(Mutex::new(Box::new(f)))); + .insert(name.into(), WrappedCallable::new_ref(f)); self } - /// Register a function which can be invoked by the front-end, object style - pub fn register_mut, F: Callable + 'static>( - &mut self, + /// Register a thread-unsafe function which can be invoked by the front-end + pub fn register_blocking, F: MutCallable + 'static>( + mut self, name: S, f: F, - ) -> &mut Self { + ) -> Self { self.calls - .insert(name.into(), Arc::new(Mutex::new(Box::new(f)))); + .insert(name.into(), WrappedCallable::new_locking(f)); + self + } + + /// Register a thread-unsafe function which can be invoked by the front-end + pub fn register_async, F: AsyncCallable + 'static>( + mut self, + name: S, + f: F, + ) -> Self { + self.calls + .insert(name.into(), WrappedCallable::new_async(f)); self } @@ -72,18 +81,17 @@ impl Instance { self.serve_internal().await } - fn handle_call( + #[async_recursion::async_recursion] + async fn handle_call( packet: socket::Packet, handlers: &HashMap, ) -> socket::Packet { match packet { socket::Packet::Call(call) => { + log::info!("Got USDPL call {} (`{}`, params: {})", call.id, call.function, call.parameters.len()); //let handlers = CALLS.lock().expect("Failed to acquire CALLS lock"); if let Some(target) = handlers.get(&call.function) { - let result = target - .lock() - .expect("Failed to acquire CALLS.function lock") - .call(call.parameters); + let result = target.call(call.parameters).await; socket::Packet::CallResponse(RemoteCallResponse { id: call.id, response: result, @@ -95,7 +103,7 @@ impl Instance { socket::Packet::Many(packets) => { let mut result = Vec::with_capacity(packets.len()); for packet in packets { - result.push(Self::handle_call(packet, handlers)); + result.push(Self::handle_call(packet, handlers).await); } socket::Packet::Many(result) } @@ -103,87 +111,88 @@ impl Instance { } } + #[cfg(not(feature = "encrypt"))] + async fn process_body((data, handlers): (bytes::Bytes, HashMap)) -> impl warp::Reply { + let (packet, _) = match socket::Packet::load_base64(&data) { + Ok(x) => x, + Err(e) => { + return warp::reply::with_status( + warp::http::Response::builder() + .body(format!("Failed to load packet: {}", e)), + warp::http::StatusCode::from_u16(400).unwrap(), + ) + } + }; + //let mut buffer = [0u8; socket::PACKET_BUFFER_SIZE]; + let mut buffer = String::with_capacity(socket::PACKET_BUFFER_SIZE); + let response = Self::handle_call(packet, &handlers).await; + let _len = match response.dump_base64(&mut buffer) { + Ok(x) => x, + Err(e) => { + return warp::reply::with_status( + warp::http::Response::builder() + .body(format!("Failed to dump response packet: {}", e)), + warp::http::StatusCode::from_u16(500).unwrap(), + ) + } + }; + warp::reply::with_status( + warp::http::Response::builder().body(buffer), + warp::http::StatusCode::from_u16(200).unwrap(), + ) + } + + #[cfg(feature = "encrypt")] + async fn process_body((data, handlers, key): (bytes::Bytes, HashMap, Vec)) -> impl warp::Reply { + let (packet, _) = match socket::Packet::load_encrypted(&data, &key, &NONCE) { + Ok(x) => x, + Err(_) => { + return warp::reply::with_status( + warp::http::Response::builder() + .body("Failed to load packet".to_string()), + warp::http::StatusCode::from_u16(400).unwrap(), + ) + } + }; + let mut buffer = Vec::with_capacity(socket::PACKET_BUFFER_SIZE); + //buffer.extend(&[0u8; socket::PACKET_BUFFER_SIZE]); + let response = Self::handle_call(packet, &handlers).await; + let len = match response.dump_encrypted(&mut buffer, &key, &NONCE) { + Ok(x) => x, + Err(_) => { + return warp::reply::with_status( + warp::http::Response::builder() + .body("Failed to dump response packet".to_string()), + warp::http::StatusCode::from_u16(500).unwrap(), + ) + } + }; + buffer.truncate(len); + let string: String = String::from_utf8(buffer).unwrap().into(); + warp::reply::with_status( + warp::http::Response::builder().body(string), + warp::http::StatusCode::from_u16(200).unwrap(), + ) + } + /// Receive and execute callbacks forever async fn serve_internal(&self) -> Result<(), ()> { let handlers = self.calls.clone(); - //self.calls = HashMap::new(); #[cfg(not(feature = "encrypt"))] - let calls = warp::post() - .and(warp::path!("usdpl" / "call")) - .and(warp::body::content_length_limit( - (socket::PACKET_BUFFER_SIZE * 2) as _, - )) - .and(warp::body::bytes()) - .map(move |data: bytes::Bytes| { - let (packet, _) = match socket::Packet::load_base64(&data) { - Ok(x) => x, - Err(e) => { - return warp::reply::with_status( - warp::http::Response::builder() - .body(format!("Failed to load packet: {}", e)), - warp::http::StatusCode::from_u16(400).unwrap(), - ) - } - }; - //let mut buffer = [0u8; socket::PACKET_BUFFER_SIZE]; - let mut buffer = String::with_capacity(socket::PACKET_BUFFER_SIZE); - let response = Self::handle_call(packet, &handlers); - let _len = match response.dump_base64(&mut buffer) { - Ok(x) => x, - Err(e) => { - return warp::reply::with_status( - warp::http::Response::builder() - .body(format!("Failed to dump response packet: {}", e)), - warp::http::StatusCode::from_u16(500).unwrap(), - ) - } - }; - warp::reply::with_status( - warp::http::Response::builder().body(buffer), - warp::http::StatusCode::from_u16(200).unwrap(), - ) - }) - .map(|reply| warp::reply::with_header(reply, "Access-Control-Allow-Origin", "*")); + let input_mapper = move |data: bytes::Bytes| { (data, handlers.clone()) }; #[cfg(feature = "encrypt")] let key = self.encryption_key.clone(); #[cfg(feature = "encrypt")] + let input_mapper = move |data: bytes::Bytes| { (data, handlers.clone(), key.clone()) }; + //self.calls = HashMap::new(); let calls = warp::post() .and(warp::path!("usdpl" / "call")) .and(warp::body::content_length_limit( (socket::PACKET_BUFFER_SIZE * 2) as _, )) .and(warp::body::bytes()) - .map(move |data: bytes::Bytes| { - let (packet, _) = match socket::Packet::load_encrypted(&data, &key, &NONCE) { - Ok(x) => x, - Err(_) => { - return warp::reply::with_status( - warp::http::Response::builder() - .body("Failed to load packet".to_string()), - warp::http::StatusCode::from_u16(400).unwrap(), - ) - } - }; - let mut buffer = Vec::with_capacity(socket::PACKET_BUFFER_SIZE); - //buffer.extend(&[0u8; socket::PACKET_BUFFER_SIZE]); - let response = Self::handle_call(packet, &handlers); - let len = match response.dump_encrypted(&mut buffer, &key, &NONCE) { - Ok(x) => x, - Err(_) => { - return warp::reply::with_status( - warp::http::Response::builder() - .body("Failed to dump response packet".to_string()), - warp::http::StatusCode::from_u16(500).unwrap(), - ) - } - }; - buffer.truncate(len); - let string: String = String::from_utf8(buffer).unwrap().into(); - warp::reply::with_status( - warp::http::Response::builder().body(string), - warp::http::StatusCode::from_u16(200).unwrap(), - ) - }) + .map(input_mapper) + .then(Self::process_body) .map(|reply| warp::reply::with_header(reply, "Access-Control-Allow-Origin", "*")); #[cfg(debug_assertions)] warp::serve(calls).run(([0, 0, 0, 0], self.port)).await; diff --git a/usdpl-back/src/lib.rs b/usdpl-back/src/lib.rs index 74bc944..660a708 100644 --- a/usdpl-back/src/lib.rs +++ b/usdpl-back/src/lib.rs @@ -17,7 +17,8 @@ mod callable; //mod errors; mod instance; -pub use callable::Callable; +pub use callable::{Callable, MutCallable, AsyncCallable}; +pub(crate) use callable::WrappedCallable; pub use instance::Instance; //pub use errors::{ServerError, ServerResult}; diff --git a/usdpl-front/Cargo.toml b/usdpl-front/Cargo.toml index 64bb0e3..294a42e 100644 --- a/usdpl-front/Cargo.toml +++ b/usdpl-front/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "usdpl-front" -version = "0.6.2" +version = "0.7.0" authors = ["NGnius (Graham) "] edition = "2021" license = "GPL-3.0-only" diff --git a/usdpl-front/src/connection.rs b/usdpl-front/src/connection.rs index 2298c8b..6dc1f76 100644 --- a/usdpl-front/src/connection.rs +++ b/usdpl-front/src/connection.rs @@ -17,6 +17,7 @@ use usdpl_core::socket; const NONCE: [u8; socket::NONCE_SIZE]= [0u8; socket::NONCE_SIZE]; pub async fn send_js( + id: u64, packet: socket::Packet, port: u16, #[cfg(feature = "encrypt")] @@ -26,7 +27,7 @@ pub async fn send_js( opts.method("POST"); opts.mode(RequestMode::Cors); - let url = format!("http://{}:{}/usdpl/call", socket::HOST_STR, port); + let url = format!("http://usdpl{}.{}:{}/usdpl/call", id, socket::HOST_STR, port); #[allow(unused_variables)] let (buffer, len) = dump_to_buffer(packet, #[cfg(feature = "encrypt")] key.as_slice())?; diff --git a/usdpl-front/src/lib.rs b/usdpl-front/src/lib.rs index a98ffd8..4631fcc 100644 --- a/usdpl-front/src/lib.rs +++ b/usdpl-front/src/lib.rs @@ -124,6 +124,7 @@ pub async fn call_backend(name: String, parameters: Vec) -> JsValue { #[cfg(feature = "debug")] imports::console_log(&format!("USDPL: Got port {}", port)); let results = connection::send_js( + next_id, Packet::Call(RemoteCall { id: next_id, function: name.clone(),