Compare commits

..

8 Commits

8 changed files with 187 additions and 24 deletions

View File

@ -27,6 +27,7 @@ import asyncio
import time
import json
from aiofiles import os as async_os
import aiofiles
import os
specs = get_specs.Specs()
@ -151,6 +152,9 @@ class CloreClient:
self.runned_pull_selftest = False
WebSocketClient.set_gpu_list(nvml.get_gpu_name_list())
WebSocketClient.set_is_hive(self.is_hive)
async def service(self):
global container_log_broken
@ -435,6 +439,12 @@ class CloreClient:
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
if not running_order and self.xfs_state == "disabled":
async with aiofiles.open("/opt/clore-hosting/xfs_state", mode='w') as file:
await file.write("enabled")
log.info("No order running, requesting XFS migration")
os._exit(0)
if self.restart_docker and not running_order and len(self.containers)>0:
log.debug("Sending docker restart command")
utils.run_command_v2("systemctl restart docker")
@ -509,7 +519,7 @@ class CloreClient:
async def submit_specs(self, current_specs):
try:
if type(current_specs) == dict:
current_specs["backend_version"]=20
current_specs["backend_version"]=21
current_specs["update_hw"]=True
smallest_pcie_width = 999
for gpu in current_specs["gpus"]["nvidia"]:

View File

@ -56,6 +56,15 @@ class WebSocketClient:
self.clore_partner_config = None
self.forwarding_latency_measurment = None
self.gpu_list = []
self.is_hive = False
def set_gpu_list(self, gpu_list):
self.gpu_list = gpu_list
def set_is_hive(self, is_hive):
self.is_hive = is_hive
def get_last_heartbeat(self):
return self.last_heartbeat
@ -113,7 +122,9 @@ class WebSocketClient:
"login":str(self.auth),
"xfs_state": self.xfs_state,
"type":"python",
"clore_partner_support": True
"clore_partner_support": True,
"gpu_list": self.gpu_list,
"is_hive": self.is_hive
}))
except Exception as e:
log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}")

View File

@ -9,11 +9,15 @@ import docker
config = config_module.config
log = logging_lib.log
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30):
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30, paused=False):
# Sanitize and validate input
container_options = sanitize_input(container_options)
command = ["docker", "run", "--detach", "--tty"]
command = ["docker", ("create" if paused else "run")]
if not paused:
command.append("--detach")
command.append("--tty")
if "name" in container_options:
command.extend(["--name", container_options["name"]])

View File

@ -151,7 +151,7 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])):
if config.creation_engine == "wrapper":
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus)
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus, paused="paused" in validated_container)
else:
container = client.containers.create(**container_options)
if "ip" in validated_container:

View File

@ -1,11 +1,17 @@
from lib import logging as logging_lib
import aiofiles
import os
from datetime import datetime
from typing import List
from lib import utils
import time
log = logging_lib.log
LOGGING_ENABLED = True
async def ensure_packages_installed(
packages: List[str] = [],
total_timeout: float = 300
@ -30,26 +36,26 @@ async def ensure_packages_installed(
return True
update_cmd = (
"apt-get update "
"-y "
"--no-install-recommends"
"apt-get update -y --no-install-recommends"
)
return_code, stdout, stderr = await utils.async_run_command(
update_cmd,
timeout=None if total_timeout == None else 180,
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"update stdout: {stdout}")
await ensure_packages_installed_log(f"update stderr: {stderr}\ncode: {str(return_code)}")
if return_code != 0:
log.error(f"Failed to update package lists: {stderr}")
return False
install_cmd = (
"apt-get install "
"-y "
"--no-install-recommends "
"--assume-yes "
"-o Dpkg::Options::='--force-confdef' " # Default to existing config
"-o Dpkg::Options::='--force-confold' " # Keep existing config
"apt-get install -y --no-install-recommends --assume-yes "+
"-o Dpkg::Options::='--force-confdef' "+ # Default to existing config
"-o Dpkg::Options::='--force-confold' "+ # Keep existing config
f"{' '.join(packages_to_install)}"
)
@ -63,9 +69,48 @@ async def ensure_packages_installed(
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"install stdout: {stdout}")
await ensure_packages_installed_log(f"install stderr: {stderr}\ncode: {str(return_code)}")
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
elif return_code == 100:
dpkg_rc, dpkg_stdout, dpkg_stderr = await utils.async_run_command(
"sudo dpkg --configure -a",
timeout=200,
env=non_interactive_env
)
# Install packages
return_code, stdout, stderr = await utils.async_run_command(
install_cmd,
timeout=remaining_timeout,
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"post-dpkg install stdout: {stdout}")
await ensure_packages_installed_log(f"post-dpkg install stderr: {stderr}\ncode: {str(return_code)}")
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
else:
log.error(f"Failed to install packages: {stderr}")
return False
else:
log.error(f"Failed to install packages: {stderr}")
return False
async def ensure_packages_installed_log(msg):
try:
log_file_path = "/opt/clore-hosting/ensure-packages-installed-log.txt"
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"{current_time} | {msg}\n"
async with aiofiles.open(log_file_path, "a") as log_file:
await log_file.write(log_message)
except Exception as e:
pass

View File

@ -460,3 +460,21 @@ class Specs:
return round(total_swap_gb, 4)
except Exception as e:
return 0
def get_root_device():
try:
mount_info = subprocess.check_output(['findmnt', '-n', '-o', 'SOURCE', '/']).decode().strip()
return mount_info
except subprocess.CalledProcessError:
return None
def is_usb_device(device):
try:
lsblk_output = subprocess.check_output(['lsblk', '-o', 'NAME,TRAN', '-n']).decode().strip()
for line in lsblk_output.splitlines():
parts = line.split()
if len(parts) == 2 and device.endswith(parts[0]):
return parts[1] == 'usb'
except subprocess.CalledProcessError:
return True
return False

View File

@ -68,10 +68,11 @@ GPU_CORE_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
is_hive = False
all_gpus_data_list=[]
gpu_name_list=[]
get_data_fail=False
def init(gpu_specs_file=None, allow_hive_binaries=True):
global is_hive, all_gpus_data_list, get_data_fail
global is_hive, all_gpus_data_list, get_data_fail, gpu_name_list
log.info("Loading GPU OC specs [ working ]")
try:
pynvml.nvmlInit()
@ -96,6 +97,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
break
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle)
gpu_name_list.append(pynvml.nvmlDeviceGetName(gpu_handle))
if not f"{i}-{gpu_uuid}" in parsed_specs_keys:
parsed_specs={}
regenerate_specs=True
@ -117,6 +119,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0)
gpu_spec["power_limits"] = [min_power_limit, max_power_limit]
gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle)
gpu_name_list.append(gpu_spec["name"])
gpu_spec["locks"] = mem_to_core_allowed_locks
pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle)
@ -201,6 +204,10 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
print(all_gpus_data_list)
# Load GPU specs
def get_gpu_name_list():
global gpu_name_list
return gpu_name_list
def get_gpu_oc_specs():
global get_data_fail
if get_data_fail:

View File

@ -3,8 +3,10 @@ from lib import ensure_packages_installed
from lib import logging as logging_lib
from lib import docker_interface
from lib import networking
from lib import get_specs
from lib import utils
from datetime import datetime
import asyncio
import json
import os
@ -13,12 +15,20 @@ log = logging_lib.log
DOCKER_ROOT = "/var/lib/docker"
DOCKER_DATA_IMG = "/opt/clore-hosting/data.img"
LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
HP_LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
HP_MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
GENERIC_LEAVE_FREE_SPACE_MB = 1024*8 # 8 GB
GENERIC_MIN_XFS_PARTITION_SIZE = 1024*10 # 10 GB
XFS_STATE_FILE = "/opt/clore-hosting/xfs_state"
HIGH_PERFORMANCE_GPUS = [
"NVIDIA GeForce RTX 4090",
"NVIDIA GeForce RTX 3090"
]
MANDATORY_PACKAGES = [
"xfsprogs",
"dmidecode",
@ -34,12 +44,33 @@ MANDATORY_PACKAGES = [
# sudo mkfs.xfs /docker-storage.img
# mount -o loop,pquota /docker-storage.img /mnt/docker-storage
def get_to_use_storage_values(max_free_space):
gpu_str, gpu_mem, gpus, nvml_err = get_specs.get_gpu_info()
if nvml_err:
return None, None
try:
gpu_names = []
for gpu in gpus["nvidia"]:
gpu_names.append(gpu["name"])
if len(gpu_names) > 0:
all_gpus_same = all(item == gpu_names[0] for item in gpu_names)
if (all_gpus_same and gpu_names[0] in HIGH_PERFORMANCE_GPUS) or max_free_space > 1024 * 70:
return HP_LEAVE_FREE_SPACE_MB, HP_MIN_XFS_PARTITION_SIZE
else:
return GENERIC_LEAVE_FREE_SPACE_MB, GENERIC_MIN_XFS_PARTITION_SIZE
else:
return "no-gpus", "no-gpus"
except Exception as e:
return None, None
def migrate():
docker_xfs_state = validate_docker_xfs()
#print(docker_xfs_state)
if docker_xfs_state == "skip":
migrate_log("skipping migration")
return
elif docker_xfs_state == "valid":
migrate_log("migration succeeded")
return 'success'
packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed(
@ -47,8 +78,20 @@ def migrate():
))
if not packages_available:
migrate_log("packages missing")
return 'packages-missing'
root_device = get_specs.get_root_device()
if not root_device:
migrate_log("not supported boot device")
return "not-supported-boot-device"
device_name = os.path.basename(root_device).split('p')[0].rstrip('0123456789')
if get_specs.is_usb_device(device_name):
migrate_log("not supported boot device")
return "not-supported-boot-device"
log.info("Starting migration to xfs")
docker_interface.stop_all_containers()
@ -56,13 +99,22 @@ def migrate():
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}")
migrate_log("Failed to remove DOCKER_DATA_IMG")
return "failure"
max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT)
data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB)
if data_img_size < MIN_XFS_PARTITION_SIZE:
leave_free_space, min_xfs_size = get_to_use_storage_values(max_free_space)
if leave_free_space == "no-gpus":
return "nvidia-failure"
if leave_free_space == None:
migrate_log("can't get free space")
return "failure"
data_img_size = int(max_free_space - leave_free_space)
if data_img_size < min_xfs_size:
migrate_log("not enought free space")
return 'not-enough-space'
docker_config_success = False
@ -85,6 +137,7 @@ def migrate():
if code==0:
return 'success'
else:
migrate_log("failed to migrate v1")
configure_docker_daemon(remove=True)
configure_fstab(remove=True)
return 'failure'
@ -97,8 +150,17 @@ def migrate():
os.remove(DOCKER_DATA_IMG)
except Exception as e:
pass
migrate_log("failed to migrate v2")
return 'failure'
def migrate_log(msg):
log_file_path = "/opt/clore-hosting/migrate-log.txt"
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"{current_time} | {msg}\n"
with open(log_file_path, "a") as log_file:
log_file.write(log_message)
def validate_docker_xfs():
code_root, stdout_root, stderr_root = utils.run_command("df -T /")
code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}")
@ -184,6 +246,12 @@ def init():
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-enough-space")
return 'not-enough-space'
elif migarion_status == "not-supported-boot-device":
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-supported-boot-device")
return 'failed'
elif migarion_status == "nvidia-failure":
return 'failed'
else:
with open(XFS_STATE_FILE, 'w') as file:
file.write("failed-migration")
@ -198,4 +266,4 @@ def init():
return "disabled"
except Exception as e:
print(e)
pass
return "failed"