Refactor music analysis functionality to add to MpsContext

This commit is contained in:
NGnius (Graham) 2022-02-04 21:48:28 -05:00
parent 6411d40b96
commit 9705dc22bc
7 changed files with 598 additions and 251 deletions

45
Cargo.lock generated
View file

@ -2016,9 +2016,12 @@ checksum = "a7e5f38aa07e792f4eebb0faa93cee088ec82c48222dd332897aae1569d9a4b7"
dependencies = [
"lazy_static 1.4.0",
"symphonia-bundle-flac 0.4.0",
"symphonia-bundle-mp3 0.4.0",
"symphonia-codec-aac 0.4.0",
"symphonia-codec-pcm 0.4.0",
"symphonia-codec-vorbis 0.4.0",
"symphonia-core 0.4.0",
"symphonia-format-isomp4 0.4.0",
"symphonia-format-ogg 0.4.0",
"symphonia-format-wav 0.4.1",
"symphonia-metadata 0.4.0",
@ -2032,13 +2035,13 @@ checksum = "eb30457ee7a904dae1e4ace25156dcabaf71e425db318e7885267f09cd8fb648"
dependencies = [
"lazy_static 1.4.0",
"symphonia-bundle-flac 0.5.0",
"symphonia-bundle-mp3",
"symphonia-codec-aac",
"symphonia-bundle-mp3 0.5.0",
"symphonia-codec-aac 0.5.0",
"symphonia-codec-alac",
"symphonia-codec-pcm 0.5.0",
"symphonia-codec-vorbis 0.5.0",
"symphonia-core 0.5.0",
"symphonia-format-isomp4",
"symphonia-format-isomp4 0.5.0",
"symphonia-format-mkv",
"symphonia-format-ogg 0.5.0",
"symphonia-format-wav 0.5.0",
@ -2069,6 +2072,19 @@ dependencies = [
"symphonia-utils-xiph 0.5.0",
]
[[package]]
name = "symphonia-bundle-mp3"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec4d97c4a61ece4651751dddb393ebecb7579169d9e758ae808fe507a5250790"
dependencies = [
"bitflags",
"lazy_static 1.4.0",
"log",
"symphonia-core 0.4.0",
"symphonia-metadata 0.4.0",
]
[[package]]
name = "symphonia-bundle-mp3"
version = "0.5.0"
@ -2082,6 +2098,17 @@ dependencies = [
"symphonia-metadata 0.5.0",
]
[[package]]
name = "symphonia-codec-aac"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd3d7ab37eb9b7df16ddedd7adb7cc382afe708ff078e525a14dc9b05e57558f"
dependencies = [
"lazy_static 1.4.0",
"log",
"symphonia-core 0.4.0",
]
[[package]]
name = "symphonia-codec-aac"
version = "0.5.0"
@ -2171,6 +2198,18 @@ dependencies = [
"log",
]
[[package]]
name = "symphonia-format-isomp4"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feee3a7711e7ec1b7540756f3868bbb3cbb0d1195569b9bc26471a24a02f57b5"
dependencies = [
"encoding_rs",
"log",
"symphonia-core 0.4.0",
"symphonia-metadata 0.4.0",
]
[[package]]
name = "symphonia-format-isomp4"
version = "0.5.0"

View file

@ -2,6 +2,10 @@ use super::processing::database::{MpsDatabaseQuerier, MpsSQLiteExecutor};
use super::processing::general::{
MpsFilesystemExecutor, MpsFilesystemQuerier, MpsOpStorage, MpsVariableStorer,
};
#[cfg(feature = "advanced")]
use super::processing::advanced::{
MpsMusicAnalyzer, MpsDefaultAnalyzer
};
use std::fmt::{Debug, Display, Error, Formatter};
#[derive(Debug)]
@ -9,6 +13,8 @@ pub struct MpsContext {
pub database: Box<dyn MpsDatabaseQuerier>,
pub variables: Box<dyn MpsVariableStorer>,
pub filesystem: Box<dyn MpsFilesystemQuerier>,
#[cfg(feature = "advanced")]
pub analysis: Box<dyn MpsMusicAnalyzer>,
}
impl Default for MpsContext {
@ -17,13 +23,15 @@ impl Default for MpsContext {
database: Box::new(MpsSQLiteExecutor::default()),
variables: Box::new(MpsOpStorage::default()),
filesystem: Box::new(MpsFilesystemExecutor::default()),
#[cfg(feature = "advanced")]
analysis: Box::new(MpsDefaultAnalyzer::default()),
}
}
}
impl Display for MpsContext {
fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
write!(f, "MpsContext")?;
write!(f, "MpsContext{{...}}")?;
Ok(())
}
}

View file

@ -1,16 +1,12 @@
use std::collections::VecDeque;
#[cfg(feature = "bliss-audio")]
use std::fmt::{Debug, Display, Error, Formatter};
#[cfg(feature = "bliss-audio")]
use std::sync::mpsc::{channel, Receiver, Sender};
#[cfg(feature = "bliss-audio")]
use bliss_audio::Song;
use crate::lang::utility::{assert_name, check_name};
use crate::lang::SyntaxError;
#[cfg(feature = "bliss-audio")]
use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, MpsTypePrimitive, RuntimeMsg};
use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg};
use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory};
use crate::tokens::MpsToken;
#[cfg(feature = "bliss-audio")]
@ -20,111 +16,9 @@ use crate::MpsItem;
#[derive(Debug)]
pub struct BlissNextSorter {
up_to: usize,
rx: Option<Receiver<Option<Result<MpsItem, bliss_audio::BlissError>>>>,
algorithm_done: bool,
}
#[cfg(feature = "bliss-audio")]
impl BlissNextSorter {
fn get_maybe(&mut self) -> Option<Result<MpsItem, RuntimeMsg>> {
if self.algorithm_done {
None
} else if let Ok(Some(item)) = self.rx.as_ref().unwrap().recv() {
Some(item.map_err(|e| bliss_err(e)))
} else {
self.algorithm_done = true;
None
}
}
fn algorithm(
mut items: VecDeque<MpsItem>,
results: Sender<Option<Result<MpsItem, bliss_audio::BlissError>>>,
) {
let mut song_cache: Option<(Song, String)> = None;
let items_len = items.len();
for i in 0..items_len {
let item = items.pop_front().unwrap();
if let Some(MpsTypePrimitive::String(path)) = item.field("filename") {
if let Err(_) = results.send(Some(Ok(item.clone()))) {
break;
}
if i + 2 < items_len {
let target_song = if let Some((_, ref cached_filename)) = song_cache {
if cached_filename == path {
Ok(song_cache.take().unwrap().0)
} else {
Song::new(path)
}
} else {
Song::new(path)
};
let target_song = match target_song {
Ok(x) => x,
Err(e) => {
results.send(Some(Err(e))).unwrap_or(());
break;
}
};
match Self::find_best(&items, target_song) {
Err(e) => {
results.send(Some(Err(e))).unwrap_or(());
break;
}
Ok((next_song, index)) => {
if let Some(next_song) = next_song {
if index != 0 {
items.swap(0, index);
}
song_cache = Some((next_song, path.to_owned()));
} else {
break;
}
}
}
}
}
}
results.send(None).unwrap_or(());
}
fn find_best(
items: &VecDeque<MpsItem>,
target: Song,
) -> Result<(Option<Song>, usize), bliss_audio::BlissError> {
let mut best = None;
let mut best_index = 0;
let mut best_distance = f32::MAX;
let (tx, rx) = channel();
let mut threads_spawned = 0;
for i in 0..items.len() {
if let Some(MpsTypePrimitive::String(path)) = items[i].field("filename") {
let result_chann = tx.clone();
let target_clone = target.clone();
let path_clone = path.to_owned();
std::thread::spawn(move || match Song::new(path_clone) {
Err(e) => result_chann.send(Err(e)).unwrap_or(()),
Ok(song) => result_chann
.send(Ok((i, target_clone.distance(&song), song)))
.unwrap_or(()),
});
threads_spawned += 1;
}
}
for _ in 0..threads_spawned {
if let Ok(result) = rx.recv() {
let (index, distance, song) = result?;
if distance < best_distance {
best = Some(song);
best_index = index;
best_distance = distance;
}
} else {
break;
}
}
Ok((best, best_index))
}
init_done: bool,
item_buf: VecDeque<MpsItem>,
}
#[cfg(feature = "bliss-audio")]
@ -132,8 +26,9 @@ impl std::clone::Clone for BlissNextSorter {
fn clone(&self) -> Self {
Self {
up_to: self.up_to,
rx: None,
algorithm_done: self.algorithm_done,
init_done: self.init_done,
item_buf: self.item_buf.clone(),
}
}
}
@ -143,8 +38,9 @@ impl Default for BlissNextSorter {
fn default() -> Self {
Self {
up_to: usize::MAX,
rx: None,
algorithm_done: false,
init_done: false,
item_buf: VecDeque::new()
}
}
}
@ -154,43 +50,90 @@ impl MpsSorter for BlissNextSorter {
fn sort(
&mut self,
iterator: &mut dyn MpsOp,
item_buf: &mut VecDeque<MpsIteratorItem>,
items_out: &mut VecDeque<MpsIteratorItem>,
) -> Result<(), RuntimeMsg> {
if self.rx.is_none() {
if !self.init_done {
// first run
let mut items = VecDeque::new();
for item in iterator {
self.init_done = true;
while let Some(item) = iterator.next() {
match item {
Ok(item) => items.push_back(item),
Err(e) => item_buf.push_back(Err(e)),
Ok(item) => self.item_buf.push_back(item),
Err(e) => items_out.push_back(Err(e)),
}
if items.len() + item_buf.len() >= self.up_to {
if self.item_buf.len() + items_out.len() >= self.up_to {
break;
}
}
// start algorithm
let (tx, rx) = channel();
std::thread::spawn(move || Self::algorithm(items, tx));
self.rx = Some(rx);
if !self.item_buf.is_empty() {
let first = &self.item_buf[0];
let mut ctx = iterator.escape();
for i in 1..self.item_buf.len() {
let item = &self.item_buf[i];
match ctx.analysis.prepare_distance(first, item) {
Err(e) => {
iterator.enter(ctx);
return Err(e);
},
Ok(_) => {},
}
}
iterator.enter(ctx);
items_out.push_back(Ok(first.to_owned()));
}
} else {
if self.item_buf.len() > 2 {
let last = self.item_buf.pop_front().unwrap();
let mut best_index = 0;
let mut best_distance = f64::MAX;
let mut ctx = iterator.escape();
for i in 0..self.item_buf.len() {
let current_item = &self.item_buf[i];
match ctx.analysis.get_distance(&last, current_item) {
Err(e) => {
iterator.enter(ctx);
return Err(e);
},
Ok(distance) => {
if distance < best_distance {
best_index = i;
best_distance = distance;
}
},
}
}
if best_index != 0 {
self.item_buf.swap(0, best_index);
}
items_out.push_back(Ok(self.item_buf[0].clone()));
let next = &self.item_buf[0];
for i in 1..self.item_buf.len() {
let item = &self.item_buf[i];
match ctx.analysis.prepare_distance(next, item) {
Err(e) => {
iterator.enter(ctx);
return Err(e);
},
Ok(_) => {},
}
}
iterator.enter(ctx);
} else if self.item_buf.len() == 2 {
self.item_buf.pop_front();
items_out.push_back(Ok(self.item_buf.pop_front().unwrap()));
// note item_buf is emptied here, so this will not proceed to len() == 1 case on next call
} else if !self.item_buf.is_empty() {
// edge case where item_buf only ever had 1 item
items_out.push_back(Ok(self.item_buf.pop_front().unwrap()));
}
if let Some(item) = self.get_maybe() {
item_buf.push_back(Ok(item?));
}
Ok(())
}
fn reset(&mut self) {
self.algorithm_done = false;
self.rx = None;
self.init_done = false;
}
}
#[cfg(feature = "bliss-audio")]
#[inline]
fn bliss_err<D: Display>(error: D) -> RuntimeMsg {
RuntimeMsg(format!("Bliss error: {}", error))
}
#[cfg(not(feature = "bliss-audio"))]
pub type BlissNextSorter = crate::lang::vocabulary::sorters::EmptySorter;

View file

@ -1,21 +1,18 @@
use std::collections::VecDeque;
#[cfg(feature = "bliss-audio")]
use std::fmt::{Debug, Display, Error, Formatter};
#[cfg(feature = "bliss-audio")]
use std::sync::mpsc::{channel, Receiver};
#[cfg(feature = "bliss-audio")]
use std::collections::HashMap;
#[cfg(feature = "bliss-audio")]
use bliss_audio::Song;
use crate::lang::utility::{assert_name, check_name};
use crate::lang::SyntaxError;
#[cfg(feature = "bliss-audio")]
use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, MpsTypePrimitive, RuntimeMsg};
use crate::lang::{MpsIteratorItem, MpsOp, MpsSorter, RuntimeMsg};
use crate::lang::{MpsLanguageDictionary, MpsSortStatementFactory, MpsSorterFactory};
use crate::tokens::MpsToken;
#[cfg(feature = "bliss-audio")]
use crate::MpsItem;
#[cfg(feature = "bliss-audio")]
const DEFAULT_ORDER: std::cmp::Ordering = std::cmp::Ordering::Greater;
@ -24,48 +21,7 @@ const DEFAULT_ORDER: std::cmp::Ordering = std::cmp::Ordering::Greater;
#[derive(Debug)]
pub struct BlissSorter {
up_to: usize,
float_map: HashMap<String, f32>,
first_song: Option<String>,
rx: Option<Receiver<Result<(String, f32), bliss_audio::BlissError>>>,
errors: Vec<bliss_audio::BlissError>,
}
#[cfg(feature = "bliss-audio")]
impl BlissSorter {
fn get_or_wait(&mut self, path: &str) -> Option<f32> {
if let Some(distance) = self.float_map.get(path) {
Some(*distance)
} else {
// wait on threads until matching result is found
for result in self.rx.as_ref().unwrap() {
match result {
Ok((key, distance)) => {
if path == key {
self.float_map.insert(key, distance);
return Some(distance);
} else {
self.float_map.insert(key, distance);
}
}
Err(e) => {
self.errors.push(e);
return None;
}
}
}
None
}
}
#[inline]
fn compare_songs(
song1: Song,
path_2: String,
) -> Result<(String, f32), bliss_audio::BlissError> {
let song2 = Song::new(&path_2)?;
let distance = song1.distance(&song2);
Ok((path_2, distance))
}
first_song: Option<MpsItem>,
}
#[cfg(feature = "bliss-audio")]
@ -73,10 +29,7 @@ impl std::clone::Clone for BlissSorter {
fn clone(&self) -> Self {
Self {
up_to: self.up_to,
float_map: self.float_map.clone(),
first_song: self.first_song.clone(),
rx: None,
errors: Vec::new(),
}
}
}
@ -86,10 +39,7 @@ impl Default for BlissSorter {
fn default() -> Self {
Self {
up_to: usize::MAX,
float_map: HashMap::new(),
first_song: None,
rx: None,
errors: Vec::new(),
}
}
}
@ -103,7 +53,7 @@ impl MpsSorter for BlissSorter {
) -> Result<(), RuntimeMsg> {
let buf_len_old = item_buf.len(); // save buffer length before modifying buffer
if item_buf.len() < self.up_to {
for item in iterator {
while let Some(item) = iterator.next() {
item_buf.push_back(item);
if item_buf.len() >= self.up_to {
break;
@ -114,81 +64,65 @@ impl MpsSorter for BlissSorter {
// when buf_len_old == item_buf.len(), iterator was already complete
// no need to sort in that case, since buffer was sorted in last call to sort or buffer never had any items to sort
if self.first_song.is_none() {
let (tx_chann, rx_chann) = channel();
let mut item_paths = Vec::with_capacity(item_buf.len() - 1);
for item in item_buf.iter() {
if let Ok(item) = item {
// build comparison table
if let Some(MpsTypePrimitive::String(path)) = item.field("filename") {
if self.first_song.is_none() {
// find first valid song (ie first item with field "filename")
self.first_song = Some(path.to_owned());
//self.first_song = Some(Song::new(path).map_err(|e| bliss_err(e, op))?);
self.float_map.insert(path.to_owned(), 0.0); // distance to itself should be 0
} else {
item_paths.push(path.to_owned());
self.first_song = Some(item.clone());
break;
}
}
}
}
if let Some(first_song_path) = &self.first_song {
// spawn threads for processing song distances
let path1_clone = first_song_path.to_owned();
std::thread::spawn(move || match Song::new(path1_clone) {
Err(e) => tx_chann.send(Err(e)).unwrap_or(()),
Ok(song1) => {
for path2 in item_paths {
let result_chann = tx_chann.clone();
let song1_clone = song1.clone();
std::thread::spawn(move || {
result_chann
.send(Self::compare_songs(song1_clone, path2))
.unwrap_or(());
});
if let Some(first) = &self.first_song {
let mut ctx = iterator.escape();
for i in 0..item_buf.len() {
if let Ok(item) = &item_buf[i] {
if item == first {continue;}
match ctx.analysis.prepare_distance(first, item) {
Err(e) => {
iterator.enter(ctx);
return Err(e);
},
Ok(_) => {},
}
}
});
}
self.rx = Some(rx_chann);
// unordered list returned on first call to this function
// note that only the first item will be used by sorter,
// since the second time this function is called the remaining items are sorted properly
iterator.enter(ctx);
}
} else if self.first_song.is_some() {
// Sort songs on second call to this function
self.first_song = None;
let first = self.first_song.take().unwrap();
let mut cache = HashMap::<MpsItem, f64>::new();
cache.insert(first.clone(), 0.0);
let mut ctx = iterator.escape();
for i in 0..item_buf.len() {
if let Ok(item) = &item_buf[i] {
if item == &first {continue;}
match ctx.analysis.get_distance(&first, item) {
Err(e) => {
iterator.enter(ctx);
return Err(e);
},
Ok(distance) => {
cache.insert(item.clone(), distance);
},
}
}
}
iterator.enter(ctx);
item_buf.make_contiguous().sort_by(|a, b| {
if let Ok(a) = a {
if let Some(MpsTypePrimitive::String(a_path)) = a.field("filename") {
if let Ok(b) = b {
if let Some(MpsTypePrimitive::String(b_path)) = b.field("filename") {
if let Some(float_a) = self.get_or_wait(a_path) {
if let Some(float_b) = self.get_or_wait(b_path) {
return float_a
.partial_cmp(&float_b)
.unwrap_or(DEFAULT_ORDER);
}
}
}
}
let float_a = cache.get(&a).unwrap();
let float_b = cache.get(&b).unwrap();
return float_a.partial_cmp(float_b).unwrap_or(DEFAULT_ORDER);
}
}
DEFAULT_ORDER
});
}
if self.errors.is_empty() {
Ok(())
} else {
Err(bliss_err(self.errors.pop().unwrap()))
}
}
}
#[cfg(feature = "bliss-audio")]
#[inline]
fn bliss_err<D: Display>(error: D) -> RuntimeMsg {
RuntimeMsg(format!("Bliss error: {}", error))
}
#[cfg(not(feature = "bliss-audio"))]
pub type BlissSorter = crate::lang::vocabulary::sorters::EmptySorter;

View file

@ -1,4 +1,6 @@
mod filesystem;
#[cfg(feature = "advanced")]
mod music_analysis;
mod sql;
mod variables;
@ -12,3 +14,8 @@ pub mod general {
pub use super::filesystem::{FileIter, MpsFilesystemExecutor, MpsFilesystemQuerier};
pub use super::variables::{MpsOpStorage, MpsType, MpsVariableStorer};
}
#[cfg(feature = "advanced")]
pub mod advanced {
pub use super::music_analysis::{MpsMusicAnalyzer, MpsDefaultAnalyzer};
}

View file

@ -0,0 +1,416 @@
use core::fmt::Debug;
#[cfg(feature = "bliss-audio")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "bliss-audio")]
use std::sync::mpsc::{channel, Sender, Receiver};
#[cfg(feature = "bliss-audio")]
use bliss_audio::{Song, BlissError};
#[cfg(feature = "bliss-audio")]
use crate::lang::MpsTypePrimitive;
use crate::lang::RuntimeMsg;
use crate::MpsItem;
const PATH_FIELD: &str = "filename";
pub trait MpsMusicAnalyzer: Debug {
fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg>;
fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg>;
fn get_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<f64, RuntimeMsg>;
}
#[cfg(feature = "bliss-audio")]
#[derive(Debug)]
pub struct MpsDefaultAnalyzer {
requests: Sender<RequestType>,
responses: Receiver<ResponseType>,
}
#[cfg(feature = "bliss-audio")]
impl std::default::Default for MpsDefaultAnalyzer {
fn default() -> Self {
let (req_tx, req_rx) = channel();
let (resp_tx, resp_rx) = channel();
std::thread::spawn(move || {
let mut cache_thread = CacheThread::new(resp_tx);
cache_thread.run_loop(req_rx);
});
Self {
requests: req_tx,
responses: resp_rx,
}
}
}
#[cfg(feature = "bliss-audio")]
impl MpsDefaultAnalyzer {
fn request_distance(&mut self, from: &MpsItem, to: &MpsItem, ack: bool) -> Result<(), RuntimeMsg> {
let path_from = Self::get_path(from)?;
let path_to = Self::get_path(to)?;
self.requests.send(
RequestType::Distance {
path1: path_from.to_owned(),
path2: path_to.to_owned(),
ack: ack,
}
).map_err(|e| RuntimeMsg(format!("Channel send err {}", e)))
}
fn get_path(item: &MpsItem) -> Result<&str, RuntimeMsg> {
if let Some(path) = item.field(PATH_FIELD) {
if let MpsTypePrimitive::String(path) = path {
Ok(path)
} else {
Err(RuntimeMsg(format!("Field {} on item is not String, it's {}", PATH_FIELD, path)))
}
} else {
Err(RuntimeMsg(format!("Missing field {} on item", PATH_FIELD)))
}
}
fn request_song(&mut self, item: &MpsItem, ack: bool) -> Result<(), RuntimeMsg> {
let path = Self::get_path(item)?;
self.requests.send(
RequestType::Song {
path: path.to_owned(),
ack: ack,
}
).map_err(|e| RuntimeMsg(format!("Channel send error: {}", e)))
}
}
#[cfg(feature = "bliss-audio")]
impl MpsMusicAnalyzer for MpsDefaultAnalyzer {
fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> {
self.request_distance(from, to, false)
}
fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg> {
self.request_song(item, false)
}
fn get_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<f64, RuntimeMsg> {
self.request_distance(from, to, true)?;
let path_from = Self::get_path(from)?;
let path_to = Self::get_path(to)?;
for response in self.responses.iter() {
if let ResponseType::Distance{path1, path2, distance} = response {
if path1 == path_from && path2 == path_to {
return match distance {
Ok(d) => Ok(d as f64),
Err(e) => Err(RuntimeMsg(format!("Bliss error: {}", e)))
};
}
}
}
Err(RuntimeMsg("Channel closed without response: internal error".to_owned()))
}
}
#[cfg(not(feature = "bliss-audio"))]
#[derive(Default, Debug)]
pub struct MpsDefaultAnalyzer {}
#[cfg(not(feature = "bliss-audio"))]
impl MpsMusicAnalyzer for MpsDefaultAnalyzer {
fn prepare_distance(&mut self, from: &MpsItem, to: &MpsItem) -> Result<(), RuntimeMsg> {
Ok(())
}
fn prepare_item(&mut self, item: &MpsItem) -> Result<(), RuntimeMsg> {
Ok(())
}
fn get_distance(&mut self, item: &MpsItem) -> Result<f64, RuntimeMsg> {
Ok(f64::MAX)
}
}
#[cfg(feature = "bliss-audio")]
enum RequestType {
Distance {
path1: String,
path2: String,
ack: bool,
},
Song {
path: String,
ack: bool,
},
//End {}
}
#[cfg(feature = "bliss-audio")]
enum ResponseType {
Distance {
path1: String,
path2: String,
distance: Result<f32, BlissError>,
},
Song {
path: String,
song: Result<Song, BlissError>,
},
}
#[cfg(feature = "bliss-audio")]
struct CacheThread {
distance_cache: HashMap<(String, String), Result<f32, BlissError>>,
distance_in_progress: HashSet<(String, String)>,
song_cache: HashMap<String, Result<Song, BlissError>>,
song_in_progress: HashSet<String>,
//requests: Receiver<RequestType>,
responses: Sender<ResponseType>,
}
#[cfg(feature = "bliss-audio")]
impl CacheThread {
fn new(responses: Sender<ResponseType>) -> Self {
Self {
distance_cache: HashMap::new(),
distance_in_progress: HashSet::new(),
song_cache: HashMap::new(),
song_in_progress: HashSet::new(),
//requests: requests,
responses: responses,
}
}
fn non_blocking_read_some(&mut self, results: &Receiver<ResponseType>) {
for result in results.try_iter() {
match result {
ResponseType::Distance {path1, path2, distance} => {
self.insert_distance(path1, path2, distance);
},
ResponseType::Song {path, song} => {
self.insert_song(path, song);
}
}
}
}
fn insert_song(&mut self, path: String, song_result: Result<Song, BlissError>) {
self.song_in_progress.remove(&path);
self.song_cache.insert(path, song_result);
}
fn insert_distance(&mut self, path1: String, path2: String, distance_result: Result<f32, BlissError>) {
let key = (path1, path2);
self.distance_in_progress.remove(&key);
self.distance_cache.insert(key, distance_result);
}
fn get_song_option(&mut self, path: &str, auto_add: bool, results: &Receiver<ResponseType>) -> Option<Song> {
// wait for song if already in progress
if self.song_in_progress.contains(path) {
for result in results.iter() {
match result {
ResponseType::Distance {path1, path2, distance} => {
self.insert_distance(path1, path2, distance);
},
ResponseType::Song {path: path2, song} => {
if path2 == path {
self.insert_song(path2, song.clone());
let result = song.ok();
if result.is_none() && auto_add {
self.song_in_progress.insert(path.to_owned());
}
return result;
} else {
self.insert_song(path2, song);
}
}
}
}
} else if self.song_cache.contains_key(path) {
let result = self.song_cache.get(path).and_then(|r| r.clone().ok().to_owned());
if result.is_none() && auto_add {
self.song_in_progress.insert(path.to_owned());
}
return result;
}
if auto_add {
self.song_in_progress.insert(path.to_owned());
}
return None;
}
fn handle_distance_req(&mut self,
path1: String,
path2: String,
ack: bool,
worker_tx: &Sender<ResponseType>,
worker_results: &Receiver<ResponseType>,
) -> bool {
let key = (path1.clone(), path2.clone());
if let Some(result) = self.distance_cache.get(&key) {
if ack {
let result = result.to_owned();
if let Err(_) = self.responses.send(
ResponseType::Distance {
path1: path1,
path2: path2,
distance: result,
}
) { return true; }
}
} else {
if path1 == path2 {
// trivial case
// also prevents deadlock in self.get_song_option()
// due to waiting on song that isn't being processed yet
// (first call adds it to song_in_progress set, second call just waits)
if ack {
if let Err(_) = self.responses.send(ResponseType::Distance {
path1: path1,
path2: path2,
distance: Ok(0.0),
}) { return true; }
}
} else if !self.distance_in_progress.contains(&key) {
let results = worker_tx.clone();
let song1_clone = self.get_song_option(&path1, true, worker_results);
let song2_clone = self.get_song_option(&path2, true, worker_results);
std::thread::spawn(move || {
let distance_result = worker_distance(
&results,
(&path1, song1_clone),
(&path2, song2_clone),
);
results.send(
ResponseType::Distance {
path1: path1,
path2: path2,
distance: distance_result,
}
).unwrap_or(());
});
}
if ack {
'inner1: for result in worker_results.iter() {
match result {
ResponseType::Distance {path1: path1_2, path2: path2_2, distance} => {
self.insert_distance(path1_2.clone(), path2_2.clone(), distance.clone());
if path1_2 == key.0 && path2_2 == key.1 {
if let Err(_) = self.responses.send(ResponseType::Distance {
path1: path1_2,
path2: path2_2,
distance: distance,
}) { return true; }
break 'inner1;
}
},
ResponseType::Song {path, song} => {
self.insert_song(path, song);
}
}
}
}
}
false
}
fn handle_song_req(&mut self,
path: String,
ack: bool,
worker_tx: &Sender<ResponseType>,
worker_results: &Receiver<ResponseType>,
) -> bool {
if let Some(song) = self.song_cache.get(&path) {
if ack {
let song = song.to_owned();
if let Err(_) = self.responses.send(
ResponseType::Song {
path: path,
song: song,
}
) { return true; }
}
} else {
if !self.song_in_progress.contains(&path) {
let path_clone = path.clone();
let results = worker_tx.clone();
std::thread::spawn(move || {
let song_result = Song::new(&path_clone);
results.send(
ResponseType::Song {
path: path_clone,
song: song_result,
}
).unwrap_or(());
});
}
if ack {
'inner2: for result in worker_results.iter() {
match result {
ResponseType::Distance {path1, path2, distance} => {
self.insert_distance(path1, path2, distance);
},
ResponseType::Song {path: path2, song} => {
self.insert_song(path2.clone(), song.clone());
if path2 == path {
if let Err(_) = self.responses.send(ResponseType::Song {
path: path,
song: song,
}) { return false; }
break 'inner2;
}
}
}
}
}
}
false
}
fn run_loop(&mut self, requests: Receiver<RequestType>,) {
let (worker_tx, worker_results): (Sender<ResponseType>, Receiver<ResponseType>) = channel();
'outer: for request in requests.iter() {
self.non_blocking_read_some(&worker_results);
match request {
//RequestType::End{} => break,
RequestType::Distance{path1, path2, ack} => {
if self.handle_distance_req(path1, path2, ack, &worker_tx, &worker_results) {
break 'outer;
}
},
RequestType::Song{path, ack} => {
if self.handle_song_req(path, ack, &worker_tx, &worker_results) {
break 'outer;
}
},
}
}
}
}
#[cfg(feature = "bliss-audio")]
fn worker_distance(results: &Sender<ResponseType>, song1: (&str, Option<Song>), song2: (&str, Option<Song>)) -> Result<f32, BlissError> {
let path1 = song1.0;
let song1 = if let Some(song) = song1.1 {
song
} else {
let new_song1 = Song::new(path1);
results.send(ResponseType::Song {
path: path1.to_string(),
song: new_song1.clone(),
}).unwrap_or(());
new_song1?
};
let path2 = song2.0;
let song2 = if let Some(song) = song2.1 {
song
} else {
let new_song2 = Song::new(path2);
results.send(ResponseType::Song {
path: path2.to_string(),
song: new_song2.clone(),
}).unwrap_or(());
new_song2?
};
Ok(song1.distance(&song2))
}

View file

@ -6,7 +6,7 @@ license = "LGPL-2.1-only OR GPL-3.0-only"
readme = "README.md"
[dependencies]
rodio = { version = "^0.15", features = ["symphonia"]}
rodio = { version = "^0.15", features = ["symphonia-all"]}
m3u8-rs = { version = "^3.0.0" }
# local