diff --git a/clore_hosting/main.py b/clore_hosting/main.py index ceb3a57..988b965 100644 --- a/clore_hosting/main.py +++ b/clore_hosting/main.py @@ -27,6 +27,7 @@ import asyncio import time import json from aiofiles import os as async_os +import aiofiles import os specs = get_specs.Specs() @@ -151,6 +152,9 @@ class CloreClient: self.runned_pull_selftest = False + WebSocketClient.set_gpu_list(nvml.get_gpu_name_list()) + WebSocketClient.set_is_hive(self.is_hive) + async def service(self): global container_log_broken @@ -435,6 +439,12 @@ class CloreClient: can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True clore_partner_socket.set_can_deploy(can_run_partner_workloads) + if not running_order and self.xfs_state == "disabled": + async with aiofiles.open("/opt/clore-hosting/xfs_state", mode='w') as file: + await file.write("enabled") + log.info("No order running, requesting XFS migration") + os._exit(0) + if self.restart_docker and not running_order and len(self.containers)>0: log.debug("Sending docker restart command") utils.run_command_v2("systemctl restart docker") @@ -509,7 +519,7 @@ class CloreClient: async def submit_specs(self, current_specs): try: if type(current_specs) == dict: - current_specs["backend_version"]=20 + current_specs["backend_version"]=21 current_specs["update_hw"]=True smallest_pcie_width = 999 for gpu in current_specs["gpus"]["nvidia"]: diff --git a/clore_hosting/ws_interface.py b/clore_hosting/ws_interface.py index d3c5dba..429b3fa 100644 --- a/clore_hosting/ws_interface.py +++ b/clore_hosting/ws_interface.py @@ -55,7 +55,16 @@ class WebSocketClient: self.clore_partner_config = None self.forwarding_latency_measurment = None + + self.gpu_list = [] + self.is_hive = False + def set_gpu_list(self, gpu_list): + self.gpu_list = gpu_list + + def set_is_hive(self, is_hive): + self.is_hive = is_hive + def get_last_heartbeat(self): return self.last_heartbeat @@ -113,7 +122,9 @@ class WebSocketClient: "login":str(self.auth), "xfs_state": self.xfs_state, "type":"python", - "clore_partner_support": True + "clore_partner_support": True, + "gpu_list": self.gpu_list, + "is_hive": self.is_hive })) except Exception as e: log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") diff --git a/lib/docker_cli_wrapper.py b/lib/docker_cli_wrapper.py index 6c48989..d590427 100644 --- a/lib/docker_cli_wrapper.py +++ b/lib/docker_cli_wrapper.py @@ -9,11 +9,15 @@ import docker config = config_module.config log = logging_lib.log -def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30): +def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30, paused=False): # Sanitize and validate input container_options = sanitize_input(container_options) - command = ["docker", "run", "--detach", "--tty"] + command = ["docker", ("create" if paused else "run")] + + if not paused: + command.append("--detach") + command.append("--tty") if "name" in container_options: command.extend(["--name", container_options["name"]]) diff --git a/lib/docker_deploy.py b/lib/docker_deploy.py index 8116085..5304293 100644 --- a/lib/docker_deploy.py +++ b/lib/docker_deploy.py @@ -151,7 +151,7 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_ if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])): if config.creation_engine == "wrapper": - docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) + docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus, paused="paused" in validated_container) else: container = client.containers.create(**container_options) if "ip" in validated_container: diff --git a/lib/ensure_packages_installed.py b/lib/ensure_packages_installed.py index df78a7b..c31b616 100644 --- a/lib/ensure_packages_installed.py +++ b/lib/ensure_packages_installed.py @@ -1,11 +1,17 @@ from lib import logging as logging_lib +import aiofiles +import os +from datetime import datetime + from typing import List from lib import utils import time log = logging_lib.log +LOGGING_ENABLED = True + async def ensure_packages_installed( packages: List[str] = [], total_timeout: float = 300 @@ -30,26 +36,26 @@ async def ensure_packages_installed( return True update_cmd = ( - "apt-get update " - "-y " - "--no-install-recommends" + "apt-get update -y --no-install-recommends" ) return_code, stdout, stderr = await utils.async_run_command( update_cmd, timeout=None if total_timeout == None else 180, env=non_interactive_env ) + + if LOGGING_ENABLED: + await ensure_packages_installed_log(f"update stdout: {stdout}") + await ensure_packages_installed_log(f"update stderr: {stderr}\ncode: {str(return_code)}") + if return_code != 0: log.error(f"Failed to update package lists: {stderr}") return False install_cmd = ( - "apt-get install " - "-y " - "--no-install-recommends " - "--assume-yes " - "-o Dpkg::Options::='--force-confdef' " # Default to existing config - "-o Dpkg::Options::='--force-confold' " # Keep existing config + "apt-get install -y --no-install-recommends --assume-yes "+ + "-o Dpkg::Options::='--force-confdef' "+ # Default to existing config + "-o Dpkg::Options::='--force-confold' "+ # Keep existing config f"{' '.join(packages_to_install)}" ) @@ -62,10 +68,49 @@ async def ensure_packages_installed( timeout=remaining_timeout, env=non_interactive_env ) + + if LOGGING_ENABLED: + await ensure_packages_installed_log(f"install stdout: {stdout}") + await ensure_packages_installed_log(f"install stderr: {stderr}\ncode: {str(return_code)}") if return_code == 0: log.debug(f"Successfully installed packages: {packages_to_install}") return True + elif return_code == 100: + dpkg_rc, dpkg_stdout, dpkg_stderr = await utils.async_run_command( + "sudo dpkg --configure -a", + timeout=200, + env=non_interactive_env + ) + + # Install packages + return_code, stdout, stderr = await utils.async_run_command( + install_cmd, + timeout=remaining_timeout, + env=non_interactive_env + ) + + if LOGGING_ENABLED: + await ensure_packages_installed_log(f"post-dpkg install stdout: {stdout}") + await ensure_packages_installed_log(f"post-dpkg install stderr: {stderr}\ncode: {str(return_code)}") + + if return_code == 0: + log.debug(f"Successfully installed packages: {packages_to_install}") + return True + else: + log.error(f"Failed to install packages: {stderr}") + return False else: log.error(f"Failed to install packages: {stderr}") - return False \ No newline at end of file + return False + +async def ensure_packages_installed_log(msg): + try: + log_file_path = "/opt/clore-hosting/ensure-packages-installed-log.txt" + os.makedirs(os.path.dirname(log_file_path), exist_ok=True) + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_message = f"{current_time} | {msg}\n" + async with aiofiles.open(log_file_path, "a") as log_file: + await log_file.write(log_message) + except Exception as e: + pass \ No newline at end of file diff --git a/lib/get_specs.py b/lib/get_specs.py index eac7726..5092333 100644 --- a/lib/get_specs.py +++ b/lib/get_specs.py @@ -459,4 +459,22 @@ class Specs: total_swap_gb = total_swap_kb / (1024) / 1000 # Convert KB to GB return round(total_swap_gb, 4) except Exception as e: - return 0 \ No newline at end of file + return 0 + +def get_root_device(): + try: + mount_info = subprocess.check_output(['findmnt', '-n', '-o', 'SOURCE', '/']).decode().strip() + return mount_info + except subprocess.CalledProcessError: + return None + +def is_usb_device(device): + try: + lsblk_output = subprocess.check_output(['lsblk', '-o', 'NAME,TRAN', '-n']).decode().strip() + for line in lsblk_output.splitlines(): + parts = line.split() + if len(parts) == 2 and device.endswith(parts[0]): + return parts[1] == 'usb' + except subprocess.CalledProcessError: + return True + return False \ No newline at end of file diff --git a/lib/nvml.py b/lib/nvml.py index 8049af6..59dade3 100644 --- a/lib/nvml.py +++ b/lib/nvml.py @@ -68,10 +68,11 @@ GPU_CORE_ALLOWED_OC_RANGES = { # Known to be problematic GPUs is_hive = False all_gpus_data_list=[] +gpu_name_list=[] get_data_fail=False def init(gpu_specs_file=None, allow_hive_binaries=True): - global is_hive, all_gpus_data_list, get_data_fail + global is_hive, all_gpus_data_list, get_data_fail, gpu_name_list log.info("Loading GPU OC specs [ working ]") try: pynvml.nvmlInit() @@ -96,6 +97,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True): break gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) + gpu_name_list.append(pynvml.nvmlDeviceGetName(gpu_handle)) if not f"{i}-{gpu_uuid}" in parsed_specs_keys: parsed_specs={} regenerate_specs=True @@ -117,6 +119,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True): gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0) gpu_spec["power_limits"] = [min_power_limit, max_power_limit] gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle) + gpu_name_list.append(gpu_spec["name"]) gpu_spec["locks"] = mem_to_core_allowed_locks pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle) @@ -201,6 +204,10 @@ def init(gpu_specs_file=None, allow_hive_binaries=True): print(all_gpus_data_list) # Load GPU specs +def get_gpu_name_list(): + global gpu_name_list + return gpu_name_list + def get_gpu_oc_specs(): global get_data_fail if get_data_fail: diff --git a/lib/xfs.py b/lib/xfs.py index 4d7a0ac..5fcc844 100644 --- a/lib/xfs.py +++ b/lib/xfs.py @@ -3,8 +3,10 @@ from lib import ensure_packages_installed from lib import logging as logging_lib from lib import docker_interface from lib import networking +from lib import get_specs from lib import utils +from datetime import datetime import asyncio import json import os @@ -13,12 +15,20 @@ log = logging_lib.log DOCKER_ROOT = "/var/lib/docker" DOCKER_DATA_IMG = "/opt/clore-hosting/data.img" -LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB -MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB +HP_LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB +HP_MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB + +GENERIC_LEAVE_FREE_SPACE_MB = 1024*8 # 8 GB +GENERIC_MIN_XFS_PARTITION_SIZE = 1024*10 # 10 GB XFS_STATE_FILE = "/opt/clore-hosting/xfs_state" +HIGH_PERFORMANCE_GPUS = [ + "NVIDIA GeForce RTX 4090", + "NVIDIA GeForce RTX 3090" +] + MANDATORY_PACKAGES = [ "xfsprogs", "dmidecode", @@ -34,12 +44,33 @@ MANDATORY_PACKAGES = [ # sudo mkfs.xfs /docker-storage.img # mount -o loop,pquota /docker-storage.img /mnt/docker-storage +def get_to_use_storage_values(max_free_space): + gpu_str, gpu_mem, gpus, nvml_err = get_specs.get_gpu_info() + if nvml_err: + return None, None + try: + gpu_names = [] + for gpu in gpus["nvidia"]: + gpu_names.append(gpu["name"]) + if len(gpu_names) > 0: + all_gpus_same = all(item == gpu_names[0] for item in gpu_names) + if (all_gpus_same and gpu_names[0] in HIGH_PERFORMANCE_GPUS) or max_free_space > 1024 * 70: + return HP_LEAVE_FREE_SPACE_MB, HP_MIN_XFS_PARTITION_SIZE + else: + return GENERIC_LEAVE_FREE_SPACE_MB, GENERIC_MIN_XFS_PARTITION_SIZE + else: + return "no-gpus", "no-gpus" + except Exception as e: + return None, None + + def migrate(): docker_xfs_state = validate_docker_xfs() - #print(docker_xfs_state) if docker_xfs_state == "skip": + migrate_log("skipping migration") return elif docker_xfs_state == "valid": + migrate_log("migration succeeded") return 'success' packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed( @@ -47,8 +78,20 @@ def migrate(): )) if not packages_available: + migrate_log("packages missing") return 'packages-missing' + root_device = get_specs.get_root_device() + if not root_device: + migrate_log("not supported boot device") + return "not-supported-boot-device" + + device_name = os.path.basename(root_device).split('p')[0].rstrip('0123456789') + + if get_specs.is_usb_device(device_name): + migrate_log("not supported boot device") + return "not-supported-boot-device" + log.info("Starting migration to xfs") docker_interface.stop_all_containers() @@ -56,13 +99,22 @@ def migrate(): try: os.remove(DOCKER_DATA_IMG) except Exception as e: - print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}") + migrate_log("Failed to remove DOCKER_DATA_IMG") return "failure" max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT) + + leave_free_space, min_xfs_size = get_to_use_storage_values(max_free_space) + if leave_free_space == "no-gpus": + return "nvidia-failure" + + if leave_free_space == None: + migrate_log("can't get free space") + return "failure" - data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB) - if data_img_size < MIN_XFS_PARTITION_SIZE: + data_img_size = int(max_free_space - leave_free_space) + if data_img_size < min_xfs_size: + migrate_log("not enought free space") return 'not-enough-space' docker_config_success = False @@ -85,6 +137,7 @@ def migrate(): if code==0: return 'success' else: + migrate_log("failed to migrate v1") configure_docker_daemon(remove=True) configure_fstab(remove=True) return 'failure' @@ -97,7 +150,16 @@ def migrate(): os.remove(DOCKER_DATA_IMG) except Exception as e: pass + migrate_log("failed to migrate v2") return 'failure' + +def migrate_log(msg): + log_file_path = "/opt/clore-hosting/migrate-log.txt" + os.makedirs(os.path.dirname(log_file_path), exist_ok=True) + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_message = f"{current_time} | {msg}\n" + with open(log_file_path, "a") as log_file: + log_file.write(log_message) def validate_docker_xfs(): code_root, stdout_root, stderr_root = utils.run_command("df -T /") @@ -184,6 +246,12 @@ def init(): with open(XFS_STATE_FILE, 'w') as file: file.write("not-enough-space") return 'not-enough-space' + elif migarion_status == "not-supported-boot-device": + with open(XFS_STATE_FILE, 'w') as file: + file.write("not-supported-boot-device") + return 'failed' + elif migarion_status == "nvidia-failure": + return 'failed' else: with open(XFS_STATE_FILE, 'w') as file: file.write("failed-migration") @@ -198,4 +266,4 @@ def init(): return "disabled" except Exception as e: print(e) - pass \ No newline at end of file + return "failed" \ No newline at end of file