Fix missing dbus metadata during playback

This commit is contained in:
NGnius (Graham) 2022-01-05 17:37:13 -05:00
parent 01ebf99c5e
commit 5fe58cda10
4 changed files with 156 additions and 49 deletions

View file

@ -23,10 +23,11 @@ impl MpsController {
) -> Self {
let (control_tx, control_rx) = channel();
let (event_tx, event_rx) = channel();
let (playback_tx, playback_rx) = channel();
let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone());
sys_ctrl.init();
sys_ctrl.init(playback_rx);
let handle =
MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, false);
MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, playback_tx, false);
Self {
control: control_tx,
event: event_rx,
@ -40,10 +41,11 @@ impl MpsController {
) -> Self {
let (control_tx, control_rx) = channel();
let (event_tx, event_rx) = channel();
let (playback_tx, playback_rx) = channel();
let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone());
sys_ctrl.init();
sys_ctrl.init(playback_rx);
let handle =
MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, true);
MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, playback_tx, true);
Self {
control: control_tx,
event: event_rx,
@ -58,7 +60,7 @@ impl MpsController {
.map_err(PlaybackError::from_err)?;
let mut response = self.event.recv().map_err(PlaybackError::from_err)?;
while !response.is_acknowledgement() {
Self::handle_event(response)?;
self.handle_event(response)?;
response = self.event.recv().map_err(PlaybackError::from_err)?;
}
if let PlayerAction::Acknowledge(action) = response {
@ -77,12 +79,13 @@ impl MpsController {
}
}
fn handle_event(event: PlayerAction) -> Result<(), PlaybackError> {
fn handle_event(&self, event: PlayerAction) -> Result<(), PlaybackError> {
match event {
PlayerAction::Acknowledge(_) => Ok(()),
PlayerAction::Exception(e) => Err(e),
PlayerAction::End => Ok(()),
PlayerAction::Empty => Ok(()),
//PlayerAction::Enqueued(item) => Ok(()),
}
}
@ -134,7 +137,7 @@ impl MpsController {
if let PlayerAction::End = msg {
break;
} else {
Self::handle_event(msg)?;
self.handle_event(msg)?;
}
}
Ok(())
@ -142,7 +145,7 @@ impl MpsController {
pub fn wait_for_empty(&self) -> Result<(), PlaybackError> {
for msg in self.event.try_iter() {
Self::handle_event(msg)?;
self.handle_event(msg)?;
}
self.control
.send(ControlAction::CheckEmpty { ack: true })
@ -152,7 +155,7 @@ impl MpsController {
if let PlayerAction::Empty = msg {
break;
} else {
Self::handle_event(msg)?;
self.handle_event(msg)?;
}
}
Ok(())
@ -163,7 +166,7 @@ impl MpsController {
pub fn check(&self) -> Vec<PlaybackError> {
let mut result = Vec::new();
for msg in self.event.try_iter() {
if let Err(e) = Self::handle_event(msg) {
if let Err(e) = self.handle_event(msg) {
result.push(e);
}
}
@ -172,7 +175,7 @@ impl MpsController {
/// Like check(), but it also waits for an acknowledgement to ensure it gets the latest events.
pub fn check_ack(&self) -> Vec<PlaybackError> {
let mut result = Vec::new();
let mut result = self.check(); // clear existing messages first
let to_send = ControlAction::NoOp { ack: true };
if let Err(e) = self
.control
@ -191,7 +194,7 @@ impl MpsController {
.into(),
});
}
} else if let Err(e) = Self::handle_event(msg) {
} else if let Err(e) = self.handle_event(msg) {
result.push(e);
}
}

View file

@ -1,13 +1,15 @@
#[cfg(unix)]
use std::sync::mpsc::{channel, Sender};
use std::sync::mpsc::{channel, Sender, Receiver};
#[cfg(unix)]
use std::thread::JoinHandle;
#[cfg(unix)]
use mpris_player::{MprisPlayer, PlaybackStatus};
use mpris_player::{MprisPlayer, PlaybackStatus, Metadata};
use mps_interpreter::MpsMusicItem;
//use super::MpsController;
use super::player_wrapper::ControlAction;
use super::player_wrapper::{ControlAction, PlaybackAction};
/// OS-specific APIs for media controls.
/// Currently only Linux (dbus) is supported.
@ -16,7 +18,17 @@ pub struct SystemControlWrapper {
#[cfg(target_os = "linux")]
dbus_handle: Option<JoinHandle<()>>, //std::sync::Arc<MprisPlayer>,
#[cfg(target_os = "linux")]
dbus_die: Option<Sender<()>>,
dbus_ctrl: Option<Sender<DbusControl>>,
#[cfg(target_os = "linux")]
playback_event_handler: Option<JoinHandle<()>>,
#[cfg(target_os = "linux")]
playback_event_handler_killer: Option<Sender<()>>,
}
#[cfg(target_os = "linux")]
enum DbusControl {
Die,
SetMetadata(Metadata),
}
#[cfg(target_os = "linux")]
@ -25,13 +37,16 @@ impl SystemControlWrapper {
Self {
control: control,
dbus_handle: None, //MprisPlayer::new("mps".into(), "mps".into(), "null".into())
dbus_die: None,
dbus_ctrl: None,
playback_event_handler: None,
playback_event_handler_killer: None,
}
}
pub fn init(&mut self) {
let (tx, die) = channel();
self.dbus_die = Some(tx);
pub fn init(&mut self, playback: Receiver<PlaybackAction>) {
let (tx, dbus_ctrl) = channel();
let dbus_ctrl_tx_clone = tx.clone();
self.dbus_ctrl = Some(tx);
let control_clone1 = self.control.clone();
self.dbus_handle = Some(std::thread::spawn(move || {
let dbus_conn = MprisPlayer::new("mps".into(), "mps".into(), "null".into());
@ -111,20 +126,80 @@ impl SystemControlWrapper {
// poll loop, using my custom mpris lib because original did it wrong
loop {
dbus_conn.poll(5);
if let Ok(_) = die.try_recv() {
match dbus_ctrl.try_recv() {
Err(_) => {},
Ok(DbusControl::Die) => break,
Ok(DbusControl::SetMetadata(meta)) => {
dbus_conn.set_metadata(meta);
},
}
}
}));
let (tx, rx) = channel();
self.playback_event_handler_killer = Some(tx);
self.playback_event_handler = Some(std::thread::spawn(move || {
loop {
if let Ok(_) = rx.try_recv() {
break;
}
match playback.recv() {
Err(_) => break,
Ok(PlaybackAction::Exit) => break,
Ok(PlaybackAction::Enqueued(item)) => Self::enqueued(item, &dbus_ctrl_tx_clone),
Ok(PlaybackAction::Empty) => Self::empty(&dbus_ctrl_tx_clone),
}
}
}));
}
pub fn exit(self) {
if let Some(tx) = self.dbus_die {
tx.send(()).unwrap_or(());
// exit dbus thread
if let Some(tx) = self.dbus_ctrl {
tx.send(DbusControl::Die).unwrap_or(());
}
if let Some(handle) = self.dbus_handle {
handle.join().unwrap_or(());
}
// exit playback event thread
if let Some(tx) = self.playback_event_handler_killer {
tx.send(()).unwrap_or(());
}
if let Some(handle) = self.playback_event_handler {
handle.join().unwrap_or(());
}
}
fn enqueued(item: MpsMusicItem, dbus_ctrl: &Sender<DbusControl>) {
//println!("Got enqueued item {}", &item.title);
dbus_ctrl.send(DbusControl::SetMetadata(Metadata {
length: None,
art_url: None,
album: item.album,
album_artist: None, // TODO maybe?
artist: item.artist.map(|artist| vec![artist]),
composer: None,
disc_number: None,
genre: item.genre.map(|genre| vec![genre]),
title: Some(item.title),
track_number: item.track.map(|track| track as i32),
url: Some(item.filename),
})).unwrap_or(());
}
fn empty(dbus_ctrl: &Sender<DbusControl>) {
dbus_ctrl.send(DbusControl::SetMetadata(Metadata {
length: None,
art_url: None,
album: None,
album_artist: None, // TODO maybe?
artist: None,
composer: None,
disc_number: None,
genre: None,
title: None,
track_number: None,
url: None,
})).unwrap_or(());
}
}
@ -134,7 +209,7 @@ impl SystemControlWrapper {
Self { control: control }
}
pub fn init(&mut self) {}
pub fn init(&mut self, _playback: Receiver<PlaybackAction>) {}
pub fn exit(self) {}
}

View file

@ -5,7 +5,7 @@ use rodio::{decoder::Decoder, OutputStream, OutputStreamHandle, Sink};
use m3u8_rs::{MediaPlaylist, MediaSegment};
use mps_interpreter::{tokens::MpsTokenReader, MpsRunner};
use mps_interpreter::{tokens::MpsTokenReader, MpsRunner, MpsMusicItem};
use super::PlaybackError;
@ -50,10 +50,12 @@ impl<T: MpsTokenReader> MpsPlayer<T> {
Ok(())
}
pub fn enqueue_all(&mut self) -> Result<(), PlaybackError> {
pub fn enqueue_all(&mut self) -> Result<Vec<MpsMusicItem>, PlaybackError> {
let mut enqueued = Vec::new();
for item in &mut self.runner {
match item {
Ok(music) => {
enqueued.push(music.clone());
let file = fs::File::open(music.filename).map_err(PlaybackError::from_err)?;
let stream = io::BufReader::new(file);
let source = Decoder::new(stream).map_err(PlaybackError::from_err)?;
@ -64,18 +66,19 @@ impl<T: MpsTokenReader> MpsPlayer<T> {
Err(e) => Err(PlaybackError::from_err(e)),
}?;
}
Ok(())
Ok(enqueued)
}
pub fn enqueue(&mut self, count: usize) -> Result<(), PlaybackError> {
pub fn enqueue(&mut self, count: usize) -> Result<Vec<MpsMusicItem>, PlaybackError> {
let mut items_left = count;
let mut enqueued = Vec::with_capacity(count);
if items_left == 0 {
return Ok(());
return Ok(enqueued);
}
for item in &mut self.runner {
match item {
Ok(music) => {
//println!("Enqueuing {}", music.filename);
enqueued.push(music.clone());
let file = fs::File::open(music.filename).map_err(PlaybackError::from_err)?;
let stream = io::BufReader::new(file);
let source = Decoder::new(stream).map_err(PlaybackError::from_err)?;
@ -91,7 +94,7 @@ impl<T: MpsTokenReader> MpsPlayer<T> {
}
}
//println!("Enqueued {} items", count - items_left);
Ok(())
Ok(enqueued)
}
pub fn resume(&self) {
@ -114,7 +117,7 @@ impl<T: MpsTokenReader> MpsPlayer<T> {
self.sink.len()
}
pub fn empty(&self) -> bool {
pub fn queue_empty(&self) -> bool {
self.sink.empty()
}

View file

@ -2,18 +2,20 @@ use std::sync::mpsc::{Receiver, Sender};
use std::{thread, thread::JoinHandle};
use mps_interpreter::tokens::MpsTokenReader;
use mps_interpreter::MpsMusicItem;
use super::MpsPlayer;
use super::PlaybackError;
/// A wrapper around MpsPlayer so that playback can occur on a different thread.
/// This allows for message passing between the threads.
/// This allows for message passing between the player and controller.
///
/// You will probably never directly interact with this, instead using MpsController to communicate.
pub struct MpsPlayerServer<T: MpsTokenReader> {
player: MpsPlayer<T>,
control: Receiver<ControlAction>,
event: Sender<PlayerAction>,
playback: Sender<PlaybackAction>,
keep_alive: bool,
}
@ -22,22 +24,42 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
player: MpsPlayer<T>,
ctrl: Receiver<ControlAction>,
event: Sender<PlayerAction>,
playback: Sender<PlaybackAction>,
keep_alive: bool,
) -> Self {
Self {
player: player,
control: ctrl,
event: event,
playback: playback,
keep_alive: keep_alive,
}
}
fn enqeue_some(&mut self, count: usize) {
//println!("Enqueuing up to {} items", count);
match self.player.enqueue(count) {
Err(e) => self.event.send(PlayerAction::Exception(e)).unwrap(),
Ok(items) => for item in items { // notify of new items that have been enqueued
self.playback.send(PlaybackAction::Enqueued(item)).unwrap();
},
}
}
fn on_empty(&self) {
self.event.send(PlayerAction::Empty).unwrap();
self.playback.send(PlaybackAction::Empty).unwrap();
}
fn on_end(&self) {
self.event.send(PlayerAction::End).unwrap();
self.playback.send(PlaybackAction::Exit).unwrap();
}
fn run_loop(&mut self) {
// this can panic since it's not on the main thread
// initial queue full
if let Err(e) = self.player.enqueue(1) {
self.event.send(PlayerAction::Exception(e)).unwrap();
}
// initial queue fill
self.enqeue_some(1);
let mut is_empty = self.player.queue_len() == 0;
loop {
let command = self.control.recv().unwrap();
@ -54,7 +76,7 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
self.event.send(PlayerAction::Exception(e)).unwrap();
}
if !self.player.is_paused() {
self.player.enqueue(1).unwrap();
self.enqeue_some(1);
}
}
ControlAction::Previous { .. } => {} // TODO
@ -73,9 +95,7 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
is_exiting = true;
}
ControlAction::Enqueue { amount, .. } => {
if let Err(e) = self.player.enqueue(amount) {
self.event.send(PlayerAction::Exception(e)).unwrap();
}
self.enqeue_some(amount);
}
ControlAction::NoOp { .. } => {} // empty by design
ControlAction::SetVolume { volume, .. } => {
@ -88,9 +108,7 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
// keep queue full (while playing music)
if self.player.queue_len() == 0 && !self.player.is_paused() && !is_exiting {
if let Err(e) = self.player.enqueue(1) {
self.event.send(PlayerAction::Exception(e)).unwrap();
}
self.enqeue_some(1);
if self.player.queue_len() == 0 {
// no more music to add
is_exiting = !self.keep_alive || is_exiting;
@ -105,22 +123,22 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
if self.player.queue_len() == 0 && !is_empty {
// just became empty
is_empty = true;
self.event.send(PlayerAction::Empty).unwrap();
self.on_empty();
} else if self.player.queue_len() != 0 && is_empty {
// just got filled
is_empty = false;
}
if is_empty && check_empty {
self.event.send(PlayerAction::Empty).unwrap();
self.on_empty();
}
if is_exiting {
break;
}
}
println!("Exiting playback server");
self.event.send(PlayerAction::End).unwrap();
//println!("Exiting playback server");
self.on_end();
}
pub fn spawn<F: FnOnce() -> MpsPlayer<T> + Send + 'static>(
@ -128,12 +146,13 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
ctrl_tx: Sender<ControlAction>,
ctrl_rx: Receiver<ControlAction>,
event: Sender<PlayerAction>,
playback: Sender<PlaybackAction>,
keep_alive: bool,
) -> JoinHandle<()> {
thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50));
thread::spawn(move || {
let player = factory();
let mut server_obj = Self::new(player, ctrl_rx, event, keep_alive);
let mut server_obj = Self::new(player, ctrl_rx, event, playback, keep_alive);
server_obj.run_loop();
})
}
@ -193,6 +212,13 @@ pub enum PlayerAction {
Empty,
}
#[derive(Clone, Debug)]
pub enum PlaybackAction {
Empty,
Enqueued(MpsMusicItem),
Exit,
}
impl PlayerAction {
pub fn is_acknowledgement(&self) -> bool {
match self {