508 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			508 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
| from lib import config as config_module
 | |
| from lib import logging as logging_lib
 | |
| from lib import log_streaming_task
 | |
| from lib import run_startup_script
 | |
| from lib import docker_interface
 | |
| from lib import docker_deploy
 | |
| from lib import docker_pull
 | |
| from lib import get_specs
 | |
| from lib import utils
 | |
| from lib import nvml
 | |
| log = logging_lib.log
 | |
| 
 | |
| from clore_hosting import docker_configurator
 | |
| from clore_hosting import api_interface
 | |
| from clore_hosting import ws_interface
 | |
| from clore_hosting import types
 | |
| 
 | |
| from queue import Queue
 | |
| import concurrent.futures
 | |
| import threading
 | |
| import asyncio
 | |
| import time
 | |
| import json
 | |
| from aiofiles import os as async_os
 | |
| import os
 | |
| 
 | |
| specs = get_specs.Specs()
 | |
| 
 | |
| container_log_broken = asyncio.Queue()
 | |
| 
 | |
| config = config_module.config
 | |
| WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_broken)
 | |
| 
 | |
| #print(config)
 | |
| 
 | |
| async def configure_networks(containers):
 | |
|     res = await asyncio.to_thread(docker_configurator.configure, containers)
 | |
|     try:
 | |
|         fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2])
 | |
|         return fin_res
 | |
|     except Exception as e:
 | |
|         return False
 | |
|     
 | |
| async def deploy_containers(validated_containers):
 | |
|     try:
 | |
|         all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers)
 | |
|         return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
 | |
|     except Exception as e:
 | |
|         return False
 | |
| 
 | |
| async def get_local_images(no_latest_tag = False):
 | |
|     res = await asyncio.to_thread(docker_interface.get_local_images, no_latest_tag)
 | |
|     return res
 | |
| 
 | |
| async def set_oc(settings):
 | |
|     try:
 | |
|         result = await asyncio.to_thread(nvml.set_oc, settings)
 | |
|         return result
 | |
|     except Exception as e:
 | |
|         log.error(f"set_oc() | error | {e}")
 | |
|         return False
 | |
|     
 | |
| async def set_hive_miner_status(enabled=False):
 | |
|     try:
 | |
|         result = await asyncio.to_thread(utils.hive_set_miner_status, enabled)
 | |
|         return True
 | |
|     except Exception as e:
 | |
|         log.error(f"set_hive_miner_status() | error | {e}")
 | |
|         return False
 | |
| 
 | |
| class CloreClient:
 | |
|     def __init__(self, auth_key):
 | |
|         self.auth_key=auth_key
 | |
|         self.ws_peers = {}
 | |
|         self.last_checked_ws_peers=0
 | |
|         self.containers={}
 | |
|         self.needed_images=[]
 | |
|         self.containers_set=False
 | |
| 
 | |
|         self.allowed_images = None
 | |
| 
 | |
|         self.p_needed_containers=[]
 | |
|         self.last_pull_progress={}
 | |
| 
 | |
|         self.validated_containers_set=False
 | |
|         self.validated_containers=[]
 | |
| 
 | |
|         self.all_running_container_names=[]
 | |
|         self.all_stopped_container_names=[]
 | |
| 
 | |
|         self.last_hw_specs_submit = time.time()-(1800-60)
 | |
| 
 | |
|         self.last_service_heartbeat = {
 | |
|             "main": utils.unix_timestamp(),
 | |
|             "handle_container_cache": utils.unix_timestamp(),
 | |
|             "startup_script_runner": utils.unix_timestamp(),
 | |
|             "log_streaming_task": utils.unix_timestamp(),
 | |
|             "container_log_streaming_service": utils.unix_timestamp(),
 | |
|             "specs_service": utils.unix_timestamp(),
 | |
|             "oc_service": utils.unix_timestamp()
 | |
|         }
 | |
|         self.max_service_inactivity = 600 # seconds
 | |
| 
 | |
|         if config.debug_ws_peer:
 | |
|             self.ws_peers[str(config.debug_ws_peer)]={
 | |
|                 "expiration":"immune"
 | |
|             }
 | |
|         
 | |
|         docker_interface.verify_docker_version()
 | |
|         nvml.init()
 | |
| 
 | |
|         self.gpu_oc_specs = nvml.get_gpu_oc_specs()
 | |
|         self.last_oc_service_submit = 0
 | |
|         self.last_applied_oc = {}
 | |
|         self.last_oc_apply_time = 0
 | |
| 
 | |
|         self.is_hive = get_specs.is_hive()
 | |
|         self.use_hive_flightsheet = False
 | |
| 
 | |
|     async def service(self):
 | |
|         global container_log_broken
 | |
| 
 | |
|         pull_list = asyncio.Queue()
 | |
|         monitoring = asyncio.Queue()
 | |
| 
 | |
|         task1 = asyncio.create_task(self.main(pull_list, monitoring))
 | |
|         task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring))
 | |
|         task3 = asyncio.create_task(self.startup_script_runner(monitoring))
 | |
|         task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring))
 | |
|         task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
 | |
|         task6 = asyncio.create_task(self.specs_service(monitoring))
 | |
|         task7 = asyncio.create_task(self.oc_service(monitoring))
 | |
|         monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
 | |
| 
 | |
|         # Wait for both tasks to complete (they won't in this case)
 | |
|         await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task)
 | |
| 
 | |
|     async def monitoring_service(self, monitoring):
 | |
|         while True:
 | |
|             try:
 | |
|                 monitoring_data = []
 | |
|                 while not monitoring.empty():
 | |
|                     monitoring_data.append(await monitoring.get())
 | |
|                 if len(monitoring_data)>0:
 | |
|                     unique_monitoring = list(set(monitoring_data))
 | |
|                     for service_name in unique_monitoring:
 | |
|                         self.last_service_heartbeat[service_name]=utils.unix_timestamp()
 | |
|                 if config.debug:
 | |
|                     log.success(self.last_service_heartbeat)
 | |
|                 for service_name in self.last_service_heartbeat.keys():
 | |
|                     last_hearthbeat = self.last_service_heartbeat[service_name]
 | |
|                     if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache":
 | |
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
 | |
|                         os._exit(1)
 | |
|                     elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache":
 | |
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
 | |
|                         os._exit(1)
 | |
|             except Exception as e:
 | |
|                 log.debug(f"monitoring_service() | ERROR | {e}")
 | |
|             await asyncio.sleep(5)
 | |
| 
 | |
|     async def container_log_streaming_service(self, monitoring):
 | |
|         while True:
 | |
|             try:
 | |
|                 await monitoring.put("container_log_streaming_service")
 | |
|                 await WebSocketClient.stream_container_logs()
 | |
|             except Exception as e:
 | |
|                 log.debug(f"container_log_streaming_service() | ERROR | {e}")
 | |
|             await asyncio.sleep(0.6)
 | |
|     async def run_startup_scripts(self, startup_script_full_path, container_name):
 | |
|         try:
 | |
|             if config.debug:
 | |
|                 log.success(f"Runnin' {startup_script_full_path}")
 | |
|                 log.error(self.all_running_container_names)
 | |
|             await asyncio.to_thread(run_startup_script.run, container_name, startup_script_full_path, f"/init-{container_name}.sh")
 | |
|             return True
 | |
|         except Exception as e:
 | |
|             return False
 | |
| 
 | |
|     async def startup_script_runner(self, monitoring):
 | |
| 
 | |
|         startup_script_ongoing_tasks = {}
 | |
| 
 | |
|         while True:
 | |
|             try:
 | |
|                 await monitoring.put("startup_script_runner")
 | |
|                 startup_script_files = await async_os.listdir(config.startup_scripts_folder)
 | |
|                 for startup_script_file in startup_script_files:
 | |
|                     if type(startup_script_file)==str and startup_script_file.endswith(".sh") and startup_script_file[:-3] in self.all_running_container_names:
 | |
|                         if not f"{startup_script_file[:-3]}.finished" in startup_script_files:
 | |
|                             full_startup_script_path = os.path.join(config.startup_scripts_folder, startup_script_file)
 | |
|                             if os.path.isfile(full_startup_script_path) and full_startup_script_path not in startup_script_ongoing_tasks:
 | |
|                                 # Start processing the file immediately in a non-blocking way
 | |
|                                 startup_script_task = asyncio.create_task(self.run_startup_scripts(full_startup_script_path, startup_script_file[:-3]))
 | |
|                                 startup_script_ongoing_tasks[full_startup_script_path] = startup_script_task
 | |
|                                 # Attach a callback to clean up the task once it's done
 | |
|                                 startup_script_task.add_done_callback(lambda t, path=full_startup_script_path: startup_script_ongoing_tasks.pop(path, None))
 | |
| 
 | |
|                 # Remove completed tasks
 | |
|                 completed_tasks = [path for path, task in startup_script_ongoing_tasks.items() if task.done()]
 | |
|                 for path in completed_tasks:
 | |
|                     startup_script_ongoing_tasks.pop(path, None)
 | |
|             except Exception as e:
 | |
|                 log.debug(f"ERROR | startup_script_runner() | {e}")
 | |
|             await asyncio.sleep(2)
 | |
| 
 | |
|     async def pull_log_progress(self, log_dict, image_name):
 | |
|         while True:
 | |
|             tmp_progress=''
 | |
|             for layer, status in log_dict.items():
 | |
|                 first_char = '' if tmp_progress=='' else '\n'
 | |
|                 tmp_progress+=f"{first_char}{layer}: {status}"
 | |
|             self.last_pull_progress[image_name]={"log":tmp_progress, "last_update":time.time()}
 | |
|             await asyncio.sleep(config.pull_log_streaming_interval/1000)
 | |
| 
 | |
|     async def check_if_pulling_required(self, pulling_image, cancellation_event):
 | |
|         while True:
 | |
|             try:
 | |
|                 matched_image = False
 | |
|                 for needed_image in self.needed_images:
 | |
|                     if "image" in needed_image and needed_image["image"]==pulling_image:
 | |
|                         matched_image=True
 | |
|                         break
 | |
| 
 | |
|                 if not matched_image:
 | |
|                     cancellation_event.set()
 | |
|             except Exception as e:
 | |
|                 pass
 | |
|             await asyncio.sleep(0.5)
 | |
| 
 | |
|     async def handle_container_cache(self, pull_list, monitoring):
 | |
|         while True:
 | |
|             got_data = []
 | |
|             while not pull_list.empty():
 | |
|                 got_data.append(await pull_list.get())
 | |
|             await monitoring.put("handle_container_cache")
 | |
|             if len(got_data)>0:
 | |
|                 self.p_needed_containers=got_data[len(got_data)-1]
 | |
| 
 | |
|             if len(self.p_needed_containers)>0:
 | |
|                 local_images = await get_local_images(no_latest_tag=True)
 | |
|                 for local_image in local_images:
 | |
|                     self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()}
 | |
|                     image_needed = False
 | |
|                     removed_cnt = 0
 | |
|                     try:
 | |
|                         for p_needed_container in self.p_needed_containers:
 | |
|                             if "image" in p_needed_container and local_image.replace(':latest','')==p_needed_container["image"].replace(':latest',''):
 | |
|                                 image_needed=True
 | |
|                                 break
 | |
|                         if type(self.allowed_images)==list:
 | |
|                             if image_needed==False:
 | |
|                                 after_split = local_image.split(':', 1)
 | |
|                                 local_image_name = after_split[0]
 | |
|                                 local_image_tag = ''
 | |
|                                 if len(after_split)>1:
 | |
|                                     local_image_tag=after_split[1]
 | |
| 
 | |
|                                 for allowed_image in self.allowed_images:
 | |
|                                     if "repository" in allowed_image and "allowed_tags" in allowed_image and allowed_image["repository"]==local_image_name:
 | |
|                                         for allowed_tag in allowed_image["allowed_tags"]:
 | |
|                                             if local_image_tag==allowed_tag or allowed_tag=='*':
 | |
|                                                 image_needed=True
 | |
|                                                 break
 | |
|                             if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers:
 | |
|                                 with concurrent.futures.ThreadPoolExecutor() as pool:
 | |
|                                     r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image)
 | |
|                                     if r:
 | |
|                                         removed_cnt+=1
 | |
|                             #if config.debug:
 | |
|                             #    log.success(f"{local_image} | {image_needed}")
 | |
| 
 | |
|                     except Exception as e:
 | |
|                         image_needed=True
 | |
|                         log.debug(f"ERROR | image_needed | {e}")
 | |
| 
 | |
|                 for lpp_image in self.last_pull_progress.keys():
 | |
|                     log_info = self.last_pull_progress[lpp_image]
 | |
|                     if log_info["last_update"] < time.time()-300:
 | |
|                         del self.last_pull_progress[lpp_image]
 | |
|                         break
 | |
|                 most_recent_wanted_state = self.p_needed_containers
 | |
|                 for wanted_image in most_recent_wanted_state:
 | |
|                     if not wanted_image["image"] in local_images:
 | |
|                         print("Local", local_images)
 | |
|                         print("W",wanted_image)
 | |
|                         log.debug(f"Starting to pull \"{wanted_image}\"")
 | |
|                         
 | |
|                         auth_config = {}
 | |
|                         if "dockerhub_token" in wanted_image and "dockerhub_user" in wanted_image:
 | |
|                             auth_config={
 | |
|                                 "username": wanted_image["dockerhub_user"],
 | |
|                                 "password": wanted_image["dockerhub_token"]
 | |
|                             }
 | |
| 
 | |
|                         log_dict = {}
 | |
|                         pull_cancellation_event = asyncio.Event()
 | |
|                         loop = asyncio.get_running_loop()
 | |
| 
 | |
|                         # Run the image pull, log progress concurrently and cancel if not needed anymore
 | |
|                         pull_task = asyncio.create_task(docker_pull.pull_image(wanted_image["image"], auth_config, log_dict, loop, pull_cancellation_event))
 | |
|                         log_task = asyncio.create_task(self.pull_log_progress(log_dict, wanted_image["image"]))
 | |
|                         check_if_pulling_required_task = asyncio.create_task(self.check_if_pulling_required(wanted_image["image"], pull_cancellation_event))
 | |
| 
 | |
|                         # Wait for the image pull to complete, then cancel the log progress task
 | |
|                         try:
 | |
|                             await pull_task
 | |
|                         except Exception as e:
 | |
|                             self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()}
 | |
|                         log_task.cancel()
 | |
|                         try:
 | |
|                             await log_task
 | |
|                         except asyncio.CancelledError:
 | |
|                             # Expect the task to be cancelled, so pass here
 | |
|                             pass
 | |
|                         check_if_pulling_required_task.cancel()
 | |
|                         try:
 | |
|                             await check_if_pulling_required_task
 | |
|                         except asyncio.CancelledError:
 | |
|                             # Expect the task to be cancelled, so pass here
 | |
|                             pass
 | |
|             await asyncio.sleep(1)
 | |
| 
 | |
|     async def main(self, pull_list, monitoring):
 | |
|         step=0
 | |
|         while True:
 | |
|             try:
 | |
|                 step+=1
 | |
| 
 | |
|                 await monitoring.put("main")
 | |
| 
 | |
|                 if config.debug:
 | |
|                     print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '')
 | |
| 
 | |
|                 tasks = []
 | |
| 
 | |
|                 container_conf = WebSocketClient.get_containers()
 | |
| 
 | |
|                 if container_conf[0]:
 | |
|                     self.containers_set=True
 | |
|                     self.containers=container_conf[1]
 | |
|                     tmp_images = []
 | |
|                     for container in self.containers:
 | |
|                         if "image" in container:
 | |
|                             log_pull = False
 | |
|                             if "name" in container:
 | |
|                                 if "-order-" in container["name"]:
 | |
|                                     log_pull=True
 | |
|                             image_config = {
 | |
|                                 "image":container["image"],
 | |
|                                 "log":log_pull
 | |
|                             }
 | |
|                             if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '":
 | |
|                                 try:
 | |
|                                     dockerhub_token = container["ip"][8:].split("'")[0]
 | |
|                                     dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17]
 | |
|                                     image_config["dockerhub_token"]=dockerhub_token
 | |
|                                     image_config["dockerhub_user"]=dockerhub_user
 | |
|                                 except Exception as e:
 | |
|                                     log.error(e)
 | |
|                                     pass
 | |
| 
 | |
|                             if not image_config in tmp_images:
 | |
|                                 tmp_images.append(image_config)
 | |
|                     if tmp_images!=self.needed_images:
 | |
|                         self.needed_images=tmp_images
 | |
|                         await pull_list.put(self.needed_images)
 | |
|                     #self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'})
 | |
| 
 | |
|                 if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)):
 | |
|                     tasks.append(api_interface.get_server_config())
 | |
| 
 | |
|                 if self.containers_set:
 | |
|                     tasks.append(configure_networks(self.containers))
 | |
|                     tasks.append(WebSocketClient.stream_pull_logs())
 | |
| 
 | |
|                 if self.validated_containers_set:
 | |
|                     tasks.append(deploy_containers(self.validated_containers))
 | |
| 
 | |
|                 if step==1:
 | |
|                     WebSocketClient.set_auth(self.auth_key)
 | |
|                     asyncio.create_task(WebSocketClient.run())
 | |
|                 elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval):
 | |
|                     log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds")
 | |
|                     log.error("CLORE HOSTING | exiting ...")
 | |
|                     os._exit(1)
 | |
| 
 | |
|                 self.expire_ws_peers()
 | |
|                 WebSocketClient.set_ws_peers(self.ws_peers)
 | |
|                 WebSocketClient.set_pull_logs(self.last_pull_progress)
 | |
| 
 | |
|                 if len(tasks)>0:
 | |
|                     results = await asyncio.gather(*tasks)
 | |
| 
 | |
|                     # Process the results (optional)
 | |
|                     for result in results:
 | |
|                         if type(result)==types.ServerConfig:
 | |
|                             if result.success:
 | |
|                                 self.last_checked_ws_peers = utils.unix_timestamp()
 | |
|                                 self.allowed_images=result.allowed_images
 | |
|                                 if not config.debug_ws_peer:
 | |
|                                     for pure_ws_peer in result.ws_peers:
 | |
|                                         self.ws_peers[pure_ws_peer]={
 | |
|                                             "expiration":utils.unix_timestamp()+900
 | |
|                                         }
 | |
|                             elif self.allowed_images==None:
 | |
|                                 log.error("Can't contact clore.ai, restarting")
 | |
|                                 os._exit(1)
 | |
|                         elif type(result)==types.DockerConfiguratorRes:
 | |
|                             if result.validation_and_security:
 | |
|                                 self.validated_containers_set=True
 | |
|                                 self.validated_containers = result.valid_containers
 | |
|                                 self.use_hive_flightsheet = result.use_hive_flightsheet
 | |
|                         elif type(result)==types.DeployContainersRes:
 | |
|                             try:
 | |
|                                 self.all_running_container_names = result.all_running_container_names
 | |
|                                 self.all_stopped_container_names = result.all_stopped_container_names
 | |
|                             except Exception as e:
 | |
|                                 pass
 | |
|             except Exception as e:
 | |
|                 log.debug(f"main() | ERROR | {e}")
 | |
|             
 | |
|             await asyncio.sleep(1)
 | |
| 
 | |
|     async def submit_specs(self, current_specs):
 | |
|         try:
 | |
|             if type(current_specs) == dict:
 | |
|                 current_specs["backend_version"]=9
 | |
|                 current_specs["update_hw"]=True
 | |
|                 smallest_pcie_width = 999
 | |
|                 for gpu in current_specs["gpus"]["nvidia"]:
 | |
|                     if "pcie_width" in gpu and gpu["pcie_width"]<smallest_pcie_width:
 | |
|                         smallest_pcie_width=gpu["pcie_width"]
 | |
|                         current_specs["pcie_width"]=gpu["pcie_width"]
 | |
|                         current_specs["pcie_rev"]=gpu["pcie_revision"]
 | |
|                 await WebSocketClient.send(current_specs)
 | |
|         except Exception as e:
 | |
|             log.debug(f"FAIL | submit_specs() | {e}")
 | |
| 
 | |
|     async def update_realtime_data(self, current_specs):
 | |
|         try:
 | |
|             if type(current_specs) == dict:
 | |
|                 cpu_usage = await get_specs.get_cpu_usage()
 | |
|                 ram_usage = await get_specs.get_ram_usage()
 | |
|                 gpu_list = current_specs["gpus"]["nvidia"]+current_specs["gpus"]["amd"]
 | |
|                 submit_document = {
 | |
|                     "update_realtime_data":True,
 | |
|                     "gpus": gpu_list,
 | |
|                     "cpu": cpu_usage,
 | |
|                     "ram": ram_usage,
 | |
|                     "all_running_container_names": self.all_running_container_names,
 | |
|                     "all_stopped_container_names": self.all_stopped_container_names
 | |
|                 }
 | |
|                 await WebSocketClient.send(submit_document)
 | |
|         except Exception as e:
 | |
|             log.debug(f"FAIL | update_realtime_data() | {e}")
 | |
| 
 | |
|     async def specs_service(self, monitoring):
 | |
|         while True:
 | |
|             try:
 | |
|                 await monitoring.put("specs_service")
 | |
|                 current_specs = await specs.get()
 | |
|                 if self.last_hw_specs_submit < (utils.unix_timestamp()-1800):
 | |
|                     self.last_hw_specs_submit=utils.unix_timestamp()
 | |
|                     await self.submit_specs(current_specs)
 | |
|                 await self.update_realtime_data(current_specs)
 | |
|             except Exception as e:
 | |
|                 log.debug(f"FAIL | specs_service() | {e}")
 | |
|             await asyncio.sleep(7)
 | |
| 
 | |
|     async def oc_service(self, monitoring):
 | |
|         while True:
 | |
|             try:
 | |
|                 await monitoring.put("oc_service")
 | |
|                 oc_apply_allowed = True
 | |
|                 ### OC Service should also hande Hive stuff
 | |
|                 if self.use_hive_flightsheet and self.is_hive:
 | |
|                     await set_hive_miner_status(True)
 | |
|                     oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
 | |
|                 elif self.is_hive:
 | |
|                     await set_hive_miner_status(False)
 | |
|                 ### Run OC tasks
 | |
|                 oc_conf = WebSocketClient.get_oc()
 | |
|                 if oc_conf[0] and type(self.gpu_oc_specs)==list and oc_conf[1]!=self.gpu_oc_specs and self.last_oc_service_submit+240 < utils.unix_timestamp():
 | |
|                     log.debug("Submitting \"gpu_oc_specs\"")
 | |
|                     self.last_oc_service_submit = utils.unix_timestamp()
 | |
|                     await WebSocketClient.send({
 | |
|                         "set_gpu_info":self.gpu_oc_specs,
 | |
|                         "xorg_valid": True
 | |
|                     })
 | |
|                 if oc_conf[0] and len(oc_conf[2].keys())>0 and oc_apply_allowed:
 | |
|                     if utils.normalize_rule(self.last_applied_oc)!=utils.normalize_rule(oc_conf[2]) or (self.last_oc_apply_time < utils.unix_timestamp()-300):
 | |
|                         self.last_oc_apply_time = utils.unix_timestamp()
 | |
|                         log.debug(f"Applying OC | {json.dumps(oc_conf[2], separators=(',',':'))}")
 | |
|                         await set_oc(oc_conf[2])
 | |
|                         self.last_applied_oc=oc_conf[2]
 | |
|             except Exception as e:
 | |
|                 log.debug(f"FAIL | oc_service() | {e}")
 | |
|             await asyncio.sleep(2)
 | |
| 
 | |
|     def expire_ws_peers(self):
 | |
|         for ws_peer_address in list(self.ws_peers.keys()):
 | |
|             ws_peer_info = self.ws_peers[ws_peer_address]
 | |
|             try:
 | |
|                 if ws_peer_info["expiration"]!="immune" and (ws_peer_info["expiration"] < utils.unix_timestamp()):
 | |
|                     self.ws_peers.pop(ws_peer_address, None)
 | |
|             except Exception as e:
 | |
|                 pass |