V5.2.9 | XFS, hosting to partner platforms #1
			
				
			
		
		
		
	|  | @ -32,7 +32,7 @@ def get_last_ip_occurrence_and_text(input_string): | ||||||
|     else: |     else: | ||||||
|         return None, None |         return None, None | ||||||
| 
 | 
 | ||||||
| def configure(containers): | def configure(containers, partner_forwarding_ips): | ||||||
|     valid_containers = [] |     valid_containers = [] | ||||||
|     newly_created_networks = [] |     newly_created_networks = [] | ||||||
|     containers_required_networks = [] |     containers_required_networks = [] | ||||||
|  | @ -141,7 +141,7 @@ def configure(containers): | ||||||
|     if config.log_containers_strings: |     if config.log_containers_strings: | ||||||
|         print("FROM DOCKER CONFIGURATOR", valid_containers) |         print("FROM DOCKER CONFIGURATOR", valid_containers) | ||||||
| 
 | 
 | ||||||
|     validation_and_security = docker_interface.validate_and_secure_networks() |     validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips) | ||||||
|     if startup_sctipt_creation_fail: |     if startup_sctipt_creation_fail: | ||||||
|         validation_and_security=False |         validation_and_security=False | ||||||
|     return validation_and_security, valid_containers, use_hive_flightsheet |     return validation_and_security, valid_containers, use_hive_flightsheet | ||||||
|  | @ -4,7 +4,10 @@ from lib import log_streaming_task | ||||||
| from lib import run_startup_script | from lib import run_startup_script | ||||||
| from lib import hive_miner_interface | from lib import hive_miner_interface | ||||||
| from lib import docker_interface | from lib import docker_interface | ||||||
|  | from lib import background_job | ||||||
| from lib import docker_deploy | from lib import docker_deploy | ||||||
|  | from lib import clore_partner | ||||||
|  | from lib import clore_partner_socket | ||||||
| from lib import docker_pull | from lib import docker_pull | ||||||
| from lib import get_specs | from lib import get_specs | ||||||
| from lib import utils | from lib import utils | ||||||
|  | @ -34,17 +37,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_ | ||||||
| 
 | 
 | ||||||
| #print(config) | #print(config) | ||||||
| 
 | 
 | ||||||
| async def configure_networks(containers): | async def configure_networks(containers, partner_forwarding_ips): | ||||||
|     res = await asyncio.to_thread(docker_configurator.configure, containers) |     res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips) | ||||||
|     try: |     try: | ||||||
|         fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) |         fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) | ||||||
|         return fin_res |         return fin_res | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         return False |         return False | ||||||
|      |      | ||||||
| async def deploy_containers(validated_containers, allowed_running_containers): | async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads): | ||||||
|     try: |     try: | ||||||
|         all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers) |         all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads) | ||||||
|         return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) |         return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         return False |         return False | ||||||
|  | @ -70,8 +73,10 @@ async def set_hive_miner_status(enabled=False): | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| class CloreClient: | class CloreClient: | ||||||
|     def __init__(self, auth_key): |     def __init__(self, auth_key, xfs_state): | ||||||
|         self.auth_key=auth_key |         self.auth_key=auth_key | ||||||
|  |         self.xfs_state = xfs_state | ||||||
|  | 
 | ||||||
|         self.ws_peers = {} |         self.ws_peers = {} | ||||||
|         self.last_checked_ws_peers=0 |         self.last_checked_ws_peers=0 | ||||||
|         self.containers={} |         self.containers={} | ||||||
|  | @ -99,9 +104,11 @@ class CloreClient: | ||||||
|             "container_log_streaming_service": utils.unix_timestamp(), |             "container_log_streaming_service": utils.unix_timestamp(), | ||||||
|             "specs_service": utils.unix_timestamp(), |             "specs_service": utils.unix_timestamp(), | ||||||
|             "oc_service": utils.unix_timestamp(), |             "oc_service": utils.unix_timestamp(), | ||||||
|             "background_pow_data_collection": utils.unix_timestamp() |             "background_pow_data_collection": utils.unix_timestamp(), | ||||||
|  |             "partner_service": utils.unix_timestamp() | ||||||
|         } |         } | ||||||
|         self.max_service_inactivity = 600 # seconds |         self.max_service_inactivity = 600 # seconds | ||||||
|  |         self.no_restart_services = ["partner_service"] # Services that are allowed to run indefinetly without triggering the app to restart | ||||||
| 
 | 
 | ||||||
|         if config.debug_ws_peer: |         if config.debug_ws_peer: | ||||||
|             self.ws_peers[str(config.debug_ws_peer)]={ |             self.ws_peers[str(config.debug_ws_peer)]={ | ||||||
|  | @ -137,6 +144,10 @@ class CloreClient: | ||||||
|         self.hive_miner_interface = hive_miner_interface.hive_interface() |         self.hive_miner_interface = hive_miner_interface.hive_interface() | ||||||
|         self.next_pow_background_job_send_update = 0 |         self.next_pow_background_job_send_update = 0 | ||||||
| 
 | 
 | ||||||
|  |         self.clore_partner_initiazized = False | ||||||
|  |         self.partner_forwarding_ips = [] | ||||||
|  |         self.start_time = utils.unix_timestamp() | ||||||
|  | 
 | ||||||
|     async def service(self): |     async def service(self): | ||||||
|         global container_log_broken |         global container_log_broken | ||||||
| 
 | 
 | ||||||
|  | @ -151,10 +162,11 @@ class CloreClient: | ||||||
|         task6 = asyncio.create_task(self.specs_service(monitoring)) |         task6 = asyncio.create_task(self.specs_service(monitoring)) | ||||||
|         task7 = asyncio.create_task(self.oc_service(monitoring)) |         task7 = asyncio.create_task(self.oc_service(monitoring)) | ||||||
|         task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) |         task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) | ||||||
|  |         task9 = asyncio.create_task(self.partner_service(monitoring)) | ||||||
|         monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) |         monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) | ||||||
| 
 | 
 | ||||||
|         # Wait for both tasks to complete (they won't in this case) |         # Wait for both tasks to complete (they won't in this case) | ||||||
|         await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) |         await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task) | ||||||
| 
 | 
 | ||||||
|     async def monitoring_service(self, monitoring): |     async def monitoring_service(self, monitoring): | ||||||
|         while True: |         while True: | ||||||
|  | @ -169,13 +181,14 @@ class CloreClient: | ||||||
|                 if config.debug: |                 if config.debug: | ||||||
|                     log.success(self.last_service_heartbeat) |                     log.success(self.last_service_heartbeat) | ||||||
|                 for service_name in self.last_service_heartbeat.keys(): |                 for service_name in self.last_service_heartbeat.keys(): | ||||||
|                     last_hearthbeat = self.last_service_heartbeat[service_name] |                     if not service_name in self.no_restart_services: | ||||||
|                     if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": |                         last_hearthbeat = self.last_service_heartbeat[service_name] | ||||||
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") |                         if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": | ||||||
|                         os._exit(1) |                             log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||||
|                     elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": |                             os._exit(1) | ||||||
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") |                         elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": | ||||||
|                         os._exit(1) |                             log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||||
|  |                             os._exit(1) | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 log.debug(f"monitoring_service() | ERROR | {e}") |                 log.debug(f"monitoring_service() | ERROR | {e}") | ||||||
|             await asyncio.sleep(5) |             await asyncio.sleep(5) | ||||||
|  | @ -260,6 +273,7 @@ class CloreClient: | ||||||
| 
 | 
 | ||||||
|             if len(self.p_needed_containers)>0: |             if len(self.p_needed_containers)>0: | ||||||
|                 local_images = await get_local_images(no_latest_tag=True) |                 local_images = await get_local_images(no_latest_tag=True) | ||||||
|  |                 partner_images = await clore_partner.get_partner_allowed_images() | ||||||
|                 for local_image in local_images: |                 for local_image in local_images: | ||||||
|                     self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} |                     self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} | ||||||
|                     image_needed = False |                     image_needed = False | ||||||
|  | @ -283,11 +297,32 @@ class CloreClient: | ||||||
|                                             if local_image_tag==allowed_tag or allowed_tag=='*': |                                             if local_image_tag==allowed_tag or allowed_tag=='*': | ||||||
|                                                 image_needed=True |                                                 image_needed=True | ||||||
|                                                 break |                                                 break | ||||||
|                             if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers: |                                 if not image_needed and type(partner_images) == list: | ||||||
|  |                                     for partner_image in partner_images: | ||||||
|  |                                         if local_image.replace(':latest', '') == partner_image.replace(':latest', ''): | ||||||
|  |                                             image_needed = True | ||||||
|  |                                             del self.last_pull_progress[local_image] | ||||||
|  |                                             break | ||||||
|  |                                         if len(local_image.split('/')) >= 3: | ||||||
|  |                                             partner_image_spl = partner_image.split(':') | ||||||
|  |                                             image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1] | ||||||
|  |                                             if len(partner_image_spl) == 1: | ||||||
|  |                                                 if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest": | ||||||
|  |                                                     image_needed = True | ||||||
|  |                                                     del self.last_pull_progress[local_image] | ||||||
|  |                                                     break | ||||||
|  |                                             elif len(partner_image_spl) == 2: | ||||||
|  |                                                 if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''): | ||||||
|  |                                                     image_needed = True | ||||||
|  |                                                     del self.last_pull_progress[local_image] | ||||||
|  |                                                     break | ||||||
|  |                             if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None: | ||||||
|  |                                 log.success(f"GOING TO REMOVE {local_image}") | ||||||
|                                 with concurrent.futures.ThreadPoolExecutor() as pool: |                                 with concurrent.futures.ThreadPoolExecutor() as pool: | ||||||
|                                     r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) |                                     r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) | ||||||
|                                     if r: |                                     if r: | ||||||
|                                         removed_cnt+=1 |                                         removed_cnt+=1 | ||||||
|  |                                         del self.last_pull_progress[local_image] | ||||||
|                             #if config.debug: |                             #if config.debug: | ||||||
|                             #    log.success(f"{local_image} | {image_needed}") |                             #    log.success(f"{local_image} | {image_needed}") | ||||||
| 
 | 
 | ||||||
|  | @ -327,7 +362,7 @@ class CloreClient: | ||||||
|                         try: |                         try: | ||||||
|                             await pull_task |                             await pull_task | ||||||
|                         except Exception as e: |                         except Exception as e: | ||||||
|                             self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()} |                             self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()} | ||||||
|                         log_task.cancel() |                         log_task.cancel() | ||||||
|                         try: |                         try: | ||||||
|                             await log_task |                             await log_task | ||||||
|  | @ -358,11 +393,18 @@ class CloreClient: | ||||||
| 
 | 
 | ||||||
|                 container_conf = WebSocketClient.get_containers() |                 container_conf = WebSocketClient.get_containers() | ||||||
| 
 | 
 | ||||||
|  |                 can_run_partner_workloads = False | ||||||
|  | 
 | ||||||
|                 if container_conf[0]: |                 if container_conf[0]: | ||||||
|                     self.containers_set=True |                     self.containers_set=True | ||||||
|                     self.containers=container_conf[1] |                     self.containers=container_conf[1] | ||||||
|                     tmp_images = [] |                     tmp_images = [] | ||||||
|                     for container in self.containers: | 
 | ||||||
|  |                     is_order_spot = False | ||||||
|  | 
 | ||||||
|  |                     for idx, container in enumerate(self.containers): | ||||||
|  |                         if "spot" in container: | ||||||
|  |                             is_order_spot = True | ||||||
|                         if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": |                         if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": | ||||||
|                             log_pull = False |                             log_pull = False | ||||||
|                             if "name" in container: |                             if "name" in container: | ||||||
|  | @ -386,6 +428,9 @@ class CloreClient: | ||||||
|                             if not image_config in tmp_images: |                             if not image_config in tmp_images: | ||||||
|                                 tmp_images.append(image_config) |                                 tmp_images.append(image_config) | ||||||
| 
 | 
 | ||||||
|  |                     can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True | ||||||
|  |                     clore_partner_socket.set_can_deploy(can_run_partner_workloads) | ||||||
|  | 
 | ||||||
|                     if self.restart_docker and not running_order and len(self.containers)>0: |                     if self.restart_docker and not running_order and len(self.containers)>0: | ||||||
|                         log.debug("Sending docker restart command") |                         log.debug("Sending docker restart command") | ||||||
|                         utils.run_command_v2("systemctl restart docker") |                         utils.run_command_v2("systemctl restart docker") | ||||||
|  | @ -400,14 +445,14 @@ class CloreClient: | ||||||
|                     tasks.append(api_interface.get_server_config()) |                     tasks.append(api_interface.get_server_config()) | ||||||
| 
 | 
 | ||||||
|                 if self.containers_set: |                 if self.containers_set: | ||||||
|                     tasks.append(configure_networks(self.containers)) |                     tasks.append(configure_networks(self.containers, self.partner_forwarding_ips)) | ||||||
|                     tasks.append(WebSocketClient.stream_pull_logs()) |                     tasks.append(WebSocketClient.stream_pull_logs()) | ||||||
| 
 | 
 | ||||||
|                 if self.validated_containers_set: |                 if self.validated_containers_set: | ||||||
|                     tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers)) |                     tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads)) | ||||||
| 
 | 
 | ||||||
|                 if step==1: |                 if step==1: | ||||||
|                     WebSocketClient.set_auth(self.auth_key) |                     WebSocketClient.set_auth(self.auth_key, self.xfs_state) | ||||||
|                     asyncio.create_task(WebSocketClient.run()) |                     asyncio.create_task(WebSocketClient.run()) | ||||||
|                 elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): |                 elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): | ||||||
|                     log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") |                     log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") | ||||||
|  | @ -427,6 +472,11 @@ class CloreClient: | ||||||
|                             if result.success: |                             if result.success: | ||||||
|                                 self.last_checked_ws_peers = utils.unix_timestamp() |                                 self.last_checked_ws_peers = utils.unix_timestamp() | ||||||
|                                 self.allowed_images=result.allowed_images+self.extra_allowed_images |                                 self.allowed_images=result.allowed_images+self.extra_allowed_images | ||||||
|  |                                 if self.xfs_state == "active": | ||||||
|  |                                     self.allowed_images.append({ | ||||||
|  |                                         "repository": "vastai/test", | ||||||
|  |                                         "allowed_tags": ["bandwidth-test-nvidia"] | ||||||
|  |                                     }) | ||||||
|                                 if not config.debug_ws_peer: |                                 if not config.debug_ws_peer: | ||||||
|                                     for pure_ws_peer in result.ws_peers: |                                     for pure_ws_peer in result.ws_peers: | ||||||
|                                         self.ws_peers[pure_ws_peer]={ |                                         self.ws_peers[pure_ws_peer]={ | ||||||
|  | @ -455,7 +505,7 @@ class CloreClient: | ||||||
|     async def submit_specs(self, current_specs): |     async def submit_specs(self, current_specs): | ||||||
|         try: |         try: | ||||||
|             if type(current_specs) == dict: |             if type(current_specs) == dict: | ||||||
|                 current_specs["backend_version"]=18 |                 current_specs["backend_version"]=19 | ||||||
|                 current_specs["update_hw"]=True |                 current_specs["update_hw"]=True | ||||||
|                 smallest_pcie_width = 999 |                 smallest_pcie_width = 999 | ||||||
|                 for gpu in current_specs["gpus"]["nvidia"]: |                 for gpu in current_specs["gpus"]["nvidia"]: | ||||||
|  | @ -504,7 +554,7 @@ class CloreClient: | ||||||
|                 await monitoring.put("oc_service") |                 await monitoring.put("oc_service") | ||||||
|                 oc_apply_allowed = True |                 oc_apply_allowed = True | ||||||
|                 ### OC Service should also hande Hive stuff |                 ### OC Service should also hande Hive stuff | ||||||
|                 if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries: |                 if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled(): | ||||||
|                     await set_hive_miner_status(True) |                     await set_hive_miner_status(True) | ||||||
|                     oc_apply_allowed = False # Don't apply any OC when running HiveOS miner |                     oc_apply_allowed = False # Don't apply any OC when running HiveOS miner | ||||||
|                 elif self.is_hive and not self.dont_use_hive_binaries: |                 elif self.is_hive and not self.dont_use_hive_binaries: | ||||||
|  | @ -544,6 +594,30 @@ class CloreClient: | ||||||
|                 log.debug(f"FAIL | background_pow_data_collection() | {e}") |                 log.debug(f"FAIL | background_pow_data_collection() | {e}") | ||||||
|             await asyncio.sleep(6) |             await asyncio.sleep(6) | ||||||
| 
 | 
 | ||||||
|  |     async def partner_service(self, monitoring): | ||||||
|  |         while True: | ||||||
|  |             try: | ||||||
|  |                 await monitoring.put("partner_service") | ||||||
|  |                 if self.start_time < utils.unix_timestamp() - 180: | ||||||
|  |                     forwarding_latency_measurment = await clore_partner.measure_forwarding_latency() | ||||||
|  |                     if type(forwarding_latency_measurment) == list: | ||||||
|  |                         await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment) | ||||||
|  |                 partner_config = WebSocketClient.get_clore_partner_config() | ||||||
|  |                 if partner_config != None: | ||||||
|  |                     if self.clore_partner_initiazized == False: | ||||||
|  |                         ir = await clore_partner.initialize() | ||||||
|  |                         if ir: | ||||||
|  |                             self.clore_partner_initiazized = True | ||||||
|  |                     if self.clore_partner_initiazized == True: | ||||||
|  |                         if 'provider' in partner_config and 'forwarding' in partner_config: | ||||||
|  |                             self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']] | ||||||
|  |                         else: | ||||||
|  |                             self.partner_forwarding_ips = [] | ||||||
|  |                         await clore_partner.configure(partner_config) | ||||||
|  |             except Exception as e: | ||||||
|  |                 log.debug(f"FAIL | partner_service() | {e}") | ||||||
|  |             await asyncio.sleep(6) | ||||||
|  | 
 | ||||||
|     def expire_ws_peers(self): |     def expire_ws_peers(self): | ||||||
|         for ws_peer_address in list(self.ws_peers.keys()): |         for ws_peer_address in list(self.ws_peers.keys()): | ||||||
|             ws_peer_info = self.ws_peers[ws_peer_address] |             ws_peer_info = self.ws_peers[ws_peer_address] | ||||||
|  |  | ||||||
|  | @ -1,4 +1,5 @@ | ||||||
| from concurrent.futures import ThreadPoolExecutor | from concurrent.futures import ThreadPoolExecutor | ||||||
|  | from lib import clore_partner | ||||||
| import asyncio | import asyncio | ||||||
| import random | import random | ||||||
| import websockets | import websockets | ||||||
|  | @ -31,6 +32,7 @@ class WebSocketClient: | ||||||
|         self.connected = False |         self.connected = False | ||||||
|         self.authorized = False |         self.authorized = False | ||||||
|         self.auth = auth |         self.auth = auth | ||||||
|  |         self.xfs_state = None | ||||||
|         self.log_auth_fail = True |         self.log_auth_fail = True | ||||||
|         self.last_heartbeat = clore_utils.unix_timestamp() |         self.last_heartbeat = clore_utils.unix_timestamp() | ||||||
|         self.containers={} |         self.containers={} | ||||||
|  | @ -50,16 +52,31 @@ class WebSocketClient: | ||||||
|         self.oc_enabled = False |         self.oc_enabled = False | ||||||
|         self.last_gpu_oc_specs = [] |         self.last_gpu_oc_specs = [] | ||||||
|         self.last_set_oc = {} |         self.last_set_oc = {} | ||||||
|  | 
 | ||||||
|  |         self.clore_partner_config = None | ||||||
|  |         self.forwarding_latency_measurment = None | ||||||
|      |      | ||||||
|     def get_last_heartbeat(self): |     def get_last_heartbeat(self): | ||||||
|         return self.last_heartbeat |         return self.last_heartbeat | ||||||
| 
 | 
 | ||||||
|     def get_containers(self): |     def get_containers(self): | ||||||
|         return self.containers_set, self.containers |         partner_container_config = clore_partner.get_partner_container_config() | ||||||
|  |         return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers) | ||||||
|      |      | ||||||
|     def get_oc(self): |     def get_oc(self): | ||||||
|         return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc |         return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc | ||||||
| 
 | 
 | ||||||
|  |     def get_clore_partner_config(self): | ||||||
|  |         return self.clore_partner_config | ||||||
|  |      | ||||||
|  |     async def set_forwarding_latency_measurment(self, forwarding_latency_measurment): | ||||||
|  |         await self.send(json.dumps( | ||||||
|  |             { | ||||||
|  |                 "forwarding_latency_measurment": forwarding_latency_measurment | ||||||
|  |             } | ||||||
|  |         )) | ||||||
|  |         self.forwarding_latency_measurment = forwarding_latency_measurment | ||||||
|  | 
 | ||||||
|     def set_ws_peers(self, ws_peers): |     def set_ws_peers(self, ws_peers): | ||||||
|         tmp_ws_peers=[] |         tmp_ws_peers=[] | ||||||
|         for ws_peer in list(ws_peers.keys()): |         for ws_peer in list(ws_peers.keys()): | ||||||
|  | @ -68,8 +85,9 @@ class WebSocketClient: | ||||||
| 
 | 
 | ||||||
|         self.ws_peers = tmp_ws_peers |         self.ws_peers = tmp_ws_peers | ||||||
|      |      | ||||||
|     def set_auth(self, auth): |     def set_auth(self, auth, xfs_state): | ||||||
|         self.auth=auth |         self.auth=auth | ||||||
|  |         self.xfs_state=xfs_state | ||||||
| 
 | 
 | ||||||
|     def set_pull_logs(self, pull_logs): |     def set_pull_logs(self, pull_logs): | ||||||
|         self.pull_logs=pull_logs |         self.pull_logs=pull_logs | ||||||
|  | @ -93,7 +111,9 @@ class WebSocketClient: | ||||||
|                 log.debug(f"CLOREWS | Connected to {random_ws_peer} ✅") |                 log.debug(f"CLOREWS | Connected to {random_ws_peer} ✅") | ||||||
|                 await self.send(json.dumps({ |                 await self.send(json.dumps({ | ||||||
|                     "login":str(self.auth), |                     "login":str(self.auth), | ||||||
|                     "type":"python" |                     "xfs_state": self.xfs_state, | ||||||
|  |                     "type":"python", | ||||||
|  |                     "clore_partner_support": True | ||||||
|                 })) |                 })) | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") |                 log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") | ||||||
|  | @ -136,6 +156,16 @@ class WebSocketClient: | ||||||
|                         pass |                         pass | ||||||
|                 elif message=="KEEPALIVE": |                 elif message=="KEEPALIVE": | ||||||
|                     self.last_heartbeat = clore_utils.unix_timestamp() |                     self.last_heartbeat = clore_utils.unix_timestamp() | ||||||
|  |                     try: | ||||||
|  |                         if self.forwarding_latency_measurment: | ||||||
|  |                             await self.send(json.dumps( | ||||||
|  |                                 { | ||||||
|  |                                     "forwarding_latency_measurment": self.forwarding_latency_measurment | ||||||
|  |                                 } | ||||||
|  |                             )) | ||||||
|  |                             self.forwarding_latency_measurment = None | ||||||
|  |                     except Exception as e: | ||||||
|  |                         pass | ||||||
|                 elif message=="NEWER_LOGIN" or message=="WAIT": |                 elif message=="NEWER_LOGIN" or message=="WAIT": | ||||||
|                     await self.close_websocket() |                     await self.close_websocket() | ||||||
|                 elif message[:10]=="PROVEPULL;": |                 elif message[:10]=="PROVEPULL;": | ||||||
|  | @ -148,13 +178,16 @@ class WebSocketClient: | ||||||
|                 else: |                 else: | ||||||
|                     try: |                     try: | ||||||
|                         parsed_json = json.loads(message) |                         parsed_json = json.loads(message) | ||||||
|                         if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: |                         if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict: | ||||||
|  |                             self.clore_partner_config = parsed_json["partner_config"] | ||||||
|  |                             await self.send(json.dumps({"partner_config":parsed_json["partner_config"]})) | ||||||
|  |                         elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: | ||||||
|                             self.last_heartbeat = clore_utils.unix_timestamp() |                             self.last_heartbeat = clore_utils.unix_timestamp() | ||||||
|                             container_str = json.dumps({"containers":parsed_json["new_containers"]}) |                             container_str = json.dumps({"containers":parsed_json["new_containers"]}) | ||||||
|                             await self.send(container_str) |                             await self.send(container_str) | ||||||
|                             if len(parsed_json["new_containers"]) > 0: # There should be at least one container |                             if len(parsed_json["new_containers"]) > 0: # There should be at least one container | ||||||
|                                 self.containers_set = True |                                 self.containers_set = True | ||||||
|                                 self.containers=parsed_json["new_containers"] |                                 self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"]) | ||||||
|                             #log.success(container_str) |                             #log.success(container_str) | ||||||
|                         elif "allow_oc" in parsed_json: # Enable OC |                         elif "allow_oc" in parsed_json: # Enable OC | ||||||
|                             self.oc_enabled=True |                             self.oc_enabled=True | ||||||
|  |  | ||||||
							
								
								
									
										11
									
								
								hosting.py
								
								
								
								
							
							
						
						
									
										11
									
								
								hosting.py
								
								
								
								
							|  | @ -1,5 +1,6 @@ | ||||||
| from lib import config as config_module | from lib import config as config_module | ||||||
| from lib import init_server | from lib import init_server | ||||||
|  | from lib import xfs | ||||||
| from lib import utils | from lib import utils | ||||||
| from clore_hosting import main as clore_hosting | from clore_hosting import main as clore_hosting | ||||||
| import asyncio, os | import asyncio, os | ||||||
|  | @ -29,7 +30,15 @@ elif config.reset: | ||||||
|             log.success("Client login reseted") |             log.success("Client login reseted") | ||||||
| elif config.service: | elif config.service: | ||||||
|     if len(auth)==32+48+1: |     if len(auth)==32+48+1: | ||||||
|         clore_client = clore_hosting.CloreClient(auth_key=auth) |         utils.run_command("sysctl -w net.ipv4.ip_forward=1") | ||||||
|  | 
 | ||||||
|  |         xfs_state = xfs.init() | ||||||
|  | 
 | ||||||
|  |         if os.path.isfile(config.restart_docker_flag_file): | ||||||
|  |             utils.run_command("systemctl restart docker") | ||||||
|  |             os.remove(config.restart_docker_flag_file) | ||||||
|  | 
 | ||||||
|  |         clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state) | ||||||
|         asyncio.run(clore_client.service()) |         asyncio.run(clore_client.service()) | ||||||
|     else: |     else: | ||||||
|         print("TODO: Firstly config auth") |         print("TODO: Firstly config auth") | ||||||
|  |  | ||||||
|  | @ -0,0 +1,22 @@ | ||||||
|  | import time | ||||||
|  | import re | ||||||
|  | 
 | ||||||
|  | disabled_till = 0 | ||||||
|  | 
 | ||||||
|  | def is_background_job_container_name(string): | ||||||
|  |     if type(string) != str: | ||||||
|  |         return False | ||||||
|  |     pattern = r"^clore-default-\d+$" | ||||||
|  |     return bool(re.match(pattern, string)) | ||||||
|  | 
 | ||||||
|  | def temporarly_disable(seconds): | ||||||
|  |     global disabled_till | ||||||
|  |     disabled_till = time.time() + seconds | ||||||
|  | 
 | ||||||
|  | def enable(): | ||||||
|  |     global disabled_till | ||||||
|  |     disabled_till=0 | ||||||
|  | 
 | ||||||
|  | def is_enabled(): | ||||||
|  |     global disabled_till | ||||||
|  |     return True if disabled_till < time.time() else False | ||||||
|  | @ -0,0 +1,238 @@ | ||||||
|  | from lib import ensure_packages_installed | ||||||
|  | from lib import config as config_module | ||||||
|  | from lib import logging as logging_lib | ||||||
|  | from lib import clore_partner_socket | ||||||
|  | from lib import latency_test | ||||||
|  | from lib import openvpn | ||||||
|  | from lib import utils | ||||||
|  | import asyncio | ||||||
|  | import random | ||||||
|  | import json | ||||||
|  | import time | ||||||
|  | import re | ||||||
|  | 
 | ||||||
|  | import os | ||||||
|  | import aiofiles.os | ||||||
|  | from aiohttp import ClientSession, ClientTimeout | ||||||
|  | 
 | ||||||
|  | config = config_module.config | ||||||
|  | log = logging_lib.log | ||||||
|  | 
 | ||||||
|  | MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2'] | ||||||
|  | 
 | ||||||
|  | DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload" | ||||||
|  | 
 | ||||||
|  | host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts") | ||||||
|  | partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache") | ||||||
|  | 
 | ||||||
|  | next_ensupe_packages_check = 0 | ||||||
|  | is_socket_running = False | ||||||
|  | 
 | ||||||
|  | partner_container_config = None | ||||||
|  | 
 | ||||||
|  | async def initialize(): | ||||||
|  |     global next_ensupe_packages_check | ||||||
|  |     global is_socket_running | ||||||
|  |     try: | ||||||
|  |         await aiofiles.os.makedirs(host_facts_location, exist_ok=True) | ||||||
|  |         await aiofiles.os.makedirs(partner_cache_location, exist_ok=True) | ||||||
|  |         await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True) | ||||||
|  |         if not is_socket_running: | ||||||
|  |             is_socket_running=True | ||||||
|  |             asyncio.create_task(clore_partner_socket.socket_service( | ||||||
|  |                 location=os.path.join(host_facts_location, "partner_interface.socket") | ||||||
|  |             )) | ||||||
|  |         if next_ensupe_packages_check < time.time(): | ||||||
|  |             success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) | ||||||
|  |             next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr | ||||||
|  |             if not success: | ||||||
|  |                 return False | ||||||
|  |         elif next_ensupe_packages_check != float('inf'): | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         code, stdout, stderr = await utils.async_run_command( | ||||||
|  |             "dmidecode -t 2 2>&1", | ||||||
|  |             20 | ||||||
|  |         ) | ||||||
|  |         if code == 0 and not stderr: | ||||||
|  |             async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file: | ||||||
|  |                 await file.write(stdout) | ||||||
|  |         else: | ||||||
|  |             return False | ||||||
|  |         code, stdout, stderr = await utils.async_run_command( | ||||||
|  |             "dmidecode 2>&1", | ||||||
|  |             20 | ||||||
|  |         ) | ||||||
|  |         if code == 0 and not stderr: | ||||||
|  |             async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file: | ||||||
|  |                 await file.write(stdout) | ||||||
|  |         else: | ||||||
|  |             return False | ||||||
|  |         return True | ||||||
|  |     except Exception as e: | ||||||
|  |         log.error(f"FAIL | clore_partner.initialize | {e}") | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | async def get_partner_allowed_images(): | ||||||
|  |     try: | ||||||
|  |         file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json")) | ||||||
|  |         if not file_exists: | ||||||
|  |             return [] | ||||||
|  |         images = [] | ||||||
|  |         async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file: | ||||||
|  |             content = await file.read() | ||||||
|  |             containers = json.loads(content) | ||||||
|  |             for container in containers: | ||||||
|  |                 image = container.get("Config", {}).get("Image", None) | ||||||
|  |                 if image and not image in images: | ||||||
|  |                     images.append(image) | ||||||
|  |         return images | ||||||
|  |     except Exception as e: | ||||||
|  |         return None | ||||||
|  | 
 | ||||||
|  | def validate_partner_container_name(name): | ||||||
|  |     if type(name) != str: | ||||||
|  |         return False | ||||||
|  |     elif name==config.clore_partner_container_name: | ||||||
|  |         return True | ||||||
|  |     pattern = r"^C\.\d+$" | ||||||
|  |     return bool(re.match(pattern, name)) | ||||||
|  | 
 | ||||||
|  | def validate_partner_workload_container_name(name): | ||||||
|  |     if type(name) != str: | ||||||
|  |         return False  | ||||||
|  |     pattern = r"^C\.\d+$" | ||||||
|  |     return bool(re.match(pattern, name)) | ||||||
|  | 
 | ||||||
|  | last_openvpn_config = None | ||||||
|  | 
 | ||||||
|  | def get_partner_container_config(): | ||||||
|  |     global partner_container_config | ||||||
|  |     return partner_container_config | ||||||
|  | 
 | ||||||
|  | async def configure(partner_config): | ||||||
|  |     global last_openvpn_config | ||||||
|  |     global partner_container_config | ||||||
|  |     if last_openvpn_config != partner_config: | ||||||
|  |         partner_container_config = { | ||||||
|  |             "image": partner_config["partner_image"], | ||||||
|  |             "name": config.clore_partner_container_name, | ||||||
|  |             "hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}", | ||||||
|  |             "env": { | ||||||
|  |                 "AUTH": partner_config['partner_id'], | ||||||
|  |                 "ip_addr": partner_config['openvpn_host'], | ||||||
|  |                 "port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}' | ||||||
|  |             }, | ||||||
|  |             "volumes": { | ||||||
|  |                 f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"}, | ||||||
|  |                 f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"}, | ||||||
|  |                 f"/var/lib/docker": {"bind": "/var/lib/docker"}, | ||||||
|  |                 f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"} | ||||||
|  |             }, | ||||||
|  |             "gpus": True, | ||||||
|  |             "command": '', | ||||||
|  |             "network": "clore-partner-br0", | ||||||
|  |             "ip": "172.19.0.254", | ||||||
|  |             "cap_add": ["SYS_ADMIN"], | ||||||
|  |             "devices": ["/dev/fuse"], | ||||||
|  |             #"security_opt": ["apparmor:unconfined"], | ||||||
|  |             "ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"], | ||||||
|  |         } | ||||||
|  |         r = await openvpn.clore_partner_configure(partner_config) | ||||||
|  |         if r: | ||||||
|  |             last_openvpn_config = partner_config | ||||||
|  | 
 | ||||||
|  | # ----------------------------------------- | ||||||
|  | 
 | ||||||
|  | next_latency_measurment = 0 | ||||||
|  | 
 | ||||||
|  | async def fetch_forwarding_nodes(): | ||||||
|  |     url = "https://api.clore.ai/v1/get_relay" | ||||||
|  |     timeout = ClientTimeout(total=30) | ||||||
|  | 
 | ||||||
|  |     async with ClientSession(timeout=timeout) as session: | ||||||
|  |         try: | ||||||
|  |             async with session.get(url) as response: | ||||||
|  |                 response.raise_for_status() | ||||||
|  |                 data = await response.json() | ||||||
|  |                 return data | ||||||
|  |         except Exception as e: | ||||||
|  |             print(f"An error occurred: {e}") | ||||||
|  |             return None | ||||||
|  | 
 | ||||||
|  | async def set_next_latency_measurment(ts): | ||||||
|  |     global next_latency_measurment | ||||||
|  |     try: | ||||||
|  |         next_latency_measurment=ts | ||||||
|  |         async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file: | ||||||
|  |             await file.write(str(ts)) | ||||||
|  |     except Exception as e: | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  | async def measure_forwarding_latency(): | ||||||
|  |     global next_latency_measurment | ||||||
|  | 
 | ||||||
|  |     if next_latency_measurment > time.time(): | ||||||
|  |         return False | ||||||
|  |     try: | ||||||
|  |         await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True) | ||||||
|  |         file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment")) | ||||||
|  |         if file_exists: | ||||||
|  |             async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file: | ||||||
|  |                 content = await file.read() | ||||||
|  |                 if content.isdigit(): | ||||||
|  |                     next_latency_measurment = int(content) | ||||||
|  |         if next_latency_measurment < time.time(): | ||||||
|  |             node_info = await fetch_forwarding_nodes() | ||||||
|  |             if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0: | ||||||
|  |                 to_test_nodes = [] | ||||||
|  |                 ip_to_region = {} | ||||||
|  | 
 | ||||||
|  |                 valid_regions = [] | ||||||
|  | 
 | ||||||
|  |                 for node_region in node_info.get("nodes").keys(): | ||||||
|  |                     nodes_ip_list = node_info.get("nodes")[node_region] | ||||||
|  |                     if type(nodes_ip_list) == list and len(nodes_ip_list) > 0: | ||||||
|  |                         to_test_nodes = to_test_nodes + nodes_ip_list | ||||||
|  |                         for node_ip in nodes_ip_list: | ||||||
|  |                             ip_to_region[node_ip]=node_region | ||||||
|  | 
 | ||||||
|  |                 if len(to_test_nodes) > 0: | ||||||
|  |                     measurment_result = await latency_test.measure_latency_icmp(to_test_nodes) | ||||||
|  |                     if measurment_result: | ||||||
|  |                         for idx, res in enumerate(measurment_result): | ||||||
|  |                             if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions: | ||||||
|  |                                 valid_regions.append(ip_to_region.get(res["host"])) | ||||||
|  |                             measurment_result[idx]["region"] = ip_to_region.get(res["host"]) | ||||||
|  |                     if len(valid_regions) == len(ip_to_region.keys()): | ||||||
|  |                         await set_next_latency_measurment(int( | ||||||
|  |                             time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded | ||||||
|  |                         )) | ||||||
|  |                         return measurment_result | ||||||
|  |                     else: | ||||||
|  |                         await set_next_latency_measurment(int( | ||||||
|  |                             time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable | ||||||
|  |                         )) | ||||||
|  |                 else: | ||||||
|  |                     await set_next_latency_measurment(int( | ||||||
|  |                         time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet) | ||||||
|  |                     )) | ||||||
|  |                 return False | ||||||
|  |             else: | ||||||
|  |                 await set_next_latency_measurment(int( | ||||||
|  |                     time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format | ||||||
|  |                 )) | ||||||
|  |                 return False | ||||||
|  |         return False | ||||||
|  |     except Exception as e: | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | def filter_partner_dummy_workload_container(containers): | ||||||
|  |     try: | ||||||
|  |         remaining_containers = [] | ||||||
|  |         for container in containers: | ||||||
|  |             if container["image"] != DUMMY_WORKLOAD_CONTAINER: | ||||||
|  |                 remaining_containers.append(container) | ||||||
|  |         return remaining_containers | ||||||
|  |     except Exception as e: | ||||||
|  |         return containers | ||||||
|  | @ -0,0 +1,94 @@ | ||||||
|  | from lib import config as config_module | ||||||
|  | from lib import logging as logging_lib | ||||||
|  | from lib import background_job | ||||||
|  | from lib import utils | ||||||
|  | import asyncio | ||||||
|  | import json | ||||||
|  | 
 | ||||||
|  | import os | ||||||
|  | import aiofiles.os | ||||||
|  | 
 | ||||||
|  | config = config_module.config | ||||||
|  | log = logging_lib.log | ||||||
|  | 
 | ||||||
|  | dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}'] | ||||||
|  | allowed_commands = ["df"] | ||||||
|  | 
 | ||||||
|  | can_deploy = True | ||||||
|  | 
 | ||||||
|  | async def remove_socket_file(path): | ||||||
|  |     try: | ||||||
|  |         file_exists = await aiofiles.os.path.exists(path) | ||||||
|  |         if file_exists: | ||||||
|  |             await aiofiles.os.remove(path) | ||||||
|  |     except Exception: | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  | async def handle_client(reader, writer): | ||||||
|  |     global can_deploy | ||||||
|  |     try: | ||||||
|  |         while True: | ||||||
|  |             data = await reader.read(1024*64) | ||||||
|  |             try: | ||||||
|  |                 if not data: | ||||||
|  |                     break | ||||||
|  |                 #print("DATA", data, data.decode()) | ||||||
|  |                 parsed_data = json.loads(data.decode()) | ||||||
|  |                 if "run_command" in parsed_data and type(parsed_data["run_command"])==str: | ||||||
|  |                     allowed = False | ||||||
|  |                     for allowed_command in allowed_commands: | ||||||
|  |                         if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]: | ||||||
|  |                             allowed = True | ||||||
|  |                             break | ||||||
|  |                     if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars): | ||||||
|  |                         allowed = False | ||||||
|  |                     log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}") | ||||||
|  |                     if allowed: | ||||||
|  |                         code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                             parsed_data["run_command"] | ||||||
|  |                         ) | ||||||
|  |                         writer.write(json.dumps({ | ||||||
|  |                             "code": code, | ||||||
|  |                             "stderr": stderr, | ||||||
|  |                             "stdout": stdout | ||||||
|  |                         }).encode()) | ||||||
|  |                     else: | ||||||
|  |                         writer.write(json.dumps({ | ||||||
|  |                             "code": -1, | ||||||
|  |                             "stderr": 'Command not allowed', | ||||||
|  |                             "stdout": 'Command not allowed' | ||||||
|  |                         }).encode()) | ||||||
|  |                 elif "can_deploy" in parsed_data: | ||||||
|  |                     writer.write(json.dumps({ | ||||||
|  |                         "can_deploy": can_deploy | ||||||
|  |                     }).encode()) | ||||||
|  |                 elif "stop_background_job" in parsed_data and "time" in parsed_data: | ||||||
|  |                     try: | ||||||
|  |                         if isinstance(parsed_data["time"], int): | ||||||
|  |                             background_job.temporarly_disable(parsed_data["time"]) | ||||||
|  |                     except Exception as e: | ||||||
|  |                         pass | ||||||
|  |                 else: | ||||||
|  |                     writer.write('?'.encode()) | ||||||
|  |                 await writer.drain() | ||||||
|  |             except Exception as data_exception: | ||||||
|  |                 pass | ||||||
|  |                 break | ||||||
|  |     except asyncio.CancelledError: | ||||||
|  |         log.debug(f"clore partner socket | Client handler canceled.") | ||||||
|  |     finally: | ||||||
|  |         log.debug(f"clore partner socket | Closing client connection.") | ||||||
|  |         writer.close() | ||||||
|  |         await writer.wait_closed() | ||||||
|  | 
 | ||||||
|  | def set_can_deploy(state): | ||||||
|  |     global can_deploy | ||||||
|  |     can_deploy = state | ||||||
|  | 
 | ||||||
|  | async def socket_service(location): | ||||||
|  |     await remove_socket_file(location) | ||||||
|  |     server = await asyncio.start_unix_server(handle_client, path=location) | ||||||
|  | 
 | ||||||
|  |     log.debug(f"clore partner socket | running at {location}") | ||||||
|  |     async with server: | ||||||
|  |         await server.serve_forever() | ||||||
|  | @ -9,6 +9,11 @@ hard_config = { | ||||||
|             "name": "clore-br0", |             "name": "clore-br0", | ||||||
|             "subnet": "172.18.0.0/16", |             "subnet": "172.18.0.0/16", | ||||||
|             "gateway": "172.18.0.1" |             "gateway": "172.18.0.1" | ||||||
|  |         }, | ||||||
|  |         { | ||||||
|  |             "name": "clore-partner-br0", | ||||||
|  |             "subnet": "172.19.0.0/20", | ||||||
|  |             "gateway": "172.19.0.1" | ||||||
|         } |         } | ||||||
|     ], |     ], | ||||||
|     "run_iptables_with_sudo":True, |     "run_iptables_with_sudo":True, | ||||||
|  | @ -33,7 +38,11 @@ hard_config = { | ||||||
|     "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app |     "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app | ||||||
|     "maximum_pull_service_loop_time": 14400, # Exception for image pulling |     "maximum_pull_service_loop_time": 14400, # Exception for image pulling | ||||||
|     "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk |     "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk | ||||||
|     "allow_mixed_gpus": True |     "allow_mixed_gpus": True, | ||||||
|  |     "openvpn_forwarding_tun_device": "tun1313", | ||||||
|  |     "forwarding_ip_route_table_id": 100, | ||||||
|  |     "clore_partner_container_name": "clore-partner-service", | ||||||
|  |     "restart_docker_flag_file": "/opt/clore-hosting/.restart_docker" | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| parser = argparse.ArgumentParser(description='Example argparse usage') | parser = argparse.ArgumentParser(description='Example argparse usage') | ||||||
|  | @ -50,6 +59,7 @@ parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hostin | ||||||
| parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") | parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") | ||||||
| parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") | parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") | ||||||
| parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software") | parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software") | ||||||
|  | parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner") | ||||||
| 
 | 
 | ||||||
| # Parse arguments, ignoring any non-defined arguments | # Parse arguments, ignoring any non-defined arguments | ||||||
| args, _ = parser.parse_known_args() | args, _ = parser.parse_known_args() | ||||||
|  |  | ||||||
|  | @ -27,6 +27,14 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, | ||||||
|     if "cap_add" in container_options: |     if "cap_add" in container_options: | ||||||
|         for cap in container_options["cap_add"]: |         for cap in container_options["cap_add"]: | ||||||
|             command.extend(["--cap-add", cap]) |             command.extend(["--cap-add", cap]) | ||||||
|  |      | ||||||
|  |     if "devices" in container_options: | ||||||
|  |         for device in container_options["devices"]: | ||||||
|  |             command.extend(["--device", device]) | ||||||
|  | 
 | ||||||
|  |     if "security_opt" in container_options: | ||||||
|  |         for security_opt in container_options["security_opt"]: | ||||||
|  |             command.extend(["--security-opt", security_opt]) | ||||||
| 
 | 
 | ||||||
|     if "volumes" in container_options: |     if "volumes" in container_options: | ||||||
|         for volume_host, volume_container in container_options["volumes"].items(): |         for volume_host, volume_container in container_options["volumes"].items(): | ||||||
|  | @ -75,6 +83,8 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, | ||||||
|     if ip: |     if ip: | ||||||
|         command.extend(["--ip", ip]) |         command.extend(["--ip", ip]) | ||||||
| 
 | 
 | ||||||
|  |     command.append('--stop-timeout') | ||||||
|  |     command.append('0') | ||||||
|     command.append(container_options["image"]) |     command.append(container_options["image"]) | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|  |  | ||||||
|  | @ -1,7 +1,9 @@ | ||||||
| from lib import config as config_module | from lib import config as config_module | ||||||
| from lib import logging as logging_lib | from lib import logging as logging_lib | ||||||
| from lib import docker_cli_wrapper | from lib import docker_cli_wrapper | ||||||
|  | from lib import background_job | ||||||
| from lib import docker_interface | from lib import docker_interface | ||||||
|  | from lib import clore_partner | ||||||
| from lib import get_specs | from lib import get_specs | ||||||
| from lib import utils | from lib import utils | ||||||
| import docker | import docker | ||||||
|  | @ -14,7 +16,7 @@ client = docker_interface.client | ||||||
| config = config_module.config | config = config_module.config | ||||||
| log = logging_lib.log | log = logging_lib.log | ||||||
| 
 | 
 | ||||||
| def deploy(validated_containers, allowed_running_containers=[]): | def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False): | ||||||
|     local_images = docker_interface.get_local_images() |     local_images = docker_interface.get_local_images() | ||||||
|     all_containers = docker_interface.get_containers(all=True) |     all_containers = docker_interface.get_containers(all=True) | ||||||
| 
 | 
 | ||||||
|  | @ -67,7 +69,9 @@ def deploy(validated_containers, allowed_running_containers=[]): | ||||||
|                 'tty': True, |                 'tty': True, | ||||||
|                 'network_mode': 'clore-br0', |                 'network_mode': 'clore-br0', | ||||||
|                 'cap_add': [], |                 'cap_add': [], | ||||||
|                 'volumes': {}, |                 'devices': [], | ||||||
|  |                 'security_opt': [], | ||||||
|  |                 'volumes': validated_container["volumes"] if "volumes" in validated_container else {}, | ||||||
|                 'ports': {}, |                 'ports': {}, | ||||||
|                 'device_requests': [], |                 'device_requests': [], | ||||||
|                 'environment': validated_container["env"] if "env" in validated_container else {}, |                 'environment': validated_container["env"] if "env" in validated_container else {}, | ||||||
|  | @ -80,6 +84,15 @@ def deploy(validated_containers, allowed_running_containers=[]): | ||||||
|                 ) |                 ) | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|  |             if "security_opt" in validated_container: | ||||||
|  |                 container_options["security_opt"] = validated_container["security_opt"] | ||||||
|  |              | ||||||
|  |             if "devices" in validated_container: | ||||||
|  |                 container_options["devices"] = validated_container["devices"] | ||||||
|  | 
 | ||||||
|  |             if "cap_add" in validated_container: | ||||||
|  |                 container_options["cap_add"] = validated_container["cap_add"] | ||||||
|  | 
 | ||||||
|             if "hostname" in validated_container: |             if "hostname" in validated_container: | ||||||
|                 container_options["hostname"]=validated_container["hostname"] |                 container_options["hostname"]=validated_container["hostname"] | ||||||
|             elif "clore-order-" in validated_container["name"]: |             elif "clore-order-" in validated_container["name"]: | ||||||
|  | @ -136,7 +149,7 @@ def deploy(validated_containers, allowed_running_containers=[]): | ||||||
| 
 | 
 | ||||||
|             container_options["shm_size"] = f"{SHM_SIZE}m" |             container_options["shm_size"] = f"{SHM_SIZE}m" | ||||||
| 
 | 
 | ||||||
|             if not validated_container["name"] in created_container_names and image_ready: |             if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])): | ||||||
|                 if config.creation_engine == "wrapper": |                 if config.creation_engine == "wrapper": | ||||||
|                     docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) |                     docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) | ||||||
|                 else: |                 else: | ||||||
|  | @ -159,7 +172,10 @@ def deploy(validated_containers, allowed_running_containers=[]): | ||||||
|                 all_running_container_names.append(container.name) |                 all_running_container_names.append(container.name) | ||||||
|             else: |             else: | ||||||
|                 all_stopped_container_names.append(container.name) |                 all_stopped_container_names.append(container.name) | ||||||
|             if container.name in needed_running_names and container.status != 'running': |             if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled(): | ||||||
|  |                 if container.status == "running": | ||||||
|  |                     container.stop() | ||||||
|  |             elif container.name in needed_running_names and container.status != 'running': | ||||||
|                 try: |                 try: | ||||||
|                     attached_networks = container.attrs['NetworkSettings']['Networks'] |                     attached_networks = container.attrs['NetworkSettings']['Networks'] | ||||||
|                     if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container |                     if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container | ||||||
|  | @ -174,17 +190,22 @@ def deploy(validated_containers, allowed_running_containers=[]): | ||||||
|                     container.stop() |                     container.stop() | ||||||
|                 except Exception as e: |                 except Exception as e: | ||||||
|                     pass |                     pass | ||||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running': |             elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): | ||||||
|                 try: |                 try: | ||||||
|                     container.stop() |                     container.stop() | ||||||
|                     container.remove() |                     container.remove() | ||||||
|                 except Exception as e: |                 except Exception as e: | ||||||
|                     pass |                     pass | ||||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers: |             elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): | ||||||
|                 try: |                 try: | ||||||
|                     container.remove() |                     container.remove() | ||||||
|                 except Exception as e: |                 except Exception as e: | ||||||
|                     pass |                     pass | ||||||
|  |             elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name): | ||||||
|  |                 try: | ||||||
|  |                     container.stop() | ||||||
|  |                 except Exception as e: | ||||||
|  |                     pass | ||||||
| 
 | 
 | ||||||
|     return all_running_container_names, all_stopped_container_names |     return all_running_container_names, all_stopped_container_names | ||||||
|     #print(validated_containers) |     #print(validated_containers) | ||||||
|  |  | ||||||
|  | @ -11,6 +11,13 @@ from typing import List, Optional | ||||||
| import docker | import docker | ||||||
| import json | import json | ||||||
| import os | import os | ||||||
|  | import re | ||||||
|  | 
 | ||||||
|  | partner_bridge_subnet = '' | ||||||
|  | 
 | ||||||
|  | for clore_network in config.clore_default_networks: | ||||||
|  |     if clore_network["name"] == "clore-partner-br0": | ||||||
|  |         partner_bridge_subnet = clore_network["subnet"] | ||||||
| 
 | 
 | ||||||
| try: | try: | ||||||
|     os.makedirs(config.startup_scripts_folder, exist_ok=True) |     os.makedirs(config.startup_scripts_folder, exist_ok=True) | ||||||
|  | @ -59,6 +66,18 @@ def get_info(): | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         return {} |         return {} | ||||||
| 
 | 
 | ||||||
|  | def stop_all_containers(): | ||||||
|  |     try: | ||||||
|  |         # List all containers | ||||||
|  |         containers = client.containers.list(all=True)  # Use all=True to include stopped containers | ||||||
|  |         for container in containers: | ||||||
|  |             log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})") | ||||||
|  |             container.stop()  # Stop the container | ||||||
|  |         log.success("stop_all_containers() | All containers have been stopped.") | ||||||
|  |     except Exception as e: | ||||||
|  |         log.error(f"stop_all_containers() |An error occurred: {e}") | ||||||
|  |     return True | ||||||
|  | 
 | ||||||
| def check_docker_connection(): | def check_docker_connection(): | ||||||
|     try: |     try: | ||||||
|         client.ping() |         client.ping() | ||||||
|  | @ -95,7 +114,7 @@ def get_local_images(no_latest_tag=False): | ||||||
| 
 | 
 | ||||||
|         return image_list |         return image_list | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         log.error(f"DOCKER | Can't get local images | {e}") |         log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}") | ||||||
|         os._exit(1) |         os._exit(1) | ||||||
|      |      | ||||||
| def get_containers(all=False): | def get_containers(all=False): | ||||||
|  | @ -184,7 +203,8 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): | ||||||
|                     gateway=gateway |                     gateway=gateway | ||||||
|                 )] |                 )] | ||||||
|             ), |             ), | ||||||
|             check_duplicate=True |             check_duplicate=True, | ||||||
|  |             #options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {} | ||||||
|         ) |         ) | ||||||
|         log.debug(f"Network {network_name} created successfully.") |         log.debug(f"Network {network_name} created successfully.") | ||||||
|         return True |         return True | ||||||
|  | @ -192,7 +212,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): | ||||||
|         log.error(f"DOCKER | Failed to create network {network_name}: {e}") |         log.error(f"DOCKER | Failed to create network {network_name}: {e}") | ||||||
|         return False |         return False | ||||||
| 
 | 
 | ||||||
| def validate_and_secure_networks(): | def validate_and_secure_networks(partner_forwarding_ips): | ||||||
|     try: |     try: | ||||||
|         failed_appending_iptables_rule = False |         failed_appending_iptables_rule = False | ||||||
| 
 | 
 | ||||||
|  | @ -238,6 +258,13 @@ def validate_and_secure_networks(): | ||||||
|                                 #print(this_ipv4_range) |                                 #print(this_ipv4_range) | ||||||
| 
 | 
 | ||||||
|                                 outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) |                                 outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) | ||||||
|  |                                 if this_ipv4_range == partner_bridge_subnet: | ||||||
|  |                                     for partner_forwarding_ip in partner_forwarding_ips: | ||||||
|  |                                         outside_ranges = [] | ||||||
|  |                                         for ip_range in outside_ranges_ip_network: | ||||||
|  |                                             outside_ranges.append(str(ip_range)) | ||||||
|  |                                         outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges) | ||||||
|  | 
 | ||||||
|                                 outside_ranges = [] |                                 outside_ranges = [] | ||||||
|                                 for outside_range_ip_network in outside_ranges_ip_network: |                                 for outside_range_ip_network in outside_ranges_ip_network: | ||||||
|                                     outside_ranges.append(str(outside_range_ip_network)) |                                     outside_ranges.append(str(outside_range_ip_network)) | ||||||
|  | @ -265,7 +292,7 @@ def validate_and_secure_networks(): | ||||||
|                                                 succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) |                                                 succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) | ||||||
|                                                 if not succesfully_appended: |                                                 if not succesfully_appended: | ||||||
|                                                     failed_appending_iptables_rule = True |                                                     failed_appending_iptables_rule = True | ||||||
|                                     else: |                                     elif this_ipv4_range != partner_bridge_subnet: | ||||||
|                                         needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name) |                                         needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name) | ||||||
|                                         for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule |                                         for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule | ||||||
|                                         for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) |                                         for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) | ||||||
|  | @ -412,4 +439,8 @@ def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"): | ||||||
|             log.error(f"Failed 'configure_exec_opts' | {e}") |             log.error(f"Failed 'configure_exec_opts' | {e}") | ||||||
|             return False |             return False | ||||||
|     else: |     else: | ||||||
|         return False |         return False | ||||||
|  | 
 | ||||||
|  | def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,  | ||||||
|  |     pattern = r'^[a-z]+_[a-z]+$' | ||||||
|  |     return re.match(pattern, container_name) is not None | ||||||
|  | @ -0,0 +1,71 @@ | ||||||
|  | from lib import logging as logging_lib | ||||||
|  | 
 | ||||||
|  | from typing import List | ||||||
|  | from lib import utils | ||||||
|  | import time | ||||||
|  | 
 | ||||||
|  | log = logging_lib.log | ||||||
|  | 
 | ||||||
|  | async def ensure_packages_installed( | ||||||
|  |     packages: List[str] = [],  | ||||||
|  |     total_timeout: float = 300 | ||||||
|  | ) -> bool: | ||||||
|  |     non_interactive_env = { | ||||||
|  |         'DEBIAN_FRONTEND': 'noninteractive', | ||||||
|  |         'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin', | ||||||
|  |     } | ||||||
|  |      | ||||||
|  |     start_time = time.time() | ||||||
|  |      | ||||||
|  |     packages_to_install = [] | ||||||
|  |     for package in packages: | ||||||
|  |         check_cmd = f"dpkg -s {package} > /dev/null 2>&1" | ||||||
|  |         return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env) | ||||||
|  |          | ||||||
|  |         if return_code != 0: | ||||||
|  |             packages_to_install.append(package) | ||||||
|  | 
 | ||||||
|  |     if not packages_to_install: | ||||||
|  |         log.debug("All packages are already installed.") | ||||||
|  |         return True | ||||||
|  |      | ||||||
|  |     update_cmd = ( | ||||||
|  |         "apt-get update " | ||||||
|  |         "-y " | ||||||
|  |         "--no-install-recommends" | ||||||
|  |     ) | ||||||
|  |     return_code, stdout, stderr = await utils.async_run_command( | ||||||
|  |         update_cmd,  | ||||||
|  |         timeout=None if total_timeout == None else 180, | ||||||
|  |         env=non_interactive_env | ||||||
|  |     ) | ||||||
|  |     if return_code != 0: | ||||||
|  |         log.error(f"Failed to update package lists: {stderr}") | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  |     install_cmd = ( | ||||||
|  |         "apt-get install " | ||||||
|  |         "-y " | ||||||
|  |         "--no-install-recommends " | ||||||
|  |         "--assume-yes " | ||||||
|  |         "-o Dpkg::Options::='--force-confdef' "  # Default to existing config | ||||||
|  |         "-o Dpkg::Options::='--force-confold' "  # Keep existing config | ||||||
|  |         f"{' '.join(packages_to_install)}" | ||||||
|  |     ) | ||||||
|  |      | ||||||
|  |     # Calculate remaining timeout | ||||||
|  |     remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time)) | ||||||
|  |      | ||||||
|  |     # Install packages | ||||||
|  |     return_code, stdout, stderr = await utils.async_run_command( | ||||||
|  |         install_cmd,  | ||||||
|  |         timeout=remaining_timeout,  | ||||||
|  |         env=non_interactive_env | ||||||
|  |     ) | ||||||
|  |      | ||||||
|  |     if return_code == 0: | ||||||
|  |         log.debug(f"Successfully installed packages: {packages_to_install}") | ||||||
|  |         return True | ||||||
|  |     else: | ||||||
|  |         log.error(f"Failed to install packages: {stderr}") | ||||||
|  |         return False | ||||||
|  | @ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat | ||||||
| from pydantic import BaseModel, Field, constr | from pydantic import BaseModel, Field, constr | ||||||
| import xml.etree.ElementTree as ET | import xml.etree.ElementTree as ET | ||||||
| from lib import docker_interface | from lib import docker_interface | ||||||
| from typing import Dict, List | from typing import Dict, List, Optional | ||||||
| from lib import utils | from lib import utils | ||||||
| import subprocess | import subprocess | ||||||
| import speedtest | import speedtest | ||||||
|  | @ -311,7 +311,7 @@ def get_gpu_info(): | ||||||
| class DockerDaemonConfig(BaseModel): | class DockerDaemonConfig(BaseModel): | ||||||
|     data_root: str = Field(alias="data-root") |     data_root: str = Field(alias="data-root") | ||||||
|     storage_driver: str = Field(alias="storage-driver") |     storage_driver: str = Field(alias="storage-driver") | ||||||
|     storage_opts: List[str] = Field(alias="storage-opts") |     storage_opts: Optional[List[str]] = Field(alias="storage-opts") | ||||||
| 
 | 
 | ||||||
| class Specs: | class Specs: | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|  | @ -336,26 +336,14 @@ class Specs: | ||||||
|         else: |         else: | ||||||
|             overlay_total_size=None |             overlay_total_size=None | ||||||
|             disk_type="" |             disk_type="" | ||||||
|  |             disk_usage_source_path = '/' | ||||||
|             try: |             try: | ||||||
|                 validated_config = DockerDaemonConfig(**docker_daemon_config) |                 if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config: | ||||||
|                 disk_udevadm = get_disk_udevadm(validated_config.data_root) |                     disk_usage_source_path = docker_daemon_config["data-root"] | ||||||
|                 for udevadm_line in disk_udevadm.split('\n'): |  | ||||||
|                     try: |  | ||||||
|                         key, value=udevadm_line.split('=',1) |  | ||||||
|                         if "id_model" in key.lower(): |  | ||||||
|                             disk_type=value[:24] |  | ||||||
|                         elif "devpath" in key.lower() and "/virtual/" in value: |  | ||||||
|                             disk_type="Virtual" |  | ||||||
|                     except Exception as e_int: |  | ||||||
|                         pass |  | ||||||
|                 for storage_opt in validated_config.storage_opts: |  | ||||||
|                     if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]: |  | ||||||
|                         numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4) |  | ||||||
|                         overlay_total_size=numeric_size |  | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 pass |                 pass | ||||||
|             if overlay_total_size==None: |             if overlay_total_size==None: | ||||||
|                 total, used, free = shutil.disk_usage("/") |                 total, used, free = shutil.disk_usage(disk_usage_source_path) | ||||||
|                 disk_udevadm = get_disk_udevadm("/") |                 disk_udevadm = get_disk_udevadm("/") | ||||||
|                 for udevadm_line in disk_udevadm.split('\n'): |                 for udevadm_line in disk_udevadm.split('\n'): | ||||||
|                     try: |                     try: | ||||||
|  |  | ||||||
|  | @ -0,0 +1,66 @@ | ||||||
|  | from lib import ensure_packages_installed | ||||||
|  | from lib import utils | ||||||
|  | 
 | ||||||
|  | import asyncio | ||||||
|  | import socket | ||||||
|  | import re | ||||||
|  | 
 | ||||||
|  | MANDATORY_PACKEGES = ['iputils-ping'] | ||||||
|  | 
 | ||||||
|  | def is_valid_ipv4_or_hostname(string): | ||||||
|  |     def is_valid_ipv4(ip): | ||||||
|  |         try: | ||||||
|  |             socket.inet_aton(ip) | ||||||
|  |             return True | ||||||
|  |         except socket.error: | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |     hostname_regex = re.compile( | ||||||
|  |         r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$" | ||||||
|  |     ) | ||||||
|  |     def is_valid_hostname(hostname): | ||||||
|  |         if len(hostname) > 255: | ||||||
|  |             return False | ||||||
|  |         return all(hostname_regex.match(part) for part in hostname.split(".")) | ||||||
|  | 
 | ||||||
|  |     return is_valid_ipv4(string) or is_valid_hostname(string) | ||||||
|  | 
 | ||||||
|  | async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4): | ||||||
|  |     success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) | ||||||
|  |     if success: | ||||||
|  |         outcome = [] | ||||||
|  |         concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)] | ||||||
|  |         for concurent_job in concurent_jobs: | ||||||
|  |             tasks = [] | ||||||
|  |             for host in concurent_job: | ||||||
|  |                 if is_valid_ipv4_or_hostname(host): | ||||||
|  |                     tasks.append( | ||||||
|  |                         utils.async_run_command( | ||||||
|  |                             f"ping -c {str(count)} -W 2 -i 0.3 {host}", | ||||||
|  |                             20 * count, | ||||||
|  |                             {"LANG": "C", "LC_ALL": "C"} | ||||||
|  |                         ) | ||||||
|  |                     ) | ||||||
|  |             if len(tasks) > 0: | ||||||
|  |                 r = await asyncio.gather(*tasks) | ||||||
|  |                 try: | ||||||
|  |                     for output in r: | ||||||
|  |                         results = [] | ||||||
|  |                         code, stdout, stderr = output | ||||||
|  |                         if code == 0: | ||||||
|  |                             for line in stdout.split('\n'): | ||||||
|  |                                 match = re.search(r"time=(\d+\.?\d*) ms", line) | ||||||
|  |                                 if match: | ||||||
|  |                                     results.append(float(match.group(1))) | ||||||
|  |                         outcome.append( | ||||||
|  |                             { | ||||||
|  |                                 "host": hosts[len(outcome)], | ||||||
|  |                                 "received": len(results), | ||||||
|  |                                 "avg_latency": sum(results) / len(results) if len(results)>0 else 0 | ||||||
|  |                             } | ||||||
|  |                         ) | ||||||
|  |                 except Exception as e: | ||||||
|  |                     print(e) | ||||||
|  |         return outcome | ||||||
|  |     else: | ||||||
|  |         return False | ||||||
|  | @ -1,6 +1,7 @@ | ||||||
| from lib import docker_interface | from lib import docker_interface | ||||||
| from lib import config as config_module | from lib import config as config_module | ||||||
| from lib import logging as logging_lib | from lib import logging as logging_lib | ||||||
|  | from lib import clore_partner | ||||||
| config = config_module.config | config = config_module.config | ||||||
| log = logging_lib.log | log = logging_lib.log | ||||||
| 
 | 
 | ||||||
|  | @ -29,7 +30,7 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container | ||||||
| 
 | 
 | ||||||
|             # Start tasks for new containers |             # Start tasks for new containers | ||||||
|             for container_name, container in current_containers.items(): |             for container_name, container in current_containers.items(): | ||||||
|                 if not container_name in do_not_stream_containers: |                 if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name): | ||||||
|                     log_container_names.append(container_name) |                     log_container_names.append(container_name) | ||||||
|                     if container_name not in tasks: |                     if container_name not in tasks: | ||||||
|                         log.debug(f"log_streaming_task() | Starting task for {container_name}") |                         log.debug(f"log_streaming_task() | Starting task for {container_name}") | ||||||
|  |  | ||||||
|  | @ -6,6 +6,7 @@ import ipaddress | ||||||
| import socket | import socket | ||||||
| import psutil | import psutil | ||||||
| import sys | import sys | ||||||
|  | import os | ||||||
| 
 | 
 | ||||||
| config = config_module.config | config = config_module.config | ||||||
| log = logging_lib.log | log = logging_lib.log | ||||||
|  | @ -25,12 +26,15 @@ def get_network_interfaces_with_subnet(): | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         return str(e) |         return str(e) | ||||||
| 
 | 
 | ||||||
| def exclude_network(excluded_network): | def exclude_network(excluded_network, input_ranges=None): | ||||||
|     # Convert exclude_network to ip_network object |     # Convert exclude_network to ip_network object | ||||||
|     excluded_network = ip_network(excluded_network) |     excluded_network = ip_network(excluded_network) | ||||||
| 
 | 
 | ||||||
|  |     if not input_ranges: | ||||||
|  |         input_ranges=config.local_ipv4_ranges | ||||||
|  | 
 | ||||||
|     # Remove the excluded network from the local_ranges list |     # Remove the excluded network from the local_ranges list | ||||||
|     local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network] |     local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network] | ||||||
| 
 | 
 | ||||||
|     ranges_outside_exclude = [] |     ranges_outside_exclude = [] | ||||||
|     for local_range in local_ranges: |     for local_range in local_ranges: | ||||||
|  | @ -92,4 +96,20 @@ def is_ip_in_network(ip: str, network: str) -> bool: | ||||||
|         except ValueError as e: |         except ValueError as e: | ||||||
|             # If there's an error with the input values, print the error and return False |             # If there's an error with the input values, print the error and return False | ||||||
|             log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") |             log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") | ||||||
|             return False |             return False | ||||||
|  | 
 | ||||||
|  | def purge_clore_interfaces(): | ||||||
|  |     network_interfaces = get_network_interfaces_with_subnet() | ||||||
|  |     if type(network_interfaces) != dict: | ||||||
|  |         log.error("Failed to load network interfaces, restarting...") | ||||||
|  |         os._exit(1) | ||||||
|  | 
 | ||||||
|  |     clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here | ||||||
|  |     for clore_default_network in config.clore_default_networks: | ||||||
|  |         clore_subnets.append(clore_default_network["subnet"]) | ||||||
|  | 
 | ||||||
|  |     for network_interface in network_interfaces.keys():         | ||||||
|  |         if network_interface == "docker0" or network_interface[:3] == "br-": | ||||||
|  |             subnet = network_interfaces[network_interface] | ||||||
|  |             if subnet in clore_subnets or network_interface == "docker0": | ||||||
|  |                 utils.run_command(f"ip link delete {network_interface}") | ||||||
|  | @ -0,0 +1,187 @@ | ||||||
|  | from lib import config as config_module | ||||||
|  | from lib import logging as logging_lib | ||||||
|  | from lib import utils | ||||||
|  | 
 | ||||||
|  | import os | ||||||
|  | import aiofiles.os | ||||||
|  | 
 | ||||||
|  | config = config_module.config | ||||||
|  | log = logging_lib.log | ||||||
|  | 
 | ||||||
|  | CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client" | ||||||
|  | PARTNER_CONFIG_NAME = "clore_partner.conf" | ||||||
|  | 
 | ||||||
|  | def generate_openvpn_config( | ||||||
|  |     local_ip='10.1.0.2',  | ||||||
|  |     server_ip='10.1.0.1',  | ||||||
|  |     server_hostname='example.com',  | ||||||
|  |     udp_port=1194,  | ||||||
|  |     vpn_secret_key='YOUR_VPN_SECRET_KEY' | ||||||
|  | ): | ||||||
|  |     openvpn_config = f"""nobind | ||||||
|  | proto udp4 | ||||||
|  | remote {server_hostname} {udp_port} | ||||||
|  | resolv-retry infinite | ||||||
|  | 
 | ||||||
|  | auth SHA256 | ||||||
|  | cipher AES-256-CBC | ||||||
|  | 
 | ||||||
|  | dev {config.openvpn_forwarding_tun_device} | ||||||
|  | ifconfig {local_ip} {server_ip} | ||||||
|  | 
 | ||||||
|  | <secret> | ||||||
|  | -----BEGIN OpenVPN Static key V1----- | ||||||
|  | {vpn_secret_key} | ||||||
|  | -----END OpenVPN Static key V1----- | ||||||
|  | </secret> | ||||||
|  | 
 | ||||||
|  | fragment 1300 | ||||||
|  | mssfix 1300 | ||||||
|  | sndbuf 524288 | ||||||
|  | rcvbuf 524288 | ||||||
|  | 
 | ||||||
|  | user nobody | ||||||
|  | group nogroup | ||||||
|  | 
 | ||||||
|  | ping 15 | ||||||
|  | ping-restart 45 | ||||||
|  | ping-timer-rem | ||||||
|  | persist-tun | ||||||
|  | persist-key | ||||||
|  | 
 | ||||||
|  | verb 0""" | ||||||
|  |      | ||||||
|  |     return openvpn_config | ||||||
|  | 
 | ||||||
|  | async def get_iptables_forward_rules(): | ||||||
|  |     code, stdout, stderr = await utils.async_run_command( | ||||||
|  |         f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers" | ||||||
|  |     ) | ||||||
|  |     rules = [] | ||||||
|  |     if code == 0: | ||||||
|  |         collumns = [] | ||||||
|  |         for idx, line in enumerate(stdout.split('\n')): | ||||||
|  |             if "num" in collumns and "target" in collumns and "in" in collumns: | ||||||
|  |                 items = line.split(maxsplit=len(collumns)+1) | ||||||
|  |                 rule = {} | ||||||
|  |                 for idx, name in enumerate(collumns): | ||||||
|  |                     rule[name]=items[idx] | ||||||
|  |                 rule["desc"] = items[len(collumns)+1] | ||||||
|  |                 rules.append(rule) | ||||||
|  |             else: | ||||||
|  |                 collumns = line.split() | ||||||
|  |     return rules | ||||||
|  | 
 | ||||||
|  | async def remove_iptables_rule(rule_dict): | ||||||
|  |     cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables" | ||||||
|  | 
 | ||||||
|  |     if rule_dict.get('target') == 'DNAT': | ||||||
|  |         cmd += " -t nat" | ||||||
|  |     cmd += " -D PREROUTING" | ||||||
|  |     if rule_dict.get('prot') and rule_dict['prot'] != '--': | ||||||
|  |         cmd += f" -p {rule_dict['prot']}" | ||||||
|  |     if rule_dict.get('in') and rule_dict['in'] != '*': | ||||||
|  |         cmd += f" -i {rule_dict['in']}" | ||||||
|  |     if rule_dict.get('out') and rule_dict['out'] != '*': | ||||||
|  |         cmd += f" -o {rule_dict['out']}" | ||||||
|  |     if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0': | ||||||
|  |         cmd += f" -s {rule_dict['source']}" | ||||||
|  |     if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0': | ||||||
|  |         cmd += f" -d {rule_dict['destination']}" | ||||||
|  |     if rule_dict.get('target') == 'DNAT': | ||||||
|  |         if 'dports' in rule_dict.get('desc', ''): | ||||||
|  |             port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0] | ||||||
|  |             if ':' in port_info: | ||||||
|  |                 cmd += f" -m multiport --dports {port_info}" | ||||||
|  |             else: | ||||||
|  |                 cmd += f" --dport {port_info}" | ||||||
|  |         if 'to:' in rule_dict.get('desc', ''): | ||||||
|  |             dest_ip = rule_dict['desc'].split('to:')[1].split()[0] | ||||||
|  |             cmd += f" -j DNAT --to-destination {dest_ip}" | ||||||
|  |     await utils.async_run_command(cmd) | ||||||
|  |      | ||||||
|  | 
 | ||||||
|  | async def clore_partner_configure(clore_partner_config): | ||||||
|  |     try: | ||||||
|  |         if clore_partner_config: | ||||||
|  |             docker_restart_required = False | ||||||
|  | 
 | ||||||
|  |             needed_openvpn_config = generate_openvpn_config( | ||||||
|  |                 local_ip=clore_partner_config["provider"], | ||||||
|  |                 server_ip=clore_partner_config["forwarding"], | ||||||
|  |                 server_hostname=clore_partner_config["openvpn_host"], | ||||||
|  |                 udp_port=clore_partner_config["openvpn_port"], | ||||||
|  |                 vpn_secret_key=clore_partner_config["secret"] | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             saved_config='' | ||||||
|  | 
 | ||||||
|  |             config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME)) | ||||||
|  |             if config_exists: | ||||||
|  |                 async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file: | ||||||
|  |                     saved_config = await file.read() | ||||||
|  | 
 | ||||||
|  |             if saved_config != needed_openvpn_config: | ||||||
|  |                 async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file: | ||||||
|  |                     await file.write(needed_openvpn_config) | ||||||
|  | 
 | ||||||
|  |             is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command( | ||||||
|  |                 f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             if is_active_code == 0 and saved_config != needed_openvpn_config: | ||||||
|  |                 code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                     f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||||
|  |                 ) | ||||||
|  |                 docker_restart_required = False if code != 0 else True | ||||||
|  |             elif is_active_code != 0: | ||||||
|  |                 code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                     f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||||
|  |                 ) | ||||||
|  |                 docker_restart_required = False if code != 0 else True | ||||||
|  | 
 | ||||||
|  |             code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                 f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}" | ||||||
|  |             ) | ||||||
|  |             ip_route_configured = False | ||||||
|  |             if code == 0: | ||||||
|  |                 for line in stdout.split('\n'): | ||||||
|  |                     items = line.split(' ') | ||||||
|  |                     if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items: | ||||||
|  |                         ip_route_configured = True | ||||||
|  |                         break | ||||||
|  |             if not ip_route_configured: | ||||||
|  |                 code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                     f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}" | ||||||
|  |                 ) | ||||||
|  |             ip_tables_configured = False | ||||||
|  | 
 | ||||||
|  |             rules = await get_iptables_forward_rules() | ||||||
|  | 
 | ||||||
|  |             for rule in rules: | ||||||
|  |                 try: | ||||||
|  |                     if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]: | ||||||
|  |                         ip_tables_configured = True | ||||||
|  |                     elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]: | ||||||
|  |                         print("REMOVE RULE", rule) | ||||||
|  |                         await remove_iptables_rule(rule) | ||||||
|  |                 except Exception as ei: | ||||||
|  |                     log.error(f"clore_partner_configure() | ei | {ei}") | ||||||
|  | 
 | ||||||
|  |             if ip_tables_configured == False: | ||||||
|  |                 code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                     f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}" | ||||||
|  |                 )             | ||||||
|  | 
 | ||||||
|  |             if docker_restart_required: | ||||||
|  |                 async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file: | ||||||
|  |                     await file.write("") | ||||||
|  |                 os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting | ||||||
|  |         else: | ||||||
|  |             code, stdout, stderr = await utils.async_run_command( | ||||||
|  |                 f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||||
|  |             ) | ||||||
|  |         return True | ||||||
|  |     except Exception as e: | ||||||
|  |         log.error(f"FAIL | openvpn.clore_partner_configure | {e}") | ||||||
|  |         return False | ||||||
							
								
								
									
										60
									
								
								lib/utils.py
								
								
								
								
							
							
						
						
									
										60
									
								
								lib/utils.py
								
								
								
								
							|  | @ -1,10 +1,13 @@ | ||||||
|  | from typing import Optional, Tuple, Dict | ||||||
| from lib import config as config_module | from lib import config as config_module | ||||||
| from lib import logging as logging_lib | from lib import logging as logging_lib | ||||||
| from lib import nvml | from lib import nvml | ||||||
| import subprocess | import subprocess | ||||||
| import hashlib | import hashlib | ||||||
|  | import asyncio | ||||||
| import random | import random | ||||||
| import string | import string | ||||||
|  | import shutil | ||||||
| import shlex | import shlex | ||||||
| import time | import time | ||||||
| import math | import math | ||||||
|  | @ -144,6 +147,63 @@ def get_extra_allowed_images(): | ||||||
|             return [] |             return [] | ||||||
|     else: |     else: | ||||||
|         return [] |         return [] | ||||||
|  | 
 | ||||||
|  | async def async_run_command( | ||||||
|  |     command: str,  | ||||||
|  |     timeout: Optional[float] = None, | ||||||
|  |     env: Optional[Dict[str, str]] = None | ||||||
|  | ) -> Tuple[int, str, str]: | ||||||
|  |     command_env = env if env is not None else {} | ||||||
|  |      | ||||||
|  |     try: | ||||||
|  |         proc = await asyncio.create_subprocess_shell( | ||||||
|  |             command, | ||||||
|  |             stdout=subprocess.PIPE, | ||||||
|  |             stderr=subprocess.PIPE, | ||||||
|  |             env=command_env | ||||||
|  |         ) | ||||||
|  |          | ||||||
|  |         try: | ||||||
|  |             stdout, stderr = await asyncio.wait_for( | ||||||
|  |                 proc.communicate(),  | ||||||
|  |                 timeout=timeout | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             stdout_str = stdout.decode('utf-8').strip() if stdout else '' | ||||||
|  |             stderr_str = stderr.decode('utf-8').strip() if stderr else '' | ||||||
|  |              | ||||||
|  |             return proc.returncode, stdout_str, stderr_str | ||||||
|  |          | ||||||
|  |         except asyncio.TimeoutError: | ||||||
|  |             # Handle timeout: terminate the process gracefully first | ||||||
|  |             proc.terminate() | ||||||
|  |             try: | ||||||
|  |                 await asyncio.wait_for(proc.wait(), timeout=5)  # Wait for it to exit | ||||||
|  |             except asyncio.TimeoutError: | ||||||
|  |                 # Force kill the process if it doesn't terminate | ||||||
|  |                 proc.kill() | ||||||
|  |                 await proc.wait() | ||||||
|  | 
 | ||||||
|  |             return -1, '', f'Command timed out after {timeout} seconds' | ||||||
|  |      | ||||||
|  |     except Exception as e: | ||||||
|  |         return -1, '', str(e) | ||||||
|  | 
 | ||||||
|  | def get_free_space_mb(path): | ||||||
|  |     """Get free space in MB for the given path.""" | ||||||
|  |     total, used, free = shutil.disk_usage(path) | ||||||
|  |     return free // (1024 * 1024)  # Convert bytes to MB | ||||||
|  | 
 | ||||||
|  | def get_directory_size_mb(path): | ||||||
|  |     """Get the size of a directory in MB.""" | ||||||
|  |     total_size = 0 | ||||||
|  |     for dirpath, dirnames, filenames in os.walk(path): | ||||||
|  |         for f in filenames: | ||||||
|  |             fp = os.path.join(dirpath, f) | ||||||
|  |             # Skip if the file doesn't exist (symlinks, etc.) | ||||||
|  |             if not os.path.islink(fp) and os.path.exists(fp): | ||||||
|  |                 total_size += os.path.getsize(fp) | ||||||
|  |     return total_size // (1024 * 1024)  # Convert bytes to MB | ||||||
|      |      | ||||||
| class shm_calculator: | class shm_calculator: | ||||||
|     def __init__(self, total_ram): |     def __init__(self, total_ram): | ||||||
|  |  | ||||||
|  | @ -0,0 +1,201 @@ | ||||||
|  | # Library to setup XFS partition for docker | ||||||
|  | from lib import ensure_packages_installed | ||||||
|  | from lib import logging as logging_lib | ||||||
|  | from lib import docker_interface | ||||||
|  | from lib import networking | ||||||
|  | from lib import utils | ||||||
|  | 
 | ||||||
|  | import asyncio | ||||||
|  | import json | ||||||
|  | import os | ||||||
|  | 
 | ||||||
|  | log = logging_lib.log | ||||||
|  | 
 | ||||||
|  | DOCKER_ROOT = "/var/lib/docker" | ||||||
|  | DOCKER_DATA_IMG = "/opt/clore-hosting/data.img" | ||||||
|  | LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB | ||||||
|  | 
 | ||||||
|  | MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB | ||||||
|  | 
 | ||||||
|  | XFS_STATE_FILE = "/opt/clore-hosting/xfs_state" | ||||||
|  | 
 | ||||||
|  | MANDATORY_PACKAGES = [ | ||||||
|  |     "xfsprogs", | ||||||
|  |     "dmidecode", | ||||||
|  |     "openvpn", | ||||||
|  |     "iproute2", | ||||||
|  |     "iputils-ping", | ||||||
|  |     "util-linux" | ||||||
|  | ] | ||||||
|  | 
 | ||||||
|  | # This code is runned on start of clore hosting to migrate docker to XFS partition system | ||||||
|  | 
 | ||||||
|  | # sudo fallocate -l 300G /docker-storage.img | ||||||
|  | # sudo mkfs.xfs /docker-storage.img | ||||||
|  | # mount -o loop,pquota /docker-storage.img /mnt/docker-storage | ||||||
|  | 
 | ||||||
|  | def migrate(): | ||||||
|  |     docker_xfs_state = validate_docker_xfs() | ||||||
|  |     #print(docker_xfs_state) | ||||||
|  |     if docker_xfs_state == "skip": | ||||||
|  |         return | ||||||
|  |     elif docker_xfs_state == "valid": | ||||||
|  |         return 'success' | ||||||
|  |      | ||||||
|  |     packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed( | ||||||
|  |         MANDATORY_PACKAGES | ||||||
|  |     )) | ||||||
|  | 
 | ||||||
|  |     if not packages_available: | ||||||
|  |         return 'packages-missing' | ||||||
|  | 
 | ||||||
|  |     log.info("Starting migration to xfs") | ||||||
|  |     docker_interface.stop_all_containers() | ||||||
|  | 
 | ||||||
|  |     if os.path.exists(DOCKER_DATA_IMG): | ||||||
|  |         try: | ||||||
|  |             os.remove(DOCKER_DATA_IMG) | ||||||
|  |         except Exception as e: | ||||||
|  |             print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}") | ||||||
|  |             return "failure" | ||||||
|  | 
 | ||||||
|  |     max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT) | ||||||
|  |      | ||||||
|  |     data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB) | ||||||
|  |     if data_img_size < MIN_XFS_PARTITION_SIZE: | ||||||
|  |         return 'not-enough-space' | ||||||
|  |      | ||||||
|  |     docker_config_success = False | ||||||
|  |     fstab_config_success = False | ||||||
|  | 
 | ||||||
|  |     code, stdout, stderr = utils.run_command( | ||||||
|  |         f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}" | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     networking.purge_clore_interfaces() | ||||||
|  | 
 | ||||||
|  |     if code == 0: | ||||||
|  |         docker_config_success = configure_docker_daemon() | ||||||
|  |     if code == 0 and docker_config_success: | ||||||
|  |         fstab_config_success = configure_fstab() | ||||||
|  |     if code == 0 and fstab_config_success: | ||||||
|  |         code, stdout, stderr = utils.run_command( | ||||||
|  |             f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a" | ||||||
|  |         ) | ||||||
|  |         if code==0: | ||||||
|  |             return 'success' | ||||||
|  |         else: | ||||||
|  |             configure_docker_daemon(remove=True) | ||||||
|  |             configure_fstab(remove=True) | ||||||
|  |             return 'failure' | ||||||
|  |     else: | ||||||
|  |         utils.run_command( | ||||||
|  |             f"mkdir {DOCKER_ROOT}" | ||||||
|  |         ) | ||||||
|  |         if os.path.exists(DOCKER_DATA_IMG): | ||||||
|  |             try: | ||||||
|  |                 os.remove(DOCKER_DATA_IMG) | ||||||
|  |             except Exception as e: | ||||||
|  |                 pass | ||||||
|  |         return 'failure' | ||||||
|  | 
 | ||||||
|  | def validate_docker_xfs(): | ||||||
|  |     code_root, stdout_root, stderr_root = utils.run_command("df -T /") | ||||||
|  |     code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}") | ||||||
|  |     #print([code, stderr, stdout]) | ||||||
|  |     if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')): | ||||||
|  |         root_blocks = None | ||||||
|  |         docker_root_blocks = None | ||||||
|  |         docker_root_format = '' | ||||||
|  |         for idx, line in enumerate(stdout_root.split('\n')): | ||||||
|  |             if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): | ||||||
|  |                 root_blocks = int(line.split()[2]) | ||||||
|  |         if code == 1: | ||||||
|  |             docker_root_blocks = root_blocks | ||||||
|  |         else: | ||||||
|  |             for idx, line in enumerate(stdout.split('\n')): | ||||||
|  |                 if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): | ||||||
|  |                     docker_root_blocks = int(line.split()[2]) | ||||||
|  |                     docker_root_format = line.split()[1] | ||||||
|  |         if root_blocks == None or docker_root_blocks == None: | ||||||
|  |             return "skip" | ||||||
|  |         elif docker_root_format=="xfs" and root_blocks > docker_root_blocks: | ||||||
|  |             return "valid" | ||||||
|  |         else: | ||||||
|  |             return "default" | ||||||
|  |     else: | ||||||
|  |         return "skip" | ||||||
|  | 
 | ||||||
|  | def configure_docker_daemon(remove=False): | ||||||
|  |     try: | ||||||
|  |         daemon_json_path = "/etc/docker/daemon.json" | ||||||
|  | 
 | ||||||
|  |         with open(daemon_json_path, 'r') as file: | ||||||
|  |             raw_content = file.read() | ||||||
|  |             daemon_config = json.loads(raw_content) | ||||||
|  |         if remove: | ||||||
|  |             if daemon_config.get("data-root") == DOCKER_ROOT: | ||||||
|  |                 del daemon_config["data-root"] | ||||||
|  |             if daemon_config.get("storage-driver") == "overlay2": | ||||||
|  |                 del daemon_config["storage-driver"] | ||||||
|  |         elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2": | ||||||
|  |             daemon_config["data-root"] = DOCKER_ROOT | ||||||
|  |             daemon_config["storage-driver"] = "overlay2" | ||||||
|  |         with open(daemon_json_path, 'w') as file: | ||||||
|  |             file.write(json.dumps(daemon_config,indent=4)) | ||||||
|  |         return True | ||||||
|  |     except Exception as e: | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | def configure_fstab(remove=False): | ||||||
|  |     try: | ||||||
|  |         file_path = "/etc/fstab" | ||||||
|  | 
 | ||||||
|  |         mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0" | ||||||
|  | 
 | ||||||
|  |         with open(file_path, 'r') as file: | ||||||
|  |             raw_content = file.read() | ||||||
|  | 
 | ||||||
|  |         if remove: | ||||||
|  |             if mount_line in raw_content: | ||||||
|  |                 raw_content = raw_content.replace(f"\n{mount_line}\n", '') | ||||||
|  |                 with open(file_path, 'w') as file: | ||||||
|  |                     file.write(raw_content) | ||||||
|  |         elif not mount_line in raw_content: | ||||||
|  |             raw_content += f"\n{mount_line}\n" | ||||||
|  |             with open(file_path, 'w') as file: | ||||||
|  |                 file.write(raw_content) | ||||||
|  |         return True | ||||||
|  |     except Exception as e: | ||||||
|  |         return False | ||||||
|  | 
 | ||||||
|  | def init(): | ||||||
|  |     try: | ||||||
|  |         if os.path.exists(XFS_STATE_FILE): | ||||||
|  |             with open(XFS_STATE_FILE, 'r') as file: | ||||||
|  |                 raw_content = file.read() | ||||||
|  |             if "enabled" in raw_content: | ||||||
|  |                 migarion_status = migrate() | ||||||
|  |                 if migarion_status == "success": | ||||||
|  |                     with open(XFS_STATE_FILE, 'w') as file: | ||||||
|  |                         file.write("active") | ||||||
|  |                     return "active" | ||||||
|  |                 elif migarion_status == "not-enough-space": | ||||||
|  |                     with open(XFS_STATE_FILE, 'w') as file: | ||||||
|  |                         file.write("not-enough-space") | ||||||
|  |                     return 'not-enough-space' | ||||||
|  |                 else: | ||||||
|  |                     with open(XFS_STATE_FILE, 'w') as file: | ||||||
|  |                         file.write("failed-migration") | ||||||
|  |                     return 'failed' | ||||||
|  |             elif 'not-enough-space' in raw_content: | ||||||
|  |                 return 'not-enough-space' | ||||||
|  |             elif "active" in raw_content: | ||||||
|  |                 return "active" | ||||||
|  |             else: | ||||||
|  |                 return "failed" | ||||||
|  |         else: | ||||||
|  |             return "disabled" | ||||||
|  |     except Exception as e: | ||||||
|  |         print(e) | ||||||
|  |         pass | ||||||
		Loading…
	
		Reference in New Issue