submit hashrates, algos of background pow job
This commit is contained in:
parent
cab037526a
commit
6150cf48cb
|
@ -2,6 +2,7 @@ from lib import config as config_module
|
||||||
from lib import logging as logging_lib
|
from lib import logging as logging_lib
|
||||||
from lib import log_streaming_task
|
from lib import log_streaming_task
|
||||||
from lib import run_startup_script
|
from lib import run_startup_script
|
||||||
|
from lib import hive_miner_interface
|
||||||
from lib import docker_interface
|
from lib import docker_interface
|
||||||
from lib import docker_deploy
|
from lib import docker_deploy
|
||||||
from lib import docker_pull
|
from lib import docker_pull
|
||||||
|
@ -97,7 +98,8 @@ class CloreClient:
|
||||||
"log_streaming_task": utils.unix_timestamp(),
|
"log_streaming_task": utils.unix_timestamp(),
|
||||||
"container_log_streaming_service": utils.unix_timestamp(),
|
"container_log_streaming_service": utils.unix_timestamp(),
|
||||||
"specs_service": utils.unix_timestamp(),
|
"specs_service": utils.unix_timestamp(),
|
||||||
"oc_service": utils.unix_timestamp()
|
"oc_service": utils.unix_timestamp(),
|
||||||
|
"background_pow_data_collection": utils.unix_timestamp()
|
||||||
}
|
}
|
||||||
self.max_service_inactivity = 600 # seconds
|
self.max_service_inactivity = 600 # seconds
|
||||||
|
|
||||||
|
@ -132,6 +134,9 @@ class CloreClient:
|
||||||
self.is_hive = get_specs.is_hive()
|
self.is_hive = get_specs.is_hive()
|
||||||
self.use_hive_flightsheet = False
|
self.use_hive_flightsheet = False
|
||||||
|
|
||||||
|
self.hive_miner_interface = hive_miner_interface.hive_interface()
|
||||||
|
self.next_pow_background_job_send_update = 0
|
||||||
|
|
||||||
async def service(self):
|
async def service(self):
|
||||||
global container_log_broken
|
global container_log_broken
|
||||||
|
|
||||||
|
@ -145,10 +150,11 @@ class CloreClient:
|
||||||
task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
|
task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
|
||||||
task6 = asyncio.create_task(self.specs_service(monitoring))
|
task6 = asyncio.create_task(self.specs_service(monitoring))
|
||||||
task7 = asyncio.create_task(self.oc_service(monitoring))
|
task7 = asyncio.create_task(self.oc_service(monitoring))
|
||||||
|
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
|
||||||
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
|
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
|
||||||
|
|
||||||
# Wait for both tasks to complete (they won't in this case)
|
# Wait for both tasks to complete (they won't in this case)
|
||||||
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task)
|
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task)
|
||||||
|
|
||||||
async def monitoring_service(self, monitoring):
|
async def monitoring_service(self, monitoring):
|
||||||
while True:
|
while True:
|
||||||
|
@ -522,6 +528,22 @@ class CloreClient:
|
||||||
log.debug(f"FAIL | oc_service() | {e}")
|
log.debug(f"FAIL | oc_service() | {e}")
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
async def background_pow_data_collection(self, monitoring):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await monitoring.put("background_pow_data_collection")
|
||||||
|
if not self.dont_use_hive_binaries and self.is_hive:
|
||||||
|
miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False)
|
||||||
|
if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update > time.time():
|
||||||
|
self.next_pow_background_job_send_update = time.time()+(5*60)
|
||||||
|
current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True)
|
||||||
|
submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics})
|
||||||
|
if not submit_result:
|
||||||
|
self.next_pow_background_job_send_update = time.time()+40
|
||||||
|
except Exception as e:
|
||||||
|
log.debug(f"FAIL | background_pow_data_collection() | {e}")
|
||||||
|
await asyncio.sleep(6)
|
||||||
|
|
||||||
def expire_ws_peers(self):
|
def expire_ws_peers(self):
|
||||||
for ws_peer_address in list(self.ws_peers.keys()):
|
for ws_peer_address in list(self.ws_peers.keys()):
|
||||||
ws_peer_info = self.ws_peers[ws_peer_address]
|
ws_peer_info = self.ws_peers[ws_peer_address]
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
|
||||||
|
import aiofiles
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import os
|
||||||
|
|
||||||
|
async def async_run_bash_command(command):
|
||||||
|
process = await asyncio.create_subprocess_exec(
|
||||||
|
'/bin/bash', '-c', command,
|
||||||
|
stdout=asyncio.subprocess.PIPE,
|
||||||
|
stderr=asyncio.subprocess.PIPE
|
||||||
|
)
|
||||||
|
stdout, stderr = await process.communicate()
|
||||||
|
|
||||||
|
return stdout.decode().strip(), stderr.decode().strip(), process.returncode
|
||||||
|
|
||||||
|
async def check_and_read_file(file_path):
|
||||||
|
try:
|
||||||
|
if os.path.exists(file_path):
|
||||||
|
async with aiofiles.open(file_path, mode='r') as file:
|
||||||
|
contents = await file.read()
|
||||||
|
return contents
|
||||||
|
else:
|
||||||
|
return "fail"
|
||||||
|
except Exception as e:
|
||||||
|
return "fail"
|
||||||
|
|
||||||
|
async def get_session_start_time(pid):
|
||||||
|
try:
|
||||||
|
async with aiofiles.open(f'/proc/{pid}/stat', 'r') as file:
|
||||||
|
stat_info = (await file.read()).split()
|
||||||
|
start_time_ticks = int(stat_info[21])
|
||||||
|
|
||||||
|
clock_ticks_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
|
||||||
|
start_time_seconds = start_time_ticks / clock_ticks_per_sec
|
||||||
|
|
||||||
|
boot_time = None
|
||||||
|
async with aiofiles.open('/proc/stat', 'r') as file:
|
||||||
|
async for line in file:
|
||||||
|
if line.startswith('btime'):
|
||||||
|
boot_time = int(line.split()[1])
|
||||||
|
break
|
||||||
|
|
||||||
|
if boot_time is None:
|
||||||
|
raise ValueError("Failed to find boot time in /proc/stat")
|
||||||
|
|
||||||
|
start_time = (boot_time + start_time_seconds)
|
||||||
|
return start_time
|
||||||
|
|
||||||
|
except FileNotFoundError:
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error retrieving session start time: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_miner_stats(miner_dir, api_timeout=15):
|
||||||
|
stdout, stderr, code = await async_run_bash_command(f'export API_TIMEOUT={api_timeout}'+''' && read -t $((API_TIMEOUT + 5)) -d "" pid khs stats < <(function run_miner_scripts { echo "$BASHPID"; cd '''+miner_dir+''' || exit; source h-manifest.conf; source h-config.sh; source h-stats.sh; printf "%q\n" "$khs"; echo "$stats"; }; run_miner_scripts) && [[ $? -ge 128 ]] && [[ ! -z "$pid" && "$pid" -ne $$ ]] && kill -9 "$pid" 2>/dev/null ; echo $stats''')
|
||||||
|
try:
|
||||||
|
if stdout and not stderr and code == 0:
|
||||||
|
stats = json.loads(stdout)
|
||||||
|
return stats
|
||||||
|
else:
|
||||||
|
return 'fail'
|
||||||
|
except Exception as e:
|
||||||
|
return 'fail'
|
||||||
|
|
||||||
|
async def get_miner_uptime_stats():
|
||||||
|
stdout, stderr, code = await async_run_bash_command("screen -ls")
|
||||||
|
if not stderr and code == 0:
|
||||||
|
for line in stdout.split('\n'):
|
||||||
|
if line[:1]=='\t':
|
||||||
|
if line.split('\t')[1].endswith(".miner"):
|
||||||
|
miner_screen_pid = line.split('\t')[1].split('.')[0]
|
||||||
|
if miner_screen_pid.isdigit():
|
||||||
|
miner_start_time = await get_session_start_time(miner_screen_pid)
|
||||||
|
return miner_start_time
|
||||||
|
return None
|
||||||
|
|
||||||
|
def extract_miner_names(rig_conf):
|
||||||
|
miner_names=[]
|
||||||
|
for line in rig_conf.split('\n'):
|
||||||
|
if '=' in line:
|
||||||
|
key, value = line.split('=', 1)
|
||||||
|
if key[:5]=="MINER" and (len(key)==5 or str(key[5:]).isdigit()):
|
||||||
|
if value.startswith('"') and value.endswith('"'):
|
||||||
|
value = value.strip('"')
|
||||||
|
miner_names.append(value)
|
||||||
|
return miner_names
|
||||||
|
|
||||||
|
def extract_miner_config(miner_names, wallet_conf):
|
||||||
|
lines = wallet_conf.split('\n')
|
||||||
|
meta_config = {}
|
||||||
|
miners = {}
|
||||||
|
for miner_name in miner_names:
|
||||||
|
remaining_lines = []
|
||||||
|
algos = []
|
||||||
|
for line in lines:
|
||||||
|
if not line.startswith('#') and '=' in line:
|
||||||
|
key, value = line.split('=', 1)
|
||||||
|
if value.startswith('"') and value.endswith('"'):
|
||||||
|
value = value.strip('"')
|
||||||
|
if value.startswith("'") and value.endswith("'"):
|
||||||
|
value = value.strip("'")
|
||||||
|
if key[:len(miner_name.replace('-','_'))+1].lower() == f"{miner_name.replace('-','_')}_".lower():
|
||||||
|
if key.split('_')[-1][:4].lower()=="algo":
|
||||||
|
algos.append(value)
|
||||||
|
elif key.lower()=="meta":
|
||||||
|
try:
|
||||||
|
meta_config=json.loads(value)
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
remaining_lines.append(line)
|
||||||
|
lines = remaining_lines
|
||||||
|
miners[miner_name]={
|
||||||
|
"algos":algos,
|
||||||
|
"coins":[]
|
||||||
|
}
|
||||||
|
for miner_name in miner_names:
|
||||||
|
if miner_name in meta_config and type(meta_config[miner_name]) == dict:
|
||||||
|
for key in meta_config[miner_name].keys():
|
||||||
|
if "coin" in key:
|
||||||
|
miners[miner_name]["coins"].append(meta_config[miner_name][key])
|
||||||
|
return miners
|
||||||
|
|
||||||
|
def sum_numbers_in_list(lst):
|
||||||
|
if all(isinstance(i, (int, float)) for i in lst):
|
||||||
|
return sum(lst)
|
||||||
|
else:
|
||||||
|
return "The list contains non-numeric elements."
|
||||||
|
|
||||||
|
class hive_interface:
|
||||||
|
def __init__(self):
|
||||||
|
self.hive_miners_dir = "/hive/miners"
|
||||||
|
self.hive_rig_config = "/hive-config/rig.conf"
|
||||||
|
self.hive_wallet_config = "/hive-config/wallet.conf"
|
||||||
|
|
||||||
|
async def get_miners_stats(self, miner_names):
|
||||||
|
scrape_tasks = []
|
||||||
|
for miner_name in miner_names:
|
||||||
|
scrape_tasks.append(get_miner_stats(os.path.join(self.hive_miners_dir, miner_name)))
|
||||||
|
results = await asyncio.gather(*scrape_tasks)
|
||||||
|
return results
|
||||||
|
|
||||||
|
async def get_configured_miners(self):
|
||||||
|
rig_conf, wallet_conf = await asyncio.gather(*[check_and_read_file(self.hive_rig_config), check_and_read_file(self.hive_wallet_config)])
|
||||||
|
miner_names = extract_miner_names(rig_conf)
|
||||||
|
miner_config = extract_miner_config(miner_names, wallet_conf)
|
||||||
|
return miner_names, miner_config
|
||||||
|
|
||||||
|
async def export_miner_stats(self, get_hashrates=False):
|
||||||
|
output = {
|
||||||
|
"miner_uptime": 0
|
||||||
|
}
|
||||||
|
miner_start_ts = await get_miner_uptime_stats()
|
||||||
|
if miner_start_ts:
|
||||||
|
output["miner_uptime"] = int(time.time()-miner_start_ts)
|
||||||
|
miner_names, miner_config = await self.get_configured_miners()
|
||||||
|
output["miners"]=miner_config
|
||||||
|
if get_hashrates:
|
||||||
|
miners_stats = await self.get_miners_stats(miner_names)
|
||||||
|
for idx, miner_stats in enumerate(miners_stats):
|
||||||
|
if type(miner_stats) == dict:
|
||||||
|
for key in miner_stats.keys():
|
||||||
|
if key[:2]=="hs" and (key=="hs" or key[2:].isdigit()):
|
||||||
|
all_hs = sum_numbers_in_list(miner_stats[key])
|
||||||
|
try:
|
||||||
|
if not "hashrates" in output['miners'][miner_names[idx]]:
|
||||||
|
output['miners'][miner_names[idx]]["hashrates"]=[]
|
||||||
|
if isinstance(all_hs, (float, int)):
|
||||||
|
output['miners'][miner_names[idx]]["hashrates"].append(all_hs)
|
||||||
|
else:
|
||||||
|
output['miners'][miner_names[idx]]["hashrates"].append(0)
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
return output
|
Loading…
Reference in New Issue