from lib import config as config_module from lib import logging as logging_lib from lib import nvidia_driver_update from lib import log_streaming_task from lib import run_startup_script from lib import hive_miner_interface from lib import docker_interface from lib import background_job from lib import docker_deploy from lib import clore_partner from lib import clore_partner_socket from lib import docker_pull from lib import get_specs from lib import utils from lib import nvml log = logging_lib.log from clore_hosting import docker_configurator from clore_hosting import api_interface from clore_hosting import ws_interface from clore_hosting import types from queue import Queue import concurrent.futures import threading import asyncio import time import json from aiofiles import os as async_os import os specs = get_specs.Specs() container_log_broken = asyncio.Queue() config = config_module.config WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_broken) #print(config) async def configure_networks(containers, partner_forwarding_ips): res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips) try: fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) return fin_res except Exception as e: return False async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads): try: all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads) return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) except Exception as e: return False async def get_local_images(no_latest_tag = False): res = await asyncio.to_thread(docker_interface.get_local_images, no_latest_tag) return res async def set_oc(settings): try: result = await asyncio.to_thread(nvml.set_oc, settings) return result except Exception as e: log.error(f"set_oc() | error | {e}") return False async def set_hive_miner_status(enabled=False): try: result = await asyncio.to_thread(utils.hive_set_miner_status, enabled) return True except Exception as e: log.error(f"set_hive_miner_status() | error | {e}") return False class CloreClient: def __init__(self, auth_key, xfs_state): self.auth_key=auth_key self.xfs_state = xfs_state self.ws_peers = {} self.last_checked_ws_peers=0 self.containers={} self.needed_images=[] self.containers_set=False self.allowed_images = None self.p_needed_containers=[] self.last_pull_progress={} self.validated_containers_set=False self.validated_containers=[] self.all_running_container_names=[] self.all_stopped_container_names=[] self.last_hw_specs_submit = time.time()-(1800-60) self.last_service_heartbeat = { "main": utils.unix_timestamp(), "handle_container_cache": utils.unix_timestamp(), "startup_script_runner": utils.unix_timestamp(), "log_streaming_task": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(), "oc_service": utils.unix_timestamp(), "background_pow_data_collection": utils.unix_timestamp(), "partner_service": utils.unix_timestamp() } self.max_service_inactivity = 600 # seconds self.no_restart_services = ["partner_service", "specs_service"] # Services that are allowed to run indefinetly without triggering the app to restart if config.debug_ws_peer: self.ws_peers[str(config.debug_ws_peer)]={ "expiration":"immune" } self.os_release = get_specs.get_os_release() self.restart_docker = False if "use_cgroupfs" in self.os_release: self.updated_exec_opts = True if docker_interface.configure_exec_opts("native.cgroupdriver","cgroupfs") else False if self.updated_exec_opts: docker_info = docker_interface.get_info() if "CgroupDriver" in docker_info and docker_info["CgroupDriver"]=="systemd": self.restart_docker = True # Restart docker when it's loaded under systemd (accual restart will happen only if no orders running to not disrupt workload) docker_interface.verify_docker_version() self.dont_use_hive_binaries = True if 'DONT_USE_HIVE_BINARIES' in os.environ else False nvml.init(allow_hive_binaries=not self.dont_use_hive_binaries) self.extra_allowed_images = utils.get_extra_allowed_images() self.allowed_running_containers = utils.get_allowed_container_names() self.gpu_oc_specs = nvml.get_gpu_oc_specs() self.last_oc_service_submit = 0 self.last_applied_oc = {} self.last_oc_apply_time = 0 self.is_hive = get_specs.is_hive() self.use_hive_flightsheet = False self.hive_miner_interface = hive_miner_interface.hive_interface() self.next_pow_background_job_send_update = 0 self.clore_partner_initiazized = False self.partner_forwarding_ips = [] self.start_time = utils.unix_timestamp() self.runned_pull_selftest = False async def service(self): global container_log_broken pull_list = asyncio.Queue() monitoring = asyncio.Queue() task1 = asyncio.create_task(self.main(pull_list, monitoring)) task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring)) task3 = asyncio.create_task(self.startup_script_runner(monitoring)) task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring, self.allowed_running_containers)) task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring)) task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) task9 = asyncio.create_task(self.partner_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) driver_update_task = asyncio.create_task(nvidia_driver_update.update_loop(self.is_hive)) # Wait for both tasks to complete (they won't in this case) await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task, driver_update_task) async def monitoring_service(self, monitoring): while True: try: monitoring_data = [] while not monitoring.empty(): monitoring_data.append(await monitoring.get()) if len(monitoring_data)>0: unique_monitoring = list(set(monitoring_data)) for service_name in unique_monitoring: self.last_service_heartbeat[service_name]=utils.unix_timestamp() if config.debug: log.success(self.last_service_heartbeat) for service_name in self.last_service_heartbeat.keys(): if not service_name in self.no_restart_services: last_hearthbeat = self.last_service_heartbeat[service_name] if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") os._exit(1) elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") os._exit(1) except Exception as e: log.debug(f"monitoring_service() | ERROR | {e}") await asyncio.sleep(5) async def container_log_streaming_service(self, monitoring): while True: try: await monitoring.put("container_log_streaming_service") await WebSocketClient.stream_container_logs() except Exception as e: log.debug(f"container_log_streaming_service() | ERROR | {e}") await asyncio.sleep(0.6) async def run_startup_scripts(self, startup_script_full_path, container_name): try: if config.debug: log.success(f"Runnin' {startup_script_full_path}") log.error(self.all_running_container_names) await asyncio.to_thread(run_startup_script.run, container_name, startup_script_full_path, f"/init-{container_name}.sh") return True except Exception as e: return False async def startup_script_runner(self, monitoring): startup_script_ongoing_tasks = {} while True: try: await monitoring.put("startup_script_runner") startup_script_files = await async_os.listdir(config.startup_scripts_folder) for startup_script_file in startup_script_files: if type(startup_script_file)==str and startup_script_file.endswith(".sh") and startup_script_file[:-3] in self.all_running_container_names: if not f"{startup_script_file[:-3]}.finished" in startup_script_files: full_startup_script_path = os.path.join(config.startup_scripts_folder, startup_script_file) if os.path.isfile(full_startup_script_path) and full_startup_script_path not in startup_script_ongoing_tasks: # Start processing the file immediately in a non-blocking way startup_script_task = asyncio.create_task(self.run_startup_scripts(full_startup_script_path, startup_script_file[:-3])) startup_script_ongoing_tasks[full_startup_script_path] = startup_script_task # Attach a callback to clean up the task once it's done startup_script_task.add_done_callback(lambda t, path=full_startup_script_path: startup_script_ongoing_tasks.pop(path, None)) # Remove completed tasks completed_tasks = [path for path, task in startup_script_ongoing_tasks.items() if task.done()] for path in completed_tasks: startup_script_ongoing_tasks.pop(path, None) except Exception as e: log.debug(f"ERROR | startup_script_runner() | {e}") await asyncio.sleep(2) async def pull_log_progress(self, log_dict, image_name): while True: tmp_progress='' for layer, status in log_dict.items(): first_char = '' if tmp_progress=='' else '\n' tmp_progress+=f"{first_char}{layer}: {status}" self.last_pull_progress[image_name]={"log":tmp_progress, "last_update":time.time()} await asyncio.sleep(config.pull_log_streaming_interval/1000) async def check_if_pulling_required(self, pulling_image, cancellation_event): while True: try: matched_image = False for needed_image in self.needed_images: if "image" in needed_image and needed_image["image"]==pulling_image: matched_image=True break if not matched_image: cancellation_event.set() except Exception as e: pass await asyncio.sleep(0.5) async def handle_container_cache(self, pull_list, monitoring): while True: got_data = [] while not pull_list.empty(): got_data.append(await pull_list.get()) await monitoring.put("handle_container_cache") if len(got_data)>0: self.p_needed_containers=got_data[len(got_data)-1] if len(self.p_needed_containers)>0: local_images = await get_local_images(no_latest_tag=True) partner_images = await clore_partner.get_partner_allowed_images() for local_image in local_images: self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} image_needed = False removed_cnt = 0 try: for p_needed_container in self.p_needed_containers: if "image" in p_needed_container and local_image.replace(':latest','')==p_needed_container["image"].replace(':latest',''): image_needed=True break if type(self.allowed_images)==list: if image_needed==False: after_split = local_image.split(':', 1) local_image_name = after_split[0] local_image_tag = '' if len(after_split)>1: local_image_tag=after_split[1] for allowed_image in self.allowed_images: if "repository" in allowed_image and "allowed_tags" in allowed_image and allowed_image["repository"]==local_image_name: for allowed_tag in allowed_image["allowed_tags"]: if local_image_tag==allowed_tag or allowed_tag=='*': image_needed=True break if not image_needed and type(partner_images) == list: for partner_image in partner_images: if local_image.replace(':latest', '') == partner_image.replace(':latest', ''): image_needed = True del self.last_pull_progress[local_image] break if len(local_image.split('/')) >= 3: partner_image_spl = partner_image.split(':') image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1] if len(partner_image_spl) == 1: if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest": image_needed = True del self.last_pull_progress[local_image] break elif len(partner_image_spl) == 2: if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''): image_needed = True del self.last_pull_progress[local_image] break if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None: log.success(f"GOING TO REMOVE {local_image}") with concurrent.futures.ThreadPoolExecutor() as pool: r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) if r: removed_cnt+=1 del self.last_pull_progress[local_image] #if config.debug: # log.success(f"{local_image} | {image_needed}") except Exception as e: image_needed=True log.debug(f"ERROR | image_needed | {e}") for lpp_image in self.last_pull_progress.keys(): log_info = self.last_pull_progress[lpp_image] if log_info["last_update"] < time.time()-300: del self.last_pull_progress[lpp_image] break most_recent_wanted_state = self.p_needed_containers for wanted_image in most_recent_wanted_state: if not wanted_image["image"] in local_images: print("Local", local_images) print("W",wanted_image) log.debug(f"Starting to pull \"{wanted_image}\"") auth_config = {} if "dockerhub_token" in wanted_image and "dockerhub_user" in wanted_image: auth_config={ "username": wanted_image["dockerhub_user"], "password": wanted_image["dockerhub_token"] } log_dict = {} pull_cancellation_event = asyncio.Event() loop = asyncio.get_running_loop() # Run the image pull, log progress concurrently and cancel if not needed anymore pull_task = asyncio.create_task(docker_pull.pull_image(wanted_image["image"], auth_config, log_dict, loop, pull_cancellation_event)) log_task = asyncio.create_task(self.pull_log_progress(log_dict, wanted_image["image"])) check_if_pulling_required_task = asyncio.create_task(self.check_if_pulling_required(wanted_image["image"], pull_cancellation_event)) # Wait for the image pull to complete, then cancel the log progress task try: await pull_task except Exception as e: self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()} log_task.cancel() try: await log_task except asyncio.CancelledError: # Expect the task to be cancelled, so pass here pass check_if_pulling_required_task.cancel() try: await check_if_pulling_required_task except asyncio.CancelledError: # Expect the task to be cancelled, so pass here pass await asyncio.sleep(1) async def main(self, pull_list, monitoring): step=0 while True: try: step+=1 await monitoring.put("main") if config.debug: print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') tasks = [] running_order = False container_conf = WebSocketClient.get_containers() can_run_partner_workloads = False if container_conf[0]: self.containers_set=True self.containers=container_conf[1] tmp_images = [] is_order_spot = False for idx, container in enumerate(self.containers): if "spot" in container: is_order_spot = True if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": log_pull = False if "name" in container: if "-order-" in container["name"]: running_order=True log_pull=True image_config = { "image":container["image"], "log":log_pull } if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '": try: dockerhub_token = container["ip"][8:].split("'")[0] dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17] image_config["dockerhub_token"]=dockerhub_token image_config["dockerhub_user"]=dockerhub_user except Exception as e: log.error(e) pass if not image_config in tmp_images: tmp_images.append(image_config) can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True clore_partner_socket.set_can_deploy(can_run_partner_workloads) if self.restart_docker and not running_order and len(self.containers)>0: log.debug("Sending docker restart command") utils.run_command_v2("systemctl restart docker") self.restart_docker=False if tmp_images!=self.needed_images: self.needed_images=tmp_images await pull_list.put(self.needed_images) #self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'}) if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)): tasks.append(api_interface.get_server_config()) if self.containers_set: tasks.append(configure_networks(self.containers, self.partner_forwarding_ips)) tasks.append(WebSocketClient.stream_pull_logs()) if self.validated_containers_set: tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads)) if step==1: WebSocketClient.set_auth(self.auth_key, self.xfs_state) asyncio.create_task(WebSocketClient.run()) elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") log.error("CLORE HOSTING | exiting ...") os._exit(1) self.expire_ws_peers() WebSocketClient.set_ws_peers(self.ws_peers) WebSocketClient.set_pull_logs(self.last_pull_progress) if len(tasks)>0: results = await asyncio.gather(*tasks) # Process the results (optional) for result in results: if type(result)==types.ServerConfig: if result.success: self.last_checked_ws_peers = utils.unix_timestamp() self.allowed_images=result.allowed_images+self.extra_allowed_images if self.xfs_state == "active": self.allowed_images.append({ "repository": "vastai/test", "allowed_tags": ["bandwidth-test-nvidia", "selftest"] }) if not config.debug_ws_peer: for pure_ws_peer in result.ws_peers: self.ws_peers[pure_ws_peer]={ "expiration":utils.unix_timestamp()+900 } elif self.allowed_images==None: log.error("Can't contact clore.ai, restarting") os._exit(1) elif type(result)==types.DockerConfiguratorRes: if result.validation_and_security: self.validated_containers_set=True self.validated_containers = result.valid_containers self.use_hive_flightsheet = result.use_hive_flightsheet log.debug(f"Use Hive flightsheet: {result.use_hive_flightsheet}") elif type(result)==types.DeployContainersRes: try: self.all_running_container_names = result.all_running_container_names self.all_stopped_container_names = result.all_stopped_container_names except Exception as e: pass except Exception as e: log.debug(f"main() | ERROR | {e}") await asyncio.sleep(1) async def submit_specs(self, current_specs): try: if type(current_specs) == dict: current_specs["backend_version"]=20 current_specs["update_hw"]=True smallest_pcie_width = 999 for gpu in current_specs["gpus"]["nvidia"]: if "pcie_width" in gpu and gpu["pcie_width"] 0 and not self.runned_pull_selftest: await clore_partner.check_to_pull_selftest(current_specs) self.runned_pull_selftest = True except Exception as partner_exception: pass except Exception as e: log.debug(f"FAIL | specs_service() | {e}") await asyncio.sleep(7) async def oc_service(self, monitoring): while True: try: await monitoring.put("oc_service") oc_apply_allowed = True ### OC Service should also hande Hive stuff if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled(): await set_hive_miner_status(True) oc_apply_allowed = False # Don't apply any OC when running HiveOS miner elif self.is_hive and not self.dont_use_hive_binaries: await set_hive_miner_status(False) ### Run OC tasks oc_conf = WebSocketClient.get_oc() if oc_conf[0] and type(self.gpu_oc_specs)==list and oc_conf[1]!=self.gpu_oc_specs and self.last_oc_service_submit+240 < utils.unix_timestamp(): log.debug("Submitting \"gpu_oc_specs\"") self.last_oc_service_submit = utils.unix_timestamp() await WebSocketClient.send({ "set_gpu_info":self.gpu_oc_specs, "xorg_valid": True }) if oc_conf[0] and type(oc_conf[2])==dict and oc_apply_allowed: if utils.normalize_rule(self.last_applied_oc)!=utils.normalize_rule(oc_conf[2]) or (self.last_oc_apply_time < utils.unix_timestamp()-300): self.last_oc_apply_time = utils.unix_timestamp() log.debug(f"Applying OC | {json.dumps(oc_conf[2], separators=(',',':'))}") await set_oc(oc_conf[2]) self.last_applied_oc=oc_conf[2] except Exception as e: log.debug(f"FAIL | oc_service() | {e}") await asyncio.sleep(2) async def background_pow_data_collection(self, monitoring): while True: try: await monitoring.put("background_pow_data_collection") if not self.dont_use_hive_binaries and self.is_hive: miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False) if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update < time.time(): self.next_pow_background_job_send_update = time.time()+(5*60) current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True) submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics}) if not submit_result: self.next_pow_background_job_send_update = time.time()+40 except Exception as e: log.debug(f"FAIL | background_pow_data_collection() | {e}") await asyncio.sleep(6) async def partner_service(self, monitoring): while True: try: await monitoring.put("partner_service") if self.start_time < utils.unix_timestamp() - 180: forwarding_latency_measurment = await clore_partner.measure_forwarding_latency() if type(forwarding_latency_measurment) == list: await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment) partner_config = WebSocketClient.get_clore_partner_config() if partner_config != None: if self.clore_partner_initiazized == False: ir = await clore_partner.initialize() if ir: self.clore_partner_initiazized = True if self.clore_partner_initiazized == True: if 'provider' in partner_config and 'forwarding' in partner_config: self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']] else: self.partner_forwarding_ips = [] await clore_partner.configure(partner_config) except Exception as e: log.debug(f"FAIL | partner_service() | {e}") await asyncio.sleep(6) def expire_ws_peers(self): for ws_peer_address in list(self.ws_peers.keys()): ws_peer_info = self.ws_peers[ws_peer_address] try: if ws_peer_info["expiration"]!="immune" and (ws_peer_info["expiration"] < utils.unix_timestamp()): self.ws_peers.pop(ws_peer_address, None) except Exception as e: pass