Make periodic reads protobuf streams to reuse open websockets

This commit is contained in:
NGnius (Graham) 2023-10-09 18:21:31 -04:00
parent dd6672f9ba
commit 6c8b335417
9 changed files with 112 additions and 43 deletions

2
backend-rs/Cargo.lock generated
View file

@ -247,7 +247,7 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]] [[package]]
name = "fantastic-rs" name = "fantastic-rs"
version = "0.5.0-alpha2" version = "0.5.0-alpha3"
dependencies = [ dependencies = [
"log", "log",
"nrpc", "nrpc",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "fantastic-rs" name = "fantastic-rs"
version = "0.5.0-alpha2" version = "0.5.0-alpha3"
edition = "2021" edition = "2021"
authors = ["NGnius (Graham) <ngniusness@gmail.com>"] authors = ["NGnius (Graham) <ngniusness@gmail.com>"]
description = "Backend (superuser) functionality for Fantastic" description = "Backend (superuser) functionality for Fantastic"
@ -17,7 +17,7 @@ serde_json = "1.0"
nrpc = { version = "0.10", path = "../../nRPC/nrpc" } nrpc = { version = "0.10", path = "../../nRPC/nrpc" }
prost = "0.11" prost = "0.11"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync", "rt"] }
sysfuss = { version = "0.3", features = ["derive"], path = "../../sysfs-nav" } sysfuss = { version = "0.3", features = ["derive"], path = "../../sysfs-nav" }

View file

@ -20,10 +20,10 @@ service Fan {
rpc name (Empty) returns (NameMessage); rpc name (Empty) returns (NameMessage);
// Get fan speed // Get fan speed
rpc get_fan_rpm (Empty) returns (RpmMessage); rpc get_fan_rpm (Empty) returns (stream RpmMessage);
// Get system temperature // Get system temperature
rpc get_temperature (Empty) returns (TemperatureMessage); rpc get_temperature (Empty) returns (stream TemperatureMessage);
// Set custom fan control enabled // Set custom fan control enabled
rpc set_enable (EnablementMessage) returns (EnablementMessage); rpc set_enable (EnablementMessage) returns (EnablementMessage);

View file

@ -1,10 +1,15 @@
use crate::services::fantastic::*; use crate::services::fantastic::*;
use usdpl_back::nrpc::_helpers::futures::{StreamExt, FutureExt};
use super::control::ControlRuntime; use super::control::ControlRuntime;
pub const VERSION: &'static str = env!("CARGO_PKG_VERSION"); pub const VERSION: &'static str = env!("CARGO_PKG_VERSION");
pub const NAME: &'static str = env!("CARGO_PKG_NAME"); pub const NAME: &'static str = env!("CARGO_PKG_NAME");
const FAN_READ_PERIOD: std::time::Duration = std::time::Duration::from_millis(1000);
const TEMPERATURE_READ_PERIOD: std::time::Duration = std::time::Duration::from_millis(2000);
pub struct FanService { pub struct FanService {
ctrl: ControlRuntime, ctrl: ControlRuntime,
} }
@ -18,6 +23,11 @@ impl FanService {
} }
} }
fn once_true() -> impl std::iter::Iterator<Item = bool> {
// iters over [true, false, false, ...]
std::iter::once(true).chain(std::iter::repeat(false))
}
#[usdpl_back::nrpc::_helpers::async_trait::async_trait] #[usdpl_back::nrpc::_helpers::async_trait::async_trait]
impl<'a> IFan<'a> for FanService { impl<'a> IFan<'a> for FanService {
async fn echo( async fn echo(
@ -67,29 +77,57 @@ impl<'a> IFan<'a> for FanService {
} }
) )
} }
async fn get_fan_rpm( async fn get_fan_rpm<'b: 'a>(
&mut self, &mut self,
_input: Empty, _input: Empty,
) -> Result<RpmMessage, Box<dyn std::error::Error + Send>> { ) -> Result<
if let Some(rpm) = crate::sys::read_fan(self.ctrl.hwmon()) { usdpl_back::nrpc::ServiceServerStream<'b, RpmMessage>,
Box<dyn std::error::Error + Send>,
> {
let hwmon = self.ctrl.hwmon_clone();
let stream = usdpl_back::nrpc::_helpers::futures::stream::iter(once_true()).then(move |is_first| {
let hwmon = hwmon.clone();
tokio::task::spawn_blocking(
/* tokio::time::sleep(..) is not Unpin (but this is)... *grumble grumble* */
move || if !is_first { std::thread::sleep(FAN_READ_PERIOD); })
.map(move |_| {
if let Some(rpm) = crate::sys::read_fan(&hwmon) {
log::debug!("get_fan_rpm() success: {}", rpm); log::debug!("get_fan_rpm() success: {}", rpm);
Ok(RpmMessage { rpm: rpm as u32 }) Ok(RpmMessage { rpm: rpm as u32 })
} else { } else {
Err(Box::<dyn std::error::Error + Send + Sync>::from("Failed to read fan speed")) Err(usdpl_back::nrpc::ServiceError::Method(Box::<dyn std::error::Error + Send + Sync>::from("Failed to read fan speed")))
} }
})
});
Ok(Box::new(stream))
} }
async fn get_temperature(
async fn get_temperature<'b: 'a>(
&mut self, &mut self,
_input: Empty, _input: Empty,
) -> Result<TemperatureMessage, Box<dyn std::error::Error + Send>>{ ) -> Result<
if let Some(temperature) = crate::sys::read_thermal_zone(self.ctrl.thermal_zone()) { usdpl_back::nrpc::ServiceServerStream<'b, TemperatureMessage>,
Box<dyn std::error::Error + Send>,
> {
let thermal_zone = self.ctrl.thermal_zone_clone();
let stream = usdpl_back::nrpc::_helpers::futures::stream::iter(once_true()).then(move |is_first| {
let thermal_zone = thermal_zone.clone();
tokio::task::spawn_blocking(
/* tokio::time::sleep(..) is not Unpin (but this is)... *grumble grumble* */
move || if !is_first { std::thread::sleep(TEMPERATURE_READ_PERIOD); })
.map(move |_| {
if let Some(temperature) = crate::sys::read_thermal_zone(&thermal_zone) {
let real_temp = temperature as f64 / 1000.0; let real_temp = temperature as f64 / 1000.0;
log::debug!("get_temperature() success: {}", real_temp); log::debug!("get_temperature() success: {}", real_temp);
Ok(TemperatureMessage { temperature: real_temp }) Ok(TemperatureMessage { temperature: real_temp })
} else { } else {
Err(Box::<dyn std::error::Error + Send + Sync>::from("get_temperature failed to read thermal zone 0")) Err(usdpl_back::nrpc::ServiceError::Method(Box::<dyn std::error::Error + Send + Sync>::from("get_temperature failed to read thermal zone 0")))
} }
})
});
Ok(Box::new(stream))
} }
async fn set_enable( async fn set_enable(
&mut self, &mut self,
input: EnablementMessage, input: EnablementMessage,

View file

@ -49,12 +49,20 @@ impl ControlRuntime {
&self.state &self.state
} }
pub(crate) fn hwmon(&self) -> &'_ HwMonPath { /*pub(crate) fn hwmon(&self) -> &'_ HwMonPath {
&self.hwmon &self.hwmon
}*/
pub(crate) fn hwmon_clone(&self) -> Arc<HwMonPath> {
self.hwmon.clone()
} }
pub(crate) fn thermal_zone(&self) -> &'_ BasicEntityPath { /*pub(crate) fn thermal_zone(&self) -> &'_ BasicEntityPath {
&self.thermal_zone &self.thermal_zone
}*/
pub(crate) fn thermal_zone_clone(&self) -> Arc<BasicEntityPath> {
self.thermal_zone.clone()
} }
pub fn run(&self) -> thread::JoinHandle<()> { pub fn run(&self) -> thread::JoinHandle<()> {

View file

@ -5,23 +5,34 @@ const HWMON_INDEX: u64 = 5;
pub const RECALCULATE_ATTR: HwMonAttribute = HwMonAttribute::custom("recalculate"); pub const RECALCULATE_ATTR: HwMonAttribute = HwMonAttribute::custom("recalculate");
pub const FAN1_INPUT_ATTR: HwMonAttribute = HwMonAttribute::new(HwMonAttributeType::Fan, 1, HwMonAttributeItem::Input); pub const FAN1_INPUT_ATTR: HwMonAttribute = HwMonAttribute::new(HwMonAttributeType::Fan, 1, HwMonAttributeItem::Input);
pub const FAN1_LABEL_ATTR: HwMonAttribute = HwMonAttribute::new(HwMonAttributeType::Fan, 1, HwMonAttributeItem::Label);
pub const FAN1_TARGET_ATTR: HwMonAttribute = HwMonAttribute::custom("fan1_target"); pub const FAN1_TARGET_ATTR: HwMonAttribute = HwMonAttribute::custom("fan1_target");
const HWMON_NEEDS: [HwMonAttribute; 3] = [ const HWMON_NEEDS: [HwMonAttribute; 3] = [
RECALCULATE_ATTR, //RECALCULATE_ATTR,
FAN1_INPUT_ATTR, FAN1_INPUT_ATTR,
FAN1_TARGET_ATTR, FAN1_TARGET_ATTR,
FAN1_LABEL_ATTR,
]; ];
pub fn read_fan(hwmon: &HwMonPath) -> Option<u64> { pub fn read_fan(hwmon: &HwMonPath) -> Option<u64> {
hwmon.attribute(FAN1_INPUT_ATTR).ok() match hwmon.attribute(FAN1_INPUT_ATTR){
//read_single(format!("/sys/class/hwmon/hwmon{}/fan1_input", HWMON_INDEX)).ok() Ok(x) => Some(x),
Err(e) => {
log::error!("Failed read_fan(): {}", e);
None
},
}
} }
// TODO convert to sysfuss
pub fn read_thermal_zone(entity: &BasicEntityPath) -> Option<u64> { pub fn read_thermal_zone(entity: &BasicEntityPath) -> Option<u64> {
entity.attribute("temp".to_owned()).ok() match entity.attribute("temp".to_owned()) {
//read_single(format!("/sys/class/thermal/thermal_zone{}/temp", index)).ok() Ok(x) => Some(x),
Err(e) => {
log::error!("Failed read_thermal_zone(): {}", e);
None
},
}
} }
pub fn write_fan_recalc(hwmon: &HwMonPath, enabled: bool) -> Result<(), std::io::Error> { pub fn write_fan_recalc(hwmon: &HwMonPath, enabled: bool) -> Result<(), std::io::Error> {

View file

@ -1,6 +1,6 @@
{ {
"name": "Fantastic", "name": "Fantastic",
"version": "0.5.0-alpha1", "version": "0.5.0-alpha3",
"description": "A template to quickly create decky plugins from scratch, based on TypeScript and webpack", "description": "A template to quickly create decky plugins from scratch, based on TypeScript and webpack",
"scripts": { "scripts": {
"build": "shx rm -rf dist && rollup -c", "build": "shx rm -rf dist && rollup -c",

View file

@ -94,12 +94,10 @@ export async function removeCurvePoint(index: number): Promise<{"x": number, "y"
//return (await call_backend("remove_curve_point", [index]))[0]; //return (await call_backend("remove_curve_point", [index]))[0];
} }
export async function getFanRpm(): Promise<number> { export async function getFanRpm(callback: (rpm: number) => void): Promise<void> {
return (await FAN_CLIENT!.get_fan_rpm(true))?? 1337; return (await FAN_CLIENT!.get_fan_rpm(true, callback));
//return (await call_backend("get_fan_rpm", []))[0];
} }
export async function getTemperature(): Promise<number> { export async function getTemperature(callback: (temp: number) => void): Promise<void> {
return (await FAN_CLIENT!.get_temperature(true))?? -273; return (await FAN_CLIENT!.get_temperature(true, callback));
//return (await call_backend("get_temperature", []))[0];
} }

View file

@ -27,6 +27,12 @@ var version: string = "";
var curve_backup: {x: number, y: number}[] = []; var curve_backup: {x: number, y: number}[] = [];
var tempCache: number = -1337;
var setTemperature_display = (_: number) => {};
var fanRpmCache: number = -273;
var setFanRpm_display = (_: number) => {};
const Content: VFC<{ serverAPI: ServerAPI }> = ({serverAPI}) => { const Content: VFC<{ serverAPI: ServerAPI }> = ({serverAPI}) => {
// const [result, setResult] = useState<number | undefined>(); // const [result, setResult] = useState<number | undefined>();
@ -54,8 +60,16 @@ const Content: VFC<{ serverAPI: ServerAPI }> = ({serverAPI}) => {
curve_backup = value; curve_backup = value;
} }
const [temperatureGlobal, setTemperature] = useState<number>(-273.15); const [temperatureGlobal, setTemperature] = useState<number>(tempCache);
const [fanRpmGlobal, setFanRpm] = useState<number>(-1337); const [fanRpmGlobal, setFanRpm] = useState<number>(fanRpmCache);
setTemperature_display = (x) => {
setTemperature(x);
tempCache = x;
};
setFanRpm_display = (x) => {
setFanRpm(x);
fanRpmCache = x;
};
function setEnable(enable: boolean) { function setEnable(enable: boolean) {
setEnableInternal(enable); setEnableInternal(enable);
@ -206,17 +220,17 @@ const Content: VFC<{ serverAPI: ServerAPI }> = ({serverAPI}) => {
backend.resolve(backend.getEnabled(), setEnable); backend.resolve(backend.getEnabled(), setEnable);
backend.resolve(backend.getInterpolate(), setInterpol); backend.resolve(backend.getInterpolate(), setInterpol);
backend.resolve(backend.getCurve(), setCurve); backend.resolve(backend.getCurve(), setCurve);
backend.resolve(backend.getTemperature(), setTemperature); backend.resolve(backend.getTemperature(setTemperature_display), (_: any) => {});
backend.resolve(backend.getFanRpm(), setFanRpm); backend.resolve(backend.getFanRpm(setFanRpm_display), (_: any) => {});
if (periodicHook != null) { if (periodicHook != null) {
clearInterval(periodicHook); clearInterval(periodicHook);
} }
periodicHook = setInterval(function() { /*periodicHook = setInterval(function() {
backend.resolve(backend.getTemperature(), setTemperature); backend.resolve(backend.getTemperature(), setTemperature);
backend.resolve(backend.getFanRpm(), setFanRpm); backend.resolve(backend.getFanRpm(), setFanRpm);
}, 1000); }, 1000);*/
} }
if (!usdplReady) { if (!usdplReady) {