diff --git a/clore_hosting/docker_configurator.py b/clore_hosting/docker_configurator.py index 107395d..052507f 100644 --- a/clore_hosting/docker_configurator.py +++ b/clore_hosting/docker_configurator.py @@ -32,7 +32,7 @@ def get_last_ip_occurrence_and_text(input_string): else: return None, None -def configure(containers): +def configure(containers, partner_forwarding_ips): valid_containers = [] newly_created_networks = [] containers_required_networks = [] @@ -141,7 +141,7 @@ def configure(containers): if config.log_containers_strings: print("FROM DOCKER CONFIGURATOR", valid_containers) - validation_and_security = docker_interface.validate_and_secure_networks() + validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips) if startup_sctipt_creation_fail: validation_and_security=False return validation_and_security, valid_containers, use_hive_flightsheet \ No newline at end of file diff --git a/clore_hosting/main.py b/clore_hosting/main.py index 9859a5d..f9031c5 100644 --- a/clore_hosting/main.py +++ b/clore_hosting/main.py @@ -4,7 +4,10 @@ from lib import log_streaming_task from lib import run_startup_script from lib import hive_miner_interface from lib import docker_interface +from lib import background_job from lib import docker_deploy +from lib import clore_partner +from lib import clore_partner_socket from lib import docker_pull from lib import get_specs from lib import utils @@ -34,17 +37,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_ #print(config) -async def configure_networks(containers): - res = await asyncio.to_thread(docker_configurator.configure, containers) +async def configure_networks(containers, partner_forwarding_ips): + res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips) try: fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) return fin_res except Exception as e: return False -async def deploy_containers(validated_containers, allowed_running_containers): +async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads): try: - all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers) + all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads) return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) except Exception as e: return False @@ -70,8 +73,10 @@ async def set_hive_miner_status(enabled=False): return False class CloreClient: - def __init__(self, auth_key): + def __init__(self, auth_key, xfs_state): self.auth_key=auth_key + self.xfs_state = xfs_state + self.ws_peers = {} self.last_checked_ws_peers=0 self.containers={} @@ -99,9 +104,11 @@ class CloreClient: "container_log_streaming_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(), "oc_service": utils.unix_timestamp(), - "background_pow_data_collection": utils.unix_timestamp() + "background_pow_data_collection": utils.unix_timestamp(), + "partner_service": utils.unix_timestamp() } self.max_service_inactivity = 600 # seconds + self.no_restart_services = ["partner_service"] # Services that are allowed to run indefinetly without triggering the app to restart if config.debug_ws_peer: self.ws_peers[str(config.debug_ws_peer)]={ @@ -137,6 +144,10 @@ class CloreClient: self.hive_miner_interface = hive_miner_interface.hive_interface() self.next_pow_background_job_send_update = 0 + self.clore_partner_initiazized = False + self.partner_forwarding_ips = [] + self.start_time = utils.unix_timestamp() + async def service(self): global container_log_broken @@ -151,10 +162,11 @@ class CloreClient: task6 = asyncio.create_task(self.specs_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring)) task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) + task9 = asyncio.create_task(self.partner_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) # Wait for both tasks to complete (they won't in this case) - await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) + await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task) async def monitoring_service(self, monitoring): while True: @@ -169,13 +181,14 @@ class CloreClient: if config.debug: log.success(self.last_service_heartbeat) for service_name in self.last_service_heartbeat.keys(): - last_hearthbeat = self.last_service_heartbeat[service_name] - if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": - log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") - os._exit(1) - elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": - log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") - os._exit(1) + if not service_name in self.no_restart_services: + last_hearthbeat = self.last_service_heartbeat[service_name] + if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": + log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") + os._exit(1) + elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": + log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") + os._exit(1) except Exception as e: log.debug(f"monitoring_service() | ERROR | {e}") await asyncio.sleep(5) @@ -260,6 +273,7 @@ class CloreClient: if len(self.p_needed_containers)>0: local_images = await get_local_images(no_latest_tag=True) + partner_images = await clore_partner.get_partner_allowed_images() for local_image in local_images: self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} image_needed = False @@ -283,11 +297,32 @@ class CloreClient: if local_image_tag==allowed_tag or allowed_tag=='*': image_needed=True break - if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers: + if not image_needed and type(partner_images) == list: + for partner_image in partner_images: + if local_image.replace(':latest', '') == partner_image.replace(':latest', ''): + image_needed = True + del self.last_pull_progress[local_image] + break + if len(local_image.split('/')) >= 3: + partner_image_spl = partner_image.split(':') + image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1] + if len(partner_image_spl) == 1: + if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest": + image_needed = True + del self.last_pull_progress[local_image] + break + elif len(partner_image_spl) == 2: + if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''): + image_needed = True + del self.last_pull_progress[local_image] + break + if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None: + log.success(f"GOING TO REMOVE {local_image}") with concurrent.futures.ThreadPoolExecutor() as pool: r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) if r: removed_cnt+=1 + del self.last_pull_progress[local_image] #if config.debug: # log.success(f"{local_image} | {image_needed}") @@ -327,7 +362,7 @@ class CloreClient: try: await pull_task except Exception as e: - self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()} + self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()} log_task.cancel() try: await log_task @@ -358,11 +393,18 @@ class CloreClient: container_conf = WebSocketClient.get_containers() + can_run_partner_workloads = False + if container_conf[0]: self.containers_set=True self.containers=container_conf[1] tmp_images = [] - for container in self.containers: + + is_order_spot = False + + for idx, container in enumerate(self.containers): + if "spot" in container: + is_order_spot = True if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": log_pull = False if "name" in container: @@ -386,6 +428,9 @@ class CloreClient: if not image_config in tmp_images: tmp_images.append(image_config) + can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True + clore_partner_socket.set_can_deploy(can_run_partner_workloads) + if self.restart_docker and not running_order and len(self.containers)>0: log.debug("Sending docker restart command") utils.run_command_v2("systemctl restart docker") @@ -400,14 +445,14 @@ class CloreClient: tasks.append(api_interface.get_server_config()) if self.containers_set: - tasks.append(configure_networks(self.containers)) + tasks.append(configure_networks(self.containers, self.partner_forwarding_ips)) tasks.append(WebSocketClient.stream_pull_logs()) if self.validated_containers_set: - tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers)) + tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads)) if step==1: - WebSocketClient.set_auth(self.auth_key) + WebSocketClient.set_auth(self.auth_key, self.xfs_state) asyncio.create_task(WebSocketClient.run()) elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") @@ -427,6 +472,11 @@ class CloreClient: if result.success: self.last_checked_ws_peers = utils.unix_timestamp() self.allowed_images=result.allowed_images+self.extra_allowed_images + if self.xfs_state == "active": + self.allowed_images.append({ + "repository": "vastai/test", + "allowed_tags": ["bandwidth-test-nvidia"] + }) if not config.debug_ws_peer: for pure_ws_peer in result.ws_peers: self.ws_peers[pure_ws_peer]={ @@ -455,7 +505,7 @@ class CloreClient: async def submit_specs(self, current_specs): try: if type(current_specs) == dict: - current_specs["backend_version"]=18 + current_specs["backend_version"]=19 current_specs["update_hw"]=True smallest_pcie_width = 999 for gpu in current_specs["gpus"]["nvidia"]: @@ -504,7 +554,7 @@ class CloreClient: await monitoring.put("oc_service") oc_apply_allowed = True ### OC Service should also hande Hive stuff - if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries: + if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled(): await set_hive_miner_status(True) oc_apply_allowed = False # Don't apply any OC when running HiveOS miner elif self.is_hive and not self.dont_use_hive_binaries: @@ -544,6 +594,30 @@ class CloreClient: log.debug(f"FAIL | background_pow_data_collection() | {e}") await asyncio.sleep(6) + async def partner_service(self, monitoring): + while True: + try: + await monitoring.put("partner_service") + if self.start_time < utils.unix_timestamp() - 180: + forwarding_latency_measurment = await clore_partner.measure_forwarding_latency() + if type(forwarding_latency_measurment) == list: + await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment) + partner_config = WebSocketClient.get_clore_partner_config() + if partner_config != None: + if self.clore_partner_initiazized == False: + ir = await clore_partner.initialize() + if ir: + self.clore_partner_initiazized = True + if self.clore_partner_initiazized == True: + if 'provider' in partner_config and 'forwarding' in partner_config: + self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']] + else: + self.partner_forwarding_ips = [] + await clore_partner.configure(partner_config) + except Exception as e: + log.debug(f"FAIL | partner_service() | {e}") + await asyncio.sleep(6) + def expire_ws_peers(self): for ws_peer_address in list(self.ws_peers.keys()): ws_peer_info = self.ws_peers[ws_peer_address] diff --git a/clore_hosting/ws_interface.py b/clore_hosting/ws_interface.py index cf54793..d3c5dba 100644 --- a/clore_hosting/ws_interface.py +++ b/clore_hosting/ws_interface.py @@ -1,4 +1,5 @@ from concurrent.futures import ThreadPoolExecutor +from lib import clore_partner import asyncio import random import websockets @@ -31,6 +32,7 @@ class WebSocketClient: self.connected = False self.authorized = False self.auth = auth + self.xfs_state = None self.log_auth_fail = True self.last_heartbeat = clore_utils.unix_timestamp() self.containers={} @@ -50,16 +52,31 @@ class WebSocketClient: self.oc_enabled = False self.last_gpu_oc_specs = [] self.last_set_oc = {} + + self.clore_partner_config = None + self.forwarding_latency_measurment = None def get_last_heartbeat(self): return self.last_heartbeat def get_containers(self): - return self.containers_set, self.containers + partner_container_config = clore_partner.get_partner_container_config() + return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers) def get_oc(self): return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc + def get_clore_partner_config(self): + return self.clore_partner_config + + async def set_forwarding_latency_measurment(self, forwarding_latency_measurment): + await self.send(json.dumps( + { + "forwarding_latency_measurment": forwarding_latency_measurment + } + )) + self.forwarding_latency_measurment = forwarding_latency_measurment + def set_ws_peers(self, ws_peers): tmp_ws_peers=[] for ws_peer in list(ws_peers.keys()): @@ -68,8 +85,9 @@ class WebSocketClient: self.ws_peers = tmp_ws_peers - def set_auth(self, auth): + def set_auth(self, auth, xfs_state): self.auth=auth + self.xfs_state=xfs_state def set_pull_logs(self, pull_logs): self.pull_logs=pull_logs @@ -93,7 +111,9 @@ class WebSocketClient: log.debug(f"CLOREWS | Connected to {random_ws_peer} ✅") await self.send(json.dumps({ "login":str(self.auth), - "type":"python" + "xfs_state": self.xfs_state, + "type":"python", + "clore_partner_support": True })) except Exception as e: log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") @@ -136,6 +156,16 @@ class WebSocketClient: pass elif message=="KEEPALIVE": self.last_heartbeat = clore_utils.unix_timestamp() + try: + if self.forwarding_latency_measurment: + await self.send(json.dumps( + { + "forwarding_latency_measurment": self.forwarding_latency_measurment + } + )) + self.forwarding_latency_measurment = None + except Exception as e: + pass elif message=="NEWER_LOGIN" or message=="WAIT": await self.close_websocket() elif message[:10]=="PROVEPULL;": @@ -148,13 +178,16 @@ class WebSocketClient: else: try: parsed_json = json.loads(message) - if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: + if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict: + self.clore_partner_config = parsed_json["partner_config"] + await self.send(json.dumps({"partner_config":parsed_json["partner_config"]})) + elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: self.last_heartbeat = clore_utils.unix_timestamp() container_str = json.dumps({"containers":parsed_json["new_containers"]}) await self.send(container_str) if len(parsed_json["new_containers"]) > 0: # There should be at least one container self.containers_set = True - self.containers=parsed_json["new_containers"] + self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"]) #log.success(container_str) elif "allow_oc" in parsed_json: # Enable OC self.oc_enabled=True diff --git a/hosting.py b/hosting.py index d787f01..e662f69 100644 --- a/hosting.py +++ b/hosting.py @@ -1,5 +1,6 @@ from lib import config as config_module from lib import init_server +from lib import xfs from lib import utils from clore_hosting import main as clore_hosting import asyncio, os @@ -29,7 +30,15 @@ elif config.reset: log.success("Client login reseted") elif config.service: if len(auth)==32+48+1: - clore_client = clore_hosting.CloreClient(auth_key=auth) + utils.run_command("sysctl -w net.ipv4.ip_forward=1") + + xfs_state = xfs.init() + + if os.path.isfile(config.restart_docker_flag_file): + utils.run_command("systemctl restart docker") + os.remove(config.restart_docker_flag_file) + + clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state) asyncio.run(clore_client.service()) else: print("TODO: Firstly config auth") diff --git a/lib/background_job.py b/lib/background_job.py new file mode 100644 index 0000000..177d4a4 --- /dev/null +++ b/lib/background_job.py @@ -0,0 +1,22 @@ +import time +import re + +disabled_till = 0 + +def is_background_job_container_name(string): + if type(string) != str: + return False + pattern = r"^clore-default-\d+$" + return bool(re.match(pattern, string)) + +def temporarly_disable(seconds): + global disabled_till + disabled_till = time.time() + seconds + +def enable(): + global disabled_till + disabled_till=0 + +def is_enabled(): + global disabled_till + return True if disabled_till < time.time() else False \ No newline at end of file diff --git a/lib/clore_partner.py b/lib/clore_partner.py new file mode 100644 index 0000000..1e971ed --- /dev/null +++ b/lib/clore_partner.py @@ -0,0 +1,238 @@ +from lib import ensure_packages_installed +from lib import config as config_module +from lib import logging as logging_lib +from lib import clore_partner_socket +from lib import latency_test +from lib import openvpn +from lib import utils +import asyncio +import random +import json +import time +import re + +import os +import aiofiles.os +from aiohttp import ClientSession, ClientTimeout + +config = config_module.config +log = logging_lib.log + +MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2'] + +DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload" + +host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts") +partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache") + +next_ensupe_packages_check = 0 +is_socket_running = False + +partner_container_config = None + +async def initialize(): + global next_ensupe_packages_check + global is_socket_running + try: + await aiofiles.os.makedirs(host_facts_location, exist_ok=True) + await aiofiles.os.makedirs(partner_cache_location, exist_ok=True) + await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True) + if not is_socket_running: + is_socket_running=True + asyncio.create_task(clore_partner_socket.socket_service( + location=os.path.join(host_facts_location, "partner_interface.socket") + )) + if next_ensupe_packages_check < time.time(): + success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) + next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr + if not success: + return False + elif next_ensupe_packages_check != float('inf'): + return False + + code, stdout, stderr = await utils.async_run_command( + "dmidecode -t 2 2>&1", + 20 + ) + if code == 0 and not stderr: + async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file: + await file.write(stdout) + else: + return False + code, stdout, stderr = await utils.async_run_command( + "dmidecode 2>&1", + 20 + ) + if code == 0 and not stderr: + async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file: + await file.write(stdout) + else: + return False + return True + except Exception as e: + log.error(f"FAIL | clore_partner.initialize | {e}") + return False + +async def get_partner_allowed_images(): + try: + file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json")) + if not file_exists: + return [] + images = [] + async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file: + content = await file.read() + containers = json.loads(content) + for container in containers: + image = container.get("Config", {}).get("Image", None) + if image and not image in images: + images.append(image) + return images + except Exception as e: + return None + +def validate_partner_container_name(name): + if type(name) != str: + return False + elif name==config.clore_partner_container_name: + return True + pattern = r"^C\.\d+$" + return bool(re.match(pattern, name)) + +def validate_partner_workload_container_name(name): + if type(name) != str: + return False + pattern = r"^C\.\d+$" + return bool(re.match(pattern, name)) + +last_openvpn_config = None + +def get_partner_container_config(): + global partner_container_config + return partner_container_config + +async def configure(partner_config): + global last_openvpn_config + global partner_container_config + if last_openvpn_config != partner_config: + partner_container_config = { + "image": partner_config["partner_image"], + "name": config.clore_partner_container_name, + "hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}", + "env": { + "AUTH": partner_config['partner_id'], + "ip_addr": partner_config['openvpn_host'], + "port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}' + }, + "volumes": { + f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"}, + f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"}, + f"/var/lib/docker": {"bind": "/var/lib/docker"}, + f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"} + }, + "gpus": True, + "command": '', + "network": "clore-partner-br0", + "ip": "172.19.0.254", + "cap_add": ["SYS_ADMIN"], + "devices": ["/dev/fuse"], + #"security_opt": ["apparmor:unconfined"], + "ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"], + } + r = await openvpn.clore_partner_configure(partner_config) + if r: + last_openvpn_config = partner_config + +# ----------------------------------------- + +next_latency_measurment = 0 + +async def fetch_forwarding_nodes(): + url = "https://api.clore.ai/v1/get_relay" + timeout = ClientTimeout(total=30) + + async with ClientSession(timeout=timeout) as session: + try: + async with session.get(url) as response: + response.raise_for_status() + data = await response.json() + return data + except Exception as e: + print(f"An error occurred: {e}") + return None + +async def set_next_latency_measurment(ts): + global next_latency_measurment + try: + next_latency_measurment=ts + async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file: + await file.write(str(ts)) + except Exception as e: + pass + +async def measure_forwarding_latency(): + global next_latency_measurment + + if next_latency_measurment > time.time(): + return False + try: + await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True) + file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment")) + if file_exists: + async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file: + content = await file.read() + if content.isdigit(): + next_latency_measurment = int(content) + if next_latency_measurment < time.time(): + node_info = await fetch_forwarding_nodes() + if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0: + to_test_nodes = [] + ip_to_region = {} + + valid_regions = [] + + for node_region in node_info.get("nodes").keys(): + nodes_ip_list = node_info.get("nodes")[node_region] + if type(nodes_ip_list) == list and len(nodes_ip_list) > 0: + to_test_nodes = to_test_nodes + nodes_ip_list + for node_ip in nodes_ip_list: + ip_to_region[node_ip]=node_region + + if len(to_test_nodes) > 0: + measurment_result = await latency_test.measure_latency_icmp(to_test_nodes) + if measurment_result: + for idx, res in enumerate(measurment_result): + if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions: + valid_regions.append(ip_to_region.get(res["host"])) + measurment_result[idx]["region"] = ip_to_region.get(res["host"]) + if len(valid_regions) == len(ip_to_region.keys()): + await set_next_latency_measurment(int( + time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded + )) + return measurment_result + else: + await set_next_latency_measurment(int( + time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable + )) + else: + await set_next_latency_measurment(int( + time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet) + )) + return False + else: + await set_next_latency_measurment(int( + time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format + )) + return False + return False + except Exception as e: + return False + +def filter_partner_dummy_workload_container(containers): + try: + remaining_containers = [] + for container in containers: + if container["image"] != DUMMY_WORKLOAD_CONTAINER: + remaining_containers.append(container) + return remaining_containers + except Exception as e: + return containers \ No newline at end of file diff --git a/lib/clore_partner_socket.py b/lib/clore_partner_socket.py new file mode 100644 index 0000000..66f5dce --- /dev/null +++ b/lib/clore_partner_socket.py @@ -0,0 +1,94 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import background_job +from lib import utils +import asyncio +import json + +import os +import aiofiles.os + +config = config_module.config +log = logging_lib.log + +dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}'] +allowed_commands = ["df"] + +can_deploy = True + +async def remove_socket_file(path): + try: + file_exists = await aiofiles.os.path.exists(path) + if file_exists: + await aiofiles.os.remove(path) + except Exception: + pass + +async def handle_client(reader, writer): + global can_deploy + try: + while True: + data = await reader.read(1024*64) + try: + if not data: + break + #print("DATA", data, data.decode()) + parsed_data = json.loads(data.decode()) + if "run_command" in parsed_data and type(parsed_data["run_command"])==str: + allowed = False + for allowed_command in allowed_commands: + if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]: + allowed = True + break + if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars): + allowed = False + log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}") + if allowed: + code, stdout, stderr = await utils.async_run_command( + parsed_data["run_command"] + ) + writer.write(json.dumps({ + "code": code, + "stderr": stderr, + "stdout": stdout + }).encode()) + else: + writer.write(json.dumps({ + "code": -1, + "stderr": 'Command not allowed', + "stdout": 'Command not allowed' + }).encode()) + elif "can_deploy" in parsed_data: + writer.write(json.dumps({ + "can_deploy": can_deploy + }).encode()) + elif "stop_background_job" in parsed_data and "time" in parsed_data: + try: + if isinstance(parsed_data["time"], int): + background_job.temporarly_disable(parsed_data["time"]) + except Exception as e: + pass + else: + writer.write('?'.encode()) + await writer.drain() + except Exception as data_exception: + pass + break + except asyncio.CancelledError: + log.debug(f"clore partner socket | Client handler canceled.") + finally: + log.debug(f"clore partner socket | Closing client connection.") + writer.close() + await writer.wait_closed() + +def set_can_deploy(state): + global can_deploy + can_deploy = state + +async def socket_service(location): + await remove_socket_file(location) + server = await asyncio.start_unix_server(handle_client, path=location) + + log.debug(f"clore partner socket | running at {location}") + async with server: + await server.serve_forever() \ No newline at end of file diff --git a/lib/config.py b/lib/config.py index 655fe0d..2df2c5c 100644 --- a/lib/config.py +++ b/lib/config.py @@ -9,6 +9,11 @@ hard_config = { "name": "clore-br0", "subnet": "172.18.0.0/16", "gateway": "172.18.0.1" + }, + { + "name": "clore-partner-br0", + "subnet": "172.19.0.0/20", + "gateway": "172.19.0.1" } ], "run_iptables_with_sudo":True, @@ -33,7 +38,11 @@ hard_config = { "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app "maximum_pull_service_loop_time": 14400, # Exception for image pulling "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk - "allow_mixed_gpus": True + "allow_mixed_gpus": True, + "openvpn_forwarding_tun_device": "tun1313", + "forwarding_ip_route_table_id": 100, + "clore_partner_container_name": "clore-partner-service", + "restart_docker_flag_file": "/opt/clore-hosting/.restart_docker" } parser = argparse.ArgumentParser(description='Example argparse usage') @@ -50,6 +59,7 @@ parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hostin parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software") +parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner") # Parse arguments, ignoring any non-defined arguments args, _ = parser.parse_known_args() diff --git a/lib/docker_cli_wrapper.py b/lib/docker_cli_wrapper.py index 1e1f4f5..6c48989 100644 --- a/lib/docker_cli_wrapper.py +++ b/lib/docker_cli_wrapper.py @@ -27,6 +27,14 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, if "cap_add" in container_options: for cap in container_options["cap_add"]: command.extend(["--cap-add", cap]) + + if "devices" in container_options: + for device in container_options["devices"]: + command.extend(["--device", device]) + + if "security_opt" in container_options: + for security_opt in container_options["security_opt"]: + command.extend(["--security-opt", security_opt]) if "volumes" in container_options: for volume_host, volume_container in container_options["volumes"].items(): @@ -75,6 +83,8 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, if ip: command.extend(["--ip", ip]) + command.append('--stop-timeout') + command.append('0') command.append(container_options["image"]) try: diff --git a/lib/docker_deploy.py b/lib/docker_deploy.py index c696066..8116085 100644 --- a/lib/docker_deploy.py +++ b/lib/docker_deploy.py @@ -1,7 +1,9 @@ from lib import config as config_module from lib import logging as logging_lib from lib import docker_cli_wrapper +from lib import background_job from lib import docker_interface +from lib import clore_partner from lib import get_specs from lib import utils import docker @@ -14,7 +16,7 @@ client = docker_interface.client config = config_module.config log = logging_lib.log -def deploy(validated_containers, allowed_running_containers=[]): +def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False): local_images = docker_interface.get_local_images() all_containers = docker_interface.get_containers(all=True) @@ -67,7 +69,9 @@ def deploy(validated_containers, allowed_running_containers=[]): 'tty': True, 'network_mode': 'clore-br0', 'cap_add': [], - 'volumes': {}, + 'devices': [], + 'security_opt': [], + 'volumes': validated_container["volumes"] if "volumes" in validated_container else {}, 'ports': {}, 'device_requests': [], 'environment': validated_container["env"] if "env" in validated_container else {}, @@ -80,6 +84,15 @@ def deploy(validated_containers, allowed_running_containers=[]): ) } + if "security_opt" in validated_container: + container_options["security_opt"] = validated_container["security_opt"] + + if "devices" in validated_container: + container_options["devices"] = validated_container["devices"] + + if "cap_add" in validated_container: + container_options["cap_add"] = validated_container["cap_add"] + if "hostname" in validated_container: container_options["hostname"]=validated_container["hostname"] elif "clore-order-" in validated_container["name"]: @@ -136,7 +149,7 @@ def deploy(validated_containers, allowed_running_containers=[]): container_options["shm_size"] = f"{SHM_SIZE}m" - if not validated_container["name"] in created_container_names and image_ready: + if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])): if config.creation_engine == "wrapper": docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) else: @@ -159,7 +172,10 @@ def deploy(validated_containers, allowed_running_containers=[]): all_running_container_names.append(container.name) else: all_stopped_container_names.append(container.name) - if container.name in needed_running_names and container.status != 'running': + if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled(): + if container.status == "running": + container.stop() + elif container.name in needed_running_names and container.status != 'running': try: attached_networks = container.attrs['NetworkSettings']['Networks'] if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container @@ -174,17 +190,22 @@ def deploy(validated_containers, allowed_running_containers=[]): container.stop() except Exception as e: pass - elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running': + elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): try: container.stop() container.remove() except Exception as e: pass - elif container.name not in paused_names+needed_running_names+allowed_running_containers: + elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): try: container.remove() except Exception as e: pass + elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name): + try: + container.stop() + except Exception as e: + pass return all_running_container_names, all_stopped_container_names #print(validated_containers) diff --git a/lib/docker_interface.py b/lib/docker_interface.py index 60e4c9f..de27b52 100644 --- a/lib/docker_interface.py +++ b/lib/docker_interface.py @@ -11,6 +11,13 @@ from typing import List, Optional import docker import json import os +import re + +partner_bridge_subnet = '' + +for clore_network in config.clore_default_networks: + if clore_network["name"] == "clore-partner-br0": + partner_bridge_subnet = clore_network["subnet"] try: os.makedirs(config.startup_scripts_folder, exist_ok=True) @@ -59,6 +66,18 @@ def get_info(): except Exception as e: return {} +def stop_all_containers(): + try: + # List all containers + containers = client.containers.list(all=True) # Use all=True to include stopped containers + for container in containers: + log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})") + container.stop() # Stop the container + log.success("stop_all_containers() | All containers have been stopped.") + except Exception as e: + log.error(f"stop_all_containers() |An error occurred: {e}") + return True + def check_docker_connection(): try: client.ping() @@ -95,7 +114,7 @@ def get_local_images(no_latest_tag=False): return image_list except Exception as e: - log.error(f"DOCKER | Can't get local images | {e}") + log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}") os._exit(1) def get_containers(all=False): @@ -184,7 +203,8 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): gateway=gateway )] ), - check_duplicate=True + check_duplicate=True, + #options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {} ) log.debug(f"Network {network_name} created successfully.") return True @@ -192,7 +212,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"): log.error(f"DOCKER | Failed to create network {network_name}: {e}") return False -def validate_and_secure_networks(): +def validate_and_secure_networks(partner_forwarding_ips): try: failed_appending_iptables_rule = False @@ -238,6 +258,13 @@ def validate_and_secure_networks(): #print(this_ipv4_range) outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) + if this_ipv4_range == partner_bridge_subnet: + for partner_forwarding_ip in partner_forwarding_ips: + outside_ranges = [] + for ip_range in outside_ranges_ip_network: + outside_ranges.append(str(ip_range)) + outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges) + outside_ranges = [] for outside_range_ip_network in outside_ranges_ip_network: outside_ranges.append(str(outside_range_ip_network)) @@ -265,7 +292,7 @@ def validate_and_secure_networks(): succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) if not succesfully_appended: failed_appending_iptables_rule = True - else: + elif this_ipv4_range != partner_bridge_subnet: needed_iptables_rule = rule_template.replace("",this_ipv4_range).replace("",this_if_name) for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) @@ -412,4 +439,8 @@ def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"): log.error(f"Failed 'configure_exec_opts' | {e}") return False else: - return False \ No newline at end of file + return False + +def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job, + pattern = r'^[a-z]+_[a-z]+$' + return re.match(pattern, container_name) is not None \ No newline at end of file diff --git a/lib/ensure_packages_installed.py b/lib/ensure_packages_installed.py new file mode 100644 index 0000000..df78a7b --- /dev/null +++ b/lib/ensure_packages_installed.py @@ -0,0 +1,71 @@ +from lib import logging as logging_lib + +from typing import List +from lib import utils +import time + +log = logging_lib.log + +async def ensure_packages_installed( + packages: List[str] = [], + total_timeout: float = 300 +) -> bool: + non_interactive_env = { + 'DEBIAN_FRONTEND': 'noninteractive', + 'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin', + } + + start_time = time.time() + + packages_to_install = [] + for package in packages: + check_cmd = f"dpkg -s {package} > /dev/null 2>&1" + return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env) + + if return_code != 0: + packages_to_install.append(package) + + if not packages_to_install: + log.debug("All packages are already installed.") + return True + + update_cmd = ( + "apt-get update " + "-y " + "--no-install-recommends" + ) + return_code, stdout, stderr = await utils.async_run_command( + update_cmd, + timeout=None if total_timeout == None else 180, + env=non_interactive_env + ) + if return_code != 0: + log.error(f"Failed to update package lists: {stderr}") + return False + + install_cmd = ( + "apt-get install " + "-y " + "--no-install-recommends " + "--assume-yes " + "-o Dpkg::Options::='--force-confdef' " # Default to existing config + "-o Dpkg::Options::='--force-confold' " # Keep existing config + f"{' '.join(packages_to_install)}" + ) + + # Calculate remaining timeout + remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time)) + + # Install packages + return_code, stdout, stderr = await utils.async_run_command( + install_cmd, + timeout=remaining_timeout, + env=non_interactive_env + ) + + if return_code == 0: + log.debug(f"Successfully installed packages: {packages_to_install}") + return True + else: + log.error(f"Failed to install packages: {stderr}") + return False \ No newline at end of file diff --git a/lib/get_specs.py b/lib/get_specs.py index 6b6fac1..eac7726 100644 --- a/lib/get_specs.py +++ b/lib/get_specs.py @@ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat from pydantic import BaseModel, Field, constr import xml.etree.ElementTree as ET from lib import docker_interface -from typing import Dict, List +from typing import Dict, List, Optional from lib import utils import subprocess import speedtest @@ -311,7 +311,7 @@ def get_gpu_info(): class DockerDaemonConfig(BaseModel): data_root: str = Field(alias="data-root") storage_driver: str = Field(alias="storage-driver") - storage_opts: List[str] = Field(alias="storage-opts") + storage_opts: Optional[List[str]] = Field(alias="storage-opts") class Specs: def __init__(self): @@ -336,26 +336,14 @@ class Specs: else: overlay_total_size=None disk_type="" + disk_usage_source_path = '/' try: - validated_config = DockerDaemonConfig(**docker_daemon_config) - disk_udevadm = get_disk_udevadm(validated_config.data_root) - for udevadm_line in disk_udevadm.split('\n'): - try: - key, value=udevadm_line.split('=',1) - if "id_model" in key.lower(): - disk_type=value[:24] - elif "devpath" in key.lower() and "/virtual/" in value: - disk_type="Virtual" - except Exception as e_int: - pass - for storage_opt in validated_config.storage_opts: - if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]: - numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4) - overlay_total_size=numeric_size + if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config: + disk_usage_source_path = docker_daemon_config["data-root"] except Exception as e: pass if overlay_total_size==None: - total, used, free = shutil.disk_usage("/") + total, used, free = shutil.disk_usage(disk_usage_source_path) disk_udevadm = get_disk_udevadm("/") for udevadm_line in disk_udevadm.split('\n'): try: diff --git a/lib/latency_test.py b/lib/latency_test.py new file mode 100644 index 0000000..a34d614 --- /dev/null +++ b/lib/latency_test.py @@ -0,0 +1,66 @@ +from lib import ensure_packages_installed +from lib import utils + +import asyncio +import socket +import re + +MANDATORY_PACKEGES = ['iputils-ping'] + +def is_valid_ipv4_or_hostname(string): + def is_valid_ipv4(ip): + try: + socket.inet_aton(ip) + return True + except socket.error: + return False + + hostname_regex = re.compile( + r"^(?!-)[A-Za-z0-9-]{1,63}(? 255: + return False + return all(hostname_regex.match(part) for part in hostname.split(".")) + + return is_valid_ipv4(string) or is_valid_hostname(string) + +async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4): + success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) + if success: + outcome = [] + concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)] + for concurent_job in concurent_jobs: + tasks = [] + for host in concurent_job: + if is_valid_ipv4_or_hostname(host): + tasks.append( + utils.async_run_command( + f"ping -c {str(count)} -W 2 -i 0.3 {host}", + 20 * count, + {"LANG": "C", "LC_ALL": "C"} + ) + ) + if len(tasks) > 0: + r = await asyncio.gather(*tasks) + try: + for output in r: + results = [] + code, stdout, stderr = output + if code == 0: + for line in stdout.split('\n'): + match = re.search(r"time=(\d+\.?\d*) ms", line) + if match: + results.append(float(match.group(1))) + outcome.append( + { + "host": hosts[len(outcome)], + "received": len(results), + "avg_latency": sum(results) / len(results) if len(results)>0 else 0 + } + ) + except Exception as e: + print(e) + return outcome + else: + return False \ No newline at end of file diff --git a/lib/log_streaming_task.py b/lib/log_streaming_task.py index 97073da..343a3a4 100644 --- a/lib/log_streaming_task.py +++ b/lib/log_streaming_task.py @@ -1,6 +1,7 @@ from lib import docker_interface from lib import config as config_module from lib import logging as logging_lib +from lib import clore_partner config = config_module.config log = logging_lib.log @@ -29,7 +30,7 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container # Start tasks for new containers for container_name, container in current_containers.items(): - if not container_name in do_not_stream_containers: + if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name): log_container_names.append(container_name) if container_name not in tasks: log.debug(f"log_streaming_task() | Starting task for {container_name}") diff --git a/lib/networking.py b/lib/networking.py index 5c27afc..43992b0 100644 --- a/lib/networking.py +++ b/lib/networking.py @@ -6,6 +6,7 @@ import ipaddress import socket import psutil import sys +import os config = config_module.config log = logging_lib.log @@ -25,12 +26,15 @@ def get_network_interfaces_with_subnet(): except Exception as e: return str(e) -def exclude_network(excluded_network): +def exclude_network(excluded_network, input_ranges=None): # Convert exclude_network to ip_network object excluded_network = ip_network(excluded_network) + if not input_ranges: + input_ranges=config.local_ipv4_ranges + # Remove the excluded network from the local_ranges list - local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network] + local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network] ranges_outside_exclude = [] for local_range in local_ranges: @@ -92,4 +96,20 @@ def is_ip_in_network(ip: str, network: str) -> bool: except ValueError as e: # If there's an error with the input values, print the error and return False log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") - return False \ No newline at end of file + return False + +def purge_clore_interfaces(): + network_interfaces = get_network_interfaces_with_subnet() + if type(network_interfaces) != dict: + log.error("Failed to load network interfaces, restarting...") + os._exit(1) + + clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here + for clore_default_network in config.clore_default_networks: + clore_subnets.append(clore_default_network["subnet"]) + + for network_interface in network_interfaces.keys(): + if network_interface == "docker0" or network_interface[:3] == "br-": + subnet = network_interfaces[network_interface] + if subnet in clore_subnets or network_interface == "docker0": + utils.run_command(f"ip link delete {network_interface}") \ No newline at end of file diff --git a/lib/openvpn.py b/lib/openvpn.py new file mode 100644 index 0000000..f417483 --- /dev/null +++ b/lib/openvpn.py @@ -0,0 +1,187 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import utils + +import os +import aiofiles.os + +config = config_module.config +log = logging_lib.log + +CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client" +PARTNER_CONFIG_NAME = "clore_partner.conf" + +def generate_openvpn_config( + local_ip='10.1.0.2', + server_ip='10.1.0.1', + server_hostname='example.com', + udp_port=1194, + vpn_secret_key='YOUR_VPN_SECRET_KEY' +): + openvpn_config = f"""nobind +proto udp4 +remote {server_hostname} {udp_port} +resolv-retry infinite + +auth SHA256 +cipher AES-256-CBC + +dev {config.openvpn_forwarding_tun_device} +ifconfig {local_ip} {server_ip} + + +-----BEGIN OpenVPN Static key V1----- +{vpn_secret_key} +-----END OpenVPN Static key V1----- + + +fragment 1300 +mssfix 1300 +sndbuf 524288 +rcvbuf 524288 + +user nobody +group nogroup + +ping 15 +ping-restart 45 +ping-timer-rem +persist-tun +persist-key + +verb 0""" + + return openvpn_config + +async def get_iptables_forward_rules(): + code, stdout, stderr = await utils.async_run_command( + f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers" + ) + rules = [] + if code == 0: + collumns = [] + for idx, line in enumerate(stdout.split('\n')): + if "num" in collumns and "target" in collumns and "in" in collumns: + items = line.split(maxsplit=len(collumns)+1) + rule = {} + for idx, name in enumerate(collumns): + rule[name]=items[idx] + rule["desc"] = items[len(collumns)+1] + rules.append(rule) + else: + collumns = line.split() + return rules + +async def remove_iptables_rule(rule_dict): + cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables" + + if rule_dict.get('target') == 'DNAT': + cmd += " -t nat" + cmd += " -D PREROUTING" + if rule_dict.get('prot') and rule_dict['prot'] != '--': + cmd += f" -p {rule_dict['prot']}" + if rule_dict.get('in') and rule_dict['in'] != '*': + cmd += f" -i {rule_dict['in']}" + if rule_dict.get('out') and rule_dict['out'] != '*': + cmd += f" -o {rule_dict['out']}" + if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0': + cmd += f" -s {rule_dict['source']}" + if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0': + cmd += f" -d {rule_dict['destination']}" + if rule_dict.get('target') == 'DNAT': + if 'dports' in rule_dict.get('desc', ''): + port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0] + if ':' in port_info: + cmd += f" -m multiport --dports {port_info}" + else: + cmd += f" --dport {port_info}" + if 'to:' in rule_dict.get('desc', ''): + dest_ip = rule_dict['desc'].split('to:')[1].split()[0] + cmd += f" -j DNAT --to-destination {dest_ip}" + await utils.async_run_command(cmd) + + +async def clore_partner_configure(clore_partner_config): + try: + if clore_partner_config: + docker_restart_required = False + + needed_openvpn_config = generate_openvpn_config( + local_ip=clore_partner_config["provider"], + server_ip=clore_partner_config["forwarding"], + server_hostname=clore_partner_config["openvpn_host"], + udp_port=clore_partner_config["openvpn_port"], + vpn_secret_key=clore_partner_config["secret"] + ) + + saved_config='' + + config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME)) + if config_exists: + async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file: + saved_config = await file.read() + + if saved_config != needed_openvpn_config: + async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file: + await file.write(needed_openvpn_config) + + is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command( + f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" + ) + + if is_active_code == 0 and saved_config != needed_openvpn_config: + code, stdout, stderr = await utils.async_run_command( + f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" + ) + docker_restart_required = False if code != 0 else True + elif is_active_code != 0: + code, stdout, stderr = await utils.async_run_command( + f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" + ) + docker_restart_required = False if code != 0 else True + + code, stdout, stderr = await utils.async_run_command( + f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}" + ) + ip_route_configured = False + if code == 0: + for line in stdout.split('\n'): + items = line.split(' ') + if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items: + ip_route_configured = True + break + if not ip_route_configured: + code, stdout, stderr = await utils.async_run_command( + f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}" + ) + ip_tables_configured = False + + rules = await get_iptables_forward_rules() + + for rule in rules: + try: + if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]: + ip_tables_configured = True + elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]: + print("REMOVE RULE", rule) + await remove_iptables_rule(rule) + except Exception as ei: + log.error(f"clore_partner_configure() | ei | {ei}") + + if ip_tables_configured == False: + code, stdout, stderr = await utils.async_run_command( + f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}" + ) + + if docker_restart_required: + async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file: + await file.write("") + os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting + else: + code, stdout, stderr = await utils.async_run_command( + f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}" + ) + return True + except Exception as e: + log.error(f"FAIL | openvpn.clore_partner_configure | {e}") + return False \ No newline at end of file diff --git a/lib/utils.py b/lib/utils.py index 42c2176..e5cebbe 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1,10 +1,13 @@ +from typing import Optional, Tuple, Dict from lib import config as config_module from lib import logging as logging_lib from lib import nvml import subprocess import hashlib +import asyncio import random import string +import shutil import shlex import time import math @@ -144,6 +147,63 @@ def get_extra_allowed_images(): return [] else: return [] + +async def async_run_command( + command: str, + timeout: Optional[float] = None, + env: Optional[Dict[str, str]] = None +) -> Tuple[int, str, str]: + command_env = env if env is not None else {} + + try: + proc = await asyncio.create_subprocess_shell( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=command_env + ) + + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), + timeout=timeout + ) + + stdout_str = stdout.decode('utf-8').strip() if stdout else '' + stderr_str = stderr.decode('utf-8').strip() if stderr else '' + + return proc.returncode, stdout_str, stderr_str + + except asyncio.TimeoutError: + # Handle timeout: terminate the process gracefully first + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=5) # Wait for it to exit + except asyncio.TimeoutError: + # Force kill the process if it doesn't terminate + proc.kill() + await proc.wait() + + return -1, '', f'Command timed out after {timeout} seconds' + + except Exception as e: + return -1, '', str(e) + +def get_free_space_mb(path): + """Get free space in MB for the given path.""" + total, used, free = shutil.disk_usage(path) + return free // (1024 * 1024) # Convert bytes to MB + +def get_directory_size_mb(path): + """Get the size of a directory in MB.""" + total_size = 0 + for dirpath, dirnames, filenames in os.walk(path): + for f in filenames: + fp = os.path.join(dirpath, f) + # Skip if the file doesn't exist (symlinks, etc.) + if not os.path.islink(fp) and os.path.exists(fp): + total_size += os.path.getsize(fp) + return total_size // (1024 * 1024) # Convert bytes to MB class shm_calculator: def __init__(self, total_ram): diff --git a/lib/xfs.py b/lib/xfs.py new file mode 100644 index 0000000..4d7a0ac --- /dev/null +++ b/lib/xfs.py @@ -0,0 +1,201 @@ +# Library to setup XFS partition for docker +from lib import ensure_packages_installed +from lib import logging as logging_lib +from lib import docker_interface +from lib import networking +from lib import utils + +import asyncio +import json +import os + +log = logging_lib.log + +DOCKER_ROOT = "/var/lib/docker" +DOCKER_DATA_IMG = "/opt/clore-hosting/data.img" +LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB + +MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB + +XFS_STATE_FILE = "/opt/clore-hosting/xfs_state" + +MANDATORY_PACKAGES = [ + "xfsprogs", + "dmidecode", + "openvpn", + "iproute2", + "iputils-ping", + "util-linux" +] + +# This code is runned on start of clore hosting to migrate docker to XFS partition system + +# sudo fallocate -l 300G /docker-storage.img +# sudo mkfs.xfs /docker-storage.img +# mount -o loop,pquota /docker-storage.img /mnt/docker-storage + +def migrate(): + docker_xfs_state = validate_docker_xfs() + #print(docker_xfs_state) + if docker_xfs_state == "skip": + return + elif docker_xfs_state == "valid": + return 'success' + + packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed( + MANDATORY_PACKAGES + )) + + if not packages_available: + return 'packages-missing' + + log.info("Starting migration to xfs") + docker_interface.stop_all_containers() + + if os.path.exists(DOCKER_DATA_IMG): + try: + os.remove(DOCKER_DATA_IMG) + except Exception as e: + print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}") + return "failure" + + max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT) + + data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB) + if data_img_size < MIN_XFS_PARTITION_SIZE: + return 'not-enough-space' + + docker_config_success = False + fstab_config_success = False + + code, stdout, stderr = utils.run_command( + f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}" + ) + + networking.purge_clore_interfaces() + + if code == 0: + docker_config_success = configure_docker_daemon() + if code == 0 and docker_config_success: + fstab_config_success = configure_fstab() + if code == 0 and fstab_config_success: + code, stdout, stderr = utils.run_command( + f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a" + ) + if code==0: + return 'success' + else: + configure_docker_daemon(remove=True) + configure_fstab(remove=True) + return 'failure' + else: + utils.run_command( + f"mkdir {DOCKER_ROOT}" + ) + if os.path.exists(DOCKER_DATA_IMG): + try: + os.remove(DOCKER_DATA_IMG) + except Exception as e: + pass + return 'failure' + +def validate_docker_xfs(): + code_root, stdout_root, stderr_root = utils.run_command("df -T /") + code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}") + #print([code, stderr, stdout]) + if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')): + root_blocks = None + docker_root_blocks = None + docker_root_format = '' + for idx, line in enumerate(stdout_root.split('\n')): + if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): + root_blocks = int(line.split()[2]) + if code == 1: + docker_root_blocks = root_blocks + else: + for idx, line in enumerate(stdout.split('\n')): + if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric(): + docker_root_blocks = int(line.split()[2]) + docker_root_format = line.split()[1] + if root_blocks == None or docker_root_blocks == None: + return "skip" + elif docker_root_format=="xfs" and root_blocks > docker_root_blocks: + return "valid" + else: + return "default" + else: + return "skip" + +def configure_docker_daemon(remove=False): + try: + daemon_json_path = "/etc/docker/daemon.json" + + with open(daemon_json_path, 'r') as file: + raw_content = file.read() + daemon_config = json.loads(raw_content) + if remove: + if daemon_config.get("data-root") == DOCKER_ROOT: + del daemon_config["data-root"] + if daemon_config.get("storage-driver") == "overlay2": + del daemon_config["storage-driver"] + elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2": + daemon_config["data-root"] = DOCKER_ROOT + daemon_config["storage-driver"] = "overlay2" + with open(daemon_json_path, 'w') as file: + file.write(json.dumps(daemon_config,indent=4)) + return True + except Exception as e: + return False + +def configure_fstab(remove=False): + try: + file_path = "/etc/fstab" + + mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0" + + with open(file_path, 'r') as file: + raw_content = file.read() + + if remove: + if mount_line in raw_content: + raw_content = raw_content.replace(f"\n{mount_line}\n", '') + with open(file_path, 'w') as file: + file.write(raw_content) + elif not mount_line in raw_content: + raw_content += f"\n{mount_line}\n" + with open(file_path, 'w') as file: + file.write(raw_content) + return True + except Exception as e: + return False + +def init(): + try: + if os.path.exists(XFS_STATE_FILE): + with open(XFS_STATE_FILE, 'r') as file: + raw_content = file.read() + if "enabled" in raw_content: + migarion_status = migrate() + if migarion_status == "success": + with open(XFS_STATE_FILE, 'w') as file: + file.write("active") + return "active" + elif migarion_status == "not-enough-space": + with open(XFS_STATE_FILE, 'w') as file: + file.write("not-enough-space") + return 'not-enough-space' + else: + with open(XFS_STATE_FILE, 'w') as file: + file.write("failed-migration") + return 'failed' + elif 'not-enough-space' in raw_content: + return 'not-enough-space' + elif "active" in raw_content: + return "active" + else: + return "failed" + else: + return "disabled" + except Exception as e: + print(e) + pass \ No newline at end of file