From bddb79d0caf187572403b28c1b6a6a7c4710a7ef Mon Sep 17 00:00:00 2001 From: "NGnius (Graham)" Date: Mon, 3 Jan 2022 17:53:57 -0500 Subject: [PATCH] Implement REPL functionality --- mps-interpreter/src/interpretor.rs | 5 +- mps-interpreter/src/lang/dictionary.rs | 4 +- mps-interpreter/src/lang/function.rs | 8 +- mps-interpreter/src/lang/utility.rs | 11 +++ mps-interpreter/src/tokens/tokenizer.rs | 2 + mps-player/src/controller.rs | 74 ++++++++++++++++++- mps-player/src/errors.rs | 4 + mps-player/src/player.rs | 1 + mps-player/src/player_wrapper.rs | 35 +++++++-- src/channel_io.rs | 62 ++++++++++++++++ src/cli.rs | 3 + src/main.rs | 7 +- src/repl.rs | 97 +++++++++++++++++++++++++ 13 files changed, 295 insertions(+), 18 deletions(-) create mode 100644 src/channel_io.rs create mode 100644 src/repl.rs diff --git a/mps-interpreter/src/interpretor.rs b/mps-interpreter/src/interpretor.rs index 21e9028..507d444 100644 --- a/mps-interpreter/src/interpretor.rs +++ b/mps-interpreter/src/interpretor.rs @@ -95,9 +95,10 @@ where None => None, } } else { - if self.tokenizer.end_of_file() { + /*if self.tokenizer.end_of_file() { return None; - } + }*/ + //println!("try get next statement"); // build new statement let token_result = self .tokenizer diff --git a/mps-interpreter/src/lang/dictionary.rs b/mps-interpreter/src/lang/dictionary.rs index fc4c4a1..8b41225 100644 --- a/mps-interpreter/src/lang/dictionary.rs +++ b/mps-interpreter/src/lang/dictionary.rs @@ -29,13 +29,13 @@ impl MpsLanguageDictionary { Some(x) => Ok(x), None => Err(SyntaxError { line: 0, - token: MpsToken::Name("???".into()), + token: MpsToken::Name("{something}".into()), got: None, }), }?; Err(SyntaxError { line: 0, - token: MpsToken::Name("???".into()), + token: MpsToken::Name("{any of many}".into()), got: Some(result), }) } diff --git a/mps-interpreter/src/lang/function.rs b/mps-interpreter/src/lang/function.rs index 00d95dc..e110f47 100644 --- a/mps-interpreter/src/lang/function.rs +++ b/mps-interpreter/src/lang/function.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; //use std::fmt::{Debug, Display, Error, Formatter}; use std::marker::PhantomData; -use crate::lang::utility::{assert_token, assert_token_raw, assert_token_raw_back}; +use crate::lang::utility::{assert_token, assert_token_raw, assert_token_raw_back, assert_empty}; use crate::lang::MpsLanguageDictionary; use crate::lang::SyntaxError; use crate::lang::{BoxedMpsOpFactory, MpsOp}; @@ -67,8 +67,8 @@ impl + 'static> BoxedMpsOpFactory )?; assert_token_raw(MpsToken::OpenBracket, tokens)?; assert_token_raw_back(MpsToken::CloseBracket, tokens)?; - Ok(Box::new( - self.op_factory.build_function_params(name, tokens, dict)?, - )) + let func = self.op_factory.build_function_params(name, tokens, dict)?; + assert_empty(tokens)?; + Ok(Box::new(func)) } } diff --git a/mps-interpreter/src/lang/utility.rs b/mps-interpreter/src/lang/utility.rs index ba3078d..03a1310 100644 --- a/mps-interpreter/src/lang/utility.rs +++ b/mps-interpreter/src/lang/utility.rs @@ -169,6 +169,17 @@ pub fn assert_type(tokens: &mut VecDeque) -> Result) -> Result<(), SyntaxError> { + match tokens.pop_front() { + None => Ok(()), + Some(t) => Err(SyntaxError { + line: 0, + token: MpsToken::Name("{nothing}".into()), + got: Some(t), + }), + } +} + pub fn music_folder() -> PathBuf { dirs::home_dir() .unwrap_or_else(|| PathBuf::from("./")) diff --git a/mps-interpreter/src/tokens/tokenizer.rs b/mps-interpreter/src/tokens/tokenizer.rs index b0a8bce..14cf651 100644 --- a/mps-interpreter/src/tokens/tokenizer.rs +++ b/mps-interpreter/src/tokens/tokenizer.rs @@ -49,6 +49,7 @@ where { byte_buf[0] = 0; // clear to null char (nothing read is assumed to mean end of file) } + //println!("tokenizer read char: {}", byte_buf[0]); self.do_tracking(byte_buf[0]); self.fsm = self.fsm.next_state(byte_buf[0]); let mut bigger_buf: Vec = Vec::new(); @@ -176,6 +177,7 @@ where fn next_statement(&mut self, buf: &mut VecDeque) -> Result<(), ParseError> { // read until buffer gets some tokens, in case multiple end of line tokens are at start of stream let original_size = buf.len(); + self.read_line(buf)?; // always try once, even if at end of file while original_size == buf.len() && !self.end_of_file() { self.read_line(buf)?; } diff --git a/mps-player/src/controller.rs b/mps-player/src/controller.rs index f08569e..53730a8 100644 --- a/mps-player/src/controller.rs +++ b/mps-player/src/controller.rs @@ -17,13 +17,29 @@ pub struct MpsController { impl MpsController { pub fn create MpsPlayer + Send + 'static, T: MpsTokenReader>( - player_gen: F + player_gen: F, ) -> Self { let (control_tx, control_rx) = channel(); let (event_tx, event_rx) = channel(); let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); sys_ctrl.init(); - let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx); + let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, false); + Self { + control: control_tx, + event: event_rx, + handle: handle, + sys_ctrl: sys_ctrl, + } + } + + pub fn create_repl MpsPlayer + Send + 'static, T: MpsTokenReader>( + player_gen: F, + ) -> Self { + let (control_tx, control_rx) = channel(); + let (event_tx, event_rx) = channel(); + let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); + sys_ctrl.init(); + let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, true); Self { control: control_tx, event: event_rx, @@ -58,7 +74,8 @@ impl MpsController { match event { PlayerAction::Acknowledge(_) => Ok(()), PlayerAction::Exception(e) => Err(e), - PlayerAction::End => Ok(()) + PlayerAction::End => Ok(()), + PlayerAction::Empty => Ok(()), } } @@ -112,4 +129,55 @@ impl MpsController { } Ok(()) } + + pub fn wait_for_empty(&self) -> Result<(), PlaybackError> { + for msg in self.event.try_iter() { + Self::handle_event(msg)?; + } + self.control.send(ControlAction::CheckEmpty{ack: true}).map_err(PlaybackError::from_err)?; + loop { + let msg = self.event.recv().map_err(PlaybackError::from_err)?; + if let PlayerAction::Empty = msg { + break; + } else { + Self::handle_event(msg)?; + } + } + Ok(()) + } + + /// Check for any errors in the event queue. + /// This is non-blocking, so it only handles events sent since the last time events were handled. + pub fn check(&self) -> Vec { + let mut result = Vec::new(); + for msg in self.event.try_iter() { + if let Err(e) = Self::handle_event(msg) { + result.push(e); + } + } + result + } + + /// Like check(), but it also waits for an acknowledgement to ensure it gets the latest events. + pub fn check_ack(&self) -> Vec { + let mut result = Vec::new(); + let to_send = ControlAction::NoOp{ack: true}; + if let Err(e) = self.control.send(to_send.clone()).map_err(PlaybackError::from_err) { + result.push(e); + } + for msg in self.event.iter() { + if let PlayerAction::Acknowledge(action) = msg { + if action == to_send { + break; + } else { + result.push(PlaybackError { + msg: "Incorrect acknowledgement received for MpsController control action".into() + }); + } + } else if let Err(e) = Self::handle_event(msg) { + result.push(e); + } + } + result + } } diff --git a/mps-player/src/errors.rs b/mps-player/src/errors.rs index 4807d54..34a97e3 100644 --- a/mps-player/src/errors.rs +++ b/mps-player/src/errors.rs @@ -11,6 +11,10 @@ impl PlaybackError { msg: format!("{}", err), } } + + pub fn message(&self) -> &'_ str { + &self.msg + } } impl Display for PlaybackError { diff --git a/mps-player/src/player.rs b/mps-player/src/player.rs index dbfb26a..937f9a0 100644 --- a/mps-player/src/player.rs +++ b/mps-player/src/player.rs @@ -83,6 +83,7 @@ impl MpsPlayer { items_left -= 1; if items_left == 0 { break; } } + //println!("Enqueued {} items", count - items_left); Ok(()) } diff --git a/mps-player/src/player_wrapper.rs b/mps-player/src/player_wrapper.rs index 82f16dc..e4296a3 100644 --- a/mps-player/src/player_wrapper.rs +++ b/mps-player/src/player_wrapper.rs @@ -10,14 +10,16 @@ pub struct MpsPlayerServer { player: MpsPlayer, control: Receiver, event: Sender, + keep_alive: bool, } impl MpsPlayerServer { - pub fn new(player: MpsPlayer, ctrl: Receiver, event: Sender) -> Self { + pub fn new(player: MpsPlayer, ctrl: Receiver, event: Sender, keep_alive: bool) -> Self { Self { player: player, control: ctrl, event: event, + keep_alive: keep_alive, } } @@ -27,11 +29,14 @@ impl MpsPlayerServer { if let Err(e) = self.player.enqueue(1) { self.event.send(PlayerAction::Exception(e)).unwrap(); } + let mut is_empty = self.player.queue_len() == 0; loop { let command = self.control.recv().unwrap(); let mut is_exiting = false; + let mut check_empty = false; + // process command match command { ControlAction::Next{..} => { @@ -66,6 +71,9 @@ impl MpsPlayerServer { ControlAction::NoOp{..} => {}, // empty by design ControlAction::SetVolume{volume,..} => { self.player.set_volume((volume as f32) / (u32::MAX as f32)); + }, + ControlAction::CheckEmpty{..} => { + check_empty = true; } } @@ -75,7 +83,7 @@ impl MpsPlayerServer { self.event.send(PlayerAction::Exception(e)).unwrap(); } if self.player.queue_len() == 0 { // no more music to add - is_exiting = true; + is_exiting = !self.keep_alive || is_exiting; } } @@ -83,8 +91,21 @@ impl MpsPlayerServer { self.event.send(PlayerAction::Acknowledge(command)).unwrap(); } + // always check for empty state change + if self.player.queue_len() == 0 && !is_empty { // just became empty + is_empty = true; + self.event.send(PlayerAction::Empty).unwrap(); + } else if self.player.queue_len() != 0 && is_empty { // just got filled + is_empty = false; + } + + if is_empty && check_empty { + self.event.send(PlayerAction::Empty).unwrap(); + } + if is_exiting { break; } } + println!("Exiting playback server"); self.event.send(PlayerAction::End).unwrap(); } @@ -92,12 +113,13 @@ impl MpsPlayerServer { factory: F, ctrl_tx: Sender, ctrl_rx: Receiver, - event: Sender + event: Sender, + keep_alive: bool ) -> JoinHandle<()> { thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50)); thread::spawn(move || { let player = factory(); - let mut server_obj = Self::new(player, ctrl_rx, event); + let mut server_obj = Self::new(player, ctrl_rx, event, keep_alive); server_obj.run_loop(); }) } @@ -126,7 +148,8 @@ pub enum ControlAction { Exit{ack: bool}, Enqueue {amount: usize, ack: bool}, NoOp{ack: bool}, - SetVolume{ack: bool, volume: u32} + SetVolume{ack: bool, volume: u32}, + CheckEmpty{ack: bool}, } impl ControlAction { @@ -142,6 +165,7 @@ impl ControlAction { Self::Enqueue{ack,..} => ack, Self::NoOp{ack,..} => ack, Self::SetVolume{ack,..} => ack, + Self::CheckEmpty{ack} => ack, } } } @@ -152,6 +176,7 @@ pub enum PlayerAction { Acknowledge(ControlAction), Exception(PlaybackError), End, + Empty, } impl PlayerAction { diff --git a/src/channel_io.rs b/src/channel_io.rs new file mode 100644 index 0000000..130441a --- /dev/null +++ b/src/channel_io.rs @@ -0,0 +1,62 @@ +use std::sync::mpsc::{channel, Sender, Receiver}; +use std::io::{Read, Write, self}; + +pub struct ChannelWriter { + tx: Sender, +} + +impl Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut count = 0; + for &b in buf { + self.tx.send(b).map_err(|e| { + eprintln!("Send error: {}", e); + io::Error::new(io::ErrorKind::ConnectionRefused, e) + })?; + count += 1; + } + Ok(count) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub struct ChannelReader { + rx: Receiver, + blocking: bool, +} + +impl Read for ChannelReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut count = 0; + if self.blocking { + for b in self.rx.iter() { + buf[count] = b; + count += 1; + if count >= buf.len() {break;} + } + } else { + for b in self.rx.try_iter() { + buf[count] = b; + count += 1; + if count >= buf.len() {break;} + } + } + Ok(count) + } +} + +pub fn channel_io() -> (ChannelWriter, ChannelReader) { + let (sender, receiver) = channel(); + ( + ChannelWriter { + tx: sender, + }, + ChannelReader { + rx: receiver, + blocking: false, + } + ) +} diff --git a/src/cli.rs b/src/cli.rs index 5cfb084..4682332 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -9,6 +9,9 @@ pub struct CliArgs { /// Generate m3u8 playlist #[clap(short, long)] pub playlist: Option, + /// In REPL mode, wait for all music in the queue to complete + #[clap(short, long)] + pub wait: bool, } pub fn parse() -> CliArgs { diff --git a/src/main.rs b/src/main.rs index 4d00b0e..fef9898 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,9 @@ //! Future home of a MPS REPL for playing music ergonomically through a CLI. //! +mod channel_io; mod cli; +mod repl; use std::io; use std::path::PathBuf; @@ -61,8 +63,9 @@ fn main() { } } else { // start REPL - // TODO - eprintln!("Abort: Cannot start REPL because it is not implemented yet :("); + println!("Welcome to MPS interactive mode!"); + println!("End a statement with ; to execute it."); + repl::repl(args) } } diff --git a/src/repl.rs b/src/repl.rs new file mode 100644 index 0000000..0609764 --- /dev/null +++ b/src/repl.rs @@ -0,0 +1,97 @@ +//! Read, Execute, Print Loop functionality + +use std::io::{self, Write, Read}; + +use mps_interpreter::MpsRunner; +use mps_player::{MpsPlayer, MpsController}; + +use super::cli::CliArgs; +use super::channel_io::channel_io; + +pub fn repl(args: CliArgs) { + let (mut writer, reader) = channel_io(); + let player_builder = move || { + let runner = MpsRunner::with_stream(reader); + let player = MpsPlayer::new(runner).unwrap(); + player + }; + let stdin_t = io::stdin(); + let mut stdin = stdin_t.lock(); + let mut buf: Vec = Vec::new(); + let mut read_buf = [0]; + let mut current_line = 0; + // TODO: enable raw mode (char by char) reading of stdin + // TODO: generalize loop for better code reuse between playlist and playback mode + if let Some(playlist_file) = &args.playlist { + println!("Playlist mode (output: `{}`)", playlist_file); + let mut player = player_builder(); + let mut playlist_writer = io::BufWriter::new( + std::fs::File::create(playlist_file) + .expect(&format!("Abort: Cannot create writeable file `{}`", playlist_file)) + ); + prompt(&mut current_line); + loop { + read_buf[0] = 0; + while read_buf[0] == 0 { + stdin.read_exact(&mut read_buf).expect("Failed to read stdin"); + } + match read_buf[0] as char { + ';' => { + buf.push(read_buf[0]); + writer.write(buf.as_slice()).expect("Failed to write to MPS interpreter"); + match player.save_m3u8(&mut playlist_writer) { + Ok(_) => {}, + Err(e) => eprintln!("{}", e.message()), + } + buf.clear(); + playlist_writer.flush().expect("Failed to flush playlist to file"); + prompt(&mut current_line); + }, + /* + '\x27' => break, // ESC key + '\x03' => break, // Ctrl+C + */ + _ => buf.push(read_buf[0]), + } + } + } else { + println!("Playback mode (output: audio device)"); + let ctrl = MpsController::create_repl(player_builder); + prompt(&mut current_line); + loop { + read_buf[0] = 0; + while read_buf[0] == 0 { + stdin.read_exact(&mut read_buf).expect("Failed to read stdin"); + } + match read_buf[0] as char { + ';' => { + buf.push(read_buf[0]); + writer.write(buf.as_slice()).expect("Failed to write to MPS interpreter"); + //ctrl.play().expect("Failed to start playback"); + if args.wait { + match ctrl.wait_for_empty() { + Ok(_) => {}, + Err(e) => eprintln!("{}", e.message()), + } + } else { + for e in ctrl.check_ack() { + eprintln!("{}", e.message()); + } + } + buf.clear(); + prompt(&mut current_line); + }, + '\x27' => break, // ESC key + '\x03' => break, // Ctrl+C + _ => buf.push(read_buf[0]), + } + } + } +} + +#[inline(always)] +fn prompt(line: &mut usize) { + print!("{} |-> ", line); + *line += 1; + std::io::stdout().flush().expect("Failed to flush stdout"); +}