V5.2.9 | XFS, hosting to partner platforms #1
			
				
			
		
		
		
	|  | @ -32,7 +32,7 @@ def get_last_ip_occurrence_and_text(input_string): | |||
|     else: | ||||
|         return None, None | ||||
| 
 | ||||
| def configure(containers): | ||||
| def configure(containers, partner_forwarding_ips): | ||||
|     valid_containers = [] | ||||
|     newly_created_networks = [] | ||||
|     containers_required_networks = [] | ||||
|  | @ -141,7 +141,7 @@ def configure(containers): | |||
|     if config.log_containers_strings: | ||||
|         print("FROM DOCKER CONFIGURATOR", valid_containers) | ||||
| 
 | ||||
|     validation_and_security = docker_interface.validate_and_secure_networks() | ||||
|     validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips) | ||||
|     if startup_sctipt_creation_fail: | ||||
|         validation_and_security=False | ||||
|     return validation_and_security, valid_containers, use_hive_flightsheet | ||||
|  | @ -4,7 +4,10 @@ from lib import log_streaming_task | |||
| from lib import run_startup_script | ||||
| from lib import hive_miner_interface | ||||
| from lib import docker_interface | ||||
| from lib import background_job | ||||
| from lib import docker_deploy | ||||
| from lib import clore_partner | ||||
| from lib import clore_partner_socket | ||||
| from lib import docker_pull | ||||
| from lib import get_specs | ||||
| from lib import utils | ||||
|  | @ -34,17 +37,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_ | |||
| 
 | ||||
| #print(config) | ||||
| 
 | ||||
| async def configure_networks(containers): | ||||
|     res = await asyncio.to_thread(docker_configurator.configure, containers) | ||||
| async def configure_networks(containers, partner_forwarding_ips): | ||||
|     res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips) | ||||
|     try: | ||||
|         fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) | ||||
|         return fin_res | ||||
|     except Exception as e: | ||||
|         return False | ||||
|      | ||||
| async def deploy_containers(validated_containers, allowed_running_containers): | ||||
| async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads): | ||||
|     try: | ||||
|         all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers) | ||||
|         all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads) | ||||
|         return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) | ||||
|     except Exception as e: | ||||
|         return False | ||||
|  | @ -70,8 +73,10 @@ async def set_hive_miner_status(enabled=False): | |||
|         return False | ||||
| 
 | ||||
| class CloreClient: | ||||
|     def __init__(self, auth_key): | ||||
|     def __init__(self, auth_key, xfs_state): | ||||
|         self.auth_key=auth_key | ||||
|         self.xfs_state = xfs_state | ||||
| 
 | ||||
|         self.ws_peers = {} | ||||
|         self.last_checked_ws_peers=0 | ||||
|         self.containers={} | ||||
|  | @ -99,9 +104,11 @@ class CloreClient: | |||
|             "container_log_streaming_service": utils.unix_timestamp(), | ||||
|             "specs_service": utils.unix_timestamp(), | ||||
|             "oc_service": utils.unix_timestamp(), | ||||
|             "background_pow_data_collection": utils.unix_timestamp() | ||||
|             "background_pow_data_collection": utils.unix_timestamp(), | ||||
|             "partner_service": utils.unix_timestamp() | ||||
|         } | ||||
|         self.max_service_inactivity = 600 # seconds | ||||
|         self.no_restart_services = ["partner_service"] # Services that are allowed to run indefinetly without triggering the app to restart | ||||
| 
 | ||||
|         if config.debug_ws_peer: | ||||
|             self.ws_peers[str(config.debug_ws_peer)]={ | ||||
|  | @ -137,6 +144,10 @@ class CloreClient: | |||
|         self.hive_miner_interface = hive_miner_interface.hive_interface() | ||||
|         self.next_pow_background_job_send_update = 0 | ||||
| 
 | ||||
|         self.clore_partner_initiazized = False | ||||
|         self.partner_forwarding_ips = [] | ||||
|         self.start_time = utils.unix_timestamp() | ||||
| 
 | ||||
|     async def service(self): | ||||
|         global container_log_broken | ||||
| 
 | ||||
|  | @ -151,10 +162,11 @@ class CloreClient: | |||
|         task6 = asyncio.create_task(self.specs_service(monitoring)) | ||||
|         task7 = asyncio.create_task(self.oc_service(monitoring)) | ||||
|         task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) | ||||
|         task9 = asyncio.create_task(self.partner_service(monitoring)) | ||||
|         monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) | ||||
| 
 | ||||
|         # Wait for both tasks to complete (they won't in this case) | ||||
|         await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) | ||||
|         await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task) | ||||
| 
 | ||||
|     async def monitoring_service(self, monitoring): | ||||
|         while True: | ||||
|  | @ -169,13 +181,14 @@ class CloreClient: | |||
|                 if config.debug: | ||||
|                     log.success(self.last_service_heartbeat) | ||||
|                 for service_name in self.last_service_heartbeat.keys(): | ||||
|                     last_hearthbeat = self.last_service_heartbeat[service_name] | ||||
|                     if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": | ||||
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||
|                         os._exit(1) | ||||
|                     elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": | ||||
|                         log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||
|                         os._exit(1) | ||||
|                     if not service_name in self.no_restart_services: | ||||
|                         last_hearthbeat = self.last_service_heartbeat[service_name] | ||||
|                         if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": | ||||
|                             log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||
|                             os._exit(1) | ||||
|                         elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": | ||||
|                             log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") | ||||
|                             os._exit(1) | ||||
|             except Exception as e: | ||||
|                 log.debug(f"monitoring_service() | ERROR | {e}") | ||||
|             await asyncio.sleep(5) | ||||
|  | @ -260,6 +273,7 @@ class CloreClient: | |||
| 
 | ||||
|             if len(self.p_needed_containers)>0: | ||||
|                 local_images = await get_local_images(no_latest_tag=True) | ||||
|                 partner_images = await clore_partner.get_partner_allowed_images() | ||||
|                 for local_image in local_images: | ||||
|                     self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} | ||||
|                     image_needed = False | ||||
|  | @ -283,11 +297,32 @@ class CloreClient: | |||
|                                             if local_image_tag==allowed_tag or allowed_tag=='*': | ||||
|                                                 image_needed=True | ||||
|                                                 break | ||||
|                             if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers: | ||||
|                                 if not image_needed and type(partner_images) == list: | ||||
|                                     for partner_image in partner_images: | ||||
|                                         if local_image.replace(':latest', '') == partner_image.replace(':latest', ''): | ||||
|                                             image_needed = True | ||||
|                                             del self.last_pull_progress[local_image] | ||||
|                                             break | ||||
|                                         if len(local_image.split('/')) >= 3: | ||||
|                                             partner_image_spl = partner_image.split(':') | ||||
|                                             image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1] | ||||
|                                             if len(partner_image_spl) == 1: | ||||
|                                                 if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest": | ||||
|                                                     image_needed = True | ||||
|                                                     del self.last_pull_progress[local_image] | ||||
|                                                     break | ||||
|                                             elif len(partner_image_spl) == 2: | ||||
|                                                 if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''): | ||||
|                                                     image_needed = True | ||||
|                                                     del self.last_pull_progress[local_image] | ||||
|                                                     break | ||||
|                             if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None: | ||||
|                                 log.success(f"GOING TO REMOVE {local_image}") | ||||
|                                 with concurrent.futures.ThreadPoolExecutor() as pool: | ||||
|                                     r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) | ||||
|                                     if r: | ||||
|                                         removed_cnt+=1 | ||||
|                                         del self.last_pull_progress[local_image] | ||||
|                             #if config.debug: | ||||
|                             #    log.success(f"{local_image} | {image_needed}") | ||||
| 
 | ||||
|  | @ -327,7 +362,7 @@ class CloreClient: | |||
|                         try: | ||||
|                             await pull_task | ||||
|                         except Exception as e: | ||||
|                             self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()} | ||||
|                             self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()} | ||||
|                         log_task.cancel() | ||||
|                         try: | ||||
|                             await log_task | ||||
|  | @ -358,11 +393,18 @@ class CloreClient: | |||
| 
 | ||||
|                 container_conf = WebSocketClient.get_containers() | ||||
| 
 | ||||
|                 can_run_partner_workloads = False | ||||
| 
 | ||||
|                 if container_conf[0]: | ||||
|                     self.containers_set=True | ||||
|                     self.containers=container_conf[1] | ||||
|                     tmp_images = [] | ||||
|                     for container in self.containers: | ||||
| 
 | ||||
|                     is_order_spot = False | ||||
| 
 | ||||
|                     for idx, container in enumerate(self.containers): | ||||
|                         if "spot" in container: | ||||
|                             is_order_spot = True | ||||
|                         if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": | ||||
|                             log_pull = False | ||||
|                             if "name" in container: | ||||
|  | @ -386,6 +428,9 @@ class CloreClient: | |||
|                             if not image_config in tmp_images: | ||||
|                                 tmp_images.append(image_config) | ||||
| 
 | ||||
|                     can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True | ||||
|                     clore_partner_socket.set_can_deploy(can_run_partner_workloads) | ||||
| 
 | ||||
|                     if self.restart_docker and not running_order and len(self.containers)>0: | ||||
|                         log.debug("Sending docker restart command") | ||||
|                         utils.run_command_v2("systemctl restart docker") | ||||
|  | @ -400,14 +445,14 @@ class CloreClient: | |||
|                     tasks.append(api_interface.get_server_config()) | ||||
| 
 | ||||
|                 if self.containers_set: | ||||
|                     tasks.append(configure_networks(self.containers)) | ||||
|                     tasks.append(configure_networks(self.containers, self.partner_forwarding_ips)) | ||||
|                     tasks.append(WebSocketClient.stream_pull_logs()) | ||||
| 
 | ||||
|                 if self.validated_containers_set: | ||||
|                     tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers)) | ||||
|                     tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads)) | ||||
| 
 | ||||
|                 if step==1: | ||||
|                     WebSocketClient.set_auth(self.auth_key) | ||||
|                     WebSocketClient.set_auth(self.auth_key, self.xfs_state) | ||||
|                     asyncio.create_task(WebSocketClient.run()) | ||||
|                 elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): | ||||
|                     log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") | ||||
|  | @ -427,6 +472,11 @@ class CloreClient: | |||
|                             if result.success: | ||||
|                                 self.last_checked_ws_peers = utils.unix_timestamp() | ||||
|                                 self.allowed_images=result.allowed_images+self.extra_allowed_images | ||||
|                                 if self.xfs_state == "active": | ||||
|                                     self.allowed_images.append({ | ||||
|                                         "repository": "vastai/test", | ||||
|                                         "allowed_tags": ["bandwidth-test-nvidia"] | ||||
|                                     }) | ||||
|                                 if not config.debug_ws_peer: | ||||
|                                     for pure_ws_peer in result.ws_peers: | ||||
|                                         self.ws_peers[pure_ws_peer]={ | ||||
|  | @ -455,7 +505,7 @@ class CloreClient: | |||
|     async def submit_specs(self, current_specs): | ||||
|         try: | ||||
|             if type(current_specs) == dict: | ||||
|                 current_specs["backend_version"]=18 | ||||
|                 current_specs["backend_version"]=19 | ||||
|                 current_specs["update_hw"]=True | ||||
|                 smallest_pcie_width = 999 | ||||
|                 for gpu in current_specs["gpus"]["nvidia"]: | ||||
|  | @ -504,7 +554,7 @@ class CloreClient: | |||
|                 await monitoring.put("oc_service") | ||||
|                 oc_apply_allowed = True | ||||
|                 ### OC Service should also hande Hive stuff | ||||
|                 if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries: | ||||
|                 if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled(): | ||||
|                     await set_hive_miner_status(True) | ||||
|                     oc_apply_allowed = False # Don't apply any OC when running HiveOS miner | ||||
|                 elif self.is_hive and not self.dont_use_hive_binaries: | ||||
|  | @ -544,6 +594,30 @@ class CloreClient: | |||
|                 log.debug(f"FAIL | background_pow_data_collection() | {e}") | ||||
|             await asyncio.sleep(6) | ||||
| 
 | ||||
|     async def partner_service(self, monitoring): | ||||
|         while True: | ||||
|             try: | ||||
|                 await monitoring.put("partner_service") | ||||
|                 if self.start_time < utils.unix_timestamp() - 180: | ||||
|                     forwarding_latency_measurment = await clore_partner.measure_forwarding_latency() | ||||
|                     if type(forwarding_latency_measurment) == list: | ||||
|                         await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment) | ||||
|                 partner_config = WebSocketClient.get_clore_partner_config() | ||||
|                 if partner_config != None: | ||||
|                     if self.clore_partner_initiazized == False: | ||||
|                         ir = await clore_partner.initialize() | ||||
|                         if ir: | ||||
|                             self.clore_partner_initiazized = True | ||||
|                     if self.clore_partner_initiazized == True: | ||||
|                         if 'provider' in partner_config and 'forwarding' in partner_config: | ||||
|                             self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']] | ||||
|                         else: | ||||
|                             self.partner_forwarding_ips = [] | ||||
|                         await clore_partner.configure(partner_config) | ||||
|             except Exception as e: | ||||
|                 log.debug(f"FAIL | partner_service() | {e}") | ||||
|             await asyncio.sleep(6) | ||||
| 
 | ||||
|     def expire_ws_peers(self): | ||||
|         for ws_peer_address in list(self.ws_peers.keys()): | ||||
|             ws_peer_info = self.ws_peers[ws_peer_address] | ||||
|  |  | |||
|  | @ -1,4 +1,5 @@ | |||
| from concurrent.futures import ThreadPoolExecutor | ||||
| from lib import clore_partner | ||||
| import asyncio | ||||
| import random | ||||
| import websockets | ||||
|  | @ -31,6 +32,7 @@ class WebSocketClient: | |||
|         self.connected = False | ||||
|         self.authorized = False | ||||
|         self.auth = auth | ||||
|         self.xfs_state = None | ||||
|         self.log_auth_fail = True | ||||
|         self.last_heartbeat = clore_utils.unix_timestamp() | ||||
|         self.containers={} | ||||
|  | @ -51,15 +53,30 @@ class WebSocketClient: | |||
|         self.last_gpu_oc_specs = [] | ||||
|         self.last_set_oc = {} | ||||
| 
 | ||||
|         self.clore_partner_config = None | ||||
|         self.forwarding_latency_measurment = None | ||||
|      | ||||
|     def get_last_heartbeat(self): | ||||
|         return self.last_heartbeat | ||||
| 
 | ||||
|     def get_containers(self): | ||||
|         return self.containers_set, self.containers | ||||
|         partner_container_config = clore_partner.get_partner_container_config() | ||||
|         return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers) | ||||
|      | ||||
|     def get_oc(self): | ||||
|         return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc | ||||
| 
 | ||||
|     def get_clore_partner_config(self): | ||||
|         return self.clore_partner_config | ||||
|      | ||||
|     async def set_forwarding_latency_measurment(self, forwarding_latency_measurment): | ||||
|         await self.send(json.dumps( | ||||
|             { | ||||
|                 "forwarding_latency_measurment": forwarding_latency_measurment | ||||
|             } | ||||
|         )) | ||||
|         self.forwarding_latency_measurment = forwarding_latency_measurment | ||||
| 
 | ||||
|     def set_ws_peers(self, ws_peers): | ||||
|         tmp_ws_peers=[] | ||||
|         for ws_peer in list(ws_peers.keys()): | ||||
|  | @ -68,8 +85,9 @@ class WebSocketClient: | |||
| 
 | ||||
|         self.ws_peers = tmp_ws_peers | ||||
|      | ||||
|     def set_auth(self, auth): | ||||
|     def set_auth(self, auth, xfs_state): | ||||
|         self.auth=auth | ||||
|         self.xfs_state=xfs_state | ||||
| 
 | ||||
|     def set_pull_logs(self, pull_logs): | ||||
|         self.pull_logs=pull_logs | ||||
|  | @ -93,7 +111,9 @@ class WebSocketClient: | |||
|                 log.debug(f"CLOREWS | Connected to {random_ws_peer} ✅") | ||||
|                 await self.send(json.dumps({ | ||||
|                     "login":str(self.auth), | ||||
|                     "type":"python" | ||||
|                     "xfs_state": self.xfs_state, | ||||
|                     "type":"python", | ||||
|                     "clore_partner_support": True | ||||
|                 })) | ||||
|             except Exception as e: | ||||
|                 log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") | ||||
|  | @ -136,6 +156,16 @@ class WebSocketClient: | |||
|                         pass | ||||
|                 elif message=="KEEPALIVE": | ||||
|                     self.last_heartbeat = clore_utils.unix_timestamp() | ||||
|                     try: | ||||
|                         if self.forwarding_latency_measurment: | ||||
|                             await self.send(json.dumps( | ||||
|                                 { | ||||
|                                     "forwarding_latency_measurment": self.forwarding_latency_measurment | ||||
|                                 } | ||||
|                             )) | ||||
|                             self.forwarding_latency_measurment = None | ||||
|                     except Exception as e: | ||||
|                         pass | ||||
|                 elif message=="NEWER_LOGIN" or message=="WAIT": | ||||
|                     await self.close_websocket() | ||||
|                 elif message[:10]=="PROVEPULL;": | ||||
|  | @ -148,13 +178,16 @@ class WebSocketClient: | |||
|                 else: | ||||
|                     try: | ||||
|                         parsed_json = json.loads(message) | ||||
|                         if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: | ||||
|                         if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict: | ||||
|                             self.clore_partner_config = parsed_json["partner_config"] | ||||
|                             await self.send(json.dumps({"partner_config":parsed_json["partner_config"]})) | ||||
|                         elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: | ||||
|                             self.last_heartbeat = clore_utils.unix_timestamp() | ||||
|                             container_str = json.dumps({"containers":parsed_json["new_containers"]}) | ||||
|                             await self.send(container_str) | ||||
|                             if len(parsed_json["new_containers"]) > 0: # There should be at least one container | ||||
|                                 self.containers_set = True | ||||
|                                 self.containers=parsed_json["new_containers"] | ||||
|                                 self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"]) | ||||
|                             #log.success(container_str) | ||||
|                         elif "allow_oc" in parsed_json: # Enable OC | ||||
|                             self.oc_enabled=True | ||||
|  |  | |||
							
								
								
									
										11
									
								
								hosting.py
								
								
								
								
							
							
						
						
									
										11
									
								
								hosting.py
								
								
								
								
							|  | @ -1,5 +1,6 @@ | |||
| from lib import config as config_module | ||||
| from lib import init_server | ||||
| from lib import xfs | ||||
| from lib import utils | ||||
| from clore_hosting import main as clore_hosting | ||||
| import asyncio, os | ||||
|  | @ -29,7 +30,15 @@ elif config.reset: | |||
|             log.success("Client login reseted") | ||||
| elif config.service: | ||||
|     if len(auth)==32+48+1: | ||||
|         clore_client = clore_hosting.CloreClient(auth_key=auth) | ||||
|         utils.run_command("sysctl -w net.ipv4.ip_forward=1") | ||||
| 
 | ||||
|         xfs_state = xfs.init() | ||||
| 
 | ||||
|         if os.path.isfile(config.restart_docker_flag_file): | ||||
|             utils.run_command("systemctl restart docker") | ||||
|             os.remove(config.restart_docker_flag_file) | ||||
| 
 | ||||
|         clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state) | ||||
|         asyncio.run(clore_client.service()) | ||||
|     else: | ||||
|         print("TODO: Firstly config auth") | ||||
|  |  | |||
|  | @ -0,0 +1,22 @@ | |||
| import time | ||||
| import re | ||||
| 
 | ||||
| disabled_till = 0 | ||||
| 
 | ||||
| def is_background_job_container_name(string): | ||||
|     if type(string) != str: | ||||
|         return False | ||||
|     pattern = r"^clore-default-\d+$" | ||||
|     return bool(re.match(pattern, string)) | ||||
| 
 | ||||
| def temporarly_disable(seconds): | ||||
|     global disabled_till | ||||
|     disabled_till = time.time() + seconds | ||||
| 
 | ||||
| def enable(): | ||||
|     global disabled_till | ||||
|     disabled_till=0 | ||||
| 
 | ||||
| def is_enabled(): | ||||
|     global disabled_till | ||||
|     return True if disabled_till < time.time() else False | ||||
|  | @ -0,0 +1,238 @@ | |||
| from lib import ensure_packages_installed | ||||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import clore_partner_socket | ||||
| from lib import latency_test | ||||
| from lib import openvpn | ||||
| from lib import utils | ||||
| import asyncio | ||||
| import random | ||||
| import json | ||||
| import time | ||||
| import re | ||||
| 
 | ||||
| import os | ||||
| import aiofiles.os | ||||
| from aiohttp import ClientSession, ClientTimeout | ||||
| 
 | ||||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2'] | ||||
| 
 | ||||
| DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload" | ||||
| 
 | ||||
| host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts") | ||||
| partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache") | ||||
| 
 | ||||
| next_ensupe_packages_check = 0 | ||||
| is_socket_running = False | ||||
| 
 | ||||
| partner_container_config = None | ||||
| 
 | ||||
| async def initialize(): | ||||
|     global next_ensupe_packages_check | ||||
|     global is_socket_running | ||||
|     try: | ||||
|         await aiofiles.os.makedirs(host_facts_location, exist_ok=True) | ||||
|         await aiofiles.os.makedirs(partner_cache_location, exist_ok=True) | ||||
|         await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True) | ||||
|         if not is_socket_running: | ||||
|             is_socket_running=True | ||||
|             asyncio.create_task(clore_partner_socket.socket_service( | ||||
|                 location=os.path.join(host_facts_location, "partner_interface.socket") | ||||
|             )) | ||||
|         if next_ensupe_packages_check < time.time(): | ||||
|             success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) | ||||
|             next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr | ||||
|             if not success: | ||||
|                 return False | ||||
|         elif next_ensupe_packages_check != float('inf'): | ||||
|             return False | ||||
| 
 | ||||
|         code, stdout, stderr = await utils.async_run_command( | ||||
|             "dmidecode -t 2 2>&1", | ||||
|             20 | ||||
|         ) | ||||
|         if code == 0 and not stderr: | ||||
|             async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file: | ||||
|                 await file.write(stdout) | ||||
|         else: | ||||
|             return False | ||||
|         code, stdout, stderr = await utils.async_run_command( | ||||
|             "dmidecode 2>&1", | ||||
|             20 | ||||
|         ) | ||||
|         if code == 0 and not stderr: | ||||
|             async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file: | ||||
|                 await file.write(stdout) | ||||
|         else: | ||||
|             return False | ||||
|         return True | ||||
|     except Exception as e: | ||||
|         log.error(f"FAIL | clore_partner.initialize | {e}") | ||||
|         return False | ||||
| 
 | ||||
| async def get_partner_allowed_images(): | ||||
|     try: | ||||
|         file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json")) | ||||
|         if not file_exists: | ||||
|             return [] | ||||
|         images = [] | ||||
|         async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file: | ||||
|             content = await file.read() | ||||
|             containers = json.loads(content) | ||||
|             for container in containers: | ||||
|                 image = container.get("Config", {}).get("Image", None) | ||||
|                 if image and not image in images: | ||||
|                     images.append(image) | ||||
|         return images | ||||
|     except Exception as e: | ||||
|         return None | ||||
| 
 | ||||
| def validate_partner_container_name(name): | ||||
|     if type(name) != str: | ||||
|         return False | ||||
|     elif name==config.clore_partner_container_name: | ||||
|         return True | ||||
|     pattern = r"^C\.\d+$" | ||||
|     return bool(re.match(pattern, name)) | ||||
| 
 | ||||
| def validate_partner_workload_container_name(name): | ||||
|     if type(name) != str: | ||||
|         return False  | ||||
|     pattern = r"^C\.\d+$" | ||||
|     return bool(re.match(pattern, name)) | ||||
| 
 | ||||
| last_openvpn_config = None | ||||
| 
 | ||||
| def get_partner_container_config(): | ||||
|     global partner_container_config | ||||
|     return partner_container_config | ||||
| 
 | ||||
| async def configure(partner_config): | ||||
|     global last_openvpn_config | ||||
|     global partner_container_config | ||||
|     if last_openvpn_config != partner_config: | ||||
|         partner_container_config = { | ||||
|             "image": partner_config["partner_image"], | ||||
|             "name": config.clore_partner_container_name, | ||||
|             "hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}", | ||||
|             "env": { | ||||
|                 "AUTH": partner_config['partner_id'], | ||||
|                 "ip_addr": partner_config['openvpn_host'], | ||||
|                 "port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}' | ||||
|             }, | ||||
|             "volumes": { | ||||
|                 f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"}, | ||||
|                 f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"}, | ||||
|                 f"/var/lib/docker": {"bind": "/var/lib/docker"}, | ||||
|                 f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"} | ||||
|             }, | ||||
|             "gpus": True, | ||||
|             "command": '', | ||||
|             "network": "clore-partner-br0", | ||||
|             "ip": "172.19.0.254", | ||||
|             "cap_add": ["SYS_ADMIN"], | ||||
|             "devices": ["/dev/fuse"], | ||||
|             #"security_opt": ["apparmor:unconfined"], | ||||
|             "ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"], | ||||
|         } | ||||
|         r = await openvpn.clore_partner_configure(partner_config) | ||||
|         if r: | ||||
|             last_openvpn_config = partner_config | ||||
| 
 | ||||
| # ----------------------------------------- | ||||
| 
 | ||||
| next_latency_measurment = 0 | ||||
| 
 | ||||
| async def fetch_forwarding_nodes(): | ||||
|     url = "https://api.clore.ai/v1/get_relay" | ||||
|     timeout = ClientTimeout(total=30) | ||||
| 
 | ||||
|     async with ClientSession(timeout=timeout) as session: | ||||
|         try: | ||||
|             async with session.get(url) as response: | ||||
|                 response.raise_for_status() | ||||
|                 data = await response.json() | ||||
|                 return data | ||||
|         except Exception as e: | ||||
|             print(f"An error occurred: {e}") | ||||
|             return None | ||||
| 
 | ||||
| async def set_next_latency_measurment(ts): | ||||
|     global next_latency_measurment | ||||
|     try: | ||||
|         next_latency_measurment=ts | ||||
|         async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file: | ||||
|             await file.write(str(ts)) | ||||
|     except Exception as e: | ||||
|         pass | ||||
| 
 | ||||
| async def measure_forwarding_latency(): | ||||
|     global next_latency_measurment | ||||
| 
 | ||||
|     if next_latency_measurment > time.time(): | ||||
|         return False | ||||
|     try: | ||||
|         await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True) | ||||
|         file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment")) | ||||
|         if file_exists: | ||||
|             async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file: | ||||
|                 content = await file.read() | ||||
|                 if content.isdigit(): | ||||
|                     next_latency_measurment = int(content) | ||||
|         if next_latency_measurment < time.time(): | ||||
|             node_info = await fetch_forwarding_nodes() | ||||
|             if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0: | ||||
|                 to_test_nodes = [] | ||||
|                 ip_to_region = {} | ||||
| 
 | ||||
|                 valid_regions = [] | ||||
| 
 | ||||
|                 for node_region in node_info.get("nodes").keys(): | ||||
|                     nodes_ip_list = node_info.get("nodes")[node_region] | ||||
|                     if type(nodes_ip_list) == list and len(nodes_ip_list) > 0: | ||||
|                         to_test_nodes = to_test_nodes + nodes_ip_list | ||||
|                         for node_ip in nodes_ip_list: | ||||
|                             ip_to_region[node_ip]=node_region | ||||
| 
 | ||||
|                 if len(to_test_nodes) > 0: | ||||
|                     measurment_result = await latency_test.measure_latency_icmp(to_test_nodes) | ||||
|                     if measurment_result: | ||||
|                         for idx, res in enumerate(measurment_result): | ||||
|                             if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions: | ||||
|                                 valid_regions.append(ip_to_region.get(res["host"])) | ||||
|                             measurment_result[idx]["region"] = ip_to_region.get(res["host"]) | ||||
|                     if len(valid_regions) == len(ip_to_region.keys()): | ||||
|                         await set_next_latency_measurment(int( | ||||
|                             time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded | ||||
|                         )) | ||||
|                         return measurment_result | ||||
|                     else: | ||||
|                         await set_next_latency_measurment(int( | ||||
|                             time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable | ||||
|                         )) | ||||
|                 else: | ||||
|                     await set_next_latency_measurment(int( | ||||
|                         time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet) | ||||
|                     )) | ||||
|                 return False | ||||
|             else: | ||||
|                 await set_next_latency_measurment(int( | ||||
|                     time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format | ||||
|                 )) | ||||
|                 return False | ||||
|         return False | ||||
|     except Exception as e: | ||||
|         return False | ||||
| 
 | ||||
| def filter_partner_dummy_workload_container(containers): | ||||
|     try: | ||||
|         remaining_containers = [] | ||||
|         for container in containers: | ||||
|             if container["image"] != DUMMY_WORKLOAD_CONTAINER: | ||||
|                 remaining_containers.append(container) | ||||
|         return remaining_containers | ||||
|     except Exception as e: | ||||
|         return containers | ||||
|  | @ -0,0 +1,94 @@ | |||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import background_job | ||||
| from lib import utils | ||||
| import asyncio | ||||
| import json | ||||
| 
 | ||||
| import os | ||||
| import aiofiles.os | ||||
| 
 | ||||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}'] | ||||
| allowed_commands = ["df"] | ||||
| 
 | ||||
| can_deploy = True | ||||
| 
 | ||||
| async def remove_socket_file(path): | ||||
|     try: | ||||
|         file_exists = await aiofiles.os.path.exists(path) | ||||
|         if file_exists: | ||||
|             await aiofiles.os.remove(path) | ||||
|     except Exception: | ||||
|         pass | ||||
| 
 | ||||
| async def handle_client(reader, writer): | ||||
|     global can_deploy | ||||
|     try: | ||||
|         while True: | ||||
|             data = await reader.read(1024*64) | ||||
|             try: | ||||
|                 if not data: | ||||
|                     break | ||||
|                 #print("DATA", data, data.decode()) | ||||
|                 parsed_data = json.loads(data.decode()) | ||||
|                 if "run_command" in parsed_data and type(parsed_data["run_command"])==str: | ||||
|                     allowed = False | ||||
|                     for allowed_command in allowed_commands: | ||||
|                         if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]: | ||||
|                             allowed = True | ||||
|                             break | ||||
|                     if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars): | ||||
|                         allowed = False | ||||
|                     log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}") | ||||
|                     if allowed: | ||||
|                         code, stdout, stderr = await utils.async_run_command( | ||||
|                             parsed_data["run_command"] | ||||
|                         ) | ||||
|                         writer.write(json.dumps({ | ||||
|                             "code": code, | ||||
|                             "stderr": stderr, | ||||
|                             "stdout": stdout | ||||
|                         }).encode()) | ||||
|                     else: | ||||
|                         writer.write(json.dumps({ | ||||
|                             "code": -1, | ||||
|                             "stderr": 'Command not allowed', | ||||
|                             "stdout": 'Command not allowed' | ||||
|                         }).encode()) | ||||
|                 elif "can_deploy" in parsed_data: | ||||
|                     writer.write(json.dumps({ | ||||
|                         "can_deploy": can_deploy | ||||
|                     }).encode()) | ||||
|                 elif "stop_background_job" in parsed_data and "time" in parsed_data: | ||||
|                     try: | ||||
|                         if isinstance(parsed_data["time"], int): | ||||
|                             background_job.temporarly_disable(parsed_data["time"]) | ||||
|                     except Exception as e: | ||||
|                         pass | ||||
|                 else: | ||||
|                     writer.write('?'.encode()) | ||||
|                 await writer.drain() | ||||
|             except Exception as data_exception: | ||||
|                 pass | ||||
|                 break | ||||
|     except asyncio.CancelledError: | ||||
|         log.debug(f"clore partner socket | Client handler canceled.") | ||||
|     finally: | ||||
|         log.debug(f"clore partner socket | Closing client connection.") | ||||
|         writer.close() | ||||
|         await writer.wait_closed() | ||||
| 
 | ||||
| def set_can_deploy(state): | ||||
|     global can_deploy | ||||
|     can_deploy = state | ||||
| 
 | ||||
| async def socket_service(location): | ||||
|     await remove_socket_file(location) | ||||
|     server = await asyncio.start_unix_server(handle_client, path=location) | ||||
| 
 | ||||
|     log.debug(f"clore partner socket | running at {location}") | ||||
|     async with server: | ||||
|         await server.serve_forever() | ||||
|  | @ -9,6 +9,11 @@ hard_config = { | |||
|             "name": "clore-br0", | ||||
|             "subnet": "172.18.0.0/16", | ||||
|             "gateway": "172.18.0.1" | ||||
|         }, | ||||
|         { | ||||
|             "name": "clore-partner-br0", | ||||
|             "subnet": "172.19.0.0/20", | ||||
|             "gateway": "172.19.0.1" | ||||
|         } | ||||
|     ], | ||||
|     "run_iptables_with_sudo":True, | ||||
|  | @ -33,7 +38,11 @@ hard_config = { | |||
|     "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app | ||||
|     "maximum_pull_service_loop_time": 14400, # Exception for image pulling | ||||
|     "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk | ||||
|     "allow_mixed_gpus": True | ||||
|     "allow_mixed_gpus": True, | ||||
|     "openvpn_forwarding_tun_device": "tun1313", | ||||
|     "forwarding_ip_route_table_id": 100, | ||||
|     "clore_partner_container_name": "clore-partner-service", | ||||
|     "restart_docker_flag_file": "/opt/clore-hosting/.restart_docker" | ||||
| } | ||||
| 
 | ||||
| parser = argparse.ArgumentParser(description='Example argparse usage') | ||||
|  | @ -50,6 +59,7 @@ parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hostin | |||
| parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") | ||||
| parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") | ||||
| parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software") | ||||
| parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner") | ||||
| 
 | ||||
| # Parse arguments, ignoring any non-defined arguments | ||||
| args, _ = parser.parse_known_args() | ||||
|  |  | |||
|  | @ -28,6 +28,14 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, | |||
|         for cap in container_options["cap_add"]: | ||||
|             command.extend(["--cap-add", cap]) | ||||
|      | ||||
|     if "devices" in container_options: | ||||
|         for device in container_options["devices"]: | ||||
|             command.extend(["--device", device]) | ||||
| 
 | ||||
|     if "security_opt" in container_options: | ||||
|         for security_opt in container_options["security_opt"]: | ||||
|             command.extend(["--security-opt", security_opt]) | ||||
| 
 | ||||
|     if "volumes" in container_options: | ||||
|         for volume_host, volume_container in container_options["volumes"].items(): | ||||
|             bind = f"{volume_host}:{volume_container['bind']}" | ||||
|  | @ -75,6 +83,8 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, | |||
|     if ip: | ||||
|         command.extend(["--ip", ip]) | ||||
| 
 | ||||
|     command.append('--stop-timeout') | ||||
|     command.append('0') | ||||
|     command.append(container_options["image"]) | ||||
| 
 | ||||
|     try: | ||||
|  |  | |||
|  | @ -1,7 +1,9 @@ | |||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import docker_cli_wrapper | ||||
| from lib import background_job | ||||
| from lib import docker_interface | ||||
| from lib import clore_partner | ||||
| from lib import get_specs | ||||
| from lib import utils | ||||
| import docker | ||||
|  | @ -14,7 +16,7 @@ client = docker_interface.client | |||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| def deploy(validated_containers, allowed_running_containers=[]): | ||||
| def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False): | ||||
|     local_images = docker_interface.get_local_images() | ||||
|     all_containers = docker_interface.get_containers(all=True) | ||||
| 
 | ||||
|  | @ -67,7 +69,9 @@ def deploy(validated_containers, allowed_running_containers=[]): | |||
|                 'tty': True, | ||||
|                 'network_mode': 'clore-br0', | ||||
|                 'cap_add': [], | ||||
|                 'volumes': {}, | ||||
|                 'devices': [], | ||||
|                 'security_opt': [], | ||||
|                 'volumes': validated_container["volumes"] if "volumes" in validated_container else {}, | ||||
|                 'ports': {}, | ||||
|                 'device_requests': [], | ||||
|                 'environment': validated_container["env"] if "env" in validated_container else {}, | ||||
|  | @ -80,6 +84,15 @@ def deploy(validated_containers, allowed_running_containers=[]): | |||
|                 ) | ||||
|             } | ||||
| 
 | ||||
|             if "security_opt" in validated_container: | ||||
|                 container_options["security_opt"] = validated_container["security_opt"] | ||||
|              | ||||
|             if "devices" in validated_container: | ||||
|                 container_options["devices"] = validated_container["devices"] | ||||
| 
 | ||||
|             if "cap_add" in validated_container: | ||||
|                 container_options["cap_add"] = validated_container["cap_add"] | ||||
| 
 | ||||
|             if "hostname" in validated_container: | ||||
|                 container_options["hostname"]=validated_container["hostname"] | ||||
|             elif "clore-order-" in validated_container["name"]: | ||||
|  | @ -136,7 +149,7 @@ def deploy(validated_containers, allowed_running_containers=[]): | |||
| 
 | ||||
|             container_options["shm_size"] = f"{SHM_SIZE}m" | ||||
| 
 | ||||
|             if not validated_container["name"] in created_container_names and image_ready: | ||||
|             if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])): | ||||
|                 if config.creation_engine == "wrapper": | ||||
|                     docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) | ||||
|                 else: | ||||
|  | @ -159,7 +172,10 @@ def deploy(validated_containers, allowed_running_containers=[]): | |||
|                 all_running_container_names.append(container.name) | ||||
|             else: | ||||
|                 all_stopped_container_names.append(container.name) | ||||
|             if container.name in needed_running_names and container.status != 'running': | ||||
|             if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled(): | ||||
|                 if container.status == "running": | ||||
|                     container.stop() | ||||
|             elif container.name in needed_running_names and container.status != 'running': | ||||
|                 try: | ||||
|                     attached_networks = container.attrs['NetworkSettings']['Networks'] | ||||
|                     if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container | ||||
|  | @ -174,17 +190,22 @@ def deploy(validated_containers, allowed_running_containers=[]): | |||
|                     container.stop() | ||||
|                 except Exception as e: | ||||
|                     pass | ||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running': | ||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): | ||||
|                 try: | ||||
|                     container.stop() | ||||
|                     container.remove() | ||||
|                 except Exception as e: | ||||
|                     pass | ||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers: | ||||
|             elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): | ||||
|                 try: | ||||
|                     container.remove() | ||||
|                 except Exception as e: | ||||
|                     pass | ||||
|             elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name): | ||||
|                 try: | ||||
|                     container.stop() | ||||
|                 except Exception as e: | ||||
|                     pass | ||||
| 
 | ||||
|     return all_running_container_names, all_stopped_container_names | ||||
|     #print(validated_containers) | ||||
|  |  | |||
|  | @ -11,6 +11,13 @@ from typing import List, Optional | |||
| import docker | ||||
| import json | ||||
| import os | ||||
| import re | ||||
| 
 | ||||
| partner_bridge_subnet = '' | ||||
| 
 | ||||
| for clore_network in config.clore_default_networks: | ||||
|     if clore_network["name"] == "clore-partner-br0": | ||||
|         partner_bridge_subnet = clore_network["subnet"] | ||||
| 
 | ||||
| try: | ||||
|     os.makedirs(config.startup_scripts_folder, exist_ok=True) | ||||
|  | @ -59,6 +66,18 @@ def get_info(): | |||
|     except Exception as e: | ||||
|         return {} | ||||
| 
 | ||||
| def stop_all_containers(): | ||||
|     try: | ||||
|         # List all containers | ||||
|         containers = client.containers.list(all=True)  # Use all=True to include stopped containers | ||||
|         for container in containers: | ||||
|             log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})") | ||||
|             container.stop()  # Stop the container | ||||
|         log.success("stop_all_containers() | All containers have been stopped.") | ||||
|     except Exception as e: | ||||
|         log.error(f"stop_all_containers() |An error occurred: {e}") | ||||
|     return True | ||||
| 
 | ||||
| def check_docker_connection(): | ||||
|     try: | ||||
|         client.ping() | ||||
|  | @ -95,7 +114,7 @@ def get_local_images(no_latest_tag=False): | |||
| 
 | ||||
|         return image_list | ||||
|     except Exception as e: | ||||
|         log.error(f"DOCKER | Can't get local images | {e}") | ||||
|         log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}") | ||||
|         os._exit(1) | ||||
|      | ||||
| def get_containers(all=False): | ||||
|  | @ -184,7 +203,8 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): | |||
|                     gateway=gateway | ||||
|                 )] | ||||
|             ), | ||||
|             check_duplicate=True | ||||
|             check_duplicate=True, | ||||
|             #options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {} | ||||
|         ) | ||||
|         log.debug(f"Network {network_name} created successfully.") | ||||
|         return True | ||||
|  | @ -192,7 +212,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): | |||
|         log.error(f"DOCKER | Failed to create network {network_name}: {e}") | ||||
|         return False | ||||
| 
 | ||||
| def validate_and_secure_networks(): | ||||
| def validate_and_secure_networks(partner_forwarding_ips): | ||||
|     try: | ||||
|         failed_appending_iptables_rule = False | ||||
| 
 | ||||
|  | @ -238,6 +258,13 @@ def validate_and_secure_networks(): | |||
|                                 #print(this_ipv4_range) | ||||
| 
 | ||||
|                                 outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) | ||||
|                                 if this_ipv4_range == partner_bridge_subnet: | ||||
|                                     for partner_forwarding_ip in partner_forwarding_ips: | ||||
|                                         outside_ranges = [] | ||||
|                                         for ip_range in outside_ranges_ip_network: | ||||
|                                             outside_ranges.append(str(ip_range)) | ||||
|                                         outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges) | ||||
| 
 | ||||
|                                 outside_ranges = [] | ||||
|                                 for outside_range_ip_network in outside_ranges_ip_network: | ||||
|                                     outside_ranges.append(str(outside_range_ip_network)) | ||||
|  | @ -265,7 +292,7 @@ def validate_and_secure_networks(): | |||
|                                                 succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) | ||||
|                                                 if not succesfully_appended: | ||||
|                                                     failed_appending_iptables_rule = True | ||||
|                                     else: | ||||
|                                     elif this_ipv4_range != partner_bridge_subnet: | ||||
|                                         needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name) | ||||
|                                         for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule | ||||
|                                         for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) | ||||
|  | @ -413,3 +440,7 @@ def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"): | |||
|             return False | ||||
|     else: | ||||
|         return False | ||||
| 
 | ||||
| def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,  | ||||
|     pattern = r'^[a-z]+_[a-z]+$' | ||||
|     return re.match(pattern, container_name) is not None | ||||
|  | @ -0,0 +1,71 @@ | |||
| from lib import logging as logging_lib | ||||
| 
 | ||||
| from typing import List | ||||
| from lib import utils | ||||
| import time | ||||
| 
 | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| async def ensure_packages_installed( | ||||
|     packages: List[str] = [],  | ||||
|     total_timeout: float = 300 | ||||
| ) -> bool: | ||||
|     non_interactive_env = { | ||||
|         'DEBIAN_FRONTEND': 'noninteractive', | ||||
|         'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin', | ||||
|     } | ||||
|      | ||||
|     start_time = time.time() | ||||
|      | ||||
|     packages_to_install = [] | ||||
|     for package in packages: | ||||
|         check_cmd = f"dpkg -s {package} > /dev/null 2>&1" | ||||
|         return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env) | ||||
|          | ||||
|         if return_code != 0: | ||||
|             packages_to_install.append(package) | ||||
| 
 | ||||
|     if not packages_to_install: | ||||
|         log.debug("All packages are already installed.") | ||||
|         return True | ||||
|      | ||||
|     update_cmd = ( | ||||
|         "apt-get update " | ||||
|         "-y " | ||||
|         "--no-install-recommends" | ||||
|     ) | ||||
|     return_code, stdout, stderr = await utils.async_run_command( | ||||
|         update_cmd,  | ||||
|         timeout=None if total_timeout == None else 180, | ||||
|         env=non_interactive_env | ||||
|     ) | ||||
|     if return_code != 0: | ||||
|         log.error(f"Failed to update package lists: {stderr}") | ||||
|         return False | ||||
| 
 | ||||
|     install_cmd = ( | ||||
|         "apt-get install " | ||||
|         "-y " | ||||
|         "--no-install-recommends " | ||||
|         "--assume-yes " | ||||
|         "-o Dpkg::Options::='--force-confdef' "  # Default to existing config | ||||
|         "-o Dpkg::Options::='--force-confold' "  # Keep existing config | ||||
|         f"{' '.join(packages_to_install)}" | ||||
|     ) | ||||
|      | ||||
|     # Calculate remaining timeout | ||||
|     remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time)) | ||||
|      | ||||
|     # Install packages | ||||
|     return_code, stdout, stderr = await utils.async_run_command( | ||||
|         install_cmd,  | ||||
|         timeout=remaining_timeout,  | ||||
|         env=non_interactive_env | ||||
|     ) | ||||
|      | ||||
|     if return_code == 0: | ||||
|         log.debug(f"Successfully installed packages: {packages_to_install}") | ||||
|         return True | ||||
|     else: | ||||
|         log.error(f"Failed to install packages: {stderr}") | ||||
|         return False | ||||
|  | @ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat | |||
| from pydantic import BaseModel, Field, constr | ||||
| import xml.etree.ElementTree as ET | ||||
| from lib import docker_interface | ||||
| from typing import Dict, List | ||||
| from typing import Dict, List, Optional | ||||
| from lib import utils | ||||
| import subprocess | ||||
| import speedtest | ||||
|  | @ -311,7 +311,7 @@ def get_gpu_info(): | |||
| class DockerDaemonConfig(BaseModel): | ||||
|     data_root: str = Field(alias="data-root") | ||||
|     storage_driver: str = Field(alias="storage-driver") | ||||
|     storage_opts: List[str] = Field(alias="storage-opts") | ||||
|     storage_opts: Optional[List[str]] = Field(alias="storage-opts") | ||||
| 
 | ||||
| class Specs: | ||||
|     def __init__(self): | ||||
|  | @ -336,26 +336,14 @@ class Specs: | |||
|         else: | ||||
|             overlay_total_size=None | ||||
|             disk_type="" | ||||
|             disk_usage_source_path = '/' | ||||
|             try: | ||||
|                 validated_config = DockerDaemonConfig(**docker_daemon_config) | ||||
|                 disk_udevadm = get_disk_udevadm(validated_config.data_root) | ||||
|                 for udevadm_line in disk_udevadm.split('\n'): | ||||
|                     try: | ||||
|                         key, value=udevadm_line.split('=',1) | ||||
|                         if "id_model" in key.lower(): | ||||
|                             disk_type=value[:24] | ||||
|                         elif "devpath" in key.lower() and "/virtual/" in value: | ||||
|                             disk_type="Virtual" | ||||
|                     except Exception as e_int: | ||||
|                         pass | ||||
|                 for storage_opt in validated_config.storage_opts: | ||||
|                     if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]: | ||||
|                         numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4) | ||||
|                         overlay_total_size=numeric_size | ||||
|                 if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config: | ||||
|                     disk_usage_source_path = docker_daemon_config["data-root"] | ||||
|             except Exception as e: | ||||
|                 pass | ||||
|             if overlay_total_size==None: | ||||
|                 total, used, free = shutil.disk_usage("/") | ||||
|                 total, used, free = shutil.disk_usage(disk_usage_source_path) | ||||
|                 disk_udevadm = get_disk_udevadm("/") | ||||
|                 for udevadm_line in disk_udevadm.split('\n'): | ||||
|                     try: | ||||
|  |  | |||
|  | @ -0,0 +1,66 @@ | |||
| from lib import ensure_packages_installed | ||||
| from lib import utils | ||||
| 
 | ||||
| import asyncio | ||||
| import socket | ||||
| import re | ||||
| 
 | ||||
| MANDATORY_PACKEGES = ['iputils-ping'] | ||||
| 
 | ||||
| def is_valid_ipv4_or_hostname(string): | ||||
|     def is_valid_ipv4(ip): | ||||
|         try: | ||||
|             socket.inet_aton(ip) | ||||
|             return True | ||||
|         except socket.error: | ||||
|             return False | ||||
| 
 | ||||
|     hostname_regex = re.compile( | ||||
|         r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$" | ||||
|     ) | ||||
|     def is_valid_hostname(hostname): | ||||
|         if len(hostname) > 255: | ||||
|             return False | ||||
|         return all(hostname_regex.match(part) for part in hostname.split(".")) | ||||
| 
 | ||||
|     return is_valid_ipv4(string) or is_valid_hostname(string) | ||||
| 
 | ||||
| async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4): | ||||
|     success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) | ||||
|     if success: | ||||
|         outcome = [] | ||||
|         concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)] | ||||
|         for concurent_job in concurent_jobs: | ||||
|             tasks = [] | ||||
|             for host in concurent_job: | ||||
|                 if is_valid_ipv4_or_hostname(host): | ||||
|                     tasks.append( | ||||
|                         utils.async_run_command( | ||||
|                             f"ping -c {str(count)} -W 2 -i 0.3 {host}", | ||||
|                             20 * count, | ||||
|                             {"LANG": "C", "LC_ALL": "C"} | ||||
|                         ) | ||||
|                     ) | ||||
|             if len(tasks) > 0: | ||||
|                 r = await asyncio.gather(*tasks) | ||||
|                 try: | ||||
|                     for output in r: | ||||
|                         results = [] | ||||
|                         code, stdout, stderr = output | ||||
|                         if code == 0: | ||||
|                             for line in stdout.split('\n'): | ||||
|                                 match = re.search(r"time=(\d+\.?\d*) ms", line) | ||||
|                                 if match: | ||||
|                                     results.append(float(match.group(1))) | ||||
|                         outcome.append( | ||||
|                             { | ||||
|                                 "host": hosts[len(outcome)], | ||||
|                                 "received": len(results), | ||||
|                                 "avg_latency": sum(results) / len(results) if len(results)>0 else 0 | ||||
|                             } | ||||
|                         ) | ||||
|                 except Exception as e: | ||||
|                     print(e) | ||||
|         return outcome | ||||
|     else: | ||||
|         return False | ||||
|  | @ -1,6 +1,7 @@ | |||
| from lib import docker_interface | ||||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import clore_partner | ||||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
| 
 | ||||
|  | @ -29,7 +30,7 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container | |||
| 
 | ||||
|             # Start tasks for new containers | ||||
|             for container_name, container in current_containers.items(): | ||||
|                 if not container_name in do_not_stream_containers: | ||||
|                 if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name): | ||||
|                     log_container_names.append(container_name) | ||||
|                     if container_name not in tasks: | ||||
|                         log.debug(f"log_streaming_task() | Starting task for {container_name}") | ||||
|  |  | |||
|  | @ -6,6 +6,7 @@ import ipaddress | |||
| import socket | ||||
| import psutil | ||||
| import sys | ||||
| import os | ||||
| 
 | ||||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
|  | @ -25,12 +26,15 @@ def get_network_interfaces_with_subnet(): | |||
|     except Exception as e: | ||||
|         return str(e) | ||||
| 
 | ||||
| def exclude_network(excluded_network): | ||||
| def exclude_network(excluded_network, input_ranges=None): | ||||
|     # Convert exclude_network to ip_network object | ||||
|     excluded_network = ip_network(excluded_network) | ||||
| 
 | ||||
|     if not input_ranges: | ||||
|         input_ranges=config.local_ipv4_ranges | ||||
| 
 | ||||
|     # Remove the excluded network from the local_ranges list | ||||
|     local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network] | ||||
|     local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network] | ||||
| 
 | ||||
|     ranges_outside_exclude = [] | ||||
|     for local_range in local_ranges: | ||||
|  | @ -93,3 +97,19 @@ def is_ip_in_network(ip: str, network: str) -> bool: | |||
|             # If there's an error with the input values, print the error and return False | ||||
|             log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") | ||||
|             return False | ||||
| 
 | ||||
| def purge_clore_interfaces(): | ||||
|     network_interfaces = get_network_interfaces_with_subnet() | ||||
|     if type(network_interfaces) != dict: | ||||
|         log.error("Failed to load network interfaces, restarting...") | ||||
|         os._exit(1) | ||||
| 
 | ||||
|     clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here | ||||
|     for clore_default_network in config.clore_default_networks: | ||||
|         clore_subnets.append(clore_default_network["subnet"]) | ||||
| 
 | ||||
|     for network_interface in network_interfaces.keys():         | ||||
|         if network_interface == "docker0" or network_interface[:3] == "br-": | ||||
|             subnet = network_interfaces[network_interface] | ||||
|             if subnet in clore_subnets or network_interface == "docker0": | ||||
|                 utils.run_command(f"ip link delete {network_interface}") | ||||
|  | @ -0,0 +1,187 @@ | |||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import utils | ||||
| 
 | ||||
| import os | ||||
| import aiofiles.os | ||||
| 
 | ||||
| config = config_module.config | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client" | ||||
| PARTNER_CONFIG_NAME = "clore_partner.conf" | ||||
| 
 | ||||
| def generate_openvpn_config( | ||||
|     local_ip='10.1.0.2',  | ||||
|     server_ip='10.1.0.1',  | ||||
|     server_hostname='example.com',  | ||||
|     udp_port=1194,  | ||||
|     vpn_secret_key='YOUR_VPN_SECRET_KEY' | ||||
| ): | ||||
|     openvpn_config = f"""nobind | ||||
| proto udp4 | ||||
| remote {server_hostname} {udp_port} | ||||
| resolv-retry infinite | ||||
| 
 | ||||
| auth SHA256 | ||||
| cipher AES-256-CBC | ||||
| 
 | ||||
| dev {config.openvpn_forwarding_tun_device} | ||||
| ifconfig {local_ip} {server_ip} | ||||
| 
 | ||||
| <secret> | ||||
| -----BEGIN OpenVPN Static key V1----- | ||||
| {vpn_secret_key} | ||||
| -----END OpenVPN Static key V1----- | ||||
| </secret> | ||||
| 
 | ||||
| fragment 1300 | ||||
| mssfix 1300 | ||||
| sndbuf 524288 | ||||
| rcvbuf 524288 | ||||
| 
 | ||||
| user nobody | ||||
| group nogroup | ||||
| 
 | ||||
| ping 15 | ||||
| ping-restart 45 | ||||
| ping-timer-rem | ||||
| persist-tun | ||||
| persist-key | ||||
| 
 | ||||
| verb 0""" | ||||
|      | ||||
|     return openvpn_config | ||||
| 
 | ||||
| async def get_iptables_forward_rules(): | ||||
|     code, stdout, stderr = await utils.async_run_command( | ||||
|         f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers" | ||||
|     ) | ||||
|     rules = [] | ||||
|     if code == 0: | ||||
|         collumns = [] | ||||
|         for idx, line in enumerate(stdout.split('\n')): | ||||
|             if "num" in collumns and "target" in collumns and "in" in collumns: | ||||
|                 items = line.split(maxsplit=len(collumns)+1) | ||||
|                 rule = {} | ||||
|                 for idx, name in enumerate(collumns): | ||||
|                     rule[name]=items[idx] | ||||
|                 rule["desc"] = items[len(collumns)+1] | ||||
|                 rules.append(rule) | ||||
|             else: | ||||
|                 collumns = line.split() | ||||
|     return rules | ||||
| 
 | ||||
| async def remove_iptables_rule(rule_dict): | ||||
|     cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables" | ||||
| 
 | ||||
|     if rule_dict.get('target') == 'DNAT': | ||||
|         cmd += " -t nat" | ||||
|     cmd += " -D PREROUTING" | ||||
|     if rule_dict.get('prot') and rule_dict['prot'] != '--': | ||||
|         cmd += f" -p {rule_dict['prot']}" | ||||
|     if rule_dict.get('in') and rule_dict['in'] != '*': | ||||
|         cmd += f" -i {rule_dict['in']}" | ||||
|     if rule_dict.get('out') and rule_dict['out'] != '*': | ||||
|         cmd += f" -o {rule_dict['out']}" | ||||
|     if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0': | ||||
|         cmd += f" -s {rule_dict['source']}" | ||||
|     if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0': | ||||
|         cmd += f" -d {rule_dict['destination']}" | ||||
|     if rule_dict.get('target') == 'DNAT': | ||||
|         if 'dports' in rule_dict.get('desc', ''): | ||||
|             port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0] | ||||
|             if ':' in port_info: | ||||
|                 cmd += f" -m multiport --dports {port_info}" | ||||
|             else: | ||||
|                 cmd += f" --dport {port_info}" | ||||
|         if 'to:' in rule_dict.get('desc', ''): | ||||
|             dest_ip = rule_dict['desc'].split('to:')[1].split()[0] | ||||
|             cmd += f" -j DNAT --to-destination {dest_ip}" | ||||
|     await utils.async_run_command(cmd) | ||||
|      | ||||
| 
 | ||||
| async def clore_partner_configure(clore_partner_config): | ||||
|     try: | ||||
|         if clore_partner_config: | ||||
|             docker_restart_required = False | ||||
| 
 | ||||
|             needed_openvpn_config = generate_openvpn_config( | ||||
|                 local_ip=clore_partner_config["provider"], | ||||
|                 server_ip=clore_partner_config["forwarding"], | ||||
|                 server_hostname=clore_partner_config["openvpn_host"], | ||||
|                 udp_port=clore_partner_config["openvpn_port"], | ||||
|                 vpn_secret_key=clore_partner_config["secret"] | ||||
|             ) | ||||
| 
 | ||||
|             saved_config='' | ||||
| 
 | ||||
|             config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME)) | ||||
|             if config_exists: | ||||
|                 async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file: | ||||
|                     saved_config = await file.read() | ||||
| 
 | ||||
|             if saved_config != needed_openvpn_config: | ||||
|                 async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file: | ||||
|                     await file.write(needed_openvpn_config) | ||||
| 
 | ||||
|             is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command( | ||||
|                 f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||
|             ) | ||||
| 
 | ||||
|             if is_active_code == 0 and saved_config != needed_openvpn_config: | ||||
|                 code, stdout, stderr = await utils.async_run_command( | ||||
|                     f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||
|                 ) | ||||
|                 docker_restart_required = False if code != 0 else True | ||||
|             elif is_active_code != 0: | ||||
|                 code, stdout, stderr = await utils.async_run_command( | ||||
|                     f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||
|                 ) | ||||
|                 docker_restart_required = False if code != 0 else True | ||||
| 
 | ||||
|             code, stdout, stderr = await utils.async_run_command( | ||||
|                 f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}" | ||||
|             ) | ||||
|             ip_route_configured = False | ||||
|             if code == 0: | ||||
|                 for line in stdout.split('\n'): | ||||
|                     items = line.split(' ') | ||||
|                     if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items: | ||||
|                         ip_route_configured = True | ||||
|                         break | ||||
|             if not ip_route_configured: | ||||
|                 code, stdout, stderr = await utils.async_run_command( | ||||
|                     f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}" | ||||
|                 ) | ||||
|             ip_tables_configured = False | ||||
| 
 | ||||
|             rules = await get_iptables_forward_rules() | ||||
| 
 | ||||
|             for rule in rules: | ||||
|                 try: | ||||
|                     if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]: | ||||
|                         ip_tables_configured = True | ||||
|                     elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]: | ||||
|                         print("REMOVE RULE", rule) | ||||
|                         await remove_iptables_rule(rule) | ||||
|                 except Exception as ei: | ||||
|                     log.error(f"clore_partner_configure() | ei | {ei}") | ||||
| 
 | ||||
|             if ip_tables_configured == False: | ||||
|                 code, stdout, stderr = await utils.async_run_command( | ||||
|                     f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}" | ||||
|                 )             | ||||
| 
 | ||||
|             if docker_restart_required: | ||||
|                 async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file: | ||||
|                     await file.write("") | ||||
|                 os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting | ||||
|         else: | ||||
|             code, stdout, stderr = await utils.async_run_command( | ||||
|                 f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" | ||||
|             ) | ||||
|         return True | ||||
|     except Exception as e: | ||||
|         log.error(f"FAIL | openvpn.clore_partner_configure | {e}") | ||||
|         return False | ||||
							
								
								
									
										60
									
								
								lib/utils.py
								
								
								
								
							
							
						
						
									
										60
									
								
								lib/utils.py
								
								
								
								
							|  | @ -1,10 +1,13 @@ | |||
| from typing import Optional, Tuple, Dict | ||||
| from lib import config as config_module | ||||
| from lib import logging as logging_lib | ||||
| from lib import nvml | ||||
| import subprocess | ||||
| import hashlib | ||||
| import asyncio | ||||
| import random | ||||
| import string | ||||
| import shutil | ||||
| import shlex | ||||
| import time | ||||
| import math | ||||
|  | @ -145,6 +148,63 @@ def get_extra_allowed_images(): | |||
|     else: | ||||
|         return [] | ||||
| 
 | ||||
| async def async_run_command( | ||||
|     command: str,  | ||||
|     timeout: Optional[float] = None, | ||||
|     env: Optional[Dict[str, str]] = None | ||||
| ) -> Tuple[int, str, str]: | ||||
|     command_env = env if env is not None else {} | ||||
|      | ||||
|     try: | ||||
|         proc = await asyncio.create_subprocess_shell( | ||||
|             command, | ||||
|             stdout=subprocess.PIPE, | ||||
|             stderr=subprocess.PIPE, | ||||
|             env=command_env | ||||
|         ) | ||||
|          | ||||
|         try: | ||||
|             stdout, stderr = await asyncio.wait_for( | ||||
|                 proc.communicate(),  | ||||
|                 timeout=timeout | ||||
|             ) | ||||
| 
 | ||||
|             stdout_str = stdout.decode('utf-8').strip() if stdout else '' | ||||
|             stderr_str = stderr.decode('utf-8').strip() if stderr else '' | ||||
|              | ||||
|             return proc.returncode, stdout_str, stderr_str | ||||
|          | ||||
|         except asyncio.TimeoutError: | ||||
|             # Handle timeout: terminate the process gracefully first | ||||
|             proc.terminate() | ||||
|             try: | ||||
|                 await asyncio.wait_for(proc.wait(), timeout=5)  # Wait for it to exit | ||||
|             except asyncio.TimeoutError: | ||||
|                 # Force kill the process if it doesn't terminate | ||||
|                 proc.kill() | ||||
|                 await proc.wait() | ||||
| 
 | ||||
|             return -1, '', f'Command timed out after {timeout} seconds' | ||||
|      | ||||
|     except Exception as e: | ||||
|         return -1, '', str(e) | ||||
| 
 | ||||
| def get_free_space_mb(path): | ||||
|     """Get free space in MB for the given path.""" | ||||
|     total, used, free = shutil.disk_usage(path) | ||||
|     return free // (1024 * 1024)  # Convert bytes to MB | ||||
| 
 | ||||
| def get_directory_size_mb(path): | ||||
|     """Get the size of a directory in MB.""" | ||||
|     total_size = 0 | ||||
|     for dirpath, dirnames, filenames in os.walk(path): | ||||
|         for f in filenames: | ||||
|             fp = os.path.join(dirpath, f) | ||||
|             # Skip if the file doesn't exist (symlinks, etc.) | ||||
|             if not os.path.islink(fp) and os.path.exists(fp): | ||||
|                 total_size += os.path.getsize(fp) | ||||
|     return total_size // (1024 * 1024)  # Convert bytes to MB | ||||
|      | ||||
| class shm_calculator: | ||||
|     def __init__(self, total_ram): | ||||
|         self.total_ram = total_ram | ||||
|  |  | |||
|  | @ -0,0 +1,201 @@ | |||
| # Library to setup XFS partition for docker | ||||
| from lib import ensure_packages_installed | ||||
| from lib import logging as logging_lib | ||||
| from lib import docker_interface | ||||
| from lib import networking | ||||
| from lib import utils | ||||
| 
 | ||||
| import asyncio | ||||
| import json | ||||
| import os | ||||
| 
 | ||||
| log = logging_lib.log | ||||
| 
 | ||||
| DOCKER_ROOT = "/var/lib/docker" | ||||
| DOCKER_DATA_IMG = "/opt/clore-hosting/data.img" | ||||
| LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB | ||||
| 
 | ||||
| MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB | ||||
| 
 | ||||
| XFS_STATE_FILE = "/opt/clore-hosting/xfs_state" | ||||
| 
 | ||||
| MANDATORY_PACKAGES = [ | ||||
|     "xfsprogs", | ||||
|     "dmidecode", | ||||
|     "openvpn", | ||||
|     "iproute2", | ||||
|     "iputils-ping", | ||||
|     "util-linux" | ||||
| ] | ||||
| 
 | ||||
| # This code is runned on start of clore hosting to migrate docker to XFS partition system | ||||
| 
 | ||||
| # sudo fallocate -l 300G /docker-storage.img | ||||
| # sudo mkfs.xfs /docker-storage.img | ||||
| # mount -o loop,pquota /docker-storage.img /mnt/docker-storage | ||||
| 
 | ||||
| def migrate(): | ||||
|     docker_xfs_state = validate_docker_xfs() | ||||
|     #print(docker_xfs_state) | ||||
|     if docker_xfs_state == "skip": | ||||
|         return | ||||
|     elif docker_xfs_state == "valid": | ||||
|         return 'success' | ||||
|      | ||||
|     packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed( | ||||
|         MANDATORY_PACKAGES | ||||
|     )) | ||||
| 
 | ||||
|     if not packages_available: | ||||
|         return 'packages-missing' | ||||
| 
 | ||||
|     log.info("Starting migration to xfs") | ||||
|     docker_interface.stop_all_containers() | ||||
| 
 | ||||
|     if os.path.exists(DOCKER_DATA_IMG): | ||||
|         try: | ||||
|             os.remove(DOCKER_DATA_IMG) | ||||
|         except Exception as e: | ||||
|             print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}") | ||||
|             return "failure" | ||||
| 
 | ||||
|     max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT) | ||||
|      | ||||
|     data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB) | ||||
|     if data_img_size < MIN_XFS_PARTITION_SIZE: | ||||
|         return 'not-enough-space' | ||||
|      | ||||
|     docker_config_success = False | ||||
|     fstab_config_success = False | ||||
| 
 | ||||
|     code, stdout, stderr = utils.run_command( | ||||
|         f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}" | ||||
|     ) | ||||
| 
 | ||||
|     networking.purge_clore_interfaces() | ||||
| 
 | ||||
|     if code == 0: | ||||
|         docker_config_success = configure_docker_daemon() | ||||
|     if code == 0 and docker_config_success: | ||||
|         fstab_config_success = configure_fstab() | ||||
|     if code == 0 and fstab_config_success: | ||||
|         code, stdout, stderr = utils.run_command( | ||||
|             f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a" | ||||
|         ) | ||||
|         if code==0: | ||||
|             return 'success' | ||||
|         else: | ||||
|             configure_docker_daemon(remove=True) | ||||
|             configure_fstab(remove=True) | ||||
|             return 'failure' | ||||
|     else: | ||||
|         utils.run_command( | ||||
|             f"mkdir {DOCKER_ROOT}" | ||||
|         ) | ||||
|         if os.path.exists(DOCKER_DATA_IMG): | ||||
|             try: | ||||
|                 os.remove(DOCKER_DATA_IMG) | ||||
|             except Exception as e: | ||||
|                 pass | ||||
|         return 'failure' | ||||
| 
 | ||||
| def validate_docker_xfs(): | ||||
|     code_root, stdout_root, stderr_root = utils.run_command("df -T /") | ||||
|     code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}") | ||||
|     #print([code, stderr, stdout]) | ||||
|     if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')): | ||||
|         root_blocks = None | ||||
|         docker_root_blocks = None | ||||
|         docker_root_format = '' | ||||
|         for idx, line in enumerate(stdout_root.split('\n')): | ||||
|             if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): | ||||
|                 root_blocks = int(line.split()[2]) | ||||
|         if code == 1: | ||||
|             docker_root_blocks = root_blocks | ||||
|         else: | ||||
|             for idx, line in enumerate(stdout.split('\n')): | ||||
|                 if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): | ||||
|                     docker_root_blocks = int(line.split()[2]) | ||||
|                     docker_root_format = line.split()[1] | ||||
|         if root_blocks == None or docker_root_blocks == None: | ||||
|             return "skip" | ||||
|         elif docker_root_format=="xfs" and root_blocks > docker_root_blocks: | ||||
|             return "valid" | ||||
|         else: | ||||
|             return "default" | ||||
|     else: | ||||
|         return "skip" | ||||
| 
 | ||||
| def configure_docker_daemon(remove=False): | ||||
|     try: | ||||
|         daemon_json_path = "/etc/docker/daemon.json" | ||||
| 
 | ||||
|         with open(daemon_json_path, 'r') as file: | ||||
|             raw_content = file.read() | ||||
|             daemon_config = json.loads(raw_content) | ||||
|         if remove: | ||||
|             if daemon_config.get("data-root") == DOCKER_ROOT: | ||||
|                 del daemon_config["data-root"] | ||||
|             if daemon_config.get("storage-driver") == "overlay2": | ||||
|                 del daemon_config["storage-driver"] | ||||
|         elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2": | ||||
|             daemon_config["data-root"] = DOCKER_ROOT | ||||
|             daemon_config["storage-driver"] = "overlay2" | ||||
|         with open(daemon_json_path, 'w') as file: | ||||
|             file.write(json.dumps(daemon_config,indent=4)) | ||||
|         return True | ||||
|     except Exception as e: | ||||
|         return False | ||||
| 
 | ||||
| def configure_fstab(remove=False): | ||||
|     try: | ||||
|         file_path = "/etc/fstab" | ||||
| 
 | ||||
|         mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0" | ||||
| 
 | ||||
|         with open(file_path, 'r') as file: | ||||
|             raw_content = file.read() | ||||
| 
 | ||||
|         if remove: | ||||
|             if mount_line in raw_content: | ||||
|                 raw_content = raw_content.replace(f"\n{mount_line}\n", '') | ||||
|                 with open(file_path, 'w') as file: | ||||
|                     file.write(raw_content) | ||||
|         elif not mount_line in raw_content: | ||||
|             raw_content += f"\n{mount_line}\n" | ||||
|             with open(file_path, 'w') as file: | ||||
|                 file.write(raw_content) | ||||
|         return True | ||||
|     except Exception as e: | ||||
|         return False | ||||
| 
 | ||||
| def init(): | ||||
|     try: | ||||
|         if os.path.exists(XFS_STATE_FILE): | ||||
|             with open(XFS_STATE_FILE, 'r') as file: | ||||
|                 raw_content = file.read() | ||||
|             if "enabled" in raw_content: | ||||
|                 migarion_status = migrate() | ||||
|                 if migarion_status == "success": | ||||
|                     with open(XFS_STATE_FILE, 'w') as file: | ||||
|                         file.write("active") | ||||
|                     return "active" | ||||
|                 elif migarion_status == "not-enough-space": | ||||
|                     with open(XFS_STATE_FILE, 'w') as file: | ||||
|                         file.write("not-enough-space") | ||||
|                     return 'not-enough-space' | ||||
|                 else: | ||||
|                     with open(XFS_STATE_FILE, 'w') as file: | ||||
|                         file.write("failed-migration") | ||||
|                     return 'failed' | ||||
|             elif 'not-enough-space' in raw_content: | ||||
|                 return 'not-enough-space' | ||||
|             elif "active" in raw_content: | ||||
|                 return "active" | ||||
|             else: | ||||
|                 return "failed" | ||||
|         else: | ||||
|             return "disabled" | ||||
|     except Exception as e: | ||||
|         print(e) | ||||
|         pass | ||||
		Loading…
	
		Reference in New Issue