From 7a327767f3d3c8ee01840cbdd0a83f482f4f6dc2 Mon Sep 17 00:00:00 2001 From: "NGnius (Graham)" Date: Fri, 10 Dec 2021 16:53:22 -0500 Subject: [PATCH] Create barebones music player for mps interpretor --- .gitmodules | 3 + Cargo.lock | 27 ++++ mpris-player | 1 + mps-interpreter/src/tokens/token_enum.rs | 28 +++- mps-interpreter/src/tokens/tokenizer.rs | 12 +- mps-player/Cargo.toml | 4 + mps-player/src/controller.rs | 115 ++++++++++++++++ mps-player/src/errors.rs | 2 +- mps-player/src/lib.rs | 6 + mps-player/src/os_controls.rs | 131 +++++++++++++++++++ mps-player/src/player.rs | 28 ++++ mps-player/src/player_wrapper.rs | 160 +++++++++++++++++++++++ src/main.rs | 14 +- 13 files changed, 520 insertions(+), 11 deletions(-) create mode 100644 .gitmodules create mode 160000 mpris-player create mode 100644 mps-player/src/controller.rs create mode 100644 mps-player/src/os_controls.rs create mode 100644 mps-player/src/player_wrapper.rs diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..7dfe03a --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "mpris-player"] + path = mpris-player + url = https://github.com/NGnius/mpris-player diff --git a/Cargo.lock b/Cargo.lock index adf1fcd..6abfc16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,16 @@ dependencies = [ "syn", ] +[[package]] +name = "dbus" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48b5f0f36f1eebe901b0e6bee369a77ed3396334bf3f09abd46454a576f71819" +dependencies = [ + "libc", + "libdbus-sys", +] + [[package]] name = "derivative" version = "2.2.0" @@ -419,6 +429,15 @@ version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" +[[package]] +name = "libdbus-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c185b5b7ad900923ef3a8ff594083d4d9b5aea80bb4f32b8342363138c0d456b" +dependencies = [ + "pkg-config", +] + [[package]] name = "libloading" version = "0.7.2" @@ -507,6 +526,13 @@ dependencies = [ "cc", ] +[[package]] +name = "mpris-player" +version = "0.6.1" +dependencies = [ + "dbus", +] + [[package]] name = "mps" version = "0.1.0" @@ -529,6 +555,7 @@ name = "mps-player" version = "0.1.0" dependencies = [ "m3u8-rs", + "mpris-player", "mps-interpreter", "rodio", ] diff --git a/mpris-player b/mpris-player new file mode 160000 index 0000000..2e843ec --- /dev/null +++ b/mpris-player @@ -0,0 +1 @@ +Subproject commit 2e843ec8d917a4859301de366a7141d5f19df6ec diff --git a/mps-interpreter/src/tokens/token_enum.rs b/mps-interpreter/src/tokens/token_enum.rs index 362860f..fdb6efd 100644 --- a/mps-interpreter/src/tokens/token_enum.rs +++ b/mps-interpreter/src/tokens/token_enum.rs @@ -5,7 +5,9 @@ pub enum MpsToken { Sql, OpenBracket, CloseBracket, + Comma, Literal(String), + Name(String), } impl MpsToken { @@ -14,7 +16,22 @@ impl MpsToken { "sql" => Ok(Self::Sql), "(" => Ok(Self::OpenBracket), ")" => Ok(Self::CloseBracket), - _ => Err(s), + "," => Ok(Self::Comma), + _ => { + // name validation + let mut ok = true; + for invalid_c in ["-", "+", ","] { + if s.contains(invalid_c) { + ok = false; + break; + } + } + if ok { + Ok(Self::Name(s)) + } else { + Err(s) + } + }, } } @@ -45,6 +62,13 @@ impl MpsToken { _ => false } } + + pub fn is_name(&self) -> bool { + match self { + Self::Name(_) => true, + _ => false + } + } } impl Display for MpsToken { @@ -53,7 +77,9 @@ impl Display for MpsToken { Self::Sql => write!(f, "sql"), Self::OpenBracket => write!(f, "("), Self::CloseBracket => write!(f, ")"), + Self::Comma => write!(f, ","), Self::Literal(s) => write!(f, "\"{}\"", s), + Self::Name(s) => write!(f, "{}", s), } } } diff --git a/mps-interpreter/src/tokens/tokenizer.rs b/mps-interpreter/src/tokens/tokenizer.rs index 57867cf..0bd8ae5 100644 --- a/mps-interpreter/src/tokens/tokenizer.rs +++ b/mps-interpreter/src/tokens/tokenizer.rs @@ -63,8 +63,8 @@ impl MpsTokenizer where R: std::io::Read { ); bigger_buf.clear(); }, - ReaderStateMachine::Bracket{..} => { - let out = bigger_buf.pop().unwrap(); // bracket token + ReaderStateMachine::SingleCharToken{..} => { + let out = bigger_buf.pop().unwrap(); // bracket or comma token if bigger_buf.len() != 0 { // bracket tokens can be beside other tokens, without separator let token = String::from_utf8(bigger_buf.clone()) .map_err(|e| self.error(format!("UTF-8 encoding error: {}", e)))?; @@ -171,7 +171,7 @@ enum ReaderStateMachine { InsideQuoteLiteral{ out: u8, }, - Bracket { + SingleCharToken { out: u8, }, EndLiteral{}, @@ -186,7 +186,7 @@ impl ReaderStateMachine { match self { Self::Start{} | Self::Regular{..} - | Self::Bracket{..} + | Self::SingleCharToken{..} | Self::EndLiteral{} | Self::EndToken{} | Self::EndStatement{} => @@ -197,7 +197,7 @@ impl ReaderStateMachine { ' ' => Self::EndToken{}, '\n' | '\r' | ';' => Self::EndStatement{}, '\0' => Self::EndOfFile{}, - '(' | ')' => Self::Bracket{out: input}, + '(' | ')' | ',' => Self::SingleCharToken{out: input}, _ => Self::Regular{out: input}, }, Self::Escaped{inside} => match inside { @@ -240,7 +240,7 @@ impl ReaderStateMachine { pub fn output(&self) -> Option { match self { Self::Regular{ out, ..} - | Self::Bracket{ out, ..} + | Self::SingleCharToken{ out, ..} | Self::InsideTickLiteral{ out, ..} | Self::InsideQuoteLiteral{ out, ..} => Some(*out), _ => None diff --git a/mps-player/Cargo.toml b/mps-player/Cargo.toml index aa0be5b..c360dad 100644 --- a/mps-player/Cargo.toml +++ b/mps-player/Cargo.toml @@ -9,3 +9,7 @@ m3u8-rs = { version = "^3.0.0" } # local mps-interpreter = { path = "../mps-interpreter" } + +[target.'cfg(unix)'.dependencies] +#dbus = { version = "^0.9" } +mpris-player = { version = "^0.6.1", path = "../mpris-player" } diff --git a/mps-player/src/controller.rs b/mps-player/src/controller.rs new file mode 100644 index 0000000..f08569e --- /dev/null +++ b/mps-player/src/controller.rs @@ -0,0 +1,115 @@ +use std::sync::mpsc::{Sender, Receiver, channel}; +use std::thread::JoinHandle; + +use mps_interpreter::tokens::MpsTokenReader; + +use super::MpsPlayer; +use super::PlaybackError; +use super::player_wrapper::{ControlAction, PlayerAction, MpsPlayerServer}; +use super::os_controls::SystemControlWrapper; + +pub struct MpsController { + control: Sender, + event: Receiver, + handle: JoinHandle<()>, + sys_ctrl: SystemControlWrapper +} + +impl MpsController { + pub fn create MpsPlayer + Send + 'static, T: MpsTokenReader>( + player_gen: F + ) -> Self { + let (control_tx, control_rx) = channel(); + let (event_tx, event_rx) = channel(); + let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); + sys_ctrl.init(); + let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx); + Self { + control: control_tx, + event: event_rx, + handle: handle, + sys_ctrl: sys_ctrl, + } + } + + fn send_confirm(&self, to_send: ControlAction) -> Result<(), PlaybackError> { + self.control.send(to_send.clone()).map_err(PlaybackError::from_err)?; + let mut response = self.event.recv().map_err(PlaybackError::from_err)?; + while !response.is_acknowledgement() { + Self::handle_event(response)?; + response = self.event.recv().map_err(PlaybackError::from_err)?; + } + if let PlayerAction::Acknowledge(action) = response { + if action == to_send { + Ok(()) + } else { + Err(PlaybackError { + msg: "Incorrect acknowledgement received for MpsController control action".into() + }) + } + } else { + Err(PlaybackError { + msg: "Invalid acknowledgement received for MpsController control action".into() + }) + } + } + + fn handle_event(event: PlayerAction) -> Result<(), PlaybackError> { + match event { + PlayerAction::Acknowledge(_) => Ok(()), + PlayerAction::Exception(e) => Err(e), + PlayerAction::End => Ok(()) + } + } + + pub fn next(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Next{ack: true}) + } + + pub fn previous(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Previous{ack: true}) + } + + pub fn play(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Play{ack: true}) + } + + pub fn pause(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Pause{ack: true}) + } + + pub fn stop(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Stop{ack: true}) + } + + pub fn enqueue(&self, count: usize) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Enqueue{amount: count, ack: true}) + } + + pub fn ping(&self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::NoOp{ack: true}) + } + + pub fn exit(self) -> Result<(), PlaybackError> { + self.send_confirm(ControlAction::Exit{ack: true})?; + self.sys_ctrl.exit(); + match self.handle.join() { + Ok(x) => Ok(x), + Err(_) => Err(PlaybackError { + msg: "MpsPlayerServer did not exit correctly".into() + }) + } + } + + pub fn wait_for_done(&self) -> Result<(), PlaybackError> { + loop { + let msg = self.event.recv().map_err(PlaybackError::from_err)?; + if let PlayerAction::End = msg { + break; + } else { + Self::handle_event(msg)?; + } + } + Ok(()) + } +} diff --git a/mps-player/src/errors.rs b/mps-player/src/errors.rs index 585961e..4807d54 100644 --- a/mps-player/src/errors.rs +++ b/mps-player/src/errors.rs @@ -1,6 +1,6 @@ use std::fmt::{Debug, Display, Formatter, Error}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PlaybackError { pub(crate) msg: String } diff --git a/mps-player/src/lib.rs b/mps-player/src/lib.rs index bcafa19..75c34dd 100644 --- a/mps-player/src/lib.rs +++ b/mps-player/src/lib.rs @@ -1,8 +1,14 @@ +mod controller; mod errors; +pub(crate) mod os_controls; mod player; +pub(crate) mod player_wrapper; +//mod utility; +pub use controller::MpsController; pub use errors::PlaybackError; pub use player::MpsPlayer; +//pub use utility::{play_script}; #[cfg(test)] mod tests {} diff --git a/mps-player/src/os_controls.rs b/mps-player/src/os_controls.rs new file mode 100644 index 0000000..3022b1d --- /dev/null +++ b/mps-player/src/os_controls.rs @@ -0,0 +1,131 @@ +#[cfg(unix)] +use std::sync::mpsc::{Sender, channel}; +#[cfg(unix)] +use std::thread::JoinHandle; + +#[cfg(unix)] +use mpris_player::{MprisPlayer, PlaybackStatus}; + +//use super::MpsController; +use super::player_wrapper::ControlAction; + +pub struct SystemControlWrapper { + control: Sender, + #[cfg(target_os = "linux")] + dbus_handle: Option>,//std::sync::Arc, + #[cfg(target_os = "linux")] + dbus_die: Option>, +} + +#[cfg(target_os = "linux")] +impl SystemControlWrapper { + pub fn new(control: Sender) -> Self { + Self { + control: control, + dbus_handle: None,//MprisPlayer::new("mps".into(), "mps".into(), "null".into()) + dbus_die: None, + } + } + + pub fn init(&mut self) { + let (tx, die) = channel(); + self.dbus_die = Some(tx); + let control_clone1 = self.control.clone(); + self.dbus_handle = Some(std::thread::spawn(move || { + let dbus_conn = MprisPlayer::new("mps".into(), "mps".into(), "null".into()); + //let (msg_tx, msg_rx) = channel(); + // dbus setup + //self.dbus_conn.set_supported_mime_types(vec![]); + //self.dbus_conn.set_supported_uri_schemes(vec![]); + let mut is_playing = true; + dbus_conn.set_playback_status(PlaybackStatus::Playing); + dbus_conn.set_can_play(true); + dbus_conn.set_can_pause(true); + dbus_conn.set_can_go_next(true); + + let control_clone = control_clone1.clone(); + dbus_conn.connect_next( + move || { + //println!("Got next signal"); + control_clone.send(ControlAction::Next{ack: false}).unwrap_or(()) + } + ); + + let control_clone = control_clone1.clone(); + dbus_conn.connect_previous( + move || control_clone.send(ControlAction::Previous{ack: false}).unwrap_or(()) + ); + + let control_clone = control_clone1.clone(); + let dbus_conn_clone = dbus_conn.clone(); + dbus_conn.connect_pause( + move || { + //println!("Got pause signal"); + dbus_conn_clone.set_playback_status(PlaybackStatus::Paused); + control_clone.send(ControlAction::Pause{ack: false}).unwrap_or(()); + } + ); + + let control_clone = control_clone1.clone(); + let dbus_conn_clone = dbus_conn.clone(); + dbus_conn.connect_play( + move || { + //println!("Got play signal"); + dbus_conn_clone.set_playback_status(PlaybackStatus::Playing); + control_clone.send(ControlAction::Play{ack: false}).unwrap_or(()) + } + ); + + let control_clone = control_clone1.clone(); + let dbus_conn_clone = dbus_conn.clone(); + dbus_conn.connect_play_pause( + move || { + //println!("Got play_pause signal (was playing? {})", is_playing); + if is_playing { + dbus_conn_clone.set_playback_status(PlaybackStatus::Paused); + control_clone.send(ControlAction::Pause{ack: false}).unwrap_or(()); + } else { + dbus_conn_clone.set_playback_status(PlaybackStatus::Playing); + control_clone.send(ControlAction::Play{ack: false}).unwrap_or(()); + } + is_playing = !is_playing; + } + ); + + let control_clone = control_clone1.clone(); + dbus_conn.connect_volume( + move |v| control_clone.send(ControlAction::SetVolume{ack: false, volume: (v * (u32::MAX as f64)) as _}).unwrap_or(()) + ); + + // poll loop, using my custom mpris lib because original did it wrong + loop { + dbus_conn.poll(5); + if let Ok(_) = die.try_recv() { + break; + } + } + })); + } + + pub fn exit(self) { + if let Some(tx) = self.dbus_die { + tx.send(()).unwrap_or(()); + } + if let Some(handle) = self.dbus_handle { + handle.join().unwrap_or(()); + } + } +} + +#[cfg(not(any(target_os = "linux")))] +impl SystemControlWrapper { + pub fn new(control: Sender) -> Self { + Self { + control: control, + } + } + + pub fn init(&mut self) {} + + pub fn exit(self) {} +} diff --git a/mps-player/src/player.rs b/mps-player/src/player.rs index 78164ba..f9bf00a 100644 --- a/mps-player/src/player.rs +++ b/mps-player/src/player.rs @@ -62,6 +62,26 @@ impl MpsPlayer { Ok(()) } + pub fn enqueue(&mut self, count: usize) -> Result<(), PlaybackError> { + let mut items_left = count; + for item in &mut self.runner { + if items_left == 0 { return Ok(()); } + match item { + Ok(music) => { + let file = fs::File::open(music.filename).map_err(PlaybackError::from_err)?; + let stream = io::BufReader::new(file); + let source = Decoder::new(stream).map_err(PlaybackError::from_err)?; + self.sink.append(source); + self.sink.play(); // idk if this is necessary + Ok(()) + }, + Err(e) => Err(PlaybackError::from_err(e)) + }?; + items_left -= 1; + } + Ok(()) + } + pub fn resume(&self) { self.sink.play() } @@ -105,6 +125,14 @@ impl MpsPlayer { } playlist.write_to(w).map_err(PlaybackError::from_err) } + + pub fn is_paused(&self) -> bool { + self.sink.is_paused() + } + + pub fn set_volume(&self, volume: f32) { + self.sink.set_volume(volume); + } } #[cfg(test)] diff --git a/mps-player/src/player_wrapper.rs b/mps-player/src/player_wrapper.rs new file mode 100644 index 0000000..6f021d4 --- /dev/null +++ b/mps-player/src/player_wrapper.rs @@ -0,0 +1,160 @@ +use std::sync::mpsc::{Sender, Receiver}; +use std::{thread, thread::JoinHandle}; + +use mps_interpreter::tokens::MpsTokenReader; + +use super::MpsPlayer; +use super::PlaybackError; + +const DEFAULT_QUEUE_SIZE: usize = 1; + +pub struct MpsPlayerServer { + player: MpsPlayer, + control: Receiver, + event: Sender, +} + +impl MpsPlayerServer { + pub fn new(player: MpsPlayer, ctrl: Receiver, event: Sender) -> Self { + Self { + player: player, + control: ctrl, + event: event, + } + } + + fn run_loop(&mut self) { + // this can panic since it's not on the main thread + if let Err(e) = self.player.enqueue_all() { + self.event.send(PlayerAction::Exception(e)).unwrap(); + } + loop { + let command = self.control.recv().unwrap(); + + // keep queue full + if self.player.queue_len() < DEFAULT_QUEUE_SIZE { + if let Err(e) = self.player.enqueue(DEFAULT_QUEUE_SIZE - self.player.queue_len()) { + self.event.send(PlayerAction::Exception(e)).unwrap(); + } + if self.player.queue_len() == 0 { // no more music to add + self.event.send(PlayerAction::End).unwrap(); + } + } + + // process command + let mut is_exiting = false; + match command { + ControlAction::Next{..} => { + if self.player.queue_len() <= 1 { + self.player.stop(); + //self.player.resume(); + self.player.enqueue(1).unwrap(); + } + }, + ControlAction::Previous{..} => {}, // TODO + ControlAction::Play{..} => self.player.resume(), + ControlAction::Pause{..} => self.player.pause(), + ControlAction::PlayPause{..} => { + if self.player.is_paused() { + self.player.resume(); + } else { + self.player.pause(); + } + }, + ControlAction::Stop{..} => self.player.stop(), + ControlAction::Exit{..} => { + self.player.stop(); + is_exiting = true; + }, + ControlAction::Enqueue{amount,..} => { + if let Err(e) = self.player.enqueue(amount) { + self.event.send(PlayerAction::Exception(e)).unwrap(); + } + }, + ControlAction::NoOp{..} => {}, // empty by design + ControlAction::SetVolume{volume,..} => { + self.player.set_volume((volume as f32) / (u32::MAX as f32)); + } + } + if command.needs_ack() { + self.event.send(PlayerAction::Acknowledge(command)).unwrap(); + } + + if is_exiting { break; } + } + } + + pub fn spawn MpsPlayer + Send + 'static>( + factory: F, + ctrl_tx: Sender, + ctrl_rx: Receiver, + event: Sender + ) -> JoinHandle<()> { + thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50)); + thread::spawn(move || { + let player = factory(); + let mut server_obj = Self::new(player, ctrl_rx, event); + server_obj.run_loop(); + }) + } + + pub fn unblocking_timer_loop(ctrl_tx: Sender, sleep_ms: u64) { + let dur = std::time::Duration::from_millis(sleep_ms); + loop { + if let Err(_) = ctrl_tx.send(ControlAction::NoOp{ack: false}) { + break; + } + thread::sleep(dur); + } + } +} + +/// Action the controller wants the player to perform +#[allow(dead_code)] +#[derive(Clone, PartialEq, Eq)] +pub enum ControlAction { + Next{ack: bool}, + Previous{ack: bool}, + Play{ack: bool}, + Pause{ack: bool}, + PlayPause{ack: bool}, + Stop{ack: bool}, + Exit{ack: bool}, + Enqueue {amount: usize, ack: bool}, + NoOp{ack: bool}, + SetVolume{ack: bool, volume: u32} +} + +impl ControlAction { + fn needs_ack(&self) -> bool { + *match self { + Self::Next{ack} => ack, + Self::Previous{ack} => ack, + Self::Play{ack} => ack, + Self::Pause{ack} => ack, + Self::PlayPause{ack} => ack, + Self::Stop{ack} => ack, + Self::Exit{ack} => ack, + Self::Enqueue{ack,..} => ack, + Self::NoOp{ack,..} => ack, + Self::SetVolume{ack,..} => ack, + } + } +} + +/// Action the player has performed/encountered +#[derive(Clone)] +pub enum PlayerAction { + Acknowledge(ControlAction), + Exception(PlaybackError), + End, +} + +impl PlayerAction { + pub fn is_acknowledgement(&self) -> bool { + match self { + Self::Acknowledge(_) => true, + _ => false + } + } +} diff --git a/src/main.rs b/src/main.rs index 36876ef..bbc52eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,23 @@ use std::io; use mps_interpreter::MpsRunner; -use mps_player::{MpsPlayer, PlaybackError}; +use mps_player::{MpsPlayer, PlaybackError, MpsController}; #[allow(dead_code)] fn play_cursor() -> Result<(), PlaybackError> { - let cursor = io::Cursor::new("sql(`SELECT * FROM songs JOIN artists ON songs.artist = artists.artist_id WHERE artists.name like 'thundercat'`);"); + let cursor = io::Cursor::<&'static str>::new("sql(`SELECT * FROM songs JOIN artists ON songs.artist = artists.artist_id WHERE artists.name like 'thundercat'`);"); let runner = MpsRunner::with_stream(cursor); let mut player = MpsPlayer::new(runner)?; player.play_all() } fn main() { - play_cursor().unwrap(); + //play_cursor().unwrap(); + let ctrl = MpsController::create(|| { + let cursor = io::Cursor::<&'static str>::new("sql(`SELECT * FROM songs JOIN artists ON songs.artist = artists.artist_id WHERE artists.name like 'thundercat'`);"); + let runner = MpsRunner::with_stream(cursor); + MpsPlayer::new(runner).unwrap() + }); + + ctrl.wait_for_done().unwrap(); + ctrl.exit().unwrap(); }