From 327ab6e7539d30f20273a47fad155945a8577fe3 Mon Sep 17 00:00:00 2001 From: "NGnius (Graham)" Date: Sun, 27 Mar 2022 13:30:47 -0400 Subject: [PATCH] Rename bliss-rs & limit bliss resource usage --- Cargo.lock | 42 +++---- Cargo.toml | 2 +- bliss-rs | 2 +- mps-interpreter/Cargo.toml | 5 +- mps-interpreter/benches/file_parse.rs | 9 +- .../vocabulary/filters/field_match_filter.rs | 1 - .../vocabulary/sorters/bliss_next_sorter.rs | 18 +-- .../lang/vocabulary/sorters/bliss_sorter.rs | 22 ++-- mps-interpreter/src/processing/filesystem.rs | 1 + .../src/processing/music_analysis.rs | 113 +++++++++++++++--- 10 files changed, 147 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9024f46..117eeb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,7 +123,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] -name = "bliss-audio" +name = "bliss-audio-aubio-rs" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe01698d293ee91e334339d6436f17eac30d94bebaa668c5799c8206384dfeb1" +dependencies = [ + "bliss-audio-aubio-sys", +] + +[[package]] +name = "bliss-audio-aubio-sys" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef9fab7b922bdd057bb06fa2a2fa79d2a93bec3dd576320511cb3dfe21e78a9" +dependencies = [ + "cc", + "fftw-sys", +] + +[[package]] +name = "bliss-audio-symphonia" version = "0.4.6" dependencies = [ "anyhow", @@ -151,25 +170,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "bliss-audio-aubio-rs" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe01698d293ee91e334339d6436f17eac30d94bebaa668c5799c8206384dfeb1" -dependencies = [ - "bliss-audio-aubio-sys", -] - -[[package]] -name = "bliss-audio-aubio-sys" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ef9fab7b922bdd057bb06fa2a2fa79d2a93bec3dd576320511cb3dfe21e78a9" -dependencies = [ - "cc", - "fftw-sys", -] - [[package]] name = "block-buffer" version = "0.7.3" @@ -1203,7 +1203,7 @@ dependencies = [ name = "mps-interpreter" version = "0.7.0" dependencies = [ - "bliss-audio", + "bliss-audio-symphonia", "criterion", "dirs", "rand", diff --git a/Cargo.toml b/Cargo.toml index a7a8d4c..6693d34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ codegen-units = 4 [profile.bench] lto = false -[profile.dev.package.bliss-audio] +[profile.dev.package.bliss-audio-symphonia] debug-assertions = false overflow-checks = false debug = true diff --git a/bliss-rs b/bliss-rs index 8517c49..5d66e10 160000 --- a/bliss-rs +++ b/bliss-rs @@ -1 +1 @@ -Subproject commit 8517c49caf3a636798411da3c0a3d80b2cf268d5 +Subproject commit 5d66e104235479e38f7bfef6d27980a183ff2142 diff --git a/mps-interpreter/Cargo.toml b/mps-interpreter/Cargo.toml index c0301d2..7f7fd48 100644 --- a/mps-interpreter/Cargo.toml +++ b/mps-interpreter/Cargo.toml @@ -4,6 +4,7 @@ version = "0.7.0" edition = "2021" license = "LGPL-2.1-only OR GPL-3.0-only" readme = "README.md" +rust-version = "1.59" [dependencies] rusqlite = { version = "0.26.3", features = ["bundled"] } @@ -14,7 +15,7 @@ dirs = { version = "4.0.0" } regex = { version = "1" } rand = { version = "0.8" } shellexpand = { version = "2.1", optional = true } -bliss-audio = { version = "0.4", optional = true, path = "../bliss-rs" } +bliss-audio-symphonia = { version = "0.4", optional = true, path = "../bliss-rs" } [dev-dependencies] criterion = "0.3" @@ -27,4 +28,4 @@ harness = false default = [ "music_library", "ergonomics", "advanced" ] music_library = [ "symphonia" ] # song metadata parsing and database auto-population ergonomics = ["shellexpand"] # niceties like ~ in pathes -advanced = ["bliss-audio"] # advanced language features like bliss playlist generation +advanced = ["bliss-audio-symphonia"] # advanced language features like bliss playlist generation diff --git a/mps-interpreter/benches/file_parse.rs b/mps-interpreter/benches/file_parse.rs index b98a038..484cba7 100644 --- a/mps-interpreter/benches/file_parse.rs +++ b/mps-interpreter/benches/file_parse.rs @@ -1,10 +1,11 @@ -use mps_interpreter::{MpsFaye, MpsRunner}; +use mps_interpreter::MpsFaye; +//use mps_interpreter::MpsRunner; use std::fs::File; use std::io::{BufReader, Read, Seek}; use criterion::{criterion_group, criterion_main, Criterion}; -fn interpretor_benchmark(c: &mut Criterion) { +/*fn interpretor_benchmark(c: &mut Criterion) { let f = File::open("benches/lots_of_empty.mps").unwrap(); let mut reader = BufReader::with_capacity(1024 * 1024 /* 1 MiB */, f); // read everything into buffer before starting @@ -25,7 +26,7 @@ fn interpretor_benchmark(c: &mut Criterion) { } }) }); -} +}*/ fn faye_benchmark(c: &mut Criterion) { let f = File::open("benches/lots_of_empty.mps").unwrap(); @@ -50,5 +51,5 @@ fn faye_benchmark(c: &mut Criterion) { }); } -criterion_group!(parse_benches, interpretor_benchmark, faye_benchmark); +criterion_group!(parse_benches, /*interpretor_benchmark,*/ faye_benchmark); criterion_main!(parse_benches); diff --git a/mps-interpreter/src/lang/vocabulary/filters/field_match_filter.rs b/mps-interpreter/src/lang/vocabulary/filters/field_match_filter.rs index 7064c14..cc6c9d7 100644 --- a/mps-interpreter/src/lang/vocabulary/filters/field_match_filter.rs +++ b/mps-interpreter/src/lang/vocabulary/filters/field_match_filter.rs @@ -212,7 +212,6 @@ fn regex_flags(tokens: &mut VecDeque) -> Result { #[inline] fn build_regex(pattern: &str, flags: u8) -> Result { - println!("Compiling"); RegexBuilder::new(pattern) .case_insensitive((flags & (1 << 0)) != 0) .multi_line((flags & (1 << 1)) != 0) diff --git a/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs b/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs index 5d046f6..046c76c 100644 --- a/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs +++ b/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use std::fmt::{Debug, Display, Error, Formatter}; use crate::lang::utility::{assert_name, check_name}; use crate::lang::SyntaxError; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg}; use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory}; use crate::tokens::MpsToken; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use crate::MpsItem; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] #[derive(Debug)] pub struct BlissNextSorter { up_to: usize, @@ -20,7 +20,7 @@ pub struct BlissNextSorter { item_buf: VecDeque, } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl std::clone::Clone for BlissNextSorter { fn clone(&self) -> Self { Self { @@ -32,7 +32,7 @@ impl std::clone::Clone for BlissNextSorter { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl Default for BlissNextSorter { fn default() -> Self { Self { @@ -44,7 +44,7 @@ impl Default for BlissNextSorter { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl MpsSorter for BlissNextSorter { fn sort( &mut self, @@ -133,10 +133,10 @@ impl MpsSorter for BlissNextSorter { } } -#[cfg(not(feature = "bliss-audio"))] +#[cfg(not(feature = "advanced"))] pub type BlissNextSorter = crate::lang::vocabulary::sorters::EmptySorter; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl Display for BlissNextSorter { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { write!(f, "advanced bliss_next") diff --git a/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs b/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs index 13307f4..153c768 100644 --- a/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs +++ b/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs @@ -1,30 +1,30 @@ use std::collections::VecDeque; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use std::fmt::{Debug, Display, Error, Formatter}; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use std::collections::HashMap; use crate::lang::utility::{assert_name, check_name}; use crate::lang::SyntaxError; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg}; use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory}; use crate::tokens::MpsToken; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] use crate::MpsItem; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] const DEFAULT_ORDER: std::cmp::Ordering = std::cmp::Ordering::Greater; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] #[derive(Debug)] pub struct BlissSorter { up_to: usize, first_song: Option, } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl std::clone::Clone for BlissSorter { fn clone(&self) -> Self { Self { @@ -34,7 +34,7 @@ impl std::clone::Clone for BlissSorter { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl Default for BlissSorter { fn default() -> Self { Self { @@ -44,7 +44,7 @@ impl Default for BlissSorter { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl MpsSorter for BlissSorter { fn sort( &mut self, @@ -127,10 +127,10 @@ impl MpsSorter for BlissSorter { } } -#[cfg(not(feature = "bliss-audio"))] +#[cfg(not(feature = "advanced"))] pub type BlissSorter = crate::lang::vocabulary::sorters::EmptySorter; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "advanced")] impl Display for BlissSorter { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { write!(f, "advanced bliss_first") diff --git a/mps-interpreter/src/processing/filesystem.rs b/mps-interpreter/src/processing/filesystem.rs index 2036981..8e0826a 100644 --- a/mps-interpreter/src/processing/filesystem.rs +++ b/mps-interpreter/src/processing/filesystem.rs @@ -202,6 +202,7 @@ impl FileIter { #[cfg(not(feature = "music_library"))] fn populate_item_impl( &self, + _path: &Path, path_str: &str, captures: Option, capture_names: regex::CaptureNames, diff --git a/mps-interpreter/src/processing/music_analysis.rs b/mps-interpreter/src/processing/music_analysis.rs index 5e0ce9a..2ba048a 100644 --- a/mps-interpreter/src/processing/music_analysis.rs +++ b/mps-interpreter/src/processing/music_analysis.rs @@ -1,13 +1,22 @@ use core::fmt::Debug; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] use std::collections::{HashMap, HashSet}; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] use std::sync::mpsc::{channel, Receiver, Sender}; -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] use crate::lang::MpsTypePrimitive; -#[cfg(feature = "bliss-audio")] -use bliss_audio::{BlissError, Song}; +#[cfg(feature = "bliss-audio-symphonia")] +use bliss_audio_symphonia::{BlissError, Song}; + +// assumed processor threads +const DEFAULT_PARALLELISM: usize = 2; + +// maximum length of song cache (song objects take up a lot of memory) +const MAX_SONG_CACHE_SIZE: usize = 1000; + +// maximum length of distance cache (takes up significantly less memory than songs) +const MAX_DISTANCE_CACHE_SIZE: usize = MAX_SONG_CACHE_SIZE * 10; use crate::lang::RuntimeMsg; use crate::MpsItem; @@ -24,14 +33,14 @@ pub trait MpsMusicAnalyzer: Debug { fn clear_cache(&mut self) -> Result<(), RuntimeMsg>; } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] #[derive(Debug)] pub struct MpsDefaultAnalyzer { requests: Sender, responses: Receiver, } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] impl std::default::Default for MpsDefaultAnalyzer { fn default() -> Self { let (req_tx, req_rx) = channel(); @@ -47,7 +56,7 @@ impl std::default::Default for MpsDefaultAnalyzer { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] impl MpsDefaultAnalyzer { fn request_distance( &mut self, @@ -92,7 +101,7 @@ impl MpsDefaultAnalyzer { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] impl MpsMusicAnalyzer for MpsDefaultAnalyzer { fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> { self.request_distance(from, to, false) @@ -133,11 +142,11 @@ impl MpsMusicAnalyzer for MpsDefaultAnalyzer { } } -#[cfg(not(feature = "bliss-audio"))] +#[cfg(not(feature = "bliss-audio-symphonia"))] #[derive(Default, Debug)] pub struct MpsDefaultAnalyzer {} -#[cfg(not(feature = "bliss-audio"))] +#[cfg(not(feature = "bliss-audio-symphonia"))] impl MpsMusicAnalyzer for MpsDefaultAnalyzer { fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> { Ok(()) @@ -150,9 +159,13 @@ impl MpsMusicAnalyzer for MpsDefaultAnalyzer { fn get_distance(&mut self, item: &MpsItem) -> Result { Ok(f64::MAX) } + + fn clear_cache(&mut self) -> Result<(), RuntimeMsg> { + Ok(()) + } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] enum RequestType { Distance { path1: String, @@ -167,7 +180,7 @@ enum RequestType { //End {} } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] enum ResponseType { Distance { path1: String, @@ -180,7 +193,7 @@ enum ResponseType { }, } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] struct CacheThread { distance_cache: HashMap<(String, String), Result>, distance_in_progress: HashSet<(String, String)>, @@ -190,7 +203,7 @@ struct CacheThread { responses: Sender, } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] impl CacheThread { fn new(responses: Sender) -> Self { Self { @@ -222,6 +235,10 @@ impl CacheThread { fn insert_song(&mut self, path: String, song_result: Result) { self.song_in_progress.remove(&path); + if self.song_cache.len() > MAX_SONG_CACHE_SIZE { + // avoid using too much memory -- songs are big memory objects + self.song_cache.clear(); + } self.song_cache.insert(path, song_result); } @@ -233,6 +250,10 @@ impl CacheThread { ) { let key = (path1, path2); self.distance_in_progress.remove(&key); + if self.distance_cache.len() > MAX_DISTANCE_CACHE_SIZE { + // avoid using too much memory + self.song_cache.clear(); + } self.distance_cache.insert(key, distance_result); } @@ -319,6 +340,34 @@ impl CacheThread { } } } else if !self.distance_in_progress.contains(&key) { + // distance worker uses 3 threads (it's own thread + 1 extra per song) for 2 songs + let available_parallelism = + (std::thread::available_parallelism().ok().map(|x| x.get()).unwrap_or(DEFAULT_PARALLELISM) * 2) / 3; + let available_parallelism = if available_parallelism != 0 { + available_parallelism - 1 + } else { + 0 + }; + // wait for processing to complete if too many tasks already running + if self.song_in_progress.len() > available_parallelism { + 'inner4: for result in worker_results.iter() { + match result { + ResponseType::Distance { + path1, + path2, + distance, + } => { + self.insert_distance(path1, path2, distance); + } + ResponseType::Song { path: path2, song } => { + self.insert_song(path2.clone(), song.clone()); + if self.song_in_progress.len() <= available_parallelism { + break 'inner4; + } + } + } + } + } let results = worker_tx.clone(); let song1_clone = self.get_song_option(&path1, true, worker_results); let song2_clone = self.get_song_option(&path2, true, worker_results); @@ -387,6 +436,34 @@ impl CacheThread { } } else { if !self.song_in_progress.contains(&path) { + // every song is roughly 2 threads -- Song::new(...) spawns a thread + let available_parallelism = + std::thread::available_parallelism().ok().map(|x| x.get()).unwrap_or(DEFAULT_PARALLELISM) / 2; + let available_parallelism = if available_parallelism != 0 { + available_parallelism - 1 + } else { + 0 + }; + // wait for processing to complete if too many tasks already running + if self.song_in_progress.len() > available_parallelism { + 'inner2: for result in worker_results.iter() { + match result { + ResponseType::Distance { + path1, + path2, + distance, + } => { + self.insert_distance(path1, path2, distance); + } + ResponseType::Song { path: path2, song } => { + self.insert_song(path2.clone(), song.clone()); + if self.song_in_progress.len() <= available_parallelism { + break 'inner2; + } + } + } + } + } let path_clone = path.clone(); let results = worker_tx.clone(); std::thread::spawn(move || { @@ -400,7 +477,7 @@ impl CacheThread { }); } if ack { - 'inner2: for result in worker_results.iter() { + 'inner3: for result in worker_results.iter() { match result { ResponseType::Distance { path1, @@ -418,7 +495,7 @@ impl CacheThread { }) { return false; } - break 'inner2; + break 'inner3; } } } @@ -453,7 +530,7 @@ impl CacheThread { } } -#[cfg(feature = "bliss-audio")] +#[cfg(feature = "bliss-audio-symphonia")] fn worker_distance( results: &Sender, song1: (&str, Option),