diff --git a/mps-player/src/controller.rs b/mps-player/src/controller.rs index 2c9a814..5852c25 100644 --- a/mps-player/src/controller.rs +++ b/mps-player/src/controller.rs @@ -23,10 +23,11 @@ impl MpsController { ) -> Self { let (control_tx, control_rx) = channel(); let (event_tx, event_rx) = channel(); + let (playback_tx, playback_rx) = channel(); let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); - sys_ctrl.init(); + sys_ctrl.init(playback_rx); let handle = - MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, false); + MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, playback_tx, false); Self { control: control_tx, event: event_rx, @@ -40,10 +41,11 @@ impl MpsController { ) -> Self { let (control_tx, control_rx) = channel(); let (event_tx, event_rx) = channel(); + let (playback_tx, playback_rx) = channel(); let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); - sys_ctrl.init(); + sys_ctrl.init(playback_rx); let handle = - MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, true); + MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, playback_tx, true); Self { control: control_tx, event: event_rx, @@ -58,7 +60,7 @@ impl MpsController { .map_err(PlaybackError::from_err)?; let mut response = self.event.recv().map_err(PlaybackError::from_err)?; while !response.is_acknowledgement() { - Self::handle_event(response)?; + self.handle_event(response)?; response = self.event.recv().map_err(PlaybackError::from_err)?; } if let PlayerAction::Acknowledge(action) = response { @@ -77,12 +79,13 @@ impl MpsController { } } - fn handle_event(event: PlayerAction) -> Result<(), PlaybackError> { + fn handle_event(&self, event: PlayerAction) -> Result<(), PlaybackError> { match event { PlayerAction::Acknowledge(_) => Ok(()), PlayerAction::Exception(e) => Err(e), PlayerAction::End => Ok(()), PlayerAction::Empty => Ok(()), + //PlayerAction::Enqueued(item) => Ok(()), } } @@ -134,7 +137,7 @@ impl MpsController { if let PlayerAction::End = msg { break; } else { - Self::handle_event(msg)?; + self.handle_event(msg)?; } } Ok(()) @@ -142,7 +145,7 @@ impl MpsController { pub fn wait_for_empty(&self) -> Result<(), PlaybackError> { for msg in self.event.try_iter() { - Self::handle_event(msg)?; + self.handle_event(msg)?; } self.control .send(ControlAction::CheckEmpty { ack: true }) @@ -152,7 +155,7 @@ impl MpsController { if let PlayerAction::Empty = msg { break; } else { - Self::handle_event(msg)?; + self.handle_event(msg)?; } } Ok(()) @@ -163,7 +166,7 @@ impl MpsController { pub fn check(&self) -> Vec { let mut result = Vec::new(); for msg in self.event.try_iter() { - if let Err(e) = Self::handle_event(msg) { + if let Err(e) = self.handle_event(msg) { result.push(e); } } @@ -172,7 +175,7 @@ impl MpsController { /// Like check(), but it also waits for an acknowledgement to ensure it gets the latest events. pub fn check_ack(&self) -> Vec { - let mut result = Vec::new(); + let mut result = self.check(); // clear existing messages first let to_send = ControlAction::NoOp { ack: true }; if let Err(e) = self .control @@ -191,7 +194,7 @@ impl MpsController { .into(), }); } - } else if let Err(e) = Self::handle_event(msg) { + } else if let Err(e) = self.handle_event(msg) { result.push(e); } } diff --git a/mps-player/src/os_controls.rs b/mps-player/src/os_controls.rs index ce9ede6..7de77fb 100644 --- a/mps-player/src/os_controls.rs +++ b/mps-player/src/os_controls.rs @@ -1,13 +1,15 @@ #[cfg(unix)] -use std::sync::mpsc::{channel, Sender}; +use std::sync::mpsc::{channel, Sender, Receiver}; #[cfg(unix)] use std::thread::JoinHandle; #[cfg(unix)] -use mpris_player::{MprisPlayer, PlaybackStatus}; +use mpris_player::{MprisPlayer, PlaybackStatus, Metadata}; + +use mps_interpreter::MpsMusicItem; //use super::MpsController; -use super::player_wrapper::ControlAction; +use super::player_wrapper::{ControlAction, PlaybackAction}; /// OS-specific APIs for media controls. /// Currently only Linux (dbus) is supported. @@ -16,7 +18,17 @@ pub struct SystemControlWrapper { #[cfg(target_os = "linux")] dbus_handle: Option>, //std::sync::Arc, #[cfg(target_os = "linux")] - dbus_die: Option>, + dbus_ctrl: Option>, + #[cfg(target_os = "linux")] + playback_event_handler: Option>, + #[cfg(target_os = "linux")] + playback_event_handler_killer: Option>, +} + +#[cfg(target_os = "linux")] +enum DbusControl { + Die, + SetMetadata(Metadata), } #[cfg(target_os = "linux")] @@ -25,13 +37,16 @@ impl SystemControlWrapper { Self { control: control, dbus_handle: None, //MprisPlayer::new("mps".into(), "mps".into(), "null".into()) - dbus_die: None, + dbus_ctrl: None, + playback_event_handler: None, + playback_event_handler_killer: None, } } - pub fn init(&mut self) { - let (tx, die) = channel(); - self.dbus_die = Some(tx); + pub fn init(&mut self, playback: Receiver) { + let (tx, dbus_ctrl) = channel(); + let dbus_ctrl_tx_clone = tx.clone(); + self.dbus_ctrl = Some(tx); let control_clone1 = self.control.clone(); self.dbus_handle = Some(std::thread::spawn(move || { let dbus_conn = MprisPlayer::new("mps".into(), "mps".into(), "null".into()); @@ -111,20 +126,80 @@ impl SystemControlWrapper { // poll loop, using my custom mpris lib because original did it wrong loop { dbus_conn.poll(5); - if let Ok(_) = die.try_recv() { + match dbus_ctrl.try_recv() { + Err(_) => {}, + Ok(DbusControl::Die) => break, + Ok(DbusControl::SetMetadata(meta)) => { + dbus_conn.set_metadata(meta); + }, + } + } + })); + let (tx, rx) = channel(); + self.playback_event_handler_killer = Some(tx); + self.playback_event_handler = Some(std::thread::spawn(move || { + loop { + if let Ok(_) = rx.try_recv() { break; } + match playback.recv() { + Err(_) => break, + Ok(PlaybackAction::Exit) => break, + Ok(PlaybackAction::Enqueued(item)) => Self::enqueued(item, &dbus_ctrl_tx_clone), + Ok(PlaybackAction::Empty) => Self::empty(&dbus_ctrl_tx_clone), + } } })); } pub fn exit(self) { - if let Some(tx) = self.dbus_die { - tx.send(()).unwrap_or(()); + // exit dbus thread + if let Some(tx) = self.dbus_ctrl { + tx.send(DbusControl::Die).unwrap_or(()); } if let Some(handle) = self.dbus_handle { handle.join().unwrap_or(()); } + // exit playback event thread + if let Some(tx) = self.playback_event_handler_killer { + tx.send(()).unwrap_or(()); + } + if let Some(handle) = self.playback_event_handler { + handle.join().unwrap_or(()); + } + } + + fn enqueued(item: MpsMusicItem, dbus_ctrl: &Sender) { + //println!("Got enqueued item {}", &item.title); + dbus_ctrl.send(DbusControl::SetMetadata(Metadata { + length: None, + art_url: None, + album: item.album, + album_artist: None, // TODO maybe? + artist: item.artist.map(|artist| vec![artist]), + composer: None, + disc_number: None, + genre: item.genre.map(|genre| vec![genre]), + title: Some(item.title), + track_number: item.track.map(|track| track as i32), + url: Some(item.filename), + })).unwrap_or(()); + } + + fn empty(dbus_ctrl: &Sender) { + dbus_ctrl.send(DbusControl::SetMetadata(Metadata { + length: None, + art_url: None, + album: None, + album_artist: None, // TODO maybe? + artist: None, + composer: None, + disc_number: None, + genre: None, + title: None, + track_number: None, + url: None, + })).unwrap_or(()); } } @@ -134,7 +209,7 @@ impl SystemControlWrapper { Self { control: control } } - pub fn init(&mut self) {} + pub fn init(&mut self, _playback: Receiver) {} pub fn exit(self) {} } diff --git a/mps-player/src/player.rs b/mps-player/src/player.rs index b4fc4f8..95ca607 100644 --- a/mps-player/src/player.rs +++ b/mps-player/src/player.rs @@ -5,7 +5,7 @@ use rodio::{decoder::Decoder, OutputStream, OutputStreamHandle, Sink}; use m3u8_rs::{MediaPlaylist, MediaSegment}; -use mps_interpreter::{tokens::MpsTokenReader, MpsRunner}; +use mps_interpreter::{tokens::MpsTokenReader, MpsRunner, MpsMusicItem}; use super::PlaybackError; @@ -50,10 +50,12 @@ impl MpsPlayer { Ok(()) } - pub fn enqueue_all(&mut self) -> Result<(), PlaybackError> { + pub fn enqueue_all(&mut self) -> Result, PlaybackError> { + let mut enqueued = Vec::new(); for item in &mut self.runner { match item { Ok(music) => { + enqueued.push(music.clone()); let file = fs::File::open(music.filename).map_err(PlaybackError::from_err)?; let stream = io::BufReader::new(file); let source = Decoder::new(stream).map_err(PlaybackError::from_err)?; @@ -64,18 +66,19 @@ impl MpsPlayer { Err(e) => Err(PlaybackError::from_err(e)), }?; } - Ok(()) + Ok(enqueued) } - pub fn enqueue(&mut self, count: usize) -> Result<(), PlaybackError> { + pub fn enqueue(&mut self, count: usize) -> Result, PlaybackError> { let mut items_left = count; + let mut enqueued = Vec::with_capacity(count); if items_left == 0 { - return Ok(()); + return Ok(enqueued); } for item in &mut self.runner { match item { Ok(music) => { - //println!("Enqueuing {}", music.filename); + enqueued.push(music.clone()); let file = fs::File::open(music.filename).map_err(PlaybackError::from_err)?; let stream = io::BufReader::new(file); let source = Decoder::new(stream).map_err(PlaybackError::from_err)?; @@ -91,7 +94,7 @@ impl MpsPlayer { } } //println!("Enqueued {} items", count - items_left); - Ok(()) + Ok(enqueued) } pub fn resume(&self) { @@ -114,7 +117,7 @@ impl MpsPlayer { self.sink.len() } - pub fn empty(&self) -> bool { + pub fn queue_empty(&self) -> bool { self.sink.empty() } diff --git a/mps-player/src/player_wrapper.rs b/mps-player/src/player_wrapper.rs index 89b0a6a..332a09c 100644 --- a/mps-player/src/player_wrapper.rs +++ b/mps-player/src/player_wrapper.rs @@ -2,18 +2,20 @@ use std::sync::mpsc::{Receiver, Sender}; use std::{thread, thread::JoinHandle}; use mps_interpreter::tokens::MpsTokenReader; +use mps_interpreter::MpsMusicItem; use super::MpsPlayer; use super::PlaybackError; /// A wrapper around MpsPlayer so that playback can occur on a different thread. -/// This allows for message passing between the threads. +/// This allows for message passing between the player and controller. /// /// You will probably never directly interact with this, instead using MpsController to communicate. pub struct MpsPlayerServer { player: MpsPlayer, control: Receiver, event: Sender, + playback: Sender, keep_alive: bool, } @@ -22,22 +24,42 @@ impl MpsPlayerServer { player: MpsPlayer, ctrl: Receiver, event: Sender, + playback: Sender, keep_alive: bool, ) -> Self { Self { player: player, control: ctrl, event: event, + playback: playback, keep_alive: keep_alive, } } + fn enqeue_some(&mut self, count: usize) { + //println!("Enqueuing up to {} items", count); + match self.player.enqueue(count) { + Err(e) => self.event.send(PlayerAction::Exception(e)).unwrap(), + Ok(items) => for item in items { // notify of new items that have been enqueued + self.playback.send(PlaybackAction::Enqueued(item)).unwrap(); + }, + } + } + + fn on_empty(&self) { + self.event.send(PlayerAction::Empty).unwrap(); + self.playback.send(PlaybackAction::Empty).unwrap(); + } + + fn on_end(&self) { + self.event.send(PlayerAction::End).unwrap(); + self.playback.send(PlaybackAction::Exit).unwrap(); + } + fn run_loop(&mut self) { // this can panic since it's not on the main thread - // initial queue full - if let Err(e) = self.player.enqueue(1) { - self.event.send(PlayerAction::Exception(e)).unwrap(); - } + // initial queue fill + self.enqeue_some(1); let mut is_empty = self.player.queue_len() == 0; loop { let command = self.control.recv().unwrap(); @@ -54,7 +76,7 @@ impl MpsPlayerServer { self.event.send(PlayerAction::Exception(e)).unwrap(); } if !self.player.is_paused() { - self.player.enqueue(1).unwrap(); + self.enqeue_some(1); } } ControlAction::Previous { .. } => {} // TODO @@ -73,9 +95,7 @@ impl MpsPlayerServer { is_exiting = true; } ControlAction::Enqueue { amount, .. } => { - if let Err(e) = self.player.enqueue(amount) { - self.event.send(PlayerAction::Exception(e)).unwrap(); - } + self.enqeue_some(amount); } ControlAction::NoOp { .. } => {} // empty by design ControlAction::SetVolume { volume, .. } => { @@ -88,9 +108,7 @@ impl MpsPlayerServer { // keep queue full (while playing music) if self.player.queue_len() == 0 && !self.player.is_paused() && !is_exiting { - if let Err(e) = self.player.enqueue(1) { - self.event.send(PlayerAction::Exception(e)).unwrap(); - } + self.enqeue_some(1); if self.player.queue_len() == 0 { // no more music to add is_exiting = !self.keep_alive || is_exiting; @@ -105,22 +123,22 @@ impl MpsPlayerServer { if self.player.queue_len() == 0 && !is_empty { // just became empty is_empty = true; - self.event.send(PlayerAction::Empty).unwrap(); + self.on_empty(); } else if self.player.queue_len() != 0 && is_empty { // just got filled is_empty = false; } if is_empty && check_empty { - self.event.send(PlayerAction::Empty).unwrap(); + self.on_empty(); } if is_exiting { break; } } - println!("Exiting playback server"); - self.event.send(PlayerAction::End).unwrap(); + //println!("Exiting playback server"); + self.on_end(); } pub fn spawn MpsPlayer + Send + 'static>( @@ -128,12 +146,13 @@ impl MpsPlayerServer { ctrl_tx: Sender, ctrl_rx: Receiver, event: Sender, + playback: Sender, keep_alive: bool, ) -> JoinHandle<()> { thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50)); thread::spawn(move || { let player = factory(); - let mut server_obj = Self::new(player, ctrl_rx, event, keep_alive); + let mut server_obj = Self::new(player, ctrl_rx, event, playback, keep_alive); server_obj.run_loop(); }) } @@ -193,6 +212,13 @@ pub enum PlayerAction { Empty, } +#[derive(Clone, Debug)] +pub enum PlaybackAction { + Empty, + Enqueued(MpsMusicItem), + Exit, +} + impl PlayerAction { pub fn is_acknowledgement(&self) -> bool { match self {