Implement REPL functionality

This commit is contained in:
NGnius (Graham) 2022-01-03 17:53:57 -05:00
parent 7d15f27465
commit bddb79d0ca
13 changed files with 295 additions and 18 deletions

View file

@ -95,9 +95,10 @@ where
None => None, None => None,
} }
} else { } else {
if self.tokenizer.end_of_file() { /*if self.tokenizer.end_of_file() {
return None; return None;
} }*/
//println!("try get next statement");
// build new statement // build new statement
let token_result = self let token_result = self
.tokenizer .tokenizer

View file

@ -29,13 +29,13 @@ impl MpsLanguageDictionary {
Some(x) => Ok(x), Some(x) => Ok(x),
None => Err(SyntaxError { None => Err(SyntaxError {
line: 0, line: 0,
token: MpsToken::Name("???".into()), token: MpsToken::Name("{something}".into()),
got: None, got: None,
}), }),
}?; }?;
Err(SyntaxError { Err(SyntaxError {
line: 0, line: 0,
token: MpsToken::Name("???".into()), token: MpsToken::Name("{any of many}".into()),
got: Some(result), got: Some(result),
}) })
} }

View file

@ -2,7 +2,7 @@ use std::collections::VecDeque;
//use std::fmt::{Debug, Display, Error, Formatter}; //use std::fmt::{Debug, Display, Error, Formatter};
use std::marker::PhantomData; use std::marker::PhantomData;
use crate::lang::utility::{assert_token, assert_token_raw, assert_token_raw_back}; use crate::lang::utility::{assert_token, assert_token_raw, assert_token_raw_back, assert_empty};
use crate::lang::MpsLanguageDictionary; use crate::lang::MpsLanguageDictionary;
use crate::lang::SyntaxError; use crate::lang::SyntaxError;
use crate::lang::{BoxedMpsOpFactory, MpsOp}; use crate::lang::{BoxedMpsOpFactory, MpsOp};
@ -67,8 +67,8 @@ impl<Op: MpsOp + 'static, F: MpsFunctionFactory<Op> + 'static> BoxedMpsOpFactory
)?; )?;
assert_token_raw(MpsToken::OpenBracket, tokens)?; assert_token_raw(MpsToken::OpenBracket, tokens)?;
assert_token_raw_back(MpsToken::CloseBracket, tokens)?; assert_token_raw_back(MpsToken::CloseBracket, tokens)?;
Ok(Box::new( let func = self.op_factory.build_function_params(name, tokens, dict)?;
self.op_factory.build_function_params(name, tokens, dict)?, assert_empty(tokens)?;
)) Ok(Box::new(func))
} }
} }

View file

@ -169,6 +169,17 @@ pub fn assert_type(tokens: &mut VecDeque<MpsToken>) -> Result<MpsTypePrimitive,
} }
} }
pub fn assert_empty(tokens: &mut VecDeque<MpsToken>) -> Result<(), SyntaxError> {
match tokens.pop_front() {
None => Ok(()),
Some(t) => Err(SyntaxError {
line: 0,
token: MpsToken::Name("{nothing}".into()),
got: Some(t),
}),
}
}
pub fn music_folder() -> PathBuf { pub fn music_folder() -> PathBuf {
dirs::home_dir() dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("./")) .unwrap_or_else(|| PathBuf::from("./"))

View file

@ -49,6 +49,7 @@ where
{ {
byte_buf[0] = 0; // clear to null char (nothing read is assumed to mean end of file) byte_buf[0] = 0; // clear to null char (nothing read is assumed to mean end of file)
} }
//println!("tokenizer read char: {}", byte_buf[0]);
self.do_tracking(byte_buf[0]); self.do_tracking(byte_buf[0]);
self.fsm = self.fsm.next_state(byte_buf[0]); self.fsm = self.fsm.next_state(byte_buf[0]);
let mut bigger_buf: Vec<u8> = Vec::new(); let mut bigger_buf: Vec<u8> = Vec::new();
@ -176,6 +177,7 @@ where
fn next_statement(&mut self, buf: &mut VecDeque<MpsToken>) -> Result<(), ParseError> { fn next_statement(&mut self, buf: &mut VecDeque<MpsToken>) -> Result<(), ParseError> {
// read until buffer gets some tokens, in case multiple end of line tokens are at start of stream // read until buffer gets some tokens, in case multiple end of line tokens are at start of stream
let original_size = buf.len(); let original_size = buf.len();
self.read_line(buf)?; // always try once, even if at end of file
while original_size == buf.len() && !self.end_of_file() { while original_size == buf.len() && !self.end_of_file() {
self.read_line(buf)?; self.read_line(buf)?;
} }

View file

@ -17,13 +17,29 @@ pub struct MpsController {
impl MpsController { impl MpsController {
pub fn create<F: FnOnce() -> MpsPlayer<T> + Send + 'static, T: MpsTokenReader>( pub fn create<F: FnOnce() -> MpsPlayer<T> + Send + 'static, T: MpsTokenReader>(
player_gen: F player_gen: F,
) -> Self { ) -> Self {
let (control_tx, control_rx) = channel(); let (control_tx, control_rx) = channel();
let (event_tx, event_rx) = channel(); let (event_tx, event_rx) = channel();
let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone()); let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone());
sys_ctrl.init(); sys_ctrl.init();
let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx); let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, false);
Self {
control: control_tx,
event: event_rx,
handle: handle,
sys_ctrl: sys_ctrl,
}
}
pub fn create_repl<F: FnOnce() -> MpsPlayer<T> + Send + 'static, T: MpsTokenReader>(
player_gen: F,
) -> Self {
let (control_tx, control_rx) = channel();
let (event_tx, event_rx) = channel();
let mut sys_ctrl = SystemControlWrapper::new(control_tx.clone());
sys_ctrl.init();
let handle = MpsPlayerServer::spawn(player_gen, control_tx.clone(), control_rx, event_tx, true);
Self { Self {
control: control_tx, control: control_tx,
event: event_rx, event: event_rx,
@ -58,7 +74,8 @@ impl MpsController {
match event { match event {
PlayerAction::Acknowledge(_) => Ok(()), PlayerAction::Acknowledge(_) => Ok(()),
PlayerAction::Exception(e) => Err(e), PlayerAction::Exception(e) => Err(e),
PlayerAction::End => Ok(()) PlayerAction::End => Ok(()),
PlayerAction::Empty => Ok(()),
} }
} }
@ -112,4 +129,55 @@ impl MpsController {
} }
Ok(()) Ok(())
} }
pub fn wait_for_empty(&self) -> Result<(), PlaybackError> {
for msg in self.event.try_iter() {
Self::handle_event(msg)?;
}
self.control.send(ControlAction::CheckEmpty{ack: true}).map_err(PlaybackError::from_err)?;
loop {
let msg = self.event.recv().map_err(PlaybackError::from_err)?;
if let PlayerAction::Empty = msg {
break;
} else {
Self::handle_event(msg)?;
}
}
Ok(())
}
/// Check for any errors in the event queue.
/// This is non-blocking, so it only handles events sent since the last time events were handled.
pub fn check(&self) -> Vec<PlaybackError> {
let mut result = Vec::new();
for msg in self.event.try_iter() {
if let Err(e) = Self::handle_event(msg) {
result.push(e);
}
}
result
}
/// Like check(), but it also waits for an acknowledgement to ensure it gets the latest events.
pub fn check_ack(&self) -> Vec<PlaybackError> {
let mut result = Vec::new();
let to_send = ControlAction::NoOp{ack: true};
if let Err(e) = self.control.send(to_send.clone()).map_err(PlaybackError::from_err) {
result.push(e);
}
for msg in self.event.iter() {
if let PlayerAction::Acknowledge(action) = msg {
if action == to_send {
break;
} else {
result.push(PlaybackError {
msg: "Incorrect acknowledgement received for MpsController control action".into()
});
}
} else if let Err(e) = Self::handle_event(msg) {
result.push(e);
}
}
result
}
} }

View file

@ -11,6 +11,10 @@ impl PlaybackError {
msg: format!("{}", err), msg: format!("{}", err),
} }
} }
pub fn message(&self) -> &'_ str {
&self.msg
}
} }
impl Display for PlaybackError { impl Display for PlaybackError {

View file

@ -83,6 +83,7 @@ impl<T: MpsTokenReader> MpsPlayer<T> {
items_left -= 1; items_left -= 1;
if items_left == 0 { break; } if items_left == 0 { break; }
} }
//println!("Enqueued {} items", count - items_left);
Ok(()) Ok(())
} }

View file

@ -10,14 +10,16 @@ pub struct MpsPlayerServer<T: MpsTokenReader> {
player: MpsPlayer<T>, player: MpsPlayer<T>,
control: Receiver<ControlAction>, control: Receiver<ControlAction>,
event: Sender<PlayerAction>, event: Sender<PlayerAction>,
keep_alive: bool,
} }
impl<T: MpsTokenReader> MpsPlayerServer<T> { impl<T: MpsTokenReader> MpsPlayerServer<T> {
pub fn new(player: MpsPlayer<T>, ctrl: Receiver<ControlAction>, event: Sender<PlayerAction>) -> Self { pub fn new(player: MpsPlayer<T>, ctrl: Receiver<ControlAction>, event: Sender<PlayerAction>, keep_alive: bool) -> Self {
Self { Self {
player: player, player: player,
control: ctrl, control: ctrl,
event: event, event: event,
keep_alive: keep_alive,
} }
} }
@ -27,11 +29,14 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
if let Err(e) = self.player.enqueue(1) { if let Err(e) = self.player.enqueue(1) {
self.event.send(PlayerAction::Exception(e)).unwrap(); self.event.send(PlayerAction::Exception(e)).unwrap();
} }
let mut is_empty = self.player.queue_len() == 0;
loop { loop {
let command = self.control.recv().unwrap(); let command = self.control.recv().unwrap();
let mut is_exiting = false; let mut is_exiting = false;
let mut check_empty = false;
// process command // process command
match command { match command {
ControlAction::Next{..} => { ControlAction::Next{..} => {
@ -66,6 +71,9 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
ControlAction::NoOp{..} => {}, // empty by design ControlAction::NoOp{..} => {}, // empty by design
ControlAction::SetVolume{volume,..} => { ControlAction::SetVolume{volume,..} => {
self.player.set_volume((volume as f32) / (u32::MAX as f32)); self.player.set_volume((volume as f32) / (u32::MAX as f32));
},
ControlAction::CheckEmpty{..} => {
check_empty = true;
} }
} }
@ -75,7 +83,7 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
self.event.send(PlayerAction::Exception(e)).unwrap(); self.event.send(PlayerAction::Exception(e)).unwrap();
} }
if self.player.queue_len() == 0 { // no more music to add if self.player.queue_len() == 0 { // no more music to add
is_exiting = true; is_exiting = !self.keep_alive || is_exiting;
} }
} }
@ -83,8 +91,21 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
self.event.send(PlayerAction::Acknowledge(command)).unwrap(); self.event.send(PlayerAction::Acknowledge(command)).unwrap();
} }
// always check for empty state change
if self.player.queue_len() == 0 && !is_empty { // just became empty
is_empty = true;
self.event.send(PlayerAction::Empty).unwrap();
} else if self.player.queue_len() != 0 && is_empty { // just got filled
is_empty = false;
}
if is_empty && check_empty {
self.event.send(PlayerAction::Empty).unwrap();
}
if is_exiting { break; } if is_exiting { break; }
} }
println!("Exiting playback server");
self.event.send(PlayerAction::End).unwrap(); self.event.send(PlayerAction::End).unwrap();
} }
@ -92,12 +113,13 @@ impl<T: MpsTokenReader> MpsPlayerServer<T> {
factory: F, factory: F,
ctrl_tx: Sender<ControlAction>, ctrl_tx: Sender<ControlAction>,
ctrl_rx: Receiver<ControlAction>, ctrl_rx: Receiver<ControlAction>,
event: Sender<PlayerAction> event: Sender<PlayerAction>,
keep_alive: bool
) -> JoinHandle<()> { ) -> JoinHandle<()> {
thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50)); thread::spawn(move || Self::unblocking_timer_loop(ctrl_tx, 50));
thread::spawn(move || { thread::spawn(move || {
let player = factory(); let player = factory();
let mut server_obj = Self::new(player, ctrl_rx, event); let mut server_obj = Self::new(player, ctrl_rx, event, keep_alive);
server_obj.run_loop(); server_obj.run_loop();
}) })
} }
@ -126,7 +148,8 @@ pub enum ControlAction {
Exit{ack: bool}, Exit{ack: bool},
Enqueue {amount: usize, ack: bool}, Enqueue {amount: usize, ack: bool},
NoOp{ack: bool}, NoOp{ack: bool},
SetVolume{ack: bool, volume: u32} SetVolume{ack: bool, volume: u32},
CheckEmpty{ack: bool},
} }
impl ControlAction { impl ControlAction {
@ -142,6 +165,7 @@ impl ControlAction {
Self::Enqueue{ack,..} => ack, Self::Enqueue{ack,..} => ack,
Self::NoOp{ack,..} => ack, Self::NoOp{ack,..} => ack,
Self::SetVolume{ack,..} => ack, Self::SetVolume{ack,..} => ack,
Self::CheckEmpty{ack} => ack,
} }
} }
} }
@ -152,6 +176,7 @@ pub enum PlayerAction {
Acknowledge(ControlAction), Acknowledge(ControlAction),
Exception(PlaybackError), Exception(PlaybackError),
End, End,
Empty,
} }
impl PlayerAction { impl PlayerAction {

62
src/channel_io.rs Normal file
View file

@ -0,0 +1,62 @@
use std::sync::mpsc::{channel, Sender, Receiver};
use std::io::{Read, Write, self};
pub struct ChannelWriter {
tx: Sender<u8>,
}
impl Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut count = 0;
for &b in buf {
self.tx.send(b).map_err(|e| {
eprintln!("Send error: {}", e);
io::Error::new(io::ErrorKind::ConnectionRefused, e)
})?;
count += 1;
}
Ok(count)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub struct ChannelReader {
rx: Receiver<u8>,
blocking: bool,
}
impl Read for ChannelReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut count = 0;
if self.blocking {
for b in self.rx.iter() {
buf[count] = b;
count += 1;
if count >= buf.len() {break;}
}
} else {
for b in self.rx.try_iter() {
buf[count] = b;
count += 1;
if count >= buf.len() {break;}
}
}
Ok(count)
}
}
pub fn channel_io() -> (ChannelWriter, ChannelReader) {
let (sender, receiver) = channel();
(
ChannelWriter {
tx: sender,
},
ChannelReader {
rx: receiver,
blocking: false,
}
)
}

View file

@ -9,6 +9,9 @@ pub struct CliArgs {
/// Generate m3u8 playlist /// Generate m3u8 playlist
#[clap(short, long)] #[clap(short, long)]
pub playlist: Option<String>, pub playlist: Option<String>,
/// In REPL mode, wait for all music in the queue to complete
#[clap(short, long)]
pub wait: bool,
} }
pub fn parse() -> CliArgs { pub fn parse() -> CliArgs {

View file

@ -4,7 +4,9 @@
//! Future home of a MPS REPL for playing music ergonomically through a CLI. //! Future home of a MPS REPL for playing music ergonomically through a CLI.
//! //!
mod channel_io;
mod cli; mod cli;
mod repl;
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
@ -61,8 +63,9 @@ fn main() {
} }
} else { } else {
// start REPL // start REPL
// TODO println!("Welcome to MPS interactive mode!");
eprintln!("Abort: Cannot start REPL because it is not implemented yet :("); println!("End a statement with ; to execute it.");
repl::repl(args)
} }
} }

97
src/repl.rs Normal file
View file

@ -0,0 +1,97 @@
//! Read, Execute, Print Loop functionality
use std::io::{self, Write, Read};
use mps_interpreter::MpsRunner;
use mps_player::{MpsPlayer, MpsController};
use super::cli::CliArgs;
use super::channel_io::channel_io;
pub fn repl(args: CliArgs) {
let (mut writer, reader) = channel_io();
let player_builder = move || {
let runner = MpsRunner::with_stream(reader);
let player = MpsPlayer::new(runner).unwrap();
player
};
let stdin_t = io::stdin();
let mut stdin = stdin_t.lock();
let mut buf: Vec<u8> = Vec::new();
let mut read_buf = [0];
let mut current_line = 0;
// TODO: enable raw mode (char by char) reading of stdin
// TODO: generalize loop for better code reuse between playlist and playback mode
if let Some(playlist_file) = &args.playlist {
println!("Playlist mode (output: `{}`)", playlist_file);
let mut player = player_builder();
let mut playlist_writer = io::BufWriter::new(
std::fs::File::create(playlist_file)
.expect(&format!("Abort: Cannot create writeable file `{}`", playlist_file))
);
prompt(&mut current_line);
loop {
read_buf[0] = 0;
while read_buf[0] == 0 {
stdin.read_exact(&mut read_buf).expect("Failed to read stdin");
}
match read_buf[0] as char {
';' => {
buf.push(read_buf[0]);
writer.write(buf.as_slice()).expect("Failed to write to MPS interpreter");
match player.save_m3u8(&mut playlist_writer) {
Ok(_) => {},
Err(e) => eprintln!("{}", e.message()),
}
buf.clear();
playlist_writer.flush().expect("Failed to flush playlist to file");
prompt(&mut current_line);
},
/*
'\x27' => break, // ESC key
'\x03' => break, // Ctrl+C
*/
_ => buf.push(read_buf[0]),
}
}
} else {
println!("Playback mode (output: audio device)");
let ctrl = MpsController::create_repl(player_builder);
prompt(&mut current_line);
loop {
read_buf[0] = 0;
while read_buf[0] == 0 {
stdin.read_exact(&mut read_buf).expect("Failed to read stdin");
}
match read_buf[0] as char {
';' => {
buf.push(read_buf[0]);
writer.write(buf.as_slice()).expect("Failed to write to MPS interpreter");
//ctrl.play().expect("Failed to start playback");
if args.wait {
match ctrl.wait_for_empty() {
Ok(_) => {},
Err(e) => eprintln!("{}", e.message()),
}
} else {
for e in ctrl.check_ack() {
eprintln!("{}", e.message());
}
}
buf.clear();
prompt(&mut current_line);
},
'\x27' => break, // ESC key
'\x03' => break, // Ctrl+C
_ => buf.push(read_buf[0]),
}
}
}
}
#[inline(always)]
fn prompt(line: &mut usize) {
print!("{} |-> ", line);
*line += 1;
std::io::stdout().flush().expect("Failed to flush stdout");
}