diff --git a/clore_hosting/main.py b/clore_hosting/main.py index f391f78..671dea5 100644 --- a/clore_hosting/main.py +++ b/clore_hosting/main.py @@ -2,6 +2,7 @@ from lib import config as config_module from lib import logging as logging_lib from lib import log_streaming_task from lib import run_startup_script +from lib import hive_miner_interface from lib import docker_interface from lib import docker_deploy from lib import docker_pull @@ -97,7 +98,8 @@ class CloreClient: "log_streaming_task": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(), - "oc_service": utils.unix_timestamp() + "oc_service": utils.unix_timestamp(), + "background_pow_data_collection": utils.unix_timestamp() } self.max_service_inactivity = 600 # seconds @@ -132,6 +134,9 @@ class CloreClient: self.is_hive = get_specs.is_hive() self.use_hive_flightsheet = False + self.hive_miner_interface = hive_miner_interface.hive_interface() + self.next_pow_background_job_send_update = 0 + async def service(self): global container_log_broken @@ -145,10 +150,11 @@ class CloreClient: task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring)) + task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) # Wait for both tasks to complete (they won't in this case) - await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task) + await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) async def monitoring_service(self, monitoring): while True: @@ -522,6 +528,22 @@ class CloreClient: log.debug(f"FAIL | oc_service() | {e}") await asyncio.sleep(2) + async def background_pow_data_collection(self, monitoring): + while True: + try: + await monitoring.put("background_pow_data_collection") + if not self.dont_use_hive_binaries and self.is_hive: + miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False) + if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update > time.time(): + self.next_pow_background_job_send_update = time.time()+(5*60) + current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True) + submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics}) + if not submit_result: + self.next_pow_background_job_send_update = time.time()+40 + except Exception as e: + log.debug(f"FAIL | background_pow_data_collection() | {e}") + await asyncio.sleep(6) + def expire_ws_peers(self): for ws_peer_address in list(self.ws_peers.keys()): ws_peer_info = self.ws_peers[ws_peer_address] diff --git a/lib/hive_miner_interface.py b/lib/hive_miner_interface.py new file mode 100644 index 0000000..3604b6a --- /dev/null +++ b/lib/hive_miner_interface.py @@ -0,0 +1,177 @@ + +import aiofiles +import asyncio +import json +import time +import os + +async def async_run_bash_command(command): + process = await asyncio.create_subprocess_exec( + '/bin/bash', '-c', command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + + return stdout.decode().strip(), stderr.decode().strip(), process.returncode + +async def check_and_read_file(file_path): + try: + if os.path.exists(file_path): + async with aiofiles.open(file_path, mode='r') as file: + contents = await file.read() + return contents + else: + return "fail" + except Exception as e: + return "fail" + +async def get_session_start_time(pid): + try: + async with aiofiles.open(f'/proc/{pid}/stat', 'r') as file: + stat_info = (await file.read()).split() + start_time_ticks = int(stat_info[21]) + + clock_ticks_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK']) + start_time_seconds = start_time_ticks / clock_ticks_per_sec + + boot_time = None + async with aiofiles.open('/proc/stat', 'r') as file: + async for line in file: + if line.startswith('btime'): + boot_time = int(line.split()[1]) + break + + if boot_time is None: + raise ValueError("Failed to find boot time in /proc/stat") + + start_time = (boot_time + start_time_seconds) + return start_time + + except FileNotFoundError: + return None + except Exception as e: + print(f"Error retrieving session start time: {e}") + return None + +async def get_miner_stats(miner_dir, api_timeout=15): + stdout, stderr, code = await async_run_bash_command(f'export API_TIMEOUT={api_timeout}'+''' && read -t $((API_TIMEOUT + 5)) -d "" pid khs stats < <(function run_miner_scripts { echo "$BASHPID"; cd '''+miner_dir+''' || exit; source h-manifest.conf; source h-config.sh; source h-stats.sh; printf "%q\n" "$khs"; echo "$stats"; }; run_miner_scripts) && [[ $? -ge 128 ]] && [[ ! -z "$pid" && "$pid" -ne $$ ]] && kill -9 "$pid" 2>/dev/null ; echo $stats''') + try: + if stdout and not stderr and code == 0: + stats = json.loads(stdout) + return stats + else: + return 'fail' + except Exception as e: + return 'fail' + +async def get_miner_uptime_stats(): + stdout, stderr, code = await async_run_bash_command("screen -ls") + if not stderr and code == 0: + for line in stdout.split('\n'): + if line[:1]=='\t': + if line.split('\t')[1].endswith(".miner"): + miner_screen_pid = line.split('\t')[1].split('.')[0] + if miner_screen_pid.isdigit(): + miner_start_time = await get_session_start_time(miner_screen_pid) + return miner_start_time + return None + +def extract_miner_names(rig_conf): + miner_names=[] + for line in rig_conf.split('\n'): + if '=' in line: + key, value = line.split('=', 1) + if key[:5]=="MINER" and (len(key)==5 or str(key[5:]).isdigit()): + if value.startswith('"') and value.endswith('"'): + value = value.strip('"') + miner_names.append(value) + return miner_names + +def extract_miner_config(miner_names, wallet_conf): + lines = wallet_conf.split('\n') + meta_config = {} + miners = {} + for miner_name in miner_names: + remaining_lines = [] + algos = [] + for line in lines: + if not line.startswith('#') and '=' in line: + key, value = line.split('=', 1) + if value.startswith('"') and value.endswith('"'): + value = value.strip('"') + if value.startswith("'") and value.endswith("'"): + value = value.strip("'") + if key[:len(miner_name.replace('-','_'))+1].lower() == f"{miner_name.replace('-','_')}_".lower(): + if key.split('_')[-1][:4].lower()=="algo": + algos.append(value) + elif key.lower()=="meta": + try: + meta_config=json.loads(value) + except Exception as e: + pass + else: + remaining_lines.append(line) + lines = remaining_lines + miners[miner_name]={ + "algos":algos, + "coins":[] + } + for miner_name in miner_names: + if miner_name in meta_config and type(meta_config[miner_name]) == dict: + for key in meta_config[miner_name].keys(): + if "coin" in key: + miners[miner_name]["coins"].append(meta_config[miner_name][key]) + return miners + +def sum_numbers_in_list(lst): + if all(isinstance(i, (int, float)) for i in lst): + return sum(lst) + else: + return "The list contains non-numeric elements." + +class hive_interface: + def __init__(self): + self.hive_miners_dir = "/hive/miners" + self.hive_rig_config = "/hive-config/rig.conf" + self.hive_wallet_config = "/hive-config/wallet.conf" + + async def get_miners_stats(self, miner_names): + scrape_tasks = [] + for miner_name in miner_names: + scrape_tasks.append(get_miner_stats(os.path.join(self.hive_miners_dir, miner_name))) + results = await asyncio.gather(*scrape_tasks) + return results + + async def get_configured_miners(self): + rig_conf, wallet_conf = await asyncio.gather(*[check_and_read_file(self.hive_rig_config), check_and_read_file(self.hive_wallet_config)]) + miner_names = extract_miner_names(rig_conf) + miner_config = extract_miner_config(miner_names, wallet_conf) + return miner_names, miner_config + + async def export_miner_stats(self, get_hashrates=False): + output = { + "miner_uptime": 0 + } + miner_start_ts = await get_miner_uptime_stats() + if miner_start_ts: + output["miner_uptime"] = int(time.time()-miner_start_ts) + miner_names, miner_config = await self.get_configured_miners() + output["miners"]=miner_config + if get_hashrates: + miners_stats = await self.get_miners_stats(miner_names) + for idx, miner_stats in enumerate(miners_stats): + if type(miner_stats) == dict: + for key in miner_stats.keys(): + if key[:2]=="hs" and (key=="hs" or key[2:].isdigit()): + all_hs = sum_numbers_in_list(miner_stats[key]) + try: + if not "hashrates" in output['miners'][miner_names[idx]]: + output['miners'][miner_names[idx]]["hashrates"]=[] + if isinstance(all_hs, (float, int)): + output['miners'][miner_names[idx]]["hashrates"].append(all_hs) + else: + output['miners'][miner_names[idx]]["hashrates"].append(0) + except Exception as e: + pass + return output \ No newline at end of file