Add merged storage impl

This commit is contained in:
NGnius (Graham) 2022-12-17 00:03:41 -05:00
parent 428b994f71
commit 60d31ea030
10 changed files with 442 additions and 70 deletions

View file

@ -1,15 +1,11 @@
use clap::{Parser, Subcommand, Args};
//use std::io::Write as _;
use std::fmt::Write as _;
/// An alternative plugin store
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None, propagate_version = true)]
pub struct CliArgs {
/// Proxy offerings from another store
#[arg(name = "store", long)]
pub proxy_store: Option<String>,
/// Proxy main store offerings
#[arg(name = "proxy", short, long)]
pub proxy: bool,
/// Cache results for a period
#[arg(name = "cache", long)]
pub cache_duration: Option<i64>,
@ -30,6 +26,41 @@ pub enum StorageArgs {
Default,
/// Use the filesystem
Filesystem(FilesystemArgs),
/// Use an existing online store
Proxy(ProxyArgs),
/// Use no storage system
Empty,
/// Combine multiple storages together
Merge(MergeArgs)
}
impl StorageArgs {
// A cursed syntax with super simple parsing for describing storage settings
pub fn from_descriptor(chars: &mut std::str::Chars) -> Result<Self, String> {
//let mut chars = descriptor.chars();
if let Some(char0) = chars.next() {
Ok(match char0 {
'd' | '_' => Self::Default,
'f' => Self::Filesystem(FilesystemArgs::from_descriptor(chars)?),
'p' => Self::Proxy(ProxyArgs::from_descriptor(chars)?),
'e' | ' ' => Self::Empty,
'm' | '+' => Self::Merge(MergeArgs::from_descriptor(chars)?),
c => return Err(format!("Unexpected char {}, expected a descriptor prefix from {{d f p e m}}", c)),
})
} else {
Err(format!("Empty storage descriptor"))
}
}
pub fn to_descriptor(self) -> String {
match self {
Self::Default => "d".to_owned(),
Self::Filesystem(fs) => format!("f{}", fs.to_descriptor()),
Self::Proxy(px) => format!("p{}", px.to_descriptor()),
Self::Empty => "e".to_owned(),
Self::Merge(ls) => format!("m{}", ls.to_descriptor()),
}
}
}
#[derive(Args, Debug, Clone)]
@ -41,3 +72,203 @@ pub struct FilesystemArgs {
#[arg(name = "stats", long)]
pub enable_stats: bool,
}
impl FilesystemArgs {
fn from_descriptor(chars: &mut std::str::Chars) -> Result<Self, String> {
if let Some(char1) = chars.next() {
if char1 != '{' {
return Err(format!("Expected {{, got {}", char1));
}
} else {
return Err(format!("Filesystem descriptor too short"));
}
let mut root = None;
let mut domain = None;
let mut stats = false;
let mut buffer = Vec::<char>::new();
let mut for_variable: Option<String> = None;
let mut in_string = false;
for c in chars {
match c {
'}' => return
Ok(Self {
root: root.unwrap_or_else(|| "./store".into()),
domain_root: domain.unwrap_or_else(|| "http://localhost:22252".into()),
enable_stats: stats,
}),
'\'' => in_string = !in_string,
'=' => if !in_string {
let value: String = buffer.drain(..).collect();
if for_variable.is_some() {
return Err("Unexpected = in filesystem descriptor".to_owned());
} else {
for_variable = Some(value);
}
},
',' => if !in_string {
let value: String = buffer.iter().collect();
if let Some(var) = for_variable.take() {
let var_trimmed = var.trim();
match &var_trimmed as &str {
"r" | "root" => root = Some(value),
"d" | "domain" => domain = Some(value),
"s" | "stats" => stats = value == "1" || value == "y",
v => return Err(format!("Unexpected variable name {} in filesystem descriptor", v)),
}
} else {
return Err("Unexpected , in filesystem descriptor".to_owned())
}
}
c => buffer.push(c),
}
}
Err("Unexpected end of descriptor".to_owned())
}
fn to_descriptor(self) -> String {
format!("{{root='{}',domain='{}',stats:{}}}", self.root, self.domain_root, self.enable_stats as u8)
}
}
#[derive(Args, Debug, Clone)]
pub struct ProxyArgs {
/// Proxy offerings from another store
#[arg(name = "store", long, default_value_t = {"https://plugins.deckbrew.xyz".into()})]
pub proxy_store: String,
}
impl ProxyArgs {
fn from_descriptor(chars: &mut std::str::Chars) -> Result<Self, String> {
if let Some(char1) = chars.next() {
if char1 != '{' {
return Err(format!("Expected {{, got {}", char1));
}
} else {
return Err(format!("Proxy descriptor too short"));
}
let mut buffer = Vec::new();
for c in chars {
match c {
'}' => return
Ok(Self {
proxy_store: if buffer.is_empty() { "https://plugins.deckbrew.xyz".into() } else { buffer.iter().collect() }
}),
c => buffer.push(c),
}
}
Err("Unexpected end of descriptor".to_owned())
}
fn to_descriptor(self) -> String {
format!("{{{}}}", self.proxy_store)
}
}
#[derive(Args, Debug, Clone)]
pub struct MergeArgs {
/// Settings descriptor
pub settings: Vec<String>,
}
impl MergeArgs {
fn from_descriptor(chars: &mut std::str::Chars) -> Result<Self, String> {
if let Some(char1) = chars.next() {
if char1 != '[' {
return Err(format!("Expected [, got {}", char1));
}
} else {
return Err(format!("Merge descriptor too short"));
}
let mut others = Vec::new();
loop {
if let Some(char_n) = chars.next() {
if char_n != '(' {
return Err(format!("Expected (, got {}", char_n));
}
}
others.push(StorageArgs::from_descriptor(chars)?.to_descriptor());
if let Some(char_n) = chars.next() {
if char_n != ')' {
return Err(format!("Expected ), got {}", char_n));
}
}
if let Some(c) = chars.next() {
match c {
',' => {},
']' => {
if others.len() < 2 {
return Err("Merge args too short (0-1 descriptors is useless!)".to_owned());
}
return Ok(Self {
settings: others,
});
},
c => return Err(format!("Unexpected char {}, expected ] or ,", c))
}
} else {
break;
}
}
Err("Unexpected end of descriptor".to_owned())
}
fn to_descriptor(self) -> String {
let mut out = "[".to_owned();
for descriptor in self.settings {
write!(&mut out, "({})", descriptor).unwrap();
}
write!(&mut out, "]").unwrap();
out
}
pub fn generate_args(&self) -> Result<Vec<StorageArgs>, String> {
let mut results = Vec::with_capacity(self.settings.len());
for args in &self.settings {
results.push(StorageArgs::from_descriptor(&mut args.chars())?);
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn storage_descriptor() {
let descriptor = "f{root='',domain='',stats:0}";
let parsed = StorageArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("StorageArgs parse error");
let descriptor = "p{}";
let parsed = StorageArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("StorageArgs parse error");
let descriptor = "m[(p{}),(d)]";
let parsed = StorageArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("StorageArgs parse error");
let descriptor = "d";
let parsed = StorageArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("StorageArgs parse error");
}
#[test]
fn filesys_descriptor() {
let descriptor = "{root='',domain='',stats:0}";
let parsed = FilesystemArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("FilesystemArgs parse error");
}
#[test]
fn proxy_descriptor() {
let descriptor = "{}";
let parsed = ProxyArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("ProxyArgs parse error");
}
#[test]
fn merge_descriptor() {
let descriptor = "[(f{}),(p{}),( )]";
let parsed = MergeArgs::from_descriptor(&mut descriptor.chars());
parsed.expect("MergeArgs parse error");
}
}

View file

@ -3,8 +3,6 @@ mod consts;
mod not_decky;
mod storage;
use crate::storage::IStorageWrap;
use actix_web::{get, web, App, HttpResponse, HttpServer, Responder};
use simplelog::{LevelFilter, WriteLogger};
@ -13,6 +11,32 @@ async fn hello() -> impl Responder {
HttpResponse::Ok().body(format!("{} v{}", consts::PACKAGE_NAME, consts::PACKAGE_VERSION))
}
fn build_storage_box(storage: &cli::StorageArgs) -> Box<dyn storage::IStorage> {
match storage {
cli::StorageArgs::Default => Box::new(storage::FileStorage::new(
"./store".into(),
"http://192.168.0.128:22252".into(),
true,
)),
cli::StorageArgs::Filesystem(fs) => Box::new(storage::FileStorage::new(
fs.root.clone().into(),
fs.domain_root.clone().into(),
fs.enable_stats,
)),
cli::StorageArgs::Proxy(px) => Box::new(storage::ProxiedStorage::new(
px.proxy_store.clone(),
)),
cli::StorageArgs::Empty => Box::new(storage::EmptyStorage),
cli::StorageArgs::Merge(ls) => Box::new(storage::MergedStorage::new(
ls.generate_args()
.expect("Bad descriptor")
.drain(..)
.map(|args| build_storage_box(&args))
.collect()
))
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let args = cli::CliArgs::get();
@ -35,17 +59,12 @@ async fn main() -> std::io::Result<()> {
.allow_any_header()
.expose_any_header();
let storage_data: Box<dyn storage::IStorage> = match &args.storage {
cli::StorageArgs::Default => storage::FileStorage::new(
"./store".into(),
"http://192.168.0.128:22252".into(),
true,
).wrap(args.clone()),
cli::StorageArgs::Filesystem(fs) => storage::FileStorage::new(
fs.root.clone().into(),
fs.domain_root.clone().into(),
fs.enable_stats,
).wrap(args.clone()),
let storage_data: Box<dyn storage::IStorage> = build_storage_box(&args.storage);
let storage_data = if let Some(cache_duration) = args.cache_duration {
Box::new(storage::CachedStorage::new(cache_duration, storage_data))
} else {
storage_data
};
App::new()

View file

@ -4,6 +4,7 @@ use crate::storage::IStorage;
#[get("/plugins/{name}/{version}/{hash}.zip")]
pub async fn decky_artifact(data: web::Data<Box<dyn IStorage>>, path: web::Path<(String, String, String)>) -> actix_web::Result<impl Responder> {
let zip = data.get_artifact(&path.0, &path.1, &path.2).map_err(|e| actix_web::error::ErrorNotFound(e.to_string()))?;
let zip = web::block(move || data.get_artifact(&path.0, &path.1, &path.2)).await
.map_err(|e| actix_web::error::ErrorNotFound(e.to_string()))?;
Ok(zip)
}

View file

@ -4,6 +4,7 @@ use crate::storage::IStorage;
#[get("/plugins/{name}.png")]
pub async fn decky_image(data: web::Data<Box<dyn IStorage>>, path: web::Path<String>) -> actix_web::Result<impl Responder> {
let zip = data.get_image(&path).map_err(|e| actix_web::error::ErrorNotFound(e.to_string()))?;
let zip = web::block(move || data.get_image(&path)).await
.map_err(|e| actix_web::error::ErrorNotFound(e.to_string()))?;
Ok(zip)
}

View file

@ -6,6 +6,6 @@ use crate::storage::IStorage;
#[get("/plugins")]
pub async fn decky_plugins(data: actix_web::web::Data<Box<dyn IStorage>>) -> impl Responder {
let plugins: StorePluginList = data.plugins();
let plugins: StorePluginList = web::block(move || data.plugins()).await.unwrap();
web::Json(plugins)
}

View file

@ -47,7 +47,7 @@ impl<T: Clone> Cached<T> {
}
}
pub struct CachedStorage<S: IStorage> {
pub struct CachedStorage<S: AsRef<dyn IStorage> + Send + Sync> {
fallback: S,
plugins_cache: Cached<StorePluginList>,
statistics_cache: Cached<HashMap<String, u64>>,
@ -55,11 +55,11 @@ pub struct CachedStorage<S: IStorage> {
images_cache: Cached<HashMap<String, Bytes>>,
}
impl<S: IStorage> CachedStorage<S> {
impl<S: AsRef<dyn IStorage> + Send + Sync> CachedStorage<S> {
pub fn new(duration: i64, inner: S) -> Self {
Self {
plugins_cache: Cached::new(inner.plugins(), duration),
statistics_cache: Cached::new(inner.get_statistics(), duration),
plugins_cache: Cached::new(inner.as_ref().plugins(), duration),
statistics_cache: Cached::new(inner.as_ref().get_statistics(), duration),
artifacts_cache: Cached::new(HashMap::new(), duration),
images_cache: Cached::new(HashMap::new(), duration),
fallback: inner,
@ -67,9 +67,9 @@ impl<S: IStorage> CachedStorage<S> {
}
}
impl<S: IStorage> IStorage for CachedStorage<S> {
impl<S: AsRef<dyn IStorage> + Send + Sync> IStorage for CachedStorage<S> {
fn plugins(&self) -> StorePluginList {
self.plugins_cache.get(|| self.fallback.plugins())
self.plugins_cache.get(|| self.fallback.as_ref().plugins())
}
fn get_artifact(&self, name: &str, version: &str, hash: &str) -> Result<bytes::Bytes, std::io::Error> {
@ -77,7 +77,7 @@ impl<S: IStorage> IStorage for CachedStorage<S> {
if let Some(bytes) = cached.get(hash) {
Ok(bytes.to_owned())
} else {
let new_artifact = self.fallback.get_artifact(name, version, hash)?;
let new_artifact = self.fallback.as_ref().get_artifact(name, version, hash)?;
cached.insert(hash.to_owned(), new_artifact.clone());
self.artifacts_cache.refresh(cached);
Ok(new_artifact)
@ -89,7 +89,7 @@ impl<S: IStorage> IStorage for CachedStorage<S> {
if let Some(bytes) = cached.get(name) {
Ok(bytes.to_owned())
} else {
let new_image = self.fallback.get_image(name)?;
let new_image = self.fallback.as_ref().get_image(name)?;
cached.insert(name.to_owned(), new_image.clone());
self.images_cache.refresh(cached);
Ok(new_image)
@ -97,6 +97,6 @@ impl<S: IStorage> IStorage for CachedStorage<S> {
}
fn get_statistics(&self) -> std::collections::HashMap<String, u64> {
self.statistics_cache.get(|| self.fallback.get_statistics())
self.statistics_cache.get(|| self.fallback.as_ref().get_statistics())
}
}

View file

@ -1,4 +1,4 @@
pub trait IStorage {
pub trait IStorage: Send + Sync {
fn plugins(&self) -> decky_api::StorePluginList;
fn get_artifact(&self, _name: &str, _version: &str, _hash: &str) -> Result<bytes::Bytes, std::io::Error> {
@ -14,33 +14,6 @@ pub trait IStorage {
}
}
pub trait IStorageWrap: Sized + IStorage {
fn wrap(self, conf: crate::cli::CliArgs) -> Box<dyn IStorage>;
}
impl<X: Sized + IStorage + 'static> IStorageWrap for X {
fn wrap(self, conf: crate::cli::CliArgs) -> Box<dyn IStorage> {
let proxy = if let Some(store) = conf.proxy_store {
Some(store)
} else if conf.proxy {
Some("https://plugins.deckbrew.xyz".to_owned())
} else {
None
};
match (proxy, conf.cache_duration) {
(Some(proxy), Some(cache_dur)) => Box::new(
super::CachedStorage::new(
cache_dur,
super::ProxiedStorage::new(proxy, self),
)
),
(Some(proxy), None) => Box::new(super::ProxiedStorage::new(proxy, self)),
(None, Some(cache_dur)) => Box::new(super::CachedStorage::new(cache_dur, self)),
(None, None) => Box::new(self),
}
}
}
pub struct EmptyStorage;
impl IStorage for EmptyStorage {

149
src/storage/merge.rs Normal file
View file

@ -0,0 +1,149 @@
use std::collections::HashMap;
use std::sync::RwLock;
use decky_api::{StorePluginList, StorePlugin};
use super::IStorage;
struct StoreIndex(usize);
#[derive(Hash, Eq, PartialEq)]
struct StoreName(String);
#[derive(Hash, Eq, PartialEq)]
struct HashablePluginVersion {
plugin_name: String,
version_name: String,
hash: String,
}
pub struct MergedStorage<S: AsRef<dyn IStorage> + Send + Sync> {
stores: Vec<S>,
store_artifact_map: RwLock<HashMap<HashablePluginVersion, StoreIndex>>,
store_image_map: RwLock<HashMap<StoreName, Vec<StoreIndex>>>,
}
impl<S: AsRef<dyn IStorage> + Send + Sync> MergedStorage<S> {
pub fn new(inner: Vec<S>) -> Self {
Self {
stores: inner,
store_artifact_map: RwLock::new(HashMap::new()),
store_image_map: RwLock::new(HashMap::new()),
}
}
/*pub fn add(mut self, store: S) -> Self {
self.stores.push(store);
self
}*/
fn map_to_vec(plugins: HashMap<StoreName, StorePlugin>) -> StorePluginList {
let mut result = Vec::with_capacity(plugins.len());
for (_, val) in plugins {
result.push(val);
}
result
}
fn merge_plugins_into(dest: &mut HashMap<StoreName, StorePlugin>, source: StorePluginList) {
for mut plugin in source {
let store_name = StoreName(plugin.name.clone());
if let Some(existing_plugin) = dest.get_mut(&store_name) {
// combine versions if the plugin has the same name as an existing one
existing_plugin.versions.append(&mut plugin.versions);
} else {
// create new plugin entry if not
dest.insert(store_name, plugin);
}
}
}
fn merge_statistics_into(dest: &mut HashMap<String, u64>, source: HashMap<String, u64>) {
for (entry, val) in source {
if let Some(existing_stat) = dest.get_mut(&entry) {
// combine if already exists
*existing_stat += val;
} else {
// create new plugin entry if not
dest.insert(entry, val);
}
}
}
}
impl<S: AsRef<dyn IStorage> + Send + Sync> IStorage for MergedStorage<S> {
fn plugins(&self) -> StorePluginList {
let mut merged_plugins = HashMap::new();
log::debug!("Acquiring store map write locks");
let mut arti_lock = self.store_artifact_map.write().expect("Failed to acquire store_artifact_map write lock");
let mut img_lock = self.store_image_map.write().expect("Failed to acquire store_image_map write lock");
for (index, store) in self.stores.iter().enumerate() {
let plugins = store.as_ref().plugins();
// re-build store mappins
for plugin in &plugins {
for version in &plugin.versions {
let hashable_ver = HashablePluginVersion {
plugin_name: plugin.name.clone(),
version_name: version.name.clone(),
hash: version.hash.clone(),
};
arti_lock.insert(hashable_ver, StoreIndex(index));
}
let store_name = StoreName(plugin.name.clone());
if let Some(stores) = img_lock.get_mut(&store_name) {
stores.push(StoreIndex(index));
} else {
img_lock.insert(store_name, vec![StoreIndex(index)]);
}
}
Self::merge_plugins_into(&mut merged_plugins, plugins);
}
Self::map_to_vec(merged_plugins)
}
fn get_artifact(&self, name: &str, version: &str, hash: &str) -> Result<bytes::Bytes, std::io::Error> {
log::debug!("Acquiring store_artifact_map read lock");
let lock = self.store_artifact_map.read().expect("Failed to acquire store_artifact_map read lock");
if let Some(index) = lock.get(&HashablePluginVersion {
plugin_name: name.to_owned(),
version_name: version.to_owned(),
hash: hash.to_owned(),
}) {
if let Some(store) = self.stores.get(index.0) {
store.as_ref().get_artifact(name, version, hash)
} else {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, format!("Store index {} does not exist", index.0)))
}
} else {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Plugin version does not exist in any store"))
}
}
fn get_image(&self, name: &str) -> Result<bytes::Bytes, std::io::Error> {
log::debug!("Acquiring store_image_map read lock");
let lock = self.store_image_map.read().expect("Failed to acquire store_image_map read lock");
if let Some(indices) = lock.get(&StoreName(name.to_owned())) {
for index in indices {
if let Some(store) = self.stores.get(index.0) {
match store.as_ref().get_image(name) {
Ok(img) => return Ok(img),
Err(e) => log::error!("Error retrieving image from store #{}: {}", index.0, e),
}
}
}
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Stores do not exist for that plugin"))
} else {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "Plugin does not exist in any store"))
}
}
fn get_statistics(&self) -> std::collections::HashMap<String, u64> {
let mut stats = HashMap::new();
for store in &self.stores {
let new_stats = store.as_ref().get_statistics();
Self::merge_statistics_into(&mut stats, new_stats);
}
stats
}
}

View file

@ -1,9 +1,11 @@
mod cache;
mod filesystem;
mod interface;
mod merge;
mod proxy;
pub use cache::CachedStorage;
pub use filesystem::FileStorage;
pub use interface::{IStorage, EmptyStorage, IStorageWrap};
pub use interface::{IStorage, EmptyStorage};
pub use merge::MergedStorage;
pub use proxy::ProxiedStorage;

View file

@ -2,17 +2,15 @@ use decky_api::{StorePluginList, StorePluginVersion};
use super::IStorage;
pub struct ProxiedStorage<S: IStorage> {
pub struct ProxiedStorage {
store_url: String,
fallback: S,
agent: ureq::Agent,
}
impl<S: IStorage> ProxiedStorage<S> {
pub fn new(target_store: String, inner: S) -> Self {
impl ProxiedStorage {
pub fn new(target_store: String) -> Self {
Self {
store_url: target_store,
fallback: inner,
agent: ureq::Agent::new(),
}
}
@ -45,7 +43,7 @@ impl<S: IStorage> ProxiedStorage<S> {
}
}
impl<S: IStorage> IStorage for ProxiedStorage<S> {
impl IStorage for ProxiedStorage {
fn plugins(&self) -> StorePluginList {
let mut proxy = self.proxy_plugins();
for plugin in &mut proxy {
@ -55,12 +53,10 @@ impl<S: IStorage> IStorage for ProxiedStorage<S> {
}
}
}
let fallback = self.fallback.plugins();
proxy.extend_from_slice(&fallback);
proxy
}
fn get_artifact(&self, name: &str, version: &str, hash: &str) -> Result<bytes::Bytes, std::io::Error> {
/*fn get_artifact(&self, name: &str, version: &str, hash: &str) -> Result<bytes::Bytes, std::io::Error> {
self.fallback.get_artifact(name, version, hash)
}
@ -70,5 +66,5 @@ impl<S: IStorage> IStorage for ProxiedStorage<S> {
fn get_statistics(&self) -> std::collections::HashMap<String, u64> {
self.fallback.get_statistics()
}
}*/
}