diff --git a/clore_hosting/main.py b/clore_hosting/main.py index e581ff1..e23b697 100644 --- a/clore_hosting/main.py +++ b/clore_hosting/main.py @@ -47,8 +47,8 @@ async def deploy_containers(validated_containers): except Exception as e: return False -async def get_local_images(): - res = await asyncio.to_thread(docker_interface.get_local_images) +async def get_local_images(no_latest_tag = False): + res = await asyncio.to_thread(docker_interface.get_local_images, no_latest_tag) return res class CloreClient: @@ -73,6 +73,16 @@ class CloreClient: self.last_hw_specs_submit = time.time()-(1800-60) + self.last_service_heartbeat = { + "main": utils.unix_timestamp(), + "handle_container_cache": utils.unix_timestamp(), + "startup_script_runner": utils.unix_timestamp(), + "log_streaming_task": utils.unix_timestamp(), + "container_log_streaming_service": utils.unix_timestamp(), + "specs_service": utils.unix_timestamp() + } + self.max_service_inactivity = 600 # seconds + if config.debug_ws_peer: self.ws_peers[str(config.debug_ws_peer)]={ "expiration":"immune" @@ -82,40 +92,67 @@ class CloreClient: global container_log_broken pull_list = asyncio.Queue() - pull_logs = asyncio.Queue() + monitoring = asyncio.Queue() - task1 = asyncio.create_task(self.main(pull_list, pull_logs)) - task2 = asyncio.create_task(self.handle_container_cache(pull_list, pull_logs)) - task3 = asyncio.create_task(self.startup_script_runner()) - task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken)) - task5 = asyncio.create_task(self.container_log_streaming_service()) - task6 = asyncio.create_task(self.specs_service()) + task1 = asyncio.create_task(self.main(pull_list, monitoring)) + task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring)) + task3 = asyncio.create_task(self.startup_script_runner(monitoring)) + task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring)) + task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) + task6 = asyncio.create_task(self.specs_service(monitoring)) + monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) # Wait for both tasks to complete (they won't in this case) - await asyncio.gather(task1, task2, task3, task4, task5, task6) + await asyncio.gather(task1, task2, task3, task4, task5, task6, monitoring_task) - async def container_log_streaming_service(self): + async def monitoring_service(self, monitoring): while True: try: + monitoring_data = [] + while not monitoring.empty(): + monitoring_data.append(await monitoring.get()) + if len(monitoring_data)>0: + unique_monitoring = list(set(monitoring_data)) + for service_name in unique_monitoring: + self.last_service_heartbeat[service_name]=utils.unix_timestamp() + log.success(self.last_service_heartbeat) + for service_name in self.last_service_heartbeat.keys(): + last_hearthbeat = self.last_service_heartbeat[service_name] + if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": + log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") + os._exit(1) + elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": + log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") + os._exit(1) + except Exception as e: + log.debug(f"monitoring_service() | ERROR | {e}") + await asyncio.sleep(5) + + async def container_log_streaming_service(self, monitoring): + while True: + try: + await monitoring.put("container_log_streaming_service") await WebSocketClient.stream_container_logs() except Exception as e: log.debug(f"container_log_streaming_service() | ERROR | {e}") await asyncio.sleep(0.6) async def run_startup_scripts(self, startup_script_full_path, container_name): try: - log.success(f"Runnin' {startup_script_full_path}") - log.error(self.all_running_container_names) + if config.debug: + log.success(f"Runnin' {startup_script_full_path}") + log.error(self.all_running_container_names) await asyncio.to_thread(run_startup_script.run, container_name, startup_script_full_path, f"/init-{container_name}.sh") return True except Exception as e: return False - async def startup_script_runner(self): + async def startup_script_runner(self, monitoring): startup_script_ongoing_tasks = {} while True: try: + await monitoring.put("startup_script_runner") startup_script_files = await async_os.listdir(config.startup_scripts_folder) for startup_script_file in startup_script_files: if type(startup_script_file)==str and startup_script_file.endswith(".sh") and startup_script_file[:-3] in self.all_running_container_names: @@ -160,16 +197,17 @@ class CloreClient: pass await asyncio.sleep(0.5) - async def handle_container_cache(self, pull_list, pull_logs): + async def handle_container_cache(self, pull_list, monitoring): while True: got_data = [] while not pull_list.empty(): got_data.append(await pull_list.get()) + await monitoring.put("handle_container_cache") if len(got_data)>0: self.p_needed_containers=got_data[len(got_data)-1] if len(self.p_needed_containers)>0: - local_images = await get_local_images() + local_images = await get_local_images(no_latest_tag=True) for local_image in local_images: self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} image_needed = False @@ -198,7 +236,8 @@ class CloreClient: r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) if r: removed_cnt+=1 - log.success(f"{local_image} | {image_needed}") + #if config.debug: + # log.success(f"{local_image} | {image_needed}") except Exception as e: image_needed=True @@ -212,6 +251,8 @@ class CloreClient: most_recent_wanted_state = self.p_needed_containers for wanted_image in most_recent_wanted_state: if not wanted_image["image"] in local_images: + print("Local", local_images) + print("W",wanted_image) log.debug(f"Starting to pull \"{wanted_image}\"") auth_config = {} @@ -249,97 +290,104 @@ class CloreClient: pass await asyncio.sleep(1) - async def main(self, pull_list, pull_logs): + async def main(self, pull_list, monitoring): step=0 while True: - print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') - - step+=1 + try: + step+=1 - tasks = [] + await monitoring.put("main") - container_conf = WebSocketClient.get_containers() + if config.debug: + print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') - if container_conf[0]: - self.containers_set=True - self.containers=container_conf[1] - tmp_images = [] - for container in self.containers: - if "image" in container: - log_pull = False - if "name" in container: - if "-order-" in container["name"]: - log_pull=True - image_config = { - "image":container["image"], - "log":log_pull - } - if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '": + tasks = [] + + container_conf = WebSocketClient.get_containers() + + if container_conf[0]: + self.containers_set=True + self.containers=container_conf[1] + tmp_images = [] + for container in self.containers: + if "image" in container: + log_pull = False + if "name" in container: + if "-order-" in container["name"]: + log_pull=True + image_config = { + "image":container["image"], + "log":log_pull + } + if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '": + try: + dockerhub_token = container["ip"][8:].split("'")[0] + dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17] + image_config["dockerhub_token"]=dockerhub_token + image_config["dockerhub_user"]=dockerhub_user + except Exception as e: + log.error(e) + pass + + if not image_config in tmp_images: + tmp_images.append(image_config) + if tmp_images!=self.needed_images: + self.needed_images=tmp_images + await pull_list.put(self.needed_images) + #self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'}) + + if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)): + tasks.append(api_interface.get_server_config()) + + if self.containers_set: + tasks.append(configure_networks(self.containers)) + tasks.append(WebSocketClient.stream_pull_logs()) + + if self.validated_containers_set: + tasks.append(deploy_containers(self.validated_containers)) + + if step==1: + WebSocketClient.set_auth(self.auth_key) + asyncio.create_task(WebSocketClient.run()) + elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): + log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") + log.error("CLORE HOSTING | exiting ...") + os._exit(1) + + self.expire_ws_peers() + WebSocketClient.set_ws_peers(self.ws_peers) + WebSocketClient.set_pull_logs(self.last_pull_progress) + + if len(tasks)>0: + results = await asyncio.gather(*tasks) + + # Process the results (optional) + for result in results: + if type(result)==types.ServerConfig: + if result.success: + self.last_checked_ws_peers = utils.unix_timestamp() + self.allowed_images=result.allowed_images + if not config.debug_ws_peer: + for pure_ws_peer in result.ws_peers: + self.ws_peers[pure_ws_peer]={ + "expiration":utils.unix_timestamp()+900 + } + elif self.allowed_images==None: + log.error("Can't contact clore.ai, restarting") + os._exit(1) + elif type(result)==types.DockerConfiguratorRes: + if result.validation_and_security: + self.validated_containers_set=True + self.validated_containers = result.valid_containers + elif type(result)==types.DeployContainersRes: try: - dockerhub_token = container["ip"][8:].split("'")[0] - dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17] - image_config["dockerhub_token"]=dockerhub_token - image_config["dockerhub_user"]=dockerhub_user + self.all_running_container_names = result.all_running_container_names + self.all_stopped_container_names = result.all_stopped_container_names except Exception as e: - log.error(e) pass - - if not image_config in tmp_images: - tmp_images.append(image_config) - if tmp_images!=self.needed_images: - self.needed_images=tmp_images - await pull_list.put(self.needed_images) - #self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'}) - - if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)): - tasks.append(api_interface.get_server_config()) - - if self.containers_set: - tasks.append(configure_networks(self.containers)) - tasks.append(WebSocketClient.stream_pull_logs()) - - if self.validated_containers_set: - tasks.append(deploy_containers(self.validated_containers)) - - if step==1: - WebSocketClient.set_auth(self.auth_key) - asyncio.create_task(WebSocketClient.run()) - elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): - log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") - log.error("CLORE HOSTING | exiting ...") - os._exit(1) - - self.expire_ws_peers() - WebSocketClient.set_ws_peers(self.ws_peers) - WebSocketClient.set_pull_logs(self.last_pull_progress) - - if len(tasks)>0: - results = await asyncio.gather(*tasks) - - # Process the results (optional) - for result in results: - if type(result)==types.ServerConfig: - if result.success: - self.last_checked_ws_peers = utils.unix_timestamp() - self.allowed_images=result.allowed_images - if not config.debug_ws_peer: - for pure_ws_peer in result.ws_peers: - self.ws_peers[pure_ws_peer]={ - "expiration":utils.unix_timestamp()+900 - } - elif self.allowed_images==None: - log.error("Can't contact clore.ai, restarting") - os._exit(1) - elif type(result)==types.DockerConfiguratorRes: - if result.validation_and_security: - self.validated_containers_set=True - self.validated_containers = result.valid_containers - elif type(result)==types.DeployContainersRes: - try: - self.all_running_container_names = result.all_running_container_names - self.all_stopped_container_names = result.all_stopped_container_names - except Exception as e: - pass + except Exception as e: + log.debug(f"main() | ERROR | {e}") + await asyncio.sleep(1) async def submit_specs(self, current_specs): @@ -375,9 +423,10 @@ class CloreClient: except Exception as e: log.debug(f"FAIL | update_realtime_data() | {e}") - async def specs_service(self): + async def specs_service(self, monitoring): while True: try: + await monitoring.put("specs_service") current_specs = await specs.get() if self.last_hw_specs_submit < (utils.unix_timestamp()-1800): self.last_hw_specs_submit=utils.unix_timestamp() diff --git a/clore_hosting/ws_interface.py b/clore_hosting/ws_interface.py index 10888e1..e9ec821 100644 --- a/clore_hosting/ws_interface.py +++ b/clore_hosting/ws_interface.py @@ -141,12 +141,13 @@ class WebSocketClient: else: try: parsed_json = json.loads(message) - if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json: + if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: self.last_heartbeat = clore_utils.unix_timestamp() container_str = json.dumps({"containers":parsed_json["new_containers"]}) await self.send(container_str) - self.containers_set = True - self.containers=parsed_json["new_containers"] + if len(parsed_json["new_containers"]) > 0: # There should be at least one container + self.containers_set = True + self.containers=parsed_json["new_containers"] #log.success(container_str) elif "allow_oc" in parsed_json: # Enable OC await self.send(json.dumps({"allow_oc":True})) diff --git a/lib/config.py b/lib/config.py index 660f819..f2c2243 100644 --- a/lib/config.py +++ b/lib/config.py @@ -30,6 +30,8 @@ hard_config = { "max_pull_log_size": 24576, # Characters "max_container_log_size": 262144, # Characters "container_log_streaming_interval": 2, # Seconds + "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app + "maximum_pull_service_loop_time": 14400 # Exception for image pulling } parser = argparse.ArgumentParser(description='Example argparse usage') diff --git a/lib/docker_interface.py b/lib/docker_interface.py index de20019..f6308a0 100644 --- a/lib/docker_interface.py +++ b/lib/docker_interface.py @@ -68,7 +68,7 @@ def get_docker_networks(): except docker.errors.DockerException as e: return (f"Error: {e}") -def get_local_images(): +def get_local_images(no_latest_tag=False): try: images = client.images.list() @@ -79,7 +79,10 @@ def get_local_images(): tags = image.tags if image.tags else [':'] for tag in tags: if tag!=":": - image_list.append(tag) + if no_latest_tag: + image_list.append(tag.replace(':latest','')) + else: + image_list.append(tag) return image_list except Exception as e: diff --git a/lib/log_streaming_task.py b/lib/log_streaming_task.py index 7d172fa..66522bb 100644 --- a/lib/log_streaming_task.py +++ b/lib/log_streaming_task.py @@ -10,7 +10,7 @@ from lib import container_logs from concurrent.futures import ThreadPoolExecutor import queue # Import the synchronous queue module -async def log_streaming_task(message_broker): +async def log_streaming_task(message_broker, monitoring): client = docker_interface.client executor = ThreadPoolExecutor(max_workers=4) tasks = {} @@ -18,6 +18,7 @@ async def log_streaming_task(message_broker): while True: try: + await monitoring.put("log_streaming_task") current_containers = await asyncio.get_event_loop().run_in_executor( executor, lambda: {container.name: container for container in client.containers.list() if container.status == 'running'}