Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
clore | 6665aa8fbb | |
clore | 76bc70ad56 | |
clore | 777365adf8 | |
clore | a9d57c18ae | |
clore | 49f8d46b45 | |
clore | 73cb1ca67e | |
clore | bce71c4574 | |
clore | 4e1e72da25 |
|
@ -27,6 +27,7 @@ import asyncio
|
|||
import time
|
||||
import json
|
||||
from aiofiles import os as async_os
|
||||
import aiofiles
|
||||
import os
|
||||
|
||||
specs = get_specs.Specs()
|
||||
|
@ -151,6 +152,9 @@ class CloreClient:
|
|||
|
||||
self.runned_pull_selftest = False
|
||||
|
||||
WebSocketClient.set_gpu_list(nvml.get_gpu_name_list())
|
||||
WebSocketClient.set_is_hive(self.is_hive)
|
||||
|
||||
async def service(self):
|
||||
global container_log_broken
|
||||
|
||||
|
@ -435,6 +439,12 @@ class CloreClient:
|
|||
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
|
||||
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
|
||||
|
||||
if not running_order and self.xfs_state == "disabled":
|
||||
async with aiofiles.open("/opt/clore-hosting/xfs_state", mode='w') as file:
|
||||
await file.write("enabled")
|
||||
log.info("No order running, requesting XFS migration")
|
||||
os._exit(0)
|
||||
|
||||
if self.restart_docker and not running_order and len(self.containers)>0:
|
||||
log.debug("Sending docker restart command")
|
||||
utils.run_command_v2("systemctl restart docker")
|
||||
|
@ -509,7 +519,7 @@ class CloreClient:
|
|||
async def submit_specs(self, current_specs):
|
||||
try:
|
||||
if type(current_specs) == dict:
|
||||
current_specs["backend_version"]=20
|
||||
current_specs["backend_version"]=21
|
||||
current_specs["update_hw"]=True
|
||||
smallest_pcie_width = 999
|
||||
for gpu in current_specs["gpus"]["nvidia"]:
|
||||
|
|
|
@ -56,6 +56,15 @@ class WebSocketClient:
|
|||
self.clore_partner_config = None
|
||||
self.forwarding_latency_measurment = None
|
||||
|
||||
self.gpu_list = []
|
||||
self.is_hive = False
|
||||
|
||||
def set_gpu_list(self, gpu_list):
|
||||
self.gpu_list = gpu_list
|
||||
|
||||
def set_is_hive(self, is_hive):
|
||||
self.is_hive = is_hive
|
||||
|
||||
def get_last_heartbeat(self):
|
||||
return self.last_heartbeat
|
||||
|
||||
|
@ -113,7 +122,9 @@ class WebSocketClient:
|
|||
"login":str(self.auth),
|
||||
"xfs_state": self.xfs_state,
|
||||
"type":"python",
|
||||
"clore_partner_support": True
|
||||
"clore_partner_support": True,
|
||||
"gpu_list": self.gpu_list,
|
||||
"is_hive": self.is_hive
|
||||
}))
|
||||
except Exception as e:
|
||||
log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌")
|
||||
|
|
|
@ -9,11 +9,15 @@ import docker
|
|||
config = config_module.config
|
||||
log = logging_lib.log
|
||||
|
||||
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30):
|
||||
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30, paused=False):
|
||||
# Sanitize and validate input
|
||||
container_options = sanitize_input(container_options)
|
||||
|
||||
command = ["docker", "run", "--detach", "--tty"]
|
||||
command = ["docker", ("create" if paused else "run")]
|
||||
|
||||
if not paused:
|
||||
command.append("--detach")
|
||||
command.append("--tty")
|
||||
|
||||
if "name" in container_options:
|
||||
command.extend(["--name", container_options["name"]])
|
||||
|
|
|
@ -151,7 +151,7 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
|
|||
|
||||
if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])):
|
||||
if config.creation_engine == "wrapper":
|
||||
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus)
|
||||
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus, paused="paused" in validated_container)
|
||||
else:
|
||||
container = client.containers.create(**container_options)
|
||||
if "ip" in validated_container:
|
||||
|
|
|
@ -1,11 +1,17 @@
|
|||
from lib import logging as logging_lib
|
||||
|
||||
import aiofiles
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from typing import List
|
||||
from lib import utils
|
||||
import time
|
||||
|
||||
log = logging_lib.log
|
||||
|
||||
LOGGING_ENABLED = True
|
||||
|
||||
async def ensure_packages_installed(
|
||||
packages: List[str] = [],
|
||||
total_timeout: float = 300
|
||||
|
@ -30,26 +36,26 @@ async def ensure_packages_installed(
|
|||
return True
|
||||
|
||||
update_cmd = (
|
||||
"apt-get update "
|
||||
"-y "
|
||||
"--no-install-recommends"
|
||||
"apt-get update -y --no-install-recommends"
|
||||
)
|
||||
return_code, stdout, stderr = await utils.async_run_command(
|
||||
update_cmd,
|
||||
timeout=None if total_timeout == None else 180,
|
||||
env=non_interactive_env
|
||||
)
|
||||
|
||||
if LOGGING_ENABLED:
|
||||
await ensure_packages_installed_log(f"update stdout: {stdout}")
|
||||
await ensure_packages_installed_log(f"update stderr: {stderr}\ncode: {str(return_code)}")
|
||||
|
||||
if return_code != 0:
|
||||
log.error(f"Failed to update package lists: {stderr}")
|
||||
return False
|
||||
|
||||
install_cmd = (
|
||||
"apt-get install "
|
||||
"-y "
|
||||
"--no-install-recommends "
|
||||
"--assume-yes "
|
||||
"-o Dpkg::Options::='--force-confdef' " # Default to existing config
|
||||
"-o Dpkg::Options::='--force-confold' " # Keep existing config
|
||||
"apt-get install -y --no-install-recommends --assume-yes "+
|
||||
"-o Dpkg::Options::='--force-confdef' "+ # Default to existing config
|
||||
"-o Dpkg::Options::='--force-confold' "+ # Keep existing config
|
||||
f"{' '.join(packages_to_install)}"
|
||||
)
|
||||
|
||||
|
@ -63,9 +69,48 @@ async def ensure_packages_installed(
|
|||
env=non_interactive_env
|
||||
)
|
||||
|
||||
if LOGGING_ENABLED:
|
||||
await ensure_packages_installed_log(f"install stdout: {stdout}")
|
||||
await ensure_packages_installed_log(f"install stderr: {stderr}\ncode: {str(return_code)}")
|
||||
|
||||
if return_code == 0:
|
||||
log.debug(f"Successfully installed packages: {packages_to_install}")
|
||||
return True
|
||||
elif return_code == 100:
|
||||
dpkg_rc, dpkg_stdout, dpkg_stderr = await utils.async_run_command(
|
||||
"sudo dpkg --configure -a",
|
||||
timeout=200,
|
||||
env=non_interactive_env
|
||||
)
|
||||
|
||||
# Install packages
|
||||
return_code, stdout, stderr = await utils.async_run_command(
|
||||
install_cmd,
|
||||
timeout=remaining_timeout,
|
||||
env=non_interactive_env
|
||||
)
|
||||
|
||||
if LOGGING_ENABLED:
|
||||
await ensure_packages_installed_log(f"post-dpkg install stdout: {stdout}")
|
||||
await ensure_packages_installed_log(f"post-dpkg install stderr: {stderr}\ncode: {str(return_code)}")
|
||||
|
||||
if return_code == 0:
|
||||
log.debug(f"Successfully installed packages: {packages_to_install}")
|
||||
return True
|
||||
else:
|
||||
log.error(f"Failed to install packages: {stderr}")
|
||||
return False
|
||||
else:
|
||||
log.error(f"Failed to install packages: {stderr}")
|
||||
return False
|
||||
|
||||
async def ensure_packages_installed_log(msg):
|
||||
try:
|
||||
log_file_path = "/opt/clore-hosting/ensure-packages-installed-log.txt"
|
||||
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
log_message = f"{current_time} | {msg}\n"
|
||||
async with aiofiles.open(log_file_path, "a") as log_file:
|
||||
await log_file.write(log_message)
|
||||
except Exception as e:
|
||||
pass
|
|
@ -460,3 +460,21 @@ class Specs:
|
|||
return round(total_swap_gb, 4)
|
||||
except Exception as e:
|
||||
return 0
|
||||
|
||||
def get_root_device():
|
||||
try:
|
||||
mount_info = subprocess.check_output(['findmnt', '-n', '-o', 'SOURCE', '/']).decode().strip()
|
||||
return mount_info
|
||||
except subprocess.CalledProcessError:
|
||||
return None
|
||||
|
||||
def is_usb_device(device):
|
||||
try:
|
||||
lsblk_output = subprocess.check_output(['lsblk', '-o', 'NAME,TRAN', '-n']).decode().strip()
|
||||
for line in lsblk_output.splitlines():
|
||||
parts = line.split()
|
||||
if len(parts) == 2 and device.endswith(parts[0]):
|
||||
return parts[1] == 'usb'
|
||||
except subprocess.CalledProcessError:
|
||||
return True
|
||||
return False
|
|
@ -68,10 +68,11 @@ GPU_CORE_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
|
|||
|
||||
is_hive = False
|
||||
all_gpus_data_list=[]
|
||||
gpu_name_list=[]
|
||||
get_data_fail=False
|
||||
|
||||
def init(gpu_specs_file=None, allow_hive_binaries=True):
|
||||
global is_hive, all_gpus_data_list, get_data_fail
|
||||
global is_hive, all_gpus_data_list, get_data_fail, gpu_name_list
|
||||
log.info("Loading GPU OC specs [ working ]")
|
||||
try:
|
||||
pynvml.nvmlInit()
|
||||
|
@ -96,6 +97,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
|
|||
break
|
||||
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
|
||||
gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle)
|
||||
gpu_name_list.append(pynvml.nvmlDeviceGetName(gpu_handle))
|
||||
if not f"{i}-{gpu_uuid}" in parsed_specs_keys:
|
||||
parsed_specs={}
|
||||
regenerate_specs=True
|
||||
|
@ -117,6 +119,7 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
|
|||
gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0)
|
||||
gpu_spec["power_limits"] = [min_power_limit, max_power_limit]
|
||||
gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle)
|
||||
gpu_name_list.append(gpu_spec["name"])
|
||||
gpu_spec["locks"] = mem_to_core_allowed_locks
|
||||
|
||||
pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle)
|
||||
|
@ -201,6 +204,10 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
|
|||
print(all_gpus_data_list)
|
||||
# Load GPU specs
|
||||
|
||||
def get_gpu_name_list():
|
||||
global gpu_name_list
|
||||
return gpu_name_list
|
||||
|
||||
def get_gpu_oc_specs():
|
||||
global get_data_fail
|
||||
if get_data_fail:
|
||||
|
|
82
lib/xfs.py
82
lib/xfs.py
|
@ -3,8 +3,10 @@ from lib import ensure_packages_installed
|
|||
from lib import logging as logging_lib
|
||||
from lib import docker_interface
|
||||
from lib import networking
|
||||
from lib import get_specs
|
||||
from lib import utils
|
||||
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
|
@ -13,12 +15,20 @@ log = logging_lib.log
|
|||
|
||||
DOCKER_ROOT = "/var/lib/docker"
|
||||
DOCKER_DATA_IMG = "/opt/clore-hosting/data.img"
|
||||
LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
|
||||
|
||||
MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
|
||||
HP_LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
|
||||
HP_MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
|
||||
|
||||
GENERIC_LEAVE_FREE_SPACE_MB = 1024*8 # 8 GB
|
||||
GENERIC_MIN_XFS_PARTITION_SIZE = 1024*10 # 10 GB
|
||||
|
||||
XFS_STATE_FILE = "/opt/clore-hosting/xfs_state"
|
||||
|
||||
HIGH_PERFORMANCE_GPUS = [
|
||||
"NVIDIA GeForce RTX 4090",
|
||||
"NVIDIA GeForce RTX 3090"
|
||||
]
|
||||
|
||||
MANDATORY_PACKAGES = [
|
||||
"xfsprogs",
|
||||
"dmidecode",
|
||||
|
@ -34,12 +44,33 @@ MANDATORY_PACKAGES = [
|
|||
# sudo mkfs.xfs /docker-storage.img
|
||||
# mount -o loop,pquota /docker-storage.img /mnt/docker-storage
|
||||
|
||||
def get_to_use_storage_values(max_free_space):
|
||||
gpu_str, gpu_mem, gpus, nvml_err = get_specs.get_gpu_info()
|
||||
if nvml_err:
|
||||
return None, None
|
||||
try:
|
||||
gpu_names = []
|
||||
for gpu in gpus["nvidia"]:
|
||||
gpu_names.append(gpu["name"])
|
||||
if len(gpu_names) > 0:
|
||||
all_gpus_same = all(item == gpu_names[0] for item in gpu_names)
|
||||
if (all_gpus_same and gpu_names[0] in HIGH_PERFORMANCE_GPUS) or max_free_space > 1024 * 70:
|
||||
return HP_LEAVE_FREE_SPACE_MB, HP_MIN_XFS_PARTITION_SIZE
|
||||
else:
|
||||
return GENERIC_LEAVE_FREE_SPACE_MB, GENERIC_MIN_XFS_PARTITION_SIZE
|
||||
else:
|
||||
return "no-gpus", "no-gpus"
|
||||
except Exception as e:
|
||||
return None, None
|
||||
|
||||
|
||||
def migrate():
|
||||
docker_xfs_state = validate_docker_xfs()
|
||||
#print(docker_xfs_state)
|
||||
if docker_xfs_state == "skip":
|
||||
migrate_log("skipping migration")
|
||||
return
|
||||
elif docker_xfs_state == "valid":
|
||||
migrate_log("migration succeeded")
|
||||
return 'success'
|
||||
|
||||
packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed(
|
||||
|
@ -47,8 +78,20 @@ def migrate():
|
|||
))
|
||||
|
||||
if not packages_available:
|
||||
migrate_log("packages missing")
|
||||
return 'packages-missing'
|
||||
|
||||
root_device = get_specs.get_root_device()
|
||||
if not root_device:
|
||||
migrate_log("not supported boot device")
|
||||
return "not-supported-boot-device"
|
||||
|
||||
device_name = os.path.basename(root_device).split('p')[0].rstrip('0123456789')
|
||||
|
||||
if get_specs.is_usb_device(device_name):
|
||||
migrate_log("not supported boot device")
|
||||
return "not-supported-boot-device"
|
||||
|
||||
log.info("Starting migration to xfs")
|
||||
docker_interface.stop_all_containers()
|
||||
|
||||
|
@ -56,13 +99,22 @@ def migrate():
|
|||
try:
|
||||
os.remove(DOCKER_DATA_IMG)
|
||||
except Exception as e:
|
||||
print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}")
|
||||
migrate_log("Failed to remove DOCKER_DATA_IMG")
|
||||
return "failure"
|
||||
|
||||
max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT)
|
||||
|
||||
data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB)
|
||||
if data_img_size < MIN_XFS_PARTITION_SIZE:
|
||||
leave_free_space, min_xfs_size = get_to_use_storage_values(max_free_space)
|
||||
if leave_free_space == "no-gpus":
|
||||
return "nvidia-failure"
|
||||
|
||||
if leave_free_space == None:
|
||||
migrate_log("can't get free space")
|
||||
return "failure"
|
||||
|
||||
data_img_size = int(max_free_space - leave_free_space)
|
||||
if data_img_size < min_xfs_size:
|
||||
migrate_log("not enought free space")
|
||||
return 'not-enough-space'
|
||||
|
||||
docker_config_success = False
|
||||
|
@ -85,6 +137,7 @@ def migrate():
|
|||
if code==0:
|
||||
return 'success'
|
||||
else:
|
||||
migrate_log("failed to migrate v1")
|
||||
configure_docker_daemon(remove=True)
|
||||
configure_fstab(remove=True)
|
||||
return 'failure'
|
||||
|
@ -97,8 +150,17 @@ def migrate():
|
|||
os.remove(DOCKER_DATA_IMG)
|
||||
except Exception as e:
|
||||
pass
|
||||
migrate_log("failed to migrate v2")
|
||||
return 'failure'
|
||||
|
||||
def migrate_log(msg):
|
||||
log_file_path = "/opt/clore-hosting/migrate-log.txt"
|
||||
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
|
||||
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
log_message = f"{current_time} | {msg}\n"
|
||||
with open(log_file_path, "a") as log_file:
|
||||
log_file.write(log_message)
|
||||
|
||||
def validate_docker_xfs():
|
||||
code_root, stdout_root, stderr_root = utils.run_command("df -T /")
|
||||
code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}")
|
||||
|
@ -184,6 +246,12 @@ def init():
|
|||
with open(XFS_STATE_FILE, 'w') as file:
|
||||
file.write("not-enough-space")
|
||||
return 'not-enough-space'
|
||||
elif migarion_status == "not-supported-boot-device":
|
||||
with open(XFS_STATE_FILE, 'w') as file:
|
||||
file.write("not-supported-boot-device")
|
||||
return 'failed'
|
||||
elif migarion_status == "nvidia-failure":
|
||||
return 'failed'
|
||||
else:
|
||||
with open(XFS_STATE_FILE, 'w') as file:
|
||||
file.write("failed-migration")
|
||||
|
@ -198,4 +266,4 @@ def init():
|
|||
return "disabled"
|
||||
except Exception as e:
|
||||
print(e)
|
||||
pass
|
||||
return "failed"
|
Loading…
Reference in New Issue