Scalability improvements for kaylon

This commit is contained in:
NGnius (Graham) 2022-09-12 20:45:42 -04:00
parent ad38087932
commit 7e2aa1ccb7
8 changed files with 211 additions and 94 deletions

29
Cargo.lock generated
View file

@ -38,6 +38,28 @@ dependencies = [
"zeroize",
]
[[package]]
name = "async-recursion"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -1026,10 +1048,13 @@ dependencies = [
[[package]]
name = "usdpl-back"
version = "0.6.0"
version = "0.7.0"
dependencies = [
"async-recursion",
"async-trait",
"bytes",
"hex",
"log",
"obfstr",
"tokio",
"usdpl-core",
@ -1047,7 +1072,7 @@ dependencies = [
[[package]]
name = "usdpl-front"
version = "0.6.2"
version = "0.7.0"
dependencies = [
"console_error_panic_hook",
"hex",

View file

@ -1,6 +1,6 @@
[package]
name = "usdpl-back"
version = "0.6.0"
version = "0.7.0"
edition = "2021"
license = "GPL-3.0-only"
repository = "https://github.com/NGnius/usdpl-rs"
@ -17,11 +17,17 @@ encrypt = ["usdpl-core/encrypt", "obfstr", "hex"]
[dependencies]
usdpl-core = { version = "0.6.0", path = "../usdpl-core"}
log = "0.4"
# HTTP web framework
warp = { version = "0.3" }
bytes = { version = "1.1" }
tokio = { version = "1.19", features = ["rt", "rt-multi-thread"], optional = true }
# this is why people don't like async
async-trait = "0.1.57"
async-recursion = "1.0.0"
# encryption helpers
obfstr = { version = "0.3", optional = true }
hex = { version = "0.4", optional = true }

View file

@ -1,13 +1,87 @@
use std::sync::{Arc, Mutex};
use usdpl_core::serdes::Primitive;
/// A function which can be called from the front-end (remotely)
pub trait Callable: Send + Sync {
/// A mutable function which can be called from the front-end (remotely)
pub trait MutCallable: Send + Sync {
/// Invoke the function
fn call(&mut self, params: Vec<Primitive>) -> Vec<Primitive>;
}
impl<F: (FnMut(Vec<Primitive>) -> Vec<Primitive>) + Send + Sync> Callable for F {
impl<F: (FnMut(Vec<Primitive>) -> Vec<Primitive>) + Send + Sync> MutCallable for F {
fn call(&mut self, params: Vec<Primitive>) -> Vec<Primitive> {
(self)(params)
}
}
/// A function which can be called from the front-end (remotely)
pub trait Callable: Send + Sync {
/// Invoke the function
fn call(&self, params: Vec<Primitive>) -> Vec<Primitive>;
}
impl<F: (Fn(Vec<Primitive>) -> Vec<Primitive>) + Send + Sync> Callable for F {
fn call(&self, params: Vec<Primitive>) -> Vec<Primitive> {
(self)(params)
}
}
/// An async function which can be called from the front-end (remotely)
#[async_trait::async_trait]
pub trait AsyncCallable: Send + Sync {
/// Invoke the function
async fn call(&self, params: Vec<Primitive>) -> Vec<Primitive>;
}
#[async_trait::async_trait]
impl<F: (Fn(Vec<Primitive>) -> A) + Send + Sync, A: core::future::Future<Output=Vec<Primitive>> + Send> AsyncCallable for F {
async fn call(&self, params: Vec<Primitive>) -> Vec<Primitive> {
(self)(params).await
}
}
pub enum WrappedCallable {
Blocking(Arc<Mutex<Box<dyn MutCallable>>>),
Ref(Arc<Box<dyn Callable>>),
Async(Arc<Box<dyn AsyncCallable>>),
}
impl WrappedCallable {
pub fn new_ref<T: Callable + 'static>(callable: T) -> Self {
Self::Ref(Arc::new(Box::new(callable)))
}
pub fn new_locking<T: MutCallable + 'static>(callable: T) -> Self {
Self::Blocking(Arc::new(Mutex::new(Box::new(callable))))
}
pub fn new_async<T: AsyncCallable + 'static>(callable: T) -> Self {
Self::Async(Arc::new(Box::new(callable)))
}
}
impl Clone for WrappedCallable {
fn clone(&self) -> Self {
match self {
Self::Blocking(x) => Self::Blocking(x.clone()),
Self::Ref(x) => Self::Ref(x.clone()),
Self::Async(x) => Self::Async(x.clone()),
}
}
}
#[async_trait::async_trait]
impl AsyncCallable for WrappedCallable {
async fn call(&self, params: Vec<Primitive>) -> Vec<Primitive> {
match self {
Self::Blocking(mut_callable) => {
mut_callable
.lock()
.expect("Failed to acquire mut_callable lock")
.call(params)
},
Self::Ref(callable) => callable.call(params),
Self::Async(async_callable) => async_callable.call(params).await,
}
}
}

View file

@ -1,15 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use warp::Filter;
use usdpl_core::serdes::{Dumpable, Loadable};
use usdpl_core::{socket, RemoteCallResponse};
use super::Callable;
use super::{Callable, MutCallable, AsyncCallable, WrappedCallable};
type WrappedCallable = Arc<Mutex<Box<dyn Callable>>>; // thread-safe, cloneable Callable
//type WrappedCallable = Arc<Mutex<Box<dyn Callable>>>; // thread-safe, cloneable Callable
#[cfg(feature = "encrypt")]
const NONCE: [u8; socket::NONCE_SIZE] = [0u8; socket::NONCE_SIZE];
@ -34,25 +32,36 @@ impl Instance {
}
}
/// Register a function which can be invoked by the front-end, builder style
/// Register a thread-safe function which can be invoked by the front-end
pub fn register<S: std::convert::Into<String>, F: Callable + 'static>(
mut self,
name: S,
f: F,
) -> Self {
self.calls
.insert(name.into(), Arc::new(Mutex::new(Box::new(f))));
.insert(name.into(), WrappedCallable::new_ref(f));
self
}
/// Register a function which can be invoked by the front-end, object style
pub fn register_mut<S: std::convert::Into<String>, F: Callable + 'static>(
&mut self,
/// Register a thread-unsafe function which can be invoked by the front-end
pub fn register_blocking<S: std::convert::Into<String>, F: MutCallable + 'static>(
mut self,
name: S,
f: F,
) -> &mut Self {
) -> Self {
self.calls
.insert(name.into(), Arc::new(Mutex::new(Box::new(f))));
.insert(name.into(), WrappedCallable::new_locking(f));
self
}
/// Register a thread-unsafe function which can be invoked by the front-end
pub fn register_async<S: std::convert::Into<String>, F: AsyncCallable + 'static>(
mut self,
name: S,
f: F,
) -> Self {
self.calls
.insert(name.into(), WrappedCallable::new_async(f));
self
}
@ -72,18 +81,17 @@ impl Instance {
self.serve_internal().await
}
fn handle_call(
#[async_recursion::async_recursion]
async fn handle_call(
packet: socket::Packet,
handlers: &HashMap<String, WrappedCallable>,
) -> socket::Packet {
match packet {
socket::Packet::Call(call) => {
log::info!("Got USDPL call {} (`{}`, params: {})", call.id, call.function, call.parameters.len());
//let handlers = CALLS.lock().expect("Failed to acquire CALLS lock");
if let Some(target) = handlers.get(&call.function) {
let result = target
.lock()
.expect("Failed to acquire CALLS.function lock")
.call(call.parameters);
let result = target.call(call.parameters).await;
socket::Packet::CallResponse(RemoteCallResponse {
id: call.id,
response: result,
@ -95,7 +103,7 @@ impl Instance {
socket::Packet::Many(packets) => {
let mut result = Vec::with_capacity(packets.len());
for packet in packets {
result.push(Self::handle_call(packet, handlers));
result.push(Self::handle_call(packet, handlers).await);
}
socket::Packet::Many(result)
}
@ -103,87 +111,88 @@ impl Instance {
}
}
#[cfg(not(feature = "encrypt"))]
async fn process_body((data, handlers): (bytes::Bytes, HashMap<String, WrappedCallable>)) -> impl warp::Reply {
let (packet, _) = match socket::Packet::load_base64(&data) {
Ok(x) => x,
Err(e) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body(format!("Failed to load packet: {}", e)),
warp::http::StatusCode::from_u16(400).unwrap(),
)
}
};
//let mut buffer = [0u8; socket::PACKET_BUFFER_SIZE];
let mut buffer = String::with_capacity(socket::PACKET_BUFFER_SIZE);
let response = Self::handle_call(packet, &handlers).await;
let _len = match response.dump_base64(&mut buffer) {
Ok(x) => x,
Err(e) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body(format!("Failed to dump response packet: {}", e)),
warp::http::StatusCode::from_u16(500).unwrap(),
)
}
};
warp::reply::with_status(
warp::http::Response::builder().body(buffer),
warp::http::StatusCode::from_u16(200).unwrap(),
)
}
#[cfg(feature = "encrypt")]
async fn process_body((data, handlers, key): (bytes::Bytes, HashMap<String, WrappedCallable>, Vec<u8>)) -> impl warp::Reply {
let (packet, _) = match socket::Packet::load_encrypted(&data, &key, &NONCE) {
Ok(x) => x,
Err(_) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body("Failed to load packet".to_string()),
warp::http::StatusCode::from_u16(400).unwrap(),
)
}
};
let mut buffer = Vec::with_capacity(socket::PACKET_BUFFER_SIZE);
//buffer.extend(&[0u8; socket::PACKET_BUFFER_SIZE]);
let response = Self::handle_call(packet, &handlers).await;
let len = match response.dump_encrypted(&mut buffer, &key, &NONCE) {
Ok(x) => x,
Err(_) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body("Failed to dump response packet".to_string()),
warp::http::StatusCode::from_u16(500).unwrap(),
)
}
};
buffer.truncate(len);
let string: String = String::from_utf8(buffer).unwrap().into();
warp::reply::with_status(
warp::http::Response::builder().body(string),
warp::http::StatusCode::from_u16(200).unwrap(),
)
}
/// Receive and execute callbacks forever
async fn serve_internal(&self) -> Result<(), ()> {
let handlers = self.calls.clone();
//self.calls = HashMap::new();
#[cfg(not(feature = "encrypt"))]
let calls = warp::post()
.and(warp::path!("usdpl" / "call"))
.and(warp::body::content_length_limit(
(socket::PACKET_BUFFER_SIZE * 2) as _,
))
.and(warp::body::bytes())
.map(move |data: bytes::Bytes| {
let (packet, _) = match socket::Packet::load_base64(&data) {
Ok(x) => x,
Err(e) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body(format!("Failed to load packet: {}", e)),
warp::http::StatusCode::from_u16(400).unwrap(),
)
}
};
//let mut buffer = [0u8; socket::PACKET_BUFFER_SIZE];
let mut buffer = String::with_capacity(socket::PACKET_BUFFER_SIZE);
let response = Self::handle_call(packet, &handlers);
let _len = match response.dump_base64(&mut buffer) {
Ok(x) => x,
Err(e) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body(format!("Failed to dump response packet: {}", e)),
warp::http::StatusCode::from_u16(500).unwrap(),
)
}
};
warp::reply::with_status(
warp::http::Response::builder().body(buffer),
warp::http::StatusCode::from_u16(200).unwrap(),
)
})
.map(|reply| warp::reply::with_header(reply, "Access-Control-Allow-Origin", "*"));
let input_mapper = move |data: bytes::Bytes| { (data, handlers.clone()) };
#[cfg(feature = "encrypt")]
let key = self.encryption_key.clone();
#[cfg(feature = "encrypt")]
let input_mapper = move |data: bytes::Bytes| { (data, handlers.clone(), key.clone()) };
//self.calls = HashMap::new();
let calls = warp::post()
.and(warp::path!("usdpl" / "call"))
.and(warp::body::content_length_limit(
(socket::PACKET_BUFFER_SIZE * 2) as _,
))
.and(warp::body::bytes())
.map(move |data: bytes::Bytes| {
let (packet, _) = match socket::Packet::load_encrypted(&data, &key, &NONCE) {
Ok(x) => x,
Err(_) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body("Failed to load packet".to_string()),
warp::http::StatusCode::from_u16(400).unwrap(),
)
}
};
let mut buffer = Vec::with_capacity(socket::PACKET_BUFFER_SIZE);
//buffer.extend(&[0u8; socket::PACKET_BUFFER_SIZE]);
let response = Self::handle_call(packet, &handlers);
let len = match response.dump_encrypted(&mut buffer, &key, &NONCE) {
Ok(x) => x,
Err(_) => {
return warp::reply::with_status(
warp::http::Response::builder()
.body("Failed to dump response packet".to_string()),
warp::http::StatusCode::from_u16(500).unwrap(),
)
}
};
buffer.truncate(len);
let string: String = String::from_utf8(buffer).unwrap().into();
warp::reply::with_status(
warp::http::Response::builder().body(string),
warp::http::StatusCode::from_u16(200).unwrap(),
)
})
.map(input_mapper)
.then(Self::process_body)
.map(|reply| warp::reply::with_header(reply, "Access-Control-Allow-Origin", "*"));
#[cfg(debug_assertions)]
warp::serve(calls).run(([0, 0, 0, 0], self.port)).await;

View file

@ -17,7 +17,8 @@ mod callable;
//mod errors;
mod instance;
pub use callable::Callable;
pub use callable::{Callable, MutCallable, AsyncCallable};
pub(crate) use callable::WrappedCallable;
pub use instance::Instance;
//pub use errors::{ServerError, ServerResult};

View file

@ -1,6 +1,6 @@
[package]
name = "usdpl-front"
version = "0.6.2"
version = "0.7.0"
authors = ["NGnius (Graham) <ngniusness@gmail.com>"]
edition = "2021"
license = "GPL-3.0-only"

View file

@ -17,6 +17,7 @@ use usdpl_core::socket;
const NONCE: [u8; socket::NONCE_SIZE]= [0u8; socket::NONCE_SIZE];
pub async fn send_js(
id: u64,
packet: socket::Packet,
port: u16,
#[cfg(feature = "encrypt")]
@ -26,7 +27,7 @@ pub async fn send_js(
opts.method("POST");
opts.mode(RequestMode::Cors);
let url = format!("http://{}:{}/usdpl/call", socket::HOST_STR, port);
let url = format!("http://usdpl{}.{}:{}/usdpl/call", id, socket::HOST_STR, port);
#[allow(unused_variables)]
let (buffer, len) = dump_to_buffer(packet, #[cfg(feature = "encrypt")] key.as_slice())?;

View file

@ -124,6 +124,7 @@ pub async fn call_backend(name: String, parameters: Vec<JsValue>) -> JsValue {
#[cfg(feature = "debug")]
imports::console_log(&format!("USDPL: Got port {}", port));
let results = connection::send_js(
next_id,
Packet::Call(RemoteCall {
id: next_id,
function: name.clone(),