From 9705dc22bccf8a72605896b17f5a1a595e951eec Mon Sep 17 00:00:00 2001 From: "NGnius (Graham)" Date: Fri, 4 Feb 2022 21:48:28 -0500 Subject: [PATCH] Refactor music analysis functionality to add to MpsContext --- Cargo.lock | 45 +- mps-interpreter/src/context.rs | 10 +- .../vocabulary/sorters/bliss_next_sorter.rs | 209 ++++----- .../lang/vocabulary/sorters/bliss_sorter.rs | 160 ++----- mps-interpreter/src/processing/mod.rs | 7 + .../src/processing/music_analysis.rs | 416 ++++++++++++++++++ mps-player/Cargo.toml | 2 +- 7 files changed, 598 insertions(+), 251 deletions(-) create mode 100644 mps-interpreter/src/processing/music_analysis.rs diff --git a/Cargo.lock b/Cargo.lock index 73a2f1a..aa883e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2016,9 +2016,12 @@ checksum = "a7e5f38aa07e792f4eebb0faa93cee088ec82c48222dd332897aae1569d9a4b7" dependencies = [ "lazy_static 1.4.0", "symphonia-bundle-flac 0.4.0", + "symphonia-bundle-mp3 0.4.0", + "symphonia-codec-aac 0.4.0", "symphonia-codec-pcm 0.4.0", "symphonia-codec-vorbis 0.4.0", "symphonia-core 0.4.0", + "symphonia-format-isomp4 0.4.0", "symphonia-format-ogg 0.4.0", "symphonia-format-wav 0.4.1", "symphonia-metadata 0.4.0", @@ -2032,13 +2035,13 @@ checksum = "eb30457ee7a904dae1e4ace25156dcabaf71e425db318e7885267f09cd8fb648" dependencies = [ "lazy_static 1.4.0", "symphonia-bundle-flac 0.5.0", - "symphonia-bundle-mp3", - "symphonia-codec-aac", + "symphonia-bundle-mp3 0.5.0", + "symphonia-codec-aac 0.5.0", "symphonia-codec-alac", "symphonia-codec-pcm 0.5.0", "symphonia-codec-vorbis 0.5.0", "symphonia-core 0.5.0", - "symphonia-format-isomp4", + "symphonia-format-isomp4 0.5.0", "symphonia-format-mkv", "symphonia-format-ogg 0.5.0", "symphonia-format-wav 0.5.0", @@ -2069,6 +2072,19 @@ dependencies = [ "symphonia-utils-xiph 0.5.0", ] +[[package]] +name = "symphonia-bundle-mp3" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec4d97c4a61ece4651751dddb393ebecb7579169d9e758ae808fe507a5250790" +dependencies = [ + "bitflags", + "lazy_static 1.4.0", + "log", + "symphonia-core 0.4.0", + "symphonia-metadata 0.4.0", +] + [[package]] name = "symphonia-bundle-mp3" version = "0.5.0" @@ -2082,6 +2098,17 @@ dependencies = [ "symphonia-metadata 0.5.0", ] +[[package]] +name = "symphonia-codec-aac" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd3d7ab37eb9b7df16ddedd7adb7cc382afe708ff078e525a14dc9b05e57558f" +dependencies = [ + "lazy_static 1.4.0", + "log", + "symphonia-core 0.4.0", +] + [[package]] name = "symphonia-codec-aac" version = "0.5.0" @@ -2171,6 +2198,18 @@ dependencies = [ "log", ] +[[package]] +name = "symphonia-format-isomp4" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feee3a7711e7ec1b7540756f3868bbb3cbb0d1195569b9bc26471a24a02f57b5" +dependencies = [ + "encoding_rs", + "log", + "symphonia-core 0.4.0", + "symphonia-metadata 0.4.0", +] + [[package]] name = "symphonia-format-isomp4" version = "0.5.0" diff --git a/mps-interpreter/src/context.rs b/mps-interpreter/src/context.rs index 1a5d340..21dd255 100644 --- a/mps-interpreter/src/context.rs +++ b/mps-interpreter/src/context.rs @@ -2,6 +2,10 @@ use super::processing::database::{MpsDatabaseQuerier, MpsSQLiteExecutor}; use super::processing::general::{ MpsFilesystemExecutor, MpsFilesystemQuerier, MpsOpStorage, MpsVariableStorer, }; +#[cfg(feature = "advanced")] +use super::processing::advanced::{ + MpsMusicAnalyzer, MpsDefaultAnalyzer +}; use std::fmt::{Debug, Display, Error, Formatter}; #[derive(Debug)] @@ -9,6 +13,8 @@ pub struct MpsContext { pub database: Box, pub variables: Box, pub filesystem: Box, + #[cfg(feature = "advanced")] + pub analysis: Box, } impl Default for MpsContext { @@ -17,13 +23,15 @@ impl Default for MpsContext { database: Box::new(MpsSQLiteExecutor::default()), variables: Box::new(MpsOpStorage::default()), filesystem: Box::new(MpsFilesystemExecutor::default()), + #[cfg(feature = "advanced")] + analysis: Box::new(MpsDefaultAnalyzer::default()), } } } impl Display for MpsContext { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { - write!(f, "MpsContext")?; + write!(f, "MpsContext{{...}}")?; Ok(()) } } diff --git a/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs b/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs index 4d41432..8aa833b 100644 --- a/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs +++ b/mps-interpreter/src/lang/vocabulary/sorters/bliss_next_sorter.rs @@ -1,16 +1,12 @@ use std::collections::VecDeque; #[cfg(feature = "bliss-audio")] use std::fmt::{Debug, Display, Error, Formatter}; -#[cfg(feature = "bliss-audio")] -use std::sync::mpsc::{channel, Receiver, Sender}; -#[cfg(feature = "bliss-audio")] -use bliss_audio::Song; use crate::lang::utility::{assert_name, check_name}; use crate::lang::SyntaxError; #[cfg(feature = "bliss-audio")] -use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, MpsTypePrimitive, RuntimeMsg}; +use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg}; use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory}; use crate::tokens::MpsToken; #[cfg(feature = "bliss-audio")] @@ -20,111 +16,9 @@ use crate::MpsItem; #[derive(Debug)] pub struct BlissNextSorter { up_to: usize, - rx: Option>>>, algorithm_done: bool, -} - -#[cfg(feature = "bliss-audio")] -impl BlissNextSorter { - fn get_maybe(&mut self) -> Option> { - if self.algorithm_done { - None - } else if let Ok(Some(item)) = self.rx.as_ref().unwrap().recv() { - Some(item.map_err(|e| bliss_err(e))) - } else { - self.algorithm_done = true; - None - } - } - - fn algorithm( - mut items: VecDeque, - results: Sender>>, - ) { - let mut song_cache: Option<(Song, String)> = None; - let items_len = items.len(); - for i in 0..items_len { - let item = items.pop_front().unwrap(); - if let Some(MpsTypePrimitive::String(path)) = item.field("filename") { - if let Err(_) = results.send(Some(Ok(item.clone()))) { - break; - } - if i + 2 < items_len { - let target_song = if let Some((_, ref cached_filename)) = song_cache { - if cached_filename == path { - Ok(song_cache.take().unwrap().0) - } else { - Song::new(path) - } - } else { - Song::new(path) - }; - let target_song = match target_song { - Ok(x) => x, - Err(e) => { - results.send(Some(Err(e))).unwrap_or(()); - break; - } - }; - match Self::find_best(&items, target_song) { - Err(e) => { - results.send(Some(Err(e))).unwrap_or(()); - break; - } - Ok((next_song, index)) => { - if let Some(next_song) = next_song { - if index != 0 { - items.swap(0, index); - } - song_cache = Some((next_song, path.to_owned())); - } else { - break; - } - } - } - } - } - } - results.send(None).unwrap_or(()); - } - - fn find_best( - items: &VecDeque, - target: Song, - ) -> Result<(Option, usize), bliss_audio::BlissError> { - let mut best = None; - let mut best_index = 0; - let mut best_distance = f32::MAX; - let (tx, rx) = channel(); - let mut threads_spawned = 0; - for i in 0..items.len() { - if let Some(MpsTypePrimitive::String(path)) = items[i].field("filename") { - let result_chann = tx.clone(); - let target_clone = target.clone(); - let path_clone = path.to_owned(); - std::thread::spawn(move || match Song::new(path_clone) { - Err(e) => result_chann.send(Err(e)).unwrap_or(()), - Ok(song) => result_chann - .send(Ok((i, target_clone.distance(&song), song))) - .unwrap_or(()), - }); - threads_spawned += 1; - } - } - for _ in 0..threads_spawned { - if let Ok(result) = rx.recv() { - let (index, distance, song) = result?; - if distance < best_distance { - best = Some(song); - best_index = index; - best_distance = distance; - } - } else { - break; - } - } - Ok((best, best_index)) - } + init_done: bool, + item_buf: VecDeque, } #[cfg(feature = "bliss-audio")] @@ -132,8 +26,9 @@ impl std::clone::Clone for BlissNextSorter { fn clone(&self) -> Self { Self { up_to: self.up_to, - rx: None, algorithm_done: self.algorithm_done, + init_done: self.init_done, + item_buf: self.item_buf.clone(), } } } @@ -143,8 +38,9 @@ impl Default for BlissNextSorter { fn default() -> Self { Self { up_to: usize::MAX, - rx: None, algorithm_done: false, + init_done: false, + item_buf: VecDeque::new() } } } @@ -154,43 +50,90 @@ impl MpsSorter for BlissNextSorter { fn sort( &mut self, iterator: &mut dyn MpsOp, - item_buf: &mut VecDeque, + items_out: &mut VecDeque, ) -> Result<(), RuntimeMsg> { - if self.rx.is_none() { + if !self.init_done { // first run - let mut items = VecDeque::new(); - for item in iterator { + self.init_done = true; + while let Some(item) = iterator.next() { match item { - Ok(item) => items.push_back(item), - Err(e) => item_buf.push_back(Err(e)), + Ok(item) => self.item_buf.push_back(item), + Err(e) => items_out.push_back(Err(e)), } - if items.len() + item_buf.len() >= self.up_to { + if self.item_buf.len() + items_out.len() >= self.up_to { break; } } - // start algorithm - let (tx, rx) = channel(); - std::thread::spawn(move || Self::algorithm(items, tx)); - self.rx = Some(rx); - } - if let Some(item) = self.get_maybe() { - item_buf.push_back(Ok(item?)); + if !self.item_buf.is_empty() { + let first = &self.item_buf[0]; + let mut ctx = iterator.escape(); + for i in 1..self.item_buf.len() { + let item = &self.item_buf[i]; + match ctx.analysis.prepare_distance(first, item) { + Err(e) => { + iterator.enter(ctx); + return Err(e); + }, + Ok(_) => {}, + } + } + iterator.enter(ctx); + items_out.push_back(Ok(first.to_owned())); + } + } else { + if self.item_buf.len() > 2 { + let last = self.item_buf.pop_front().unwrap(); + let mut best_index = 0; + let mut best_distance = f64::MAX; + let mut ctx = iterator.escape(); + for i in 0..self.item_buf.len() { + let current_item = &self.item_buf[i]; + match ctx.analysis.get_distance(&last, current_item) { + Err(e) => { + iterator.enter(ctx); + return Err(e); + }, + Ok(distance) => { + if distance < best_distance { + best_index = i; + best_distance = distance; + } + }, + } + } + if best_index != 0 { + self.item_buf.swap(0, best_index); + } + items_out.push_back(Ok(self.item_buf[0].clone())); + let next = &self.item_buf[0]; + for i in 1..self.item_buf.len() { + let item = &self.item_buf[i]; + match ctx.analysis.prepare_distance(next, item) { + Err(e) => { + iterator.enter(ctx); + return Err(e); + }, + Ok(_) => {}, + } + } + iterator.enter(ctx); + } else if self.item_buf.len() == 2 { + self.item_buf.pop_front(); + items_out.push_back(Ok(self.item_buf.pop_front().unwrap())); + // note item_buf is emptied here, so this will not proceed to len() == 1 case on next call + } else if !self.item_buf.is_empty() { + // edge case where item_buf only ever had 1 item + items_out.push_back(Ok(self.item_buf.pop_front().unwrap())); + } } Ok(()) } fn reset(&mut self) { - self.algorithm_done = false; - self.rx = None; + self.init_done = false; } } -#[cfg(feature = "bliss-audio")] -#[inline] -fn bliss_err(error: D) -> RuntimeMsg { - RuntimeMsg(format!("Bliss error: {}", error)) -} - #[cfg(not(feature = "bliss-audio"))] pub type BlissNextSorter = crate::lang::vocabulary::sorters::EmptySorter; diff --git a/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs b/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs index 9282102..ba462b6 100644 --- a/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs +++ b/mps-interpreter/src/lang/vocabulary/sorters/bliss_sorter.rs @@ -1,21 +1,18 @@ use std::collections::VecDeque; #[cfg(feature = "bliss-audio")] use std::fmt::{Debug, Display, Error, Formatter}; -#[cfg(feature = "bliss-audio")] -use std::sync::mpsc::{channel, Receiver}; #[cfg(feature = "bliss-audio")] use std::collections::HashMap; -#[cfg(feature = "bliss-audio")] -use bliss_audio::Song; - use crate::lang::utility::{assert_name, check_name}; use crate::lang::SyntaxError; #[cfg(feature = "bliss-audio")] -use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, MpsTypePrimitive, RuntimeMsg}; +use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg}; use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory}; use crate::tokens::MpsToken; +#[cfg(feature = "bliss-audio")] +use crate::MpsItem; #[cfg(feature = "bliss-audio")] const DEFAULT_ORDER: std::cmp::Ordering = std::cmp::Ordering::Greater; @@ -24,48 +21,7 @@ const DEFAULT_ORDER: std::cmp::Ordering = std::cmp::Ordering::Greater; #[derive(Debug)] pub struct BlissSorter { up_to: usize, - float_map: HashMap, - first_song: Option, - rx: Option>>, - errors: Vec, -} - -#[cfg(feature = "bliss-audio")] -impl BlissSorter { - fn get_or_wait(&mut self, path: &str) -> Option { - if let Some(distance) = self.float_map.get(path) { - Some(*distance) - } else { - // wait on threads until matching result is found - for result in self.rx.as_ref().unwrap() { - match result { - Ok((key, distance)) => { - if path == key { - self.float_map.insert(key, distance); - return Some(distance); - } else { - self.float_map.insert(key, distance); - } - } - Err(e) => { - self.errors.push(e); - return None; - } - } - } - None - } - } - - #[inline] - fn compare_songs( - song1: Song, - path_2: String, - ) -> Result<(String, f32), bliss_audio::BlissError> { - let song2 = Song::new(&path_2)?; - let distance = song1.distance(&song2); - Ok((path_2, distance)) - } + first_song: Option, } #[cfg(feature = "bliss-audio")] @@ -73,10 +29,7 @@ impl std::clone::Clone for BlissSorter { fn clone(&self) -> Self { Self { up_to: self.up_to, - float_map: self.float_map.clone(), first_song: self.first_song.clone(), - rx: None, - errors: Vec::new(), } } } @@ -86,10 +39,7 @@ impl Default for BlissSorter { fn default() -> Self { Self { up_to: usize::MAX, - float_map: HashMap::new(), first_song: None, - rx: None, - errors: Vec::new(), } } } @@ -103,7 +53,7 @@ impl MpsSorter for BlissSorter { ) -> Result<(), RuntimeMsg> { let buf_len_old = item_buf.len(); // save buffer length before modifying buffer if item_buf.len() < self.up_to { - for item in iterator { + while let Some(item) = iterator.next() { item_buf.push_back(item); if item_buf.len() >= self.up_to { break; @@ -114,82 +64,66 @@ impl MpsSorter for BlissSorter { // when buf_len_old == item_buf.len(), iterator was already complete // no need to sort in that case, since buffer was sorted in last call to sort or buffer never had any items to sort if self.first_song.is_none() { - let (tx_chann, rx_chann) = channel(); - let mut item_paths = Vec::with_capacity(item_buf.len() - 1); for item in item_buf.iter() { if let Ok(item) = item { - // build comparison table - if let Some(MpsTypePrimitive::String(path)) = item.field("filename") { - if self.first_song.is_none() { - // find first valid song (ie first item with field "filename") - self.first_song = Some(path.to_owned()); - //self.first_song = Some(Song::new(path).map_err(|e| bliss_err(e, op))?); - self.float_map.insert(path.to_owned(), 0.0); // distance to itself should be 0 - } else { - item_paths.push(path.to_owned()); - } + self.first_song = Some(item.clone()); + break; + } + } + } + if let Some(first) = &self.first_song { + let mut ctx = iterator.escape(); + for i in 0..item_buf.len() { + if let Ok(item) = &item_buf[i] { + if item == first {continue;} + match ctx.analysis.prepare_distance(first, item) { + Err(e) => { + iterator.enter(ctx); + return Err(e); + }, + Ok(_) => {}, } } } - if let Some(first_song_path) = &self.first_song { - // spawn threads for processing song distances - let path1_clone = first_song_path.to_owned(); - std::thread::spawn(move || match Song::new(path1_clone) { - Err(e) => tx_chann.send(Err(e)).unwrap_or(()), - Ok(song1) => { - for path2 in item_paths { - let result_chann = tx_chann.clone(); - let song1_clone = song1.clone(); - std::thread::spawn(move || { - result_chann - .send(Self::compare_songs(song1_clone, path2)) - .unwrap_or(()); - }); - } - } - }); - } - self.rx = Some(rx_chann); - // unordered list returned on first call to this function - // note that only the first item will be used by sorter, - // since the second time this function is called the remaining items are sorted properly + iterator.enter(ctx); } + } else if self.first_song.is_some() { // Sort songs on second call to this function - self.first_song = None; + let first = self.first_song.take().unwrap(); + let mut cache = HashMap::::new(); + cache.insert(first.clone(), 0.0); + let mut ctx = iterator.escape(); + for i in 0..item_buf.len() { + if let Ok(item) = &item_buf[i] { + if item == &first {continue;} + match ctx.analysis.get_distance(&first, item) { + Err(e) => { + iterator.enter(ctx); + return Err(e); + }, + Ok(distance) => { + cache.insert(item.clone(), distance); + }, + } + } + } + iterator.enter(ctx); item_buf.make_contiguous().sort_by(|a, b| { if let Ok(a) = a { - if let Some(MpsTypePrimitive::String(a_path)) = a.field("filename") { - if let Ok(b) = b { - if let Some(MpsTypePrimitive::String(b_path)) = b.field("filename") { - if let Some(float_a) = self.get_or_wait(a_path) { - if let Some(float_b) = self.get_or_wait(b_path) { - return float_a - .partial_cmp(&float_b) - .unwrap_or(DEFAULT_ORDER); - } - } - } - } + if let Ok(b) = b { + let float_a = cache.get(&a).unwrap(); + let float_b = cache.get(&b).unwrap(); + return float_a.partial_cmp(float_b).unwrap_or(DEFAULT_ORDER); } } DEFAULT_ORDER }); } - if self.errors.is_empty() { - Ok(()) - } else { - Err(bliss_err(self.errors.pop().unwrap())) - } + Ok(()) } } -#[cfg(feature = "bliss-audio")] -#[inline] -fn bliss_err(error: D) -> RuntimeMsg { - RuntimeMsg(format!("Bliss error: {}", error)) -} - #[cfg(not(feature = "bliss-audio"))] pub type BlissSorter = crate::lang::vocabulary::sorters::EmptySorter; diff --git a/mps-interpreter/src/processing/mod.rs b/mps-interpreter/src/processing/mod.rs index 996dfb7..a7c4c41 100644 --- a/mps-interpreter/src/processing/mod.rs +++ b/mps-interpreter/src/processing/mod.rs @@ -1,4 +1,6 @@ mod filesystem; +#[cfg(feature = "advanced")] +mod music_analysis; mod sql; mod variables; @@ -12,3 +14,8 @@ pub mod general { pub use super::filesystem::{FileIter, MpsFilesystemExecutor, MpsFilesystemQuerier}; pub use super::variables::{MpsOpStorage, MpsType, MpsVariableStorer}; } + +#[cfg(feature = "advanced")] +pub mod advanced { + pub use super::music_analysis::{MpsMusicAnalyzer, MpsDefaultAnalyzer}; +} diff --git a/mps-interpreter/src/processing/music_analysis.rs b/mps-interpreter/src/processing/music_analysis.rs new file mode 100644 index 0000000..f4263fd --- /dev/null +++ b/mps-interpreter/src/processing/music_analysis.rs @@ -0,0 +1,416 @@ +use core::fmt::Debug; +#[cfg(feature = "bliss-audio")] +use std::collections::{HashMap, HashSet}; +#[cfg(feature = "bliss-audio")] +use std::sync::mpsc::{channel, Sender, Receiver}; + +#[cfg(feature = "bliss-audio")] +use bliss_audio::{Song, BlissError}; +#[cfg(feature = "bliss-audio")] +use crate::lang::MpsTypePrimitive; + +use crate::lang::RuntimeMsg; +use crate::MpsItem; + +const PATH_FIELD: &str = "filename"; + +pub trait MpsMusicAnalyzer: Debug { + fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg>; + + fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg>; + + fn get_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result; +} + +#[cfg(feature = "bliss-audio")] +#[derive(Debug)] +pub struct MpsDefaultAnalyzer { + requests: Sender, + responses: Receiver, +} + +#[cfg(feature = "bliss-audio")] +impl std::default::Default for MpsDefaultAnalyzer { + fn default() -> Self { + let (req_tx, req_rx) = channel(); + let (resp_tx, resp_rx) = channel(); + std::thread::spawn(move || { + let mut cache_thread = CacheThread::new(resp_tx); + cache_thread.run_loop(req_rx); + }); + Self { + requests: req_tx, + responses: resp_rx, + } + } +} + +#[cfg(feature = "bliss-audio")] +impl MpsDefaultAnalyzer { + fn request_distance(&mut self, from: &MpsItem, to: &MpsItem, ack: bool) -> Result<(), RuntimeMsg> { + let path_from = Self::get_path(from)?; + let path_to = Self::get_path(to)?; + self.requests.send( + RequestType::Distance { + path1: path_from.to_owned(), + path2: path_to.to_owned(), + ack: ack, + } + ).map_err(|e| RuntimeMsg(format!("Channel send err {}", e))) + } + + fn get_path(item: &MpsItem) -> Result<&str, RuntimeMsg> { + if let Some(path) = item.field(PATH_FIELD) { + if let MpsTypePrimitive::String(path) = path { + Ok(path) + } else { + Err(RuntimeMsg(format!("Field {} on item is not String, it's {}", PATH_FIELD, path))) + } + } else { + Err(RuntimeMsg(format!("Missing field {} on item", PATH_FIELD))) + } + } + + fn request_song(&mut self, item: &MpsItem, ack: bool) -> Result<(), RuntimeMsg> { + let path = Self::get_path(item)?; + self.requests.send( + RequestType::Song { + path: path.to_owned(), + ack: ack, + } + ).map_err(|e| RuntimeMsg(format!("Channel send error: {}", e))) + } +} + +#[cfg(feature = "bliss-audio")] +impl MpsMusicAnalyzer for MpsDefaultAnalyzer { + fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> { + self.request_distance(from, to, false) + } + + fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg> { + self.request_song(item, false) + } + + fn get_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result { + self.request_distance(from, to, true)?; + let path_from = Self::get_path(from)?; + let path_to = Self::get_path(to)?; + for response in self.responses.iter() { + if let ResponseType::Distance{path1, path2, distance} = response { + if path1 == path_from && path2 == path_to { + return match distance { + Ok(d) => Ok(d as f64), + Err(e) => Err(RuntimeMsg(format!("Bliss error: {}", e))) + }; + } + } + } + Err(RuntimeMsg("Channel closed without response: internal error".to_owned())) + } +} + +#[cfg(not(feature = "bliss-audio"))] +#[derive(Default, Debug)] +pub struct MpsDefaultAnalyzer {} + +#[cfg(not(feature = "bliss-audio"))] +impl MpsMusicAnalyzer for MpsDefaultAnalyzer { + fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> { + Ok(()) + } + + fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg> { + Ok(()) + } + + fn get_distance(&mut self, item: &MpsItem) -> Result { + Ok(f64::MAX) + } +} + +#[cfg(feature = "bliss-audio")] +enum RequestType { + Distance { + path1: String, + path2: String, + ack: bool, + }, + Song { + path: String, + ack: bool, + }, + //End {} +} + +#[cfg(feature = "bliss-audio")] +enum ResponseType { + Distance { + path1: String, + path2: String, + distance: Result, + }, + Song { + path: String, + song: Result, + }, +} + +#[cfg(feature = "bliss-audio")] +struct CacheThread { + distance_cache: HashMap<(String, String), Result>, + distance_in_progress: HashSet<(String, String)>, + song_cache: HashMap>, + song_in_progress: HashSet, + //requests: Receiver, + responses: Sender, +} + +#[cfg(feature = "bliss-audio")] +impl CacheThread { + fn new(responses: Sender) -> Self { + Self { + distance_cache: HashMap::new(), + distance_in_progress: HashSet::new(), + song_cache: HashMap::new(), + song_in_progress: HashSet::new(), + //requests: requests, + responses: responses, + } + } + + fn non_blocking_read_some(&mut self, results: &Receiver) { + for result in results.try_iter() { + match result { + ResponseType::Distance {path1, path2, distance} => { + self.insert_distance(path1, path2, distance); + }, + ResponseType::Song {path, song} => { + self.insert_song(path, song); + } + } + } + } + + fn insert_song(&mut self, path: String, song_result: Result) { + self.song_in_progress.remove(&path); + self.song_cache.insert(path, song_result); + } + + fn insert_distance(&mut self, path1: String, path2: String, distance_result: Result) { + let key = (path1, path2); + self.distance_in_progress.remove(&key); + self.distance_cache.insert(key, distance_result); + } + + fn get_song_option(&mut self, path: &str, auto_add: bool, results: &Receiver) -> Option { + // wait for song if already in progress + if self.song_in_progress.contains(path) { + for result in results.iter() { + match result { + ResponseType::Distance {path1, path2, distance} => { + self.insert_distance(path1, path2, distance); + }, + ResponseType::Song {path: path2, song} => { + if path2 == path { + self.insert_song(path2, song.clone()); + let result = song.ok(); + if result.is_none() && auto_add { + self.song_in_progress.insert(path.to_owned()); + } + return result; + } else { + self.insert_song(path2, song); + } + } + } + } + } else if self.song_cache.contains_key(path) { + let result = self.song_cache.get(path).and_then(|r| r.clone().ok().to_owned()); + if result.is_none() && auto_add { + self.song_in_progress.insert(path.to_owned()); + } + return result; + } + if auto_add { + self.song_in_progress.insert(path.to_owned()); + } + return None; + } + + fn handle_distance_req(&mut self, + path1: String, + path2: String, + ack: bool, + worker_tx: &Sender, + worker_results: &Receiver, + ) -> bool { + let key = (path1.clone(), path2.clone()); + if let Some(result) = self.distance_cache.get(&key) { + if ack { + let result = result.to_owned(); + if let Err(_) = self.responses.send( + ResponseType::Distance { + path1: path1, + path2: path2, + distance: result, + } + ) { return true; } + } + } else { + if path1 == path2 { + // trivial case + // also prevents deadlock in self.get_song_option() + // due to waiting on song that isn't being processed yet + // (first call adds it to song_in_progress set, second call just waits) + if ack { + if let Err(_) = self.responses.send(ResponseType::Distance { + path1: path1, + path2: path2, + distance: Ok(0.0), + }) { return true; } + } + } else if !self.distance_in_progress.contains(&key) { + let results = worker_tx.clone(); + let song1_clone = self.get_song_option(&path1, true, worker_results); + let song2_clone = self.get_song_option(&path2, true, worker_results); + std::thread::spawn(move || { + let distance_result = worker_distance( + &results, + (&path1, song1_clone), + (&path2, song2_clone), + ); + results.send( + ResponseType::Distance { + path1: path1, + path2: path2, + distance: distance_result, + } + ).unwrap_or(()); + }); + } + if ack { + 'inner1: for result in worker_results.iter() { + match result { + ResponseType::Distance {path1: path1_2, path2: path2_2, distance} => { + self.insert_distance(path1_2.clone(), path2_2.clone(), distance.clone()); + if path1_2 == key.0 && path2_2 == key.1 { + if let Err(_) = self.responses.send(ResponseType::Distance { + path1: path1_2, + path2: path2_2, + distance: distance, + }) { return true; } + break 'inner1; + } + + }, + ResponseType::Song {path, song} => { + self.insert_song(path, song); + } + } + } + } + } + false + } + + fn handle_song_req(&mut self, + path: String, + ack: bool, + worker_tx: &Sender, + worker_results: &Receiver, + ) -> bool { + if let Some(song) = self.song_cache.get(&path) { + if ack { + let song = song.to_owned(); + if let Err(_) = self.responses.send( + ResponseType::Song { + path: path, + song: song, + } + ) { return true; } + } + } else { + if !self.song_in_progress.contains(&path) { + let path_clone = path.clone(); + let results = worker_tx.clone(); + std::thread::spawn(move || { + let song_result = Song::new(&path_clone); + results.send( + ResponseType::Song { + path: path_clone, + song: song_result, + } + ).unwrap_or(()); + }); + } + if ack { + 'inner2: for result in worker_results.iter() { + match result { + ResponseType::Distance {path1, path2, distance} => { + self.insert_distance(path1, path2, distance); + }, + ResponseType::Song {path: path2, song} => { + self.insert_song(path2.clone(), song.clone()); + if path2 == path { + if let Err(_) = self.responses.send(ResponseType::Song { + path: path, + song: song, + }) { return false; } + break 'inner2; + } + } + } + } + } + } + false + } + + fn run_loop(&mut self, requests: Receiver,) { + let (worker_tx, worker_results): (Sender, Receiver) = channel(); + 'outer: for request in requests.iter() { + self.non_blocking_read_some(&worker_results); + match request { + //RequestType::End{} => break, + RequestType::Distance{path1, path2, ack} => { + if self.handle_distance_req(path1, path2, ack, &worker_tx, &worker_results) { + break 'outer; + } + }, + RequestType::Song{path, ack} => { + if self.handle_song_req(path, ack, &worker_tx, &worker_results) { + break 'outer; + } + }, + } + } + } +} + +#[cfg(feature = "bliss-audio")] +fn worker_distance(results: &Sender, song1: (&str, Option), song2: (&str, Option)) -> Result { + let path1 = song1.0; + let song1 = if let Some(song) = song1.1 { + song + } else { + let new_song1 = Song::new(path1); + results.send(ResponseType::Song { + path: path1.to_string(), + song: new_song1.clone(), + }).unwrap_or(()); + new_song1? + }; + let path2 = song2.0; + let song2 = if let Some(song) = song2.1 { + song + } else { + let new_song2 = Song::new(path2); + results.send(ResponseType::Song { + path: path2.to_string(), + song: new_song2.clone(), + }).unwrap_or(()); + new_song2? + }; + Ok(song1.distance(&song2)) +} diff --git a/mps-player/Cargo.toml b/mps-player/Cargo.toml index 0238ef4..41e28f4 100644 --- a/mps-player/Cargo.toml +++ b/mps-player/Cargo.toml @@ -6,7 +6,7 @@ license = "LGPL-2.1-only OR GPL-3.0-only" readme = "README.md" [dependencies] -rodio = { version = "^0.15", features = ["symphonia"]} +rodio = { version = "^0.15", features = ["symphonia-all"]} m3u8-rs = { version = "^3.0.0" } # local