648 lines
34 KiB
Python
648 lines
34 KiB
Python
from lib import config as config_module
|
|
from lib import logging as logging_lib
|
|
from lib import nvidia_driver_update
|
|
from lib import log_streaming_task
|
|
from lib import run_startup_script
|
|
from lib import hive_miner_interface
|
|
from lib import docker_interface
|
|
from lib import background_job
|
|
from lib import docker_deploy
|
|
from lib import clore_partner
|
|
from lib import clore_partner_socket
|
|
from lib import docker_pull
|
|
from lib import get_specs
|
|
from lib import utils
|
|
from lib import nvml
|
|
log = logging_lib.log
|
|
|
|
from clore_hosting import docker_configurator
|
|
from clore_hosting import api_interface
|
|
from clore_hosting import ws_interface
|
|
from clore_hosting import types
|
|
|
|
from queue import Queue
|
|
import concurrent.futures
|
|
import threading
|
|
import asyncio
|
|
import time
|
|
import json
|
|
from aiofiles import os as async_os
|
|
import aiofiles
|
|
import os
|
|
|
|
specs = get_specs.Specs()
|
|
|
|
container_log_broken = asyncio.Queue()
|
|
|
|
config = config_module.config
|
|
WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_broken)
|
|
|
|
#print(config)
|
|
|
|
async def configure_networks(containers, partner_forwarding_ips):
|
|
res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips)
|
|
try:
|
|
fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2])
|
|
return fin_res
|
|
except Exception as e:
|
|
return False
|
|
|
|
async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads):
|
|
try:
|
|
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads)
|
|
return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
|
|
except Exception as e:
|
|
return False
|
|
|
|
async def get_local_images(no_latest_tag = False):
|
|
res = await asyncio.to_thread(docker_interface.get_local_images, no_latest_tag)
|
|
return res
|
|
|
|
async def set_oc(settings):
|
|
try:
|
|
result = await asyncio.to_thread(nvml.set_oc, settings)
|
|
return result
|
|
except Exception as e:
|
|
log.error(f"set_oc() | error | {e}")
|
|
return False
|
|
|
|
async def set_hive_miner_status(enabled=False):
|
|
try:
|
|
result = await asyncio.to_thread(utils.hive_set_miner_status, enabled)
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"set_hive_miner_status() | error | {e}")
|
|
return False
|
|
|
|
class CloreClient:
|
|
def __init__(self, auth_key, xfs_state):
|
|
self.auth_key=auth_key
|
|
self.xfs_state = xfs_state
|
|
|
|
self.ws_peers = {}
|
|
self.last_checked_ws_peers=0
|
|
self.containers={}
|
|
self.needed_images=[]
|
|
self.containers_set=False
|
|
|
|
self.allowed_images = None
|
|
|
|
self.p_needed_containers=[]
|
|
self.last_pull_progress={}
|
|
|
|
self.validated_containers_set=False
|
|
self.validated_containers=[]
|
|
|
|
self.all_running_container_names=[]
|
|
self.all_stopped_container_names=[]
|
|
|
|
self.last_hw_specs_submit = time.time()-(1800-60)
|
|
|
|
self.last_service_heartbeat = {
|
|
"main": utils.unix_timestamp(),
|
|
"handle_container_cache": utils.unix_timestamp(),
|
|
"startup_script_runner": utils.unix_timestamp(),
|
|
"log_streaming_task": utils.unix_timestamp(),
|
|
"container_log_streaming_service": utils.unix_timestamp(),
|
|
"specs_service": utils.unix_timestamp(),
|
|
"oc_service": utils.unix_timestamp(),
|
|
"background_pow_data_collection": utils.unix_timestamp(),
|
|
"partner_service": utils.unix_timestamp()
|
|
}
|
|
self.max_service_inactivity = 600 # seconds
|
|
self.no_restart_services = ["partner_service", "specs_service"] # Services that are allowed to run indefinetly without triggering the app to restart
|
|
|
|
if config.debug_ws_peer:
|
|
self.ws_peers[str(config.debug_ws_peer)]={
|
|
"expiration":"immune"
|
|
}
|
|
|
|
self.os_release = get_specs.get_os_release()
|
|
self.restart_docker = False
|
|
if "use_cgroupfs" in self.os_release:
|
|
self.updated_exec_opts = True if docker_interface.configure_exec_opts("native.cgroupdriver","cgroupfs") else False
|
|
if self.updated_exec_opts:
|
|
docker_info = docker_interface.get_info()
|
|
if "CgroupDriver" in docker_info and docker_info["CgroupDriver"]=="systemd":
|
|
self.restart_docker = True # Restart docker when it's loaded under systemd (accual restart will happen only if no orders running to not disrupt workload)
|
|
|
|
docker_interface.verify_docker_version()
|
|
|
|
self.dont_use_hive_binaries = True if 'DONT_USE_HIVE_BINARIES' in os.environ else False
|
|
|
|
nvml.init(allow_hive_binaries=not self.dont_use_hive_binaries)
|
|
|
|
self.extra_allowed_images = utils.get_extra_allowed_images()
|
|
self.allowed_running_containers = utils.get_allowed_container_names()
|
|
|
|
self.gpu_oc_specs = nvml.get_gpu_oc_specs()
|
|
self.last_oc_service_submit = 0
|
|
self.last_applied_oc = {}
|
|
self.last_oc_apply_time = 0
|
|
|
|
self.is_hive = get_specs.is_hive()
|
|
self.use_hive_flightsheet = False
|
|
|
|
self.hive_miner_interface = hive_miner_interface.hive_interface()
|
|
self.next_pow_background_job_send_update = 0
|
|
|
|
self.clore_partner_initiazized = False
|
|
self.partner_forwarding_ips = []
|
|
self.start_time = utils.unix_timestamp()
|
|
|
|
self.runned_pull_selftest = False
|
|
|
|
WebSocketClient.set_gpu_list(nvml.get_gpu_name_list())
|
|
WebSocketClient.set_is_hive(self.is_hive)
|
|
|
|
async def service(self):
|
|
global container_log_broken
|
|
|
|
pull_list = asyncio.Queue()
|
|
monitoring = asyncio.Queue()
|
|
|
|
task1 = asyncio.create_task(self.main(pull_list, monitoring))
|
|
task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring))
|
|
task3 = asyncio.create_task(self.startup_script_runner(monitoring))
|
|
task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring, self.allowed_running_containers))
|
|
task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
|
|
task6 = asyncio.create_task(self.specs_service(monitoring))
|
|
task7 = asyncio.create_task(self.oc_service(monitoring))
|
|
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
|
|
task9 = asyncio.create_task(self.partner_service(monitoring))
|
|
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
|
|
driver_update_task = asyncio.create_task(nvidia_driver_update.update_loop(self.is_hive))
|
|
|
|
# Wait for both tasks to complete (they won't in this case)
|
|
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task, driver_update_task)
|
|
|
|
async def monitoring_service(self, monitoring):
|
|
while True:
|
|
try:
|
|
monitoring_data = []
|
|
while not monitoring.empty():
|
|
monitoring_data.append(await monitoring.get())
|
|
if len(monitoring_data)>0:
|
|
unique_monitoring = list(set(monitoring_data))
|
|
for service_name in unique_monitoring:
|
|
self.last_service_heartbeat[service_name]=utils.unix_timestamp()
|
|
if config.debug:
|
|
log.success(self.last_service_heartbeat)
|
|
for service_name in self.last_service_heartbeat.keys():
|
|
if not service_name in self.no_restart_services:
|
|
last_hearthbeat = self.last_service_heartbeat[service_name]
|
|
if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache":
|
|
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
|
|
os._exit(1)
|
|
elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache":
|
|
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
|
|
os._exit(1)
|
|
except Exception as e:
|
|
log.debug(f"monitoring_service() | ERROR | {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
async def container_log_streaming_service(self, monitoring):
|
|
while True:
|
|
try:
|
|
await monitoring.put("container_log_streaming_service")
|
|
await WebSocketClient.stream_container_logs()
|
|
except Exception as e:
|
|
log.debug(f"container_log_streaming_service() | ERROR | {e}")
|
|
await asyncio.sleep(0.6)
|
|
async def run_startup_scripts(self, startup_script_full_path, container_name):
|
|
try:
|
|
if config.debug:
|
|
log.success(f"Runnin' {startup_script_full_path}")
|
|
log.error(self.all_running_container_names)
|
|
await asyncio.to_thread(run_startup_script.run, container_name, startup_script_full_path, f"/init-{container_name}.sh")
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
async def startup_script_runner(self, monitoring):
|
|
|
|
startup_script_ongoing_tasks = {}
|
|
|
|
while True:
|
|
try:
|
|
await monitoring.put("startup_script_runner")
|
|
startup_script_files = await async_os.listdir(config.startup_scripts_folder)
|
|
for startup_script_file in startup_script_files:
|
|
if type(startup_script_file)==str and startup_script_file.endswith(".sh") and startup_script_file[:-3] in self.all_running_container_names:
|
|
if not f"{startup_script_file[:-3]}.finished" in startup_script_files:
|
|
full_startup_script_path = os.path.join(config.startup_scripts_folder, startup_script_file)
|
|
if os.path.isfile(full_startup_script_path) and full_startup_script_path not in startup_script_ongoing_tasks:
|
|
# Start processing the file immediately in a non-blocking way
|
|
startup_script_task = asyncio.create_task(self.run_startup_scripts(full_startup_script_path, startup_script_file[:-3]))
|
|
startup_script_ongoing_tasks[full_startup_script_path] = startup_script_task
|
|
# Attach a callback to clean up the task once it's done
|
|
startup_script_task.add_done_callback(lambda t, path=full_startup_script_path: startup_script_ongoing_tasks.pop(path, None))
|
|
|
|
# Remove completed tasks
|
|
completed_tasks = [path for path, task in startup_script_ongoing_tasks.items() if task.done()]
|
|
for path in completed_tasks:
|
|
startup_script_ongoing_tasks.pop(path, None)
|
|
except Exception as e:
|
|
log.debug(f"ERROR | startup_script_runner() | {e}")
|
|
await asyncio.sleep(2)
|
|
|
|
async def pull_log_progress(self, log_dict, image_name):
|
|
while True:
|
|
tmp_progress=''
|
|
for layer, status in log_dict.items():
|
|
first_char = '' if tmp_progress=='' else '\n'
|
|
tmp_progress+=f"{first_char}{layer}: {status}"
|
|
self.last_pull_progress[image_name]={"log":tmp_progress, "last_update":time.time()}
|
|
await asyncio.sleep(config.pull_log_streaming_interval/1000)
|
|
|
|
async def check_if_pulling_required(self, pulling_image, cancellation_event):
|
|
while True:
|
|
try:
|
|
matched_image = False
|
|
for needed_image in self.needed_images:
|
|
if "image" in needed_image and needed_image["image"]==pulling_image:
|
|
matched_image=True
|
|
break
|
|
|
|
if not matched_image:
|
|
cancellation_event.set()
|
|
except Exception as e:
|
|
pass
|
|
await asyncio.sleep(0.5)
|
|
|
|
async def handle_container_cache(self, pull_list, monitoring):
|
|
while True:
|
|
got_data = []
|
|
while not pull_list.empty():
|
|
got_data.append(await pull_list.get())
|
|
await monitoring.put("handle_container_cache")
|
|
if len(got_data)>0:
|
|
self.p_needed_containers=got_data[len(got_data)-1]
|
|
|
|
if len(self.p_needed_containers)>0:
|
|
local_images = await get_local_images(no_latest_tag=True)
|
|
partner_images = await clore_partner.get_partner_allowed_images()
|
|
for local_image in local_images:
|
|
self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()}
|
|
image_needed = False
|
|
removed_cnt = 0
|
|
try:
|
|
for p_needed_container in self.p_needed_containers:
|
|
if "image" in p_needed_container and local_image.replace(':latest','')==p_needed_container["image"].replace(':latest',''):
|
|
image_needed=True
|
|
break
|
|
if type(self.allowed_images)==list:
|
|
if image_needed==False:
|
|
after_split = local_image.split(':', 1)
|
|
local_image_name = after_split[0]
|
|
local_image_tag = ''
|
|
if len(after_split)>1:
|
|
local_image_tag=after_split[1]
|
|
|
|
for allowed_image in self.allowed_images:
|
|
if "repository" in allowed_image and "allowed_tags" in allowed_image and allowed_image["repository"]==local_image_name:
|
|
for allowed_tag in allowed_image["allowed_tags"]:
|
|
if local_image_tag==allowed_tag or allowed_tag=='*':
|
|
image_needed=True
|
|
break
|
|
if not image_needed and type(partner_images) == list:
|
|
for partner_image in partner_images:
|
|
if local_image.replace(':latest', '') == partner_image.replace(':latest', ''):
|
|
image_needed = True
|
|
del self.last_pull_progress[local_image]
|
|
break
|
|
if len(local_image.split('/')) >= 3:
|
|
partner_image_spl = partner_image.split(':')
|
|
image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1]
|
|
if len(partner_image_spl) == 1:
|
|
if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest":
|
|
image_needed = True
|
|
del self.last_pull_progress[local_image]
|
|
break
|
|
elif len(partner_image_spl) == 2:
|
|
if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''):
|
|
image_needed = True
|
|
del self.last_pull_progress[local_image]
|
|
break
|
|
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None:
|
|
log.success(f"GOING TO REMOVE {local_image}")
|
|
with concurrent.futures.ThreadPoolExecutor() as pool:
|
|
r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image)
|
|
if r:
|
|
removed_cnt+=1
|
|
del self.last_pull_progress[local_image]
|
|
#if config.debug:
|
|
# log.success(f"{local_image} | {image_needed}")
|
|
|
|
except Exception as e:
|
|
image_needed=True
|
|
log.debug(f"ERROR | image_needed | {e}")
|
|
|
|
for lpp_image in self.last_pull_progress.keys():
|
|
log_info = self.last_pull_progress[lpp_image]
|
|
if log_info["last_update"] < time.time()-300:
|
|
del self.last_pull_progress[lpp_image]
|
|
break
|
|
most_recent_wanted_state = self.p_needed_containers
|
|
for wanted_image in most_recent_wanted_state:
|
|
if not wanted_image["image"] in local_images:
|
|
print("Local", local_images)
|
|
print("W",wanted_image)
|
|
log.debug(f"Starting to pull \"{wanted_image}\"")
|
|
|
|
auth_config = {}
|
|
if "dockerhub_token" in wanted_image and "dockerhub_user" in wanted_image:
|
|
auth_config={
|
|
"username": wanted_image["dockerhub_user"],
|
|
"password": wanted_image["dockerhub_token"]
|
|
}
|
|
|
|
log_dict = {}
|
|
pull_cancellation_event = asyncio.Event()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Run the image pull, log progress concurrently and cancel if not needed anymore
|
|
pull_task = asyncio.create_task(docker_pull.pull_image(wanted_image["image"], auth_config, log_dict, loop, pull_cancellation_event))
|
|
log_task = asyncio.create_task(self.pull_log_progress(log_dict, wanted_image["image"]))
|
|
check_if_pulling_required_task = asyncio.create_task(self.check_if_pulling_required(wanted_image["image"], pull_cancellation_event))
|
|
|
|
# Wait for the image pull to complete, then cancel the log progress task
|
|
try:
|
|
await pull_task
|
|
except Exception as e:
|
|
self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()}
|
|
log_task.cancel()
|
|
try:
|
|
await log_task
|
|
except asyncio.CancelledError:
|
|
# Expect the task to be cancelled, so pass here
|
|
pass
|
|
check_if_pulling_required_task.cancel()
|
|
try:
|
|
await check_if_pulling_required_task
|
|
except asyncio.CancelledError:
|
|
# Expect the task to be cancelled, so pass here
|
|
pass
|
|
await asyncio.sleep(1)
|
|
|
|
async def main(self, pull_list, monitoring):
|
|
step=0
|
|
while True:
|
|
try:
|
|
step+=1
|
|
|
|
await monitoring.put("main")
|
|
|
|
if config.debug:
|
|
print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '')
|
|
|
|
tasks = []
|
|
running_order = False
|
|
|
|
container_conf = WebSocketClient.get_containers()
|
|
|
|
can_run_partner_workloads = False
|
|
|
|
if container_conf[0]:
|
|
self.containers_set=True
|
|
self.containers=container_conf[1]
|
|
tmp_images = []
|
|
|
|
is_order_spot = False
|
|
|
|
for idx, container in enumerate(self.containers):
|
|
if "spot" in container:
|
|
is_order_spot = True
|
|
if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet":
|
|
log_pull = False
|
|
if "name" in container:
|
|
if "-order-" in container["name"]:
|
|
running_order=True
|
|
log_pull=True
|
|
image_config = {
|
|
"image":container["image"],
|
|
"log":log_pull
|
|
}
|
|
if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '":
|
|
try:
|
|
dockerhub_token = container["ip"][8:].split("'")[0]
|
|
dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17]
|
|
image_config["dockerhub_token"]=dockerhub_token
|
|
image_config["dockerhub_user"]=dockerhub_user
|
|
except Exception as e:
|
|
log.error(e)
|
|
pass
|
|
|
|
if not image_config in tmp_images:
|
|
tmp_images.append(image_config)
|
|
|
|
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
|
|
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
|
|
|
|
if not running_order and self.xfs_state == "disabled":
|
|
async with aiofiles.open("/opt/clore-hosting/xfs_state", mode='w') as file:
|
|
await file.write("enabled")
|
|
log.info("No order running, requesting XFS migration")
|
|
os._exit(0)
|
|
|
|
if self.restart_docker and not running_order and len(self.containers)>0:
|
|
log.debug("Sending docker restart command")
|
|
utils.run_command_v2("systemctl restart docker")
|
|
self.restart_docker=False
|
|
|
|
if tmp_images!=self.needed_images:
|
|
self.needed_images=tmp_images
|
|
await pull_list.put(self.needed_images)
|
|
#self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'})
|
|
|
|
if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)):
|
|
tasks.append(api_interface.get_server_config())
|
|
|
|
if self.containers_set:
|
|
tasks.append(configure_networks(self.containers, self.partner_forwarding_ips))
|
|
tasks.append(WebSocketClient.stream_pull_logs())
|
|
|
|
if self.validated_containers_set:
|
|
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads))
|
|
|
|
if step==1:
|
|
WebSocketClient.set_auth(self.auth_key, self.xfs_state)
|
|
asyncio.create_task(WebSocketClient.run())
|
|
elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval):
|
|
log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds")
|
|
log.error("CLORE HOSTING | exiting ...")
|
|
os._exit(1)
|
|
|
|
self.expire_ws_peers()
|
|
WebSocketClient.set_ws_peers(self.ws_peers)
|
|
WebSocketClient.set_pull_logs(self.last_pull_progress)
|
|
|
|
if len(tasks)>0:
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
# Process the results (optional)
|
|
for result in results:
|
|
if type(result)==types.ServerConfig:
|
|
if result.success:
|
|
self.last_checked_ws_peers = utils.unix_timestamp()
|
|
self.allowed_images=result.allowed_images+self.extra_allowed_images
|
|
if self.xfs_state == "active":
|
|
self.allowed_images.append({
|
|
"repository": "vastai/test",
|
|
"allowed_tags": ["bandwidth-test-nvidia", "selftest"]
|
|
})
|
|
if not config.debug_ws_peer:
|
|
for pure_ws_peer in result.ws_peers:
|
|
self.ws_peers[pure_ws_peer]={
|
|
"expiration":utils.unix_timestamp()+900
|
|
}
|
|
elif self.allowed_images==None:
|
|
log.error("Can't contact clore.ai, restarting")
|
|
os._exit(1)
|
|
elif type(result)==types.DockerConfiguratorRes:
|
|
if result.validation_and_security:
|
|
self.validated_containers_set=True
|
|
self.validated_containers = result.valid_containers
|
|
self.use_hive_flightsheet = result.use_hive_flightsheet
|
|
log.debug(f"Use Hive flightsheet: {result.use_hive_flightsheet}")
|
|
elif type(result)==types.DeployContainersRes:
|
|
try:
|
|
self.all_running_container_names = result.all_running_container_names
|
|
self.all_stopped_container_names = result.all_stopped_container_names
|
|
except Exception as e:
|
|
pass
|
|
except Exception as e:
|
|
log.debug(f"main() | ERROR | {e}")
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
async def submit_specs(self, current_specs):
|
|
try:
|
|
if type(current_specs) == dict:
|
|
current_specs["backend_version"]=21
|
|
current_specs["update_hw"]=True
|
|
smallest_pcie_width = 999
|
|
for gpu in current_specs["gpus"]["nvidia"]:
|
|
if "pcie_width" in gpu and gpu["pcie_width"]<smallest_pcie_width:
|
|
smallest_pcie_width=gpu["pcie_width"]
|
|
current_specs["pcie_width"]=gpu["pcie_width"]
|
|
current_specs["pcie_rev"]=gpu["pcie_revision"]
|
|
await WebSocketClient.send(current_specs)
|
|
except Exception as e:
|
|
log.debug(f"FAIL | submit_specs() | {e}")
|
|
|
|
async def update_realtime_data(self, current_specs):
|
|
try:
|
|
if type(current_specs) == dict:
|
|
cpu_usage = await get_specs.get_cpu_usage()
|
|
ram_usage = await get_specs.get_ram_usage()
|
|
gpu_list = current_specs["gpus"]["nvidia"]+current_specs["gpus"]["amd"]
|
|
submit_document = {
|
|
"update_realtime_data":True,
|
|
"gpus": gpu_list,
|
|
"cpu": cpu_usage,
|
|
"ram": ram_usage.percent,
|
|
"all_running_container_names": self.all_running_container_names,
|
|
"all_stopped_container_names": self.all_stopped_container_names
|
|
}
|
|
await WebSocketClient.send(submit_document)
|
|
except Exception as e:
|
|
log.debug(f"FAIL | update_realtime_data() | {e}")
|
|
|
|
async def specs_service(self, monitoring):
|
|
while True:
|
|
try:
|
|
await monitoring.put("specs_service")
|
|
current_specs = await specs.get()
|
|
if self.last_hw_specs_submit < (utils.unix_timestamp()-1800):
|
|
self.last_hw_specs_submit=utils.unix_timestamp()
|
|
await self.submit_specs(current_specs)
|
|
await self.update_realtime_data(current_specs)
|
|
try:
|
|
if self.xfs_state == "active" and len(current_specs["gpus"]["nvidia"]) > 0 and not self.runned_pull_selftest:
|
|
await clore_partner.check_to_pull_selftest(current_specs)
|
|
self.runned_pull_selftest = True
|
|
except Exception as partner_exception:
|
|
pass
|
|
except Exception as e:
|
|
log.debug(f"FAIL | specs_service() | {e}")
|
|
await asyncio.sleep(7)
|
|
|
|
async def oc_service(self, monitoring):
|
|
while True:
|
|
try:
|
|
await monitoring.put("oc_service")
|
|
oc_apply_allowed = True
|
|
### OC Service should also hande Hive stuff
|
|
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled():
|
|
await set_hive_miner_status(True)
|
|
oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
|
|
elif self.is_hive and not self.dont_use_hive_binaries:
|
|
await set_hive_miner_status(False)
|
|
### Run OC tasks
|
|
oc_conf = WebSocketClient.get_oc()
|
|
if oc_conf[0] and type(self.gpu_oc_specs)==list and oc_conf[1]!=self.gpu_oc_specs and self.last_oc_service_submit+240 < utils.unix_timestamp():
|
|
log.debug("Submitting \"gpu_oc_specs\"")
|
|
self.last_oc_service_submit = utils.unix_timestamp()
|
|
await WebSocketClient.send({
|
|
"set_gpu_info":self.gpu_oc_specs,
|
|
"xorg_valid": True
|
|
})
|
|
if oc_conf[0] and type(oc_conf[2])==dict and oc_apply_allowed:
|
|
if utils.normalize_rule(self.last_applied_oc)!=utils.normalize_rule(oc_conf[2]) or (self.last_oc_apply_time < utils.unix_timestamp()-300):
|
|
self.last_oc_apply_time = utils.unix_timestamp()
|
|
log.debug(f"Applying OC | {json.dumps(oc_conf[2], separators=(',',':'))}")
|
|
await set_oc(oc_conf[2])
|
|
self.last_applied_oc=oc_conf[2]
|
|
except Exception as e:
|
|
log.debug(f"FAIL | oc_service() | {e}")
|
|
await asyncio.sleep(2)
|
|
|
|
async def background_pow_data_collection(self, monitoring):
|
|
while True:
|
|
try:
|
|
await monitoring.put("background_pow_data_collection")
|
|
if not self.dont_use_hive_binaries and self.is_hive:
|
|
miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False)
|
|
if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update < time.time():
|
|
self.next_pow_background_job_send_update = time.time()+(5*60)
|
|
current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True)
|
|
submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics})
|
|
if not submit_result:
|
|
self.next_pow_background_job_send_update = time.time()+40
|
|
except Exception as e:
|
|
log.debug(f"FAIL | background_pow_data_collection() | {e}")
|
|
await asyncio.sleep(6)
|
|
|
|
async def partner_service(self, monitoring):
|
|
while True:
|
|
try:
|
|
await monitoring.put("partner_service")
|
|
if self.start_time < utils.unix_timestamp() - 180:
|
|
forwarding_latency_measurment = await clore_partner.measure_forwarding_latency()
|
|
if type(forwarding_latency_measurment) == list:
|
|
await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment)
|
|
partner_config = WebSocketClient.get_clore_partner_config()
|
|
if partner_config != None:
|
|
if self.clore_partner_initiazized == False:
|
|
ir = await clore_partner.initialize()
|
|
if ir:
|
|
self.clore_partner_initiazized = True
|
|
if self.clore_partner_initiazized == True:
|
|
if 'provider' in partner_config and 'forwarding' in partner_config:
|
|
self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']]
|
|
else:
|
|
self.partner_forwarding_ips = []
|
|
await clore_partner.configure(partner_config)
|
|
except Exception as e:
|
|
log.debug(f"FAIL | partner_service() | {e}")
|
|
await asyncio.sleep(6)
|
|
|
|
def expire_ws_peers(self):
|
|
for ws_peer_address in list(self.ws_peers.keys()):
|
|
ws_peer_info = self.ws_peers[ws_peer_address]
|
|
try:
|
|
if ws_peer_info["expiration"]!="immune" and (ws_peer_info["expiration"] < utils.unix_timestamp()):
|
|
self.ws_peers.pop(ws_peer_address, None)
|
|
except Exception as e:
|
|
pass |