PowerTools/main.py

498 lines
18 KiB
Python

import time
import os
import json
import asyncio
import pathlib
import subprocess
VERSION = "0.7.0"
HOME_DIR = str(pathlib.Path(os.getcwd()).parent.parent.resolve())
DEFAULT_SETTINGS_LOCATION = HOME_DIR + "/.config/powertools/default_settings.json"
LOG_LOCATION = "/tmp/powertools.log"
FANTASTIC_INSTALL_DIR = HOME_DIR + "/homebrew/plugins/Fantastic"
import logging
logging.basicConfig(
filename = LOG_LOCATION,
format = '%(asctime)s %(levelname)s %(message)s',
filemode = 'w',
force = True)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.info(f"PowerTools v{VERSION} https://github.com/NGnius/PowerTools")
logging.debug(f"CWD: {os.getcwd()} HOME:{HOME_DIR}")
import sys
#import pathlib
sys.path.append(str(pathlib.Path(__file__).parent.resolve()))
import server as pt_server
startup_time = time.time()
class CPU:
SCALING_FREQUENCIES = [1700000, 2400000, 2800000]
def __init__(self, number, settings=None):
self.number = number
if settings is not None:
self.set_max_boost(settings["max_boost"])
if settings["online"]:
self.enable()
else:
self.disable()
# TODO governor
if(self.status()):
self.max_boost = self._get_max_boost()
else:
self.max_boost = CPU.SCALING_FREQUENCIES[-1]
def enable(self):
# CPU number 0 is special
if(self.number == 0):
return
filepath = cpu_online_path(self.number)
write_to_sys(filepath, 1)
# The user might have changed the maximum cpu clock while the cpu was offline
self._set_max_boost(self.max_boost)
def disable(self):
# CPU number 0 is special
if(self.number == 0):
return
filepath = cpu_online_path(self.number)
write_to_sys(filepath, 0)
def set_max_boost(self, frequency):
self.max_boost = frequency
if(self.status()):
self._set_max_boost(frequency)
def status(self) -> bool:
# cpu number 0 is always online
if(self.number == 0):
return True
filepath = cpu_online_path(self.number)
return read_from_sys(filepath) == "1"
def governor(self) -> str:
return self._read_scaling_governor()
def settings(self) -> dict:
return {
"online": self.status(),
"max_boost": self.max_boost,
"governor": self.governor(),
}
def _read_scaling_governor(self) -> str:
filepath = cpu_governor_scaling_path(self.number)
return read_from_sys(filepath, amount=-1).strip()
def _write_scaling_governor(self, governor: str):
filepath = cpu_governor_scaling_path(self.number)
with open(filepath, mode="w") as f:
f.write(governor)
def _set_max_boost(self, frequency):
if(frequency == CPU.SCALING_FREQUENCIES[-1]):
self._write_scaling_governor("schedutil")
return
if(self._read_scaling_governor() != "userspace"):
self._write_scaling_governor("userspace")
else:
filepath = cpu_freq_scaling_path(self.number)
write_to_sys(filepath, frequency)
def _get_max_boost(self) -> int:
filepath = cpu_freq_scaling_path(self.number)
freq_maybe = read_from_sys(filepath, amount=-1).strip()
if(freq_maybe is None or len(freq_maybe) == 0 or freq_maybe == "<unsupported>"):
return CPU.SCALING_FREQUENCIES[-1]
freq = int(freq_maybe)
return freq
class Plugin:
CPU_COUNT = 8
FAN_SPEEDS = [0, 1000, 2000, 3000, 4000, 5000, 6000]
auto_fan = True
persistent = True
modified_settings = False
current_gameid = None
async def get_version(self) -> str:
return VERSION
# CPU stuff
# call from main_view.html with setCPUs(count, smt)
async def set_cpus(self, count, smt=True):
self.modified_settings = True
cpu_count = len(self.cpus)
self.smt = smt
# print("Setting CPUs")
if smt:
count = min(int(count), cpu_count)
for cpu in self.cpus[: count]:
cpu.enable()
for cpu in self.cpus[count :: 1]:
cpu.disable()
else:
count = min(int(count), cpu_count / 2)
# never touch cpu0, since it's special
for cpu in self.cpus[1 : cpu_count : 2]:
cpu.disable()
for cpu in self.cpus[2 : cpu_count : 2]:
if(cpu.number / 2 + 1 > count):
cpu.disable()
else:
cpu.enable()
async def get_cpus(self) -> int:
online_count = 0
for cpu in self.cpus:
if(cpu.status()):
online_count += 1
return online_count
async def get_smt(self) -> bool:
return self.smt
async def set_boost(self, enabled: bool) -> bool:
self.modified_settings = True
write_cpu_boost(enabled)
return True
async def get_boost(self) -> bool:
return read_cpu_boost()
async def set_max_boost(self, index):
self.modified_settings = True
if index < 0 or index >= len(CPU.SCALING_FREQUENCIES):
return 0
selected_freq = CPU.SCALING_FREQUENCIES[index]
for cpu in self.cpus:
cpu.set_max_boost(selected_freq)
return len(self.cpus)
async def get_max_boost(self) -> int:
return CPU.SCALING_FREQUENCIES.index(self.cpus[0].max_boost)
# GPU stuff
async def set_gpu_power(self, value: int, power_number: int) -> bool:
self.modified_settings = True
write_gpu_ppt(power_number, value)
return True
async def get_gpu_power(self, power_number: int) -> int:
return read_gpu_ppt(power_number)
# Fan stuff
async def set_fan_tick(self, tick: int):
self.modified_settings = True
if tick >= len(self.FAN_SPEEDS):
# automatic mode
self.enable_jupiter_fan_control(self)
self.auto_fan = True
write_to_sys("/sys/class/hwmon/hwmon5/recalculate", 0)
write_to_sys("/sys/class/hwmon/hwmon5/fan1_target", 4099) # 4099 is default
else:
# manual voltage
self.disable_jupiter_fan_control(self)
self.auto_fan = False
write_to_sys("/sys/class/hwmon/hwmon5/recalculate", 1)
write_to_sys("/sys/class/hwmon/hwmon5/fan1_target", self.FAN_SPEEDS[tick])
async def get_fan_tick(self) -> int:
fan_target = read_fan_target()
fan_input = int(read_from_sys("/sys/class/hwmon/hwmon5/fan1_input", amount=-1).strip())
fan_target_v = float(fan_target) / 1000
fan_input_v = float(fan_input) / 1000
if self.auto_fan:
return len(self.FAN_SPEEDS)
elif fan_target == 4099 or (int(round(fan_target_v)) != int(round(fan_input_v)) and fan_target not in self.FAN_SPEEDS):
# cannot read /sys/class/hwmon/hwmon5/recalculate, so guess based on available fan info
# NOTE: the fan takes time to ramp up, so fan_target will never approximately equal fan_input
# when fan_target was changed recently (hence set RPM caching)
return len(self.FAN_SPEEDS)
else:
# quantize RPM to nearest tick (price is right rules; closest without going over)
for i in range(len(self.FAN_SPEEDS)-1):
if fan_target <= self.FAN_SPEEDS[i]:
return i
return len(self.FAN_SPEEDS)-1 # any higher value is considered as highest manual setting
async def fantastic_installed(self) -> bool:
return os.path.exists(FANTASTIC_INSTALL_DIR)
def disable_jupiter_fan_control(self):
active = subprocess.Popen(["systemctl", "is-active", "jupiter-fan-control.service"]).wait() == 0
if active:
logging.info("Stopping jupiter-fan-control.service so it doesn't interfere")
# only disable if currently active
self.jupiter_fan_control_was_disabled = True
stop_p = subprocess.Popen(["systemctl", "stop", "jupiter-fan-control.service"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stop_p.wait()
logging.debug("systemctl stop jupiter-fan-control.service stdout:\n" + stop_p.stdout.read().decode())
logging.debug("systemctl stop jupiter-fan-control.service stderr:\n" + stop_p.stderr.read().decode())
def enable_jupiter_fan_control(self):
if self.jupiter_fan_control_was_disabled:
logging.info("Starting jupiter-fan-control.service so it doesn't interfere")
# only re-enable if I disabled it
self.jupiter_fan_control_was_disabled = False
start_p = subprocess.Popen(["systemctl", "start", "jupiter-fan-control.service"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start_p.wait()
logging.debug("systemctl start jupiter-fan-control.service stdout:\n" + start_p.stdout.read().decode())
logging.debug("systemctl start jupiter-fan-control.service stderr:\n" + start_p.stderr.read().decode())
# Battery stuff
async def get_charge_now(self) -> int:
return int(read_from_sys("/sys/class/hwmon/hwmon2/device/charge_now", amount=-1).strip())
async def get_charge_full(self) -> int:
return int(read_from_sys("/sys/class/hwmon/hwmon2/device/charge_full", amount=-1).strip())
async def get_charge_design(self) -> int:
return int(read_from_sys("/sys/class/hwmon/hwmon2/device/charge_full_design", amount=-1).strip())
# Asyncio-compatible long-running code, executed in a task when the plugin is loaded
async def _main(self):
# startup: load & apply settings
self.jupiter_fan_control_was_disabled = False
if os.path.exists(DEFAULT_SETTINGS_LOCATION):
settings = read_json(DEFAULT_SETTINGS_LOCATION)
logging.debug(f"Loaded settings from {DEFAULT_SETTINGS_LOCATION}: {settings}")
else:
settings = None
logging.debug(f"Settings {DEFAULT_SETTINGS_LOCATION} does not exist, skipped")
if settings is None or settings["persistent"] == False:
logging.debug("Ignoring settings from file")
self.persistent = False
self.guess_settings(self)
self.modified_settings = True
else:
# apply settings
logging.debug("Restoring settings from file")
self.persistent = True
self.apply_settings(self, settings)
# self.modified_settings = False
logging.info("Handled saved settings, back-end startup complete")
# server setup
await pt_server.start(VERSION)
# work loop
while True:
# persistence
if self.modified_settings and self.persistent:
self.save_settings(self)
self.modified_settings = False
if self.persistent:
# per-game profiles
current_game = pt_server.http_server.game()
old_gameid = self.current_gameid
if current_game is not None and current_game.has_settings():
self.current_gameid = current_game.gameid
if old_gameid != self.current_gameid:
logging.info(f"Applying custom settings for {current_game.name()} {current_game.appid()}")
# new game; apply settings
settings = current_game.load_settings()
if settings is not None:
self.apply_settings(self, settings)
else:
self.current_gameid = None
if old_gameid != self.current_gameid:
logging.info("Reapplying default settings; game without custom settings found")
# game without custom settings; apply defaults
settings = read_json(DEFAULT_SETTINGS_LOCATION)
self.apply_settings(self, settings)
logging.debug(f"gameid update: {old_gameid} -> {self.current_gameid}")
await asyncio.sleep(1)
await pt_server.shutdown()
# called from main_view::onViewReady
async def on_ready(self):
delta = time.time() - startup_time
logging.info(f"Front-end initialised {delta}s after startup")
# persistence
async def get_persistent(self) -> bool:
return self.persistent
async def set_persistent(self, enabled: bool):
logging.debug(f"Persistence is now: {enabled}")
self.persistent = enabled
self.save_settings(self)
def current_settings(self) -> dict:
settings = dict()
settings["cpu"] = self.current_cpu_settings(self)
settings["gpu"] = self.current_gpu_settings(self)
settings["fan"] = self.current_fan_settings(self)
settings["persistent"] = self.persistent
return settings
def current_cpu_settings(self) -> dict:
settings = dict()
cpu_settings = []
for cpu in self.cpus:
cpu_settings.append(cpu.settings())
settings["threads"] = cpu_settings
settings["smt"] = self.smt
settings["boost"] = read_cpu_boost()
return settings
def current_gpu_settings(self) -> dict:
settings = dict()
settings["slowppt"] = read_gpu_ppt(1)
settings["fastppt"] = read_gpu_ppt(2)
return settings
def current_fan_settings(self) -> dict:
settings = dict()
settings["target"] = read_fan_target()
settings["auto"] = self.auto_fan
return settings
def save_settings(self):
settings = self.current_settings(self)
logging.debug(f"Saving settings to file: {settings}")
current_game = pt_server.http_server.game()
if current_game is not None and self.current_gameid is not None:
save_location = current_game.settings_path()
else:
save_location = DEFAULT_SETTINGS_LOCATION
write_json(save_location, settings)
logging.info(f"Saved settings to {save_location}")
def apply_settings(self, settings: dict):
# CPU
self.cpus = []
for cpu_number in range(0, Plugin.CPU_COUNT):
self.cpus.append(CPU(cpu_number, settings=settings["cpu"]["threads"][cpu_number]))
self.smt = settings["cpu"]["smt"]
write_cpu_boost(settings["cpu"]["boost"])
# GPU
write_gpu_ppt(1, settings["gpu"]["slowppt"])
write_gpu_ppt(2, settings["gpu"]["fastppt"])
# Fan
if not (os.path.exists(FANTASTIC_INSTALL_DIR) or settings["fan"]["auto"]):
self.disable_jupiter_fan_control(self)
write_to_sys("/sys/class/hwmon/hwmon5/recalculate", 1)
write_to_sys("/sys/class/hwmon/hwmon5/fan1_target", settings["fan"]["target"])
elif settings["fan"]["auto"] and not os.path.exists(FANTASTIC_INSTALL_DIR):
self.enable_jupiter_fan_control(self)
def guess_settings(self):
self.cpus = []
for cpu_number in range(0, Plugin.CPU_COUNT):
self.cpus.append(CPU(cpu_number))
# If any core has two threads, smt is True
self.smt = self.cpus[1].status()
if(not self.smt):
for cpu_number in range(2, len(self.cpus), 2):
if(self.cpus[cpu_number].status()):
self.smt = True
break
logging.info(f"SMT state is guessed to be {self.smt}")
# per-game profiles
async def get_current_game(self) -> str:
current_game = pt_server.http_server.game()
if current_game is None:
return "Menu (default)"
else:
return f"{current_game.name()} ({current_game.appid()})"
async def set_per_game_profile(self, enabled: bool):
current_game = pt_server.http_server.game()
if enabled and self.persistent and current_game is not None:
self.current_gameid = current_game.gameid
self.modified_settings = True
else:
if not enabled and current_game is not None and current_game.has_settings():
# delete settings; disable settings loading
os.remove(current_game.settings_path())
self.current_gameid = None
async def get_per_game_profile(self) -> bool:
return self.current_gameid is not None
# these are stateless (well, the state is not saved internally) functions, so there's no need for these to be called like a class method
def cpu_online_path(cpu_number: int) -> str:
return f"/sys/devices/system/cpu/cpu{cpu_number}/online"
def cpu_freq_scaling_path(cpu_number: int) -> str:
return f"/sys/devices/system/cpu/cpu{cpu_number}/cpufreq/scaling_setspeed"
def cpu_governor_scaling_path(cpu_number: int) -> str:
return f"/sys/devices/system/cpu/cpu{cpu_number}/cpufreq/scaling_governor"
def gpu_power_path(power_number: int) -> str:
return f"/sys/class/hwmon/hwmon4/power{power_number}_cap"
def read_cpu_boost() -> bool:
return read_from_sys("/sys/devices/system/cpu/cpufreq/boost") == "1"
def write_cpu_boost(enable: bool):
write_to_sys("/sys/devices/system/cpu/cpufreq/boost", int(enable))
def read_gpu_ppt(power_number: int) -> int:
return read_sys_int(gpu_power_path(power_number))
def write_gpu_ppt(power_number:int, value: int):
write_to_sys(gpu_power_path(power_number), value)
def read_fan_target() -> int:
return read_sys_int("/sys/class/hwmon/hwmon5/fan1_target")
def write_to_sys(path, value: int):
with open(path, mode="w") as f:
f.write(str(value))
logging.debug(f"Wrote `{value}` to {path}")
def read_from_sys(path, amount=1):
with open(path, mode="r") as f:
value = f.read(amount)
logging.debug(f"Read `{value}` from {path}")
return value
def read_sys_int(path) -> int:
return int(read_from_sys(path, amount=-1).strip())
def write_json(path, data):
with open(path, mode="w") as f:
json.dump(data, f) # I always guess which is which param and I hate it
def read_json(path):
with open(path, mode="r") as f:
return json.load(f)
os_release = read_from_sys("/etc/os-release", amount=-1).strip()
logging.info(f"/etc/os-release\n{os_release}")