Compare commits

..

No commits in common. "main" and "nvml_integration" have entirely different histories.

26 changed files with 123 additions and 2132 deletions

View File

@ -4,7 +4,6 @@ from lib import custom_entrypoint
from lib import networking from lib import networking
from lib import wireguard from lib import wireguard
from lib import logging as logging_lib from lib import logging as logging_lib
from clore_hosting import utils as hosting_utils
import shutil import shutil
import os import os
import re import re
@ -32,7 +31,7 @@ def get_last_ip_occurrence_and_text(input_string):
else: else:
return None, None return None, None
def configure(containers, partner_forwarding_ips): def configure(containers):
valid_containers = [] valid_containers = []
newly_created_networks = [] newly_created_networks = []
containers_required_networks = [] containers_required_networks = []
@ -54,12 +53,9 @@ def configure(containers, partner_forwarding_ips):
for index, container in enumerate(containers): for index, container in enumerate(containers):
ok_custom_entrypoint = False ok_custom_entrypoint = False
invalid_hostname = False
if index < len(custom_entrypoint_state): if index < len(custom_entrypoint_state):
ok_custom_entrypoint = custom_entrypoint_state[index] ok_custom_entrypoint = custom_entrypoint_state[index]
startup_script_name = f"{container['name']}.sh" startup_script_name = f"{container['name']}.sh"
if "hostname" in container and not hosting_utils.validate_hostname(container["hostname"]):
invalid_hostname = True
if "ip" in container and len(container["ip"])>6 and type(container["ip"])==str: if "ip" in container and len(container["ip"])>6 and type(container["ip"])==str:
if container["ip"][:8] == "; echo '": if container["ip"][:8] == "; echo '":
last_occurrence, text_after_last_ip = get_last_ip_occurrence_and_text(container["ip"]) last_occurrence, text_after_last_ip = get_last_ip_occurrence_and_text(container["ip"])
@ -99,14 +95,14 @@ def configure(containers, partner_forwarding_ips):
newly_created_networks.append(container["network"]) newly_created_networks.append(container["network"])
else: else:
any_fail=True any_fail=True
if not any_fail and ok_custom_entrypoint and not invalid_hostname: if not any_fail and ok_custom_entrypoint:
valid_containers.append(container) valid_containers.append(container)
elif "network" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: # Subnet & gateway not defined, must be some of default networks, otherwise dump it elif "network" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: # Subnet & gateway not defined, must be some of default networks, otherwise dump it
if container["network"] in default_network_names: if container["network"] in default_network_names:
for docker_network in docker_networks: for docker_network in docker_networks:
if docker_network["Name"]==container["network"]: if docker_network["Name"]==container["network"]:
for ipam in docker_network["IPAM"]: for ipam in docker_network["IPAM"]:
if not ok_custom_entrypoint or invalid_hostname: if not ok_custom_entrypoint:
break break
elif not "ip" in container: elif not "ip" in container:
valid_containers.append(container) valid_containers.append(container)
@ -141,7 +137,7 @@ def configure(containers, partner_forwarding_ips):
if config.log_containers_strings: if config.log_containers_strings:
print("FROM DOCKER CONFIGURATOR", valid_containers) print("FROM DOCKER CONFIGURATOR", valid_containers)
validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips) validation_and_security = docker_interface.validate_and_secure_networks()
if startup_sctipt_creation_fail: if startup_sctipt_creation_fail:
validation_and_security=False validation_and_security=False
return validation_and_security, valid_containers, use_hive_flightsheet return validation_and_security, valid_containers, use_hive_flightsheet

View File

@ -1,14 +1,9 @@
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import nvidia_driver_update
from lib import log_streaming_task from lib import log_streaming_task
from lib import run_startup_script from lib import run_startup_script
from lib import hive_miner_interface
from lib import docker_interface from lib import docker_interface
from lib import background_job
from lib import docker_deploy from lib import docker_deploy
from lib import clore_partner
from lib import clore_partner_socket
from lib import docker_pull from lib import docker_pull
from lib import get_specs from lib import get_specs
from lib import utils from lib import utils
@ -27,7 +22,6 @@ import asyncio
import time import time
import json import json
from aiofiles import os as async_os from aiofiles import os as async_os
import aiofiles
import os import os
specs = get_specs.Specs() specs = get_specs.Specs()
@ -39,17 +33,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_
#print(config) #print(config)
async def configure_networks(containers, partner_forwarding_ips): async def configure_networks(containers):
res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips) res = await asyncio.to_thread(docker_configurator.configure, containers)
try: try:
fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2])
return fin_res return fin_res
except Exception as e: except Exception as e:
return False return False
async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads): async def deploy_containers(validated_containers):
try: try:
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads) all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers)
return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
except Exception as e: except Exception as e:
return False return False
@ -75,10 +69,8 @@ async def set_hive_miner_status(enabled=False):
return False return False
class CloreClient: class CloreClient:
def __init__(self, auth_key, xfs_state): def __init__(self, auth_key):
self.auth_key=auth_key self.auth_key=auth_key
self.xfs_state = xfs_state
self.ws_peers = {} self.ws_peers = {}
self.last_checked_ws_peers=0 self.last_checked_ws_peers=0
self.containers={} self.containers={}
@ -105,35 +97,17 @@ class CloreClient:
"log_streaming_task": utils.unix_timestamp(), "log_streaming_task": utils.unix_timestamp(),
"container_log_streaming_service": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(),
"specs_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(),
"oc_service": utils.unix_timestamp(), "oc_service": utils.unix_timestamp()
"background_pow_data_collection": utils.unix_timestamp(),
"partner_service": utils.unix_timestamp()
} }
self.max_service_inactivity = 600 # seconds self.max_service_inactivity = 600 # seconds
self.no_restart_services = ["partner_service", "specs_service"] # Services that are allowed to run indefinetly without triggering the app to restart
if config.debug_ws_peer: if config.debug_ws_peer:
self.ws_peers[str(config.debug_ws_peer)]={ self.ws_peers[str(config.debug_ws_peer)]={
"expiration":"immune" "expiration":"immune"
} }
self.os_release = get_specs.get_os_release()
self.restart_docker = False
if "use_cgroupfs" in self.os_release:
self.updated_exec_opts = True if docker_interface.configure_exec_opts("native.cgroupdriver","cgroupfs") else False
if self.updated_exec_opts:
docker_info = docker_interface.get_info()
if "CgroupDriver" in docker_info and docker_info["CgroupDriver"]=="systemd":
self.restart_docker = True # Restart docker when it's loaded under systemd (accual restart will happen only if no orders running to not disrupt workload)
docker_interface.verify_docker_version() docker_interface.verify_docker_version()
nvml.init()
self.dont_use_hive_binaries = True if 'DONT_USE_HIVE_BINARIES' in os.environ else False
nvml.init(allow_hive_binaries=not self.dont_use_hive_binaries)
self.extra_allowed_images = utils.get_extra_allowed_images()
self.allowed_running_containers = utils.get_allowed_container_names()
self.gpu_oc_specs = nvml.get_gpu_oc_specs() self.gpu_oc_specs = nvml.get_gpu_oc_specs()
self.last_oc_service_submit = 0 self.last_oc_service_submit = 0
@ -143,18 +117,6 @@ class CloreClient:
self.is_hive = get_specs.is_hive() self.is_hive = get_specs.is_hive()
self.use_hive_flightsheet = False self.use_hive_flightsheet = False
self.hive_miner_interface = hive_miner_interface.hive_interface()
self.next_pow_background_job_send_update = 0
self.clore_partner_initiazized = False
self.partner_forwarding_ips = []
self.start_time = utils.unix_timestamp()
self.runned_pull_selftest = False
WebSocketClient.set_gpu_list(nvml.get_gpu_name_list())
WebSocketClient.set_is_hive(self.is_hive)
async def service(self): async def service(self):
global container_log_broken global container_log_broken
@ -164,17 +126,14 @@ class CloreClient:
task1 = asyncio.create_task(self.main(pull_list, monitoring)) task1 = asyncio.create_task(self.main(pull_list, monitoring))
task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring)) task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring))
task3 = asyncio.create_task(self.startup_script_runner(monitoring)) task3 = asyncio.create_task(self.startup_script_runner(monitoring))
task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring, self.allowed_running_containers)) task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring))
task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
task6 = asyncio.create_task(self.specs_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring))
task7 = asyncio.create_task(self.oc_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring))
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
task9 = asyncio.create_task(self.partner_service(monitoring))
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
driver_update_task = asyncio.create_task(nvidia_driver_update.update_loop(self.is_hive))
# Wait for both tasks to complete (they won't in this case) # Wait for both tasks to complete (they won't in this case)
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task, driver_update_task) await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task)
async def monitoring_service(self, monitoring): async def monitoring_service(self, monitoring):
while True: while True:
@ -189,14 +148,13 @@ class CloreClient:
if config.debug: if config.debug:
log.success(self.last_service_heartbeat) log.success(self.last_service_heartbeat)
for service_name in self.last_service_heartbeat.keys(): for service_name in self.last_service_heartbeat.keys():
if not service_name in self.no_restart_services: last_hearthbeat = self.last_service_heartbeat[service_name]
last_hearthbeat = self.last_service_heartbeat[service_name] if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache":
if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") os._exit(1)
os._exit(1) elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache":
elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") os._exit(1)
os._exit(1)
except Exception as e: except Exception as e:
log.debug(f"monitoring_service() | ERROR | {e}") log.debug(f"monitoring_service() | ERROR | {e}")
await asyncio.sleep(5) await asyncio.sleep(5)
@ -281,7 +239,6 @@ class CloreClient:
if len(self.p_needed_containers)>0: if len(self.p_needed_containers)>0:
local_images = await get_local_images(no_latest_tag=True) local_images = await get_local_images(no_latest_tag=True)
partner_images = await clore_partner.get_partner_allowed_images()
for local_image in local_images: for local_image in local_images:
self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()}
image_needed = False image_needed = False
@ -305,32 +262,11 @@ class CloreClient:
if local_image_tag==allowed_tag or allowed_tag=='*': if local_image_tag==allowed_tag or allowed_tag=='*':
image_needed=True image_needed=True
break break
if not image_needed and type(partner_images) == list: if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers:
for partner_image in partner_images:
if local_image.replace(':latest', '') == partner_image.replace(':latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if len(local_image.split('/')) >= 3:
partner_image_spl = partner_image.split(':')
image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1]
if len(partner_image_spl) == 1:
if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest":
image_needed = True
del self.last_pull_progress[local_image]
break
elif len(partner_image_spl) == 2:
if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None:
log.success(f"GOING TO REMOVE {local_image}")
with concurrent.futures.ThreadPoolExecutor() as pool: with concurrent.futures.ThreadPoolExecutor() as pool:
r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image)
if r: if r:
removed_cnt+=1 removed_cnt+=1
del self.last_pull_progress[local_image]
#if config.debug: #if config.debug:
# log.success(f"{local_image} | {image_needed}") # log.success(f"{local_image} | {image_needed}")
@ -370,7 +306,7 @@ class CloreClient:
try: try:
await pull_task await pull_task
except Exception as e: except Exception as e:
self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()} self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()}
log_task.cancel() log_task.cancel()
try: try:
await log_task await log_task
@ -397,27 +333,18 @@ class CloreClient:
print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '')
tasks = [] tasks = []
running_order = False
container_conf = WebSocketClient.get_containers() container_conf = WebSocketClient.get_containers()
can_run_partner_workloads = False
if container_conf[0]: if container_conf[0]:
self.containers_set=True self.containers_set=True
self.containers=container_conf[1] self.containers=container_conf[1]
tmp_images = [] tmp_images = []
for container in self.containers:
is_order_spot = False if "image" in container:
for idx, container in enumerate(self.containers):
if "spot" in container:
is_order_spot = True
if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet":
log_pull = False log_pull = False
if "name" in container: if "name" in container:
if "-order-" in container["name"]: if "-order-" in container["name"]:
running_order=True
log_pull=True log_pull=True
image_config = { image_config = {
"image":container["image"], "image":container["image"],
@ -435,21 +362,6 @@ class CloreClient:
if not image_config in tmp_images: if not image_config in tmp_images:
tmp_images.append(image_config) tmp_images.append(image_config)
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
if not running_order and self.xfs_state == "disabled":
async with aiofiles.open("/opt/clore-hosting/xfs_state", mode='w') as file:
await file.write("enabled")
log.info("No order running, requesting XFS migration")
os._exit(0)
if self.restart_docker and not running_order and len(self.containers)>0:
log.debug("Sending docker restart command")
utils.run_command_v2("systemctl restart docker")
self.restart_docker=False
if tmp_images!=self.needed_images: if tmp_images!=self.needed_images:
self.needed_images=tmp_images self.needed_images=tmp_images
await pull_list.put(self.needed_images) await pull_list.put(self.needed_images)
@ -459,14 +371,14 @@ class CloreClient:
tasks.append(api_interface.get_server_config()) tasks.append(api_interface.get_server_config())
if self.containers_set: if self.containers_set:
tasks.append(configure_networks(self.containers, self.partner_forwarding_ips)) tasks.append(configure_networks(self.containers))
tasks.append(WebSocketClient.stream_pull_logs()) tasks.append(WebSocketClient.stream_pull_logs())
if self.validated_containers_set: if self.validated_containers_set:
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads)) tasks.append(deploy_containers(self.validated_containers))
if step==1: if step==1:
WebSocketClient.set_auth(self.auth_key, self.xfs_state) WebSocketClient.set_auth(self.auth_key)
asyncio.create_task(WebSocketClient.run()) asyncio.create_task(WebSocketClient.run())
elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval):
log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds")
@ -485,12 +397,7 @@ class CloreClient:
if type(result)==types.ServerConfig: if type(result)==types.ServerConfig:
if result.success: if result.success:
self.last_checked_ws_peers = utils.unix_timestamp() self.last_checked_ws_peers = utils.unix_timestamp()
self.allowed_images=result.allowed_images+self.extra_allowed_images self.allowed_images=result.allowed_images
if self.xfs_state == "active":
self.allowed_images.append({
"repository": "vastai/test",
"allowed_tags": ["bandwidth-test-nvidia", "selftest"]
})
if not config.debug_ws_peer: if not config.debug_ws_peer:
for pure_ws_peer in result.ws_peers: for pure_ws_peer in result.ws_peers:
self.ws_peers[pure_ws_peer]={ self.ws_peers[pure_ws_peer]={
@ -504,7 +411,6 @@ class CloreClient:
self.validated_containers_set=True self.validated_containers_set=True
self.validated_containers = result.valid_containers self.validated_containers = result.valid_containers
self.use_hive_flightsheet = result.use_hive_flightsheet self.use_hive_flightsheet = result.use_hive_flightsheet
log.debug(f"Use Hive flightsheet: {result.use_hive_flightsheet}")
elif type(result)==types.DeployContainersRes: elif type(result)==types.DeployContainersRes:
try: try:
self.all_running_container_names = result.all_running_container_names self.all_running_container_names = result.all_running_container_names
@ -519,7 +425,7 @@ class CloreClient:
async def submit_specs(self, current_specs): async def submit_specs(self, current_specs):
try: try:
if type(current_specs) == dict: if type(current_specs) == dict:
current_specs["backend_version"]=21 current_specs["backend_version"]=9
current_specs["update_hw"]=True current_specs["update_hw"]=True
smallest_pcie_width = 999 smallest_pcie_width = 999
for gpu in current_specs["gpus"]["nvidia"]: for gpu in current_specs["gpus"]["nvidia"]:
@ -541,7 +447,7 @@ class CloreClient:
"update_realtime_data":True, "update_realtime_data":True,
"gpus": gpu_list, "gpus": gpu_list,
"cpu": cpu_usage, "cpu": cpu_usage,
"ram": ram_usage.percent, "ram": ram_usage,
"all_running_container_names": self.all_running_container_names, "all_running_container_names": self.all_running_container_names,
"all_stopped_container_names": self.all_stopped_container_names "all_stopped_container_names": self.all_stopped_container_names
} }
@ -558,12 +464,6 @@ class CloreClient:
self.last_hw_specs_submit=utils.unix_timestamp() self.last_hw_specs_submit=utils.unix_timestamp()
await self.submit_specs(current_specs) await self.submit_specs(current_specs)
await self.update_realtime_data(current_specs) await self.update_realtime_data(current_specs)
try:
if self.xfs_state == "active" and len(current_specs["gpus"]["nvidia"]) > 0 and not self.runned_pull_selftest:
await clore_partner.check_to_pull_selftest(current_specs)
self.runned_pull_selftest = True
except Exception as partner_exception:
pass
except Exception as e: except Exception as e:
log.debug(f"FAIL | specs_service() | {e}") log.debug(f"FAIL | specs_service() | {e}")
await asyncio.sleep(7) await asyncio.sleep(7)
@ -574,10 +474,10 @@ class CloreClient:
await monitoring.put("oc_service") await monitoring.put("oc_service")
oc_apply_allowed = True oc_apply_allowed = True
### OC Service should also hande Hive stuff ### OC Service should also hande Hive stuff
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled(): if self.use_hive_flightsheet and self.is_hive:
await set_hive_miner_status(True) await set_hive_miner_status(True)
oc_apply_allowed = False # Don't apply any OC when running HiveOS miner oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
elif self.is_hive and not self.dont_use_hive_binaries: elif self.is_hive:
await set_hive_miner_status(False) await set_hive_miner_status(False)
### Run OC tasks ### Run OC tasks
oc_conf = WebSocketClient.get_oc() oc_conf = WebSocketClient.get_oc()
@ -598,46 +498,6 @@ class CloreClient:
log.debug(f"FAIL | oc_service() | {e}") log.debug(f"FAIL | oc_service() | {e}")
await asyncio.sleep(2) await asyncio.sleep(2)
async def background_pow_data_collection(self, monitoring):
while True:
try:
await monitoring.put("background_pow_data_collection")
if not self.dont_use_hive_binaries and self.is_hive:
miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False)
if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update < time.time():
self.next_pow_background_job_send_update = time.time()+(5*60)
current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True)
submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics})
if not submit_result:
self.next_pow_background_job_send_update = time.time()+40
except Exception as e:
log.debug(f"FAIL | background_pow_data_collection() | {e}")
await asyncio.sleep(6)
async def partner_service(self, monitoring):
while True:
try:
await monitoring.put("partner_service")
if self.start_time < utils.unix_timestamp() - 180:
forwarding_latency_measurment = await clore_partner.measure_forwarding_latency()
if type(forwarding_latency_measurment) == list:
await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment)
partner_config = WebSocketClient.get_clore_partner_config()
if partner_config != None:
if self.clore_partner_initiazized == False:
ir = await clore_partner.initialize()
if ir:
self.clore_partner_initiazized = True
if self.clore_partner_initiazized == True:
if 'provider' in partner_config and 'forwarding' in partner_config:
self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']]
else:
self.partner_forwarding_ips = []
await clore_partner.configure(partner_config)
except Exception as e:
log.debug(f"FAIL | partner_service() | {e}")
await asyncio.sleep(6)
def expire_ws_peers(self): def expire_ws_peers(self):
for ws_peer_address in list(self.ws_peers.keys()): for ws_peer_address in list(self.ws_peers.keys()):
ws_peer_info = self.ws_peers[ws_peer_address] ws_peer_info = self.ws_peers[ws_peer_address]

View File

@ -10,13 +10,5 @@ def is_valid_websocket_url(url):
return True return True
return False return False
def validate_hostname(hostname):
# Define the regular expression pattern for a valid hostname
pattern = re.compile(r'^[a-zA-Z0-9._-]{1,63}$')
if pattern.match(hostname):
return True
else:
return False
def unix_timestamp(): def unix_timestamp():
return int(time.time()) return int(time.time())

View File

@ -1,5 +1,4 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from lib import clore_partner
import asyncio import asyncio
import random import random
import websockets import websockets
@ -32,7 +31,6 @@ class WebSocketClient:
self.connected = False self.connected = False
self.authorized = False self.authorized = False
self.auth = auth self.auth = auth
self.xfs_state = None
self.log_auth_fail = True self.log_auth_fail = True
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
self.containers={} self.containers={}
@ -53,39 +51,15 @@ class WebSocketClient:
self.last_gpu_oc_specs = [] self.last_gpu_oc_specs = []
self.last_set_oc = {} self.last_set_oc = {}
self.clore_partner_config = None
self.forwarding_latency_measurment = None
self.gpu_list = []
self.is_hive = False
def set_gpu_list(self, gpu_list):
self.gpu_list = gpu_list
def set_is_hive(self, is_hive):
self.is_hive = is_hive
def get_last_heartbeat(self): def get_last_heartbeat(self):
return self.last_heartbeat return self.last_heartbeat
def get_containers(self): def get_containers(self):
partner_container_config = clore_partner.get_partner_container_config() return self.containers_set, self.containers
return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers)
def get_oc(self): def get_oc(self):
return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc
def get_clore_partner_config(self):
return self.clore_partner_config
async def set_forwarding_latency_measurment(self, forwarding_latency_measurment):
await self.send(json.dumps(
{
"forwarding_latency_measurment": forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = forwarding_latency_measurment
def set_ws_peers(self, ws_peers): def set_ws_peers(self, ws_peers):
tmp_ws_peers=[] tmp_ws_peers=[]
for ws_peer in list(ws_peers.keys()): for ws_peer in list(ws_peers.keys()):
@ -94,9 +68,8 @@ class WebSocketClient:
self.ws_peers = tmp_ws_peers self.ws_peers = tmp_ws_peers
def set_auth(self, auth, xfs_state): def set_auth(self, auth):
self.auth=auth self.auth=auth
self.xfs_state=xfs_state
def set_pull_logs(self, pull_logs): def set_pull_logs(self, pull_logs):
self.pull_logs=pull_logs self.pull_logs=pull_logs
@ -120,11 +93,7 @@ class WebSocketClient:
log.debug(f"CLOREWS | Connected to {random_ws_peer}") log.debug(f"CLOREWS | Connected to {random_ws_peer}")
await self.send(json.dumps({ await self.send(json.dumps({
"login":str(self.auth), "login":str(self.auth),
"xfs_state": self.xfs_state, "type":"python"
"type":"python",
"clore_partner_support": True,
"gpu_list": self.gpu_list,
"is_hive": self.is_hive
})) }))
except Exception as e: except Exception as e:
log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}") log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}")
@ -167,16 +136,6 @@ class WebSocketClient:
pass pass
elif message=="KEEPALIVE": elif message=="KEEPALIVE":
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
try:
if self.forwarding_latency_measurment:
await self.send(json.dumps(
{
"forwarding_latency_measurment": self.forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = None
except Exception as e:
pass
elif message=="NEWER_LOGIN" or message=="WAIT": elif message=="NEWER_LOGIN" or message=="WAIT":
await self.close_websocket() await self.close_websocket()
elif message[:10]=="PROVEPULL;": elif message[:10]=="PROVEPULL;":
@ -189,16 +148,13 @@ class WebSocketClient:
else: else:
try: try:
parsed_json = json.loads(message) parsed_json = json.loads(message)
if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict: if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list:
self.clore_partner_config = parsed_json["partner_config"]
await self.send(json.dumps({"partner_config":parsed_json["partner_config"]}))
elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list:
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
container_str = json.dumps({"containers":parsed_json["new_containers"]}) container_str = json.dumps({"containers":parsed_json["new_containers"]})
await self.send(container_str) await self.send(container_str)
if len(parsed_json["new_containers"]) > 0: # There should be at least one container if len(parsed_json["new_containers"]) > 0: # There should be at least one container
self.containers_set = True self.containers_set = True
self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"]) self.containers=parsed_json["new_containers"]
#log.success(container_str) #log.success(container_str)
elif "allow_oc" in parsed_json: # Enable OC elif "allow_oc" in parsed_json: # Enable OC
self.oc_enabled=True self.oc_enabled=True

View File

@ -1,6 +1,5 @@
from lib import config as config_module from lib import config as config_module
from lib import init_server from lib import init_server
from lib import xfs
from lib import utils from lib import utils
from clore_hosting import main as clore_hosting from clore_hosting import main as clore_hosting
import asyncio, os import asyncio, os
@ -30,23 +29,7 @@ elif config.reset:
log.success("Client login reseted") log.success("Client login reseted")
elif config.service: elif config.service:
if len(auth)==32+48+1: if len(auth)==32+48+1:
utils.run_command("sysctl -w net.ipv4.ip_forward=1") clore_client = clore_hosting.CloreClient(auth_key=auth)
xfs_state = xfs.init()
if os.path.isfile("/opt/clore-hosting/.run_hive_driver_update"):
utils.run_command("systemctl stop docker && PATH=/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./ nvidia-driver-update http://45.12.132.34/NVIDIA-Linux-x86_64-550.135.run --force")
utils.run_command("systemctl restart docker")
os.remove("/opt/clore-hosting/.run_hive_driver_update")
try:
os.remove(config.update_driver_550_flag)
except Exception as e:
pass
if os.path.isfile(config.restart_docker_flag_file):
utils.run_command("systemctl restart docker")
os.remove(config.restart_docker_flag_file)
clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state)
asyncio.run(clore_client.service()) asyncio.run(clore_client.service())
else: else:
print("TODO: Firstly config auth") print("TODO: Firstly config auth")

View File

@ -1,22 +0,0 @@
import time
import re
disabled_till = 0
def is_background_job_container_name(string):
if type(string) != str:
return False
pattern = r"^clore-default-\d+$"
return bool(re.match(pattern, string))
def temporarly_disable(seconds):
global disabled_till
disabled_till = time.time() + seconds
def enable():
global disabled_till
disabled_till=0
def is_enabled():
global disabled_till
return True if disabled_till < time.time() else False

View File

@ -1,266 +0,0 @@
from lib import ensure_packages_installed
from lib import config as config_module
from lib import logging as logging_lib
from lib import clore_partner_socket
from lib import latency_test
from lib import openvpn
from lib import utils
import asyncio
import random
import json
import time
import re
import os
import aiofiles.os
from aiohttp import ClientSession, ClientTimeout
config = config_module.config
log = logging_lib.log
MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2']
DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload"
non_interactive_env = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
}
host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts")
partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache")
next_ensupe_packages_check = 0
is_socket_running = False
partner_container_config = None
async def initialize():
global next_ensupe_packages_check
global is_socket_running
try:
await aiofiles.os.makedirs(host_facts_location, exist_ok=True)
await aiofiles.os.makedirs(partner_cache_location, exist_ok=True)
await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True)
if not is_socket_running:
is_socket_running=True
asyncio.create_task(clore_partner_socket.socket_service(
location=os.path.join(host_facts_location, "partner_interface.socket")
))
if next_ensupe_packages_check < time.time():
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr
if not success:
return False
elif next_ensupe_packages_check != float('inf'):
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode -t 2 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
return True
except Exception as e:
log.error(f"FAIL | clore_partner.initialize | {e}")
return False
async def get_partner_allowed_images():
try:
file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json"))
if not file_exists:
return []
images = []
async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file:
content = await file.read()
containers = json.loads(content)
for container in containers:
image = container.get("Config", {}).get("Image", None)
if image and not image in images:
images.append(image)
return images
except Exception as e:
return None
def validate_partner_container_name(name):
if type(name) != str:
return False
elif name==config.clore_partner_container_name:
return True
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
def validate_partner_workload_container_name(name):
if type(name) != str:
return False
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
last_openvpn_config = None
def get_partner_container_config():
global partner_container_config
return partner_container_config
async def configure(partner_config):
global last_openvpn_config
global partner_container_config
if last_openvpn_config != partner_config:
partner_container_config = {
"image": partner_config["partner_image"],
"name": config.clore_partner_container_name,
"hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}",
"env": {
"AUTH": partner_config['partner_id'],
"ip_addr": partner_config['openvpn_host'],
"port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}'
},
"volumes": {
f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"},
f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"},
f"/var/lib/docker": {"bind": "/var/lib/docker"},
f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"}
},
"gpus": True,
"command": '',
"network": "clore-partner-br0",
"ip": "172.19.0.254",
"cap_add": ["SYS_ADMIN"],
"devices": ["/dev/fuse"],
#"security_opt": ["apparmor:unconfined"],
"ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"],
}
r = await openvpn.clore_partner_configure(partner_config)
if r:
last_openvpn_config = partner_config
# -----------------------------------------
next_latency_measurment = 0
async def fetch_forwarding_nodes():
url = "https://api.clore.ai/v1/get_relay"
timeout = ClientTimeout(total=30)
async with ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
return data
except Exception as e:
print(f"An error occurred: {e}")
return None
async def set_next_latency_measurment(ts):
global next_latency_measurment
try:
next_latency_measurment=ts
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file:
await file.write(str(ts))
except Exception as e:
pass
async def measure_forwarding_latency():
global next_latency_measurment
if next_latency_measurment > time.time():
return False
try:
await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True)
file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"))
if file_exists:
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file:
content = await file.read()
if content.isdigit():
next_latency_measurment = int(content)
if next_latency_measurment < time.time():
node_info = await fetch_forwarding_nodes()
if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0:
to_test_nodes = []
ip_to_region = {}
valid_regions = []
for node_region in node_info.get("nodes").keys():
nodes_ip_list = node_info.get("nodes")[node_region]
if type(nodes_ip_list) == list and len(nodes_ip_list) > 0:
to_test_nodes = to_test_nodes + nodes_ip_list
for node_ip in nodes_ip_list:
ip_to_region[node_ip]=node_region
if len(to_test_nodes) > 0:
measurment_result = await latency_test.measure_latency_icmp(to_test_nodes)
if measurment_result:
for idx, res in enumerate(measurment_result):
if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions:
valid_regions.append(ip_to_region.get(res["host"]))
measurment_result[idx]["region"] = ip_to_region.get(res["host"])
if len(valid_regions) == len(ip_to_region.keys()):
await set_next_latency_measurment(int(
time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded
))
return measurment_result
else:
await set_next_latency_measurment(int(
time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable
))
else:
await set_next_latency_measurment(int(
time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet)
))
return False
else:
await set_next_latency_measurment(int(
time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format
))
return False
return False
except Exception as e:
return False
def filter_partner_dummy_workload_container(containers):
try:
remaining_containers = []
for container in containers:
if container["image"] != DUMMY_WORKLOAD_CONTAINER:
remaining_containers.append(container)
return remaining_containers
except Exception as e:
return containers
auto_pull_selftest_gpus = ["NVIDIA GeForce RTX 3090", "NVIDIA GeForce RTX 4090"]
async def check_to_pull_selftest(current_specs):
try:
min_width = 16
gpu_total_vram = 0
gpu_name = ''
mixed_cards = False
driver_version = 0
for idx, nvidia_gpu in enumerate(current_specs["gpus"]["nvidia"]):
if idx > 0 and nvidia_gpu["name"] != gpu_name:
mixed_cards = True
gpu_name = nvidia_gpu["name"]
driver_version = int(nvidia_gpu["driver"].split('.')[0])
if nvidia_gpu["pcie_width"] < min_width:
min_width = nvidia_gpu["pcie_width"]
if " MiB" in nvidia_gpu["mem_total"]:
gpu_total_vram += int(nvidia_gpu["mem_total"].replace(" MiB", ''))
if driver_version >= 550 and gpu_name in auto_pull_selftest_gpus and current_specs["ram"] > 7 and int(current_specs["cpus"].split('/')[0]) >= 4 and not mixed_cards and min_width > 1 and gpu_total_vram < current_specs["ram"] * 1024 and float(current_specs["disk"].split(' ')[-1].replace("GB", '')) > 25:
await utils.async_run_command("docker pull vastai/test:selftest", 14400, non_interactive_env)
except Exception as e:
pass

View File

@ -1,94 +0,0 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import background_job
from lib import utils
import asyncio
import json
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}']
allowed_commands = ["df"]
can_deploy = True
async def remove_socket_file(path):
try:
file_exists = await aiofiles.os.path.exists(path)
if file_exists:
await aiofiles.os.remove(path)
except Exception:
pass
async def handle_client(reader, writer):
global can_deploy
try:
while True:
data = await reader.read(1024*64)
try:
if not data:
break
#print("DATA", data, data.decode())
parsed_data = json.loads(data.decode())
if "run_command" in parsed_data and type(parsed_data["run_command"])==str:
allowed = False
for allowed_command in allowed_commands:
if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]:
allowed = True
break
if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars):
allowed = False
log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}")
if allowed:
code, stdout, stderr = await utils.async_run_command(
parsed_data["run_command"]
)
writer.write(json.dumps({
"code": code,
"stderr": stderr,
"stdout": stdout
}).encode())
else:
writer.write(json.dumps({
"code": -1,
"stderr": 'Command not allowed',
"stdout": 'Command not allowed'
}).encode())
elif "can_deploy" in parsed_data:
writer.write(json.dumps({
"can_deploy": can_deploy
}).encode())
elif "stop_background_job" in parsed_data and "time" in parsed_data:
try:
if isinstance(parsed_data["time"], int):
background_job.temporarly_disable(parsed_data["time"])
except Exception as e:
pass
else:
writer.write('?'.encode())
await writer.drain()
except Exception as data_exception:
pass
break
except asyncio.CancelledError:
log.debug(f"clore partner socket | Client handler canceled.")
finally:
log.debug(f"clore partner socket | Closing client connection.")
writer.close()
await writer.wait_closed()
def set_can_deploy(state):
global can_deploy
can_deploy = state
async def socket_service(location):
await remove_socket_file(location)
server = await asyncio.start_unix_server(handle_client, path=location)
log.debug(f"clore partner socket | running at {location}")
async with server:
await server.serve_forever()

View File

@ -9,11 +9,6 @@ hard_config = {
"name": "clore-br0", "name": "clore-br0",
"subnet": "172.18.0.0/16", "subnet": "172.18.0.0/16",
"gateway": "172.18.0.1" "gateway": "172.18.0.1"
},
{
"name": "clore-partner-br0",
"subnet": "172.19.0.0/20",
"gateway": "172.19.0.1"
} }
], ],
"run_iptables_with_sudo":True, "run_iptables_with_sudo":True,
@ -38,12 +33,7 @@ hard_config = {
"maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app
"maximum_pull_service_loop_time": 14400, # Exception for image pulling "maximum_pull_service_loop_time": 14400, # Exception for image pulling
"creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk
"allow_mixed_gpus": True, "allow_mixed_gpus": False
"openvpn_forwarding_tun_device": "tun1313",
"forwarding_ip_route_table_id": 100,
"clore_partner_container_name": "clore-partner-service",
"restart_docker_flag_file": "/opt/clore-hosting/.restart_docker",
"update_driver_550_flag": "/opt/clore-hosting/.update_550"
} }
parser = argparse.ArgumentParser(description='Example argparse usage') parser = argparse.ArgumentParser(description='Example argparse usage')
@ -58,9 +48,7 @@ parser.add_argument('--startup-scripts-folder', type=str, default='/opt/clore-ho
parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs') parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs')
parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints') parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints')
parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)")
parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json' ,help="Cache with specs of GPU possible OC/Power limit changes")
parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software")
parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner")
# Parse arguments, ignoring any non-defined arguments # Parse arguments, ignoring any non-defined arguments
args, _ = parser.parse_known_args() args, _ = parser.parse_known_args()

View File

@ -63,7 +63,7 @@ def cache_entrypoints(containers):
else: else:
valid_conf.append(True) valid_conf.append(True)
for remaining_file in entrypoint_files: # We can remove files that are not needed anymore for remaining_file in entrypoint_files: # We can remove files that are not needed anymore
os.remove(os.path.join(config.entrypoints_folder,remaining_file)) os.remove(remaining_file)
return valid_conf return valid_conf
except Exception as e: except Exception as e:
return 'e' return 'e'

View File

@ -9,15 +9,11 @@ import docker
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30, paused=False): def create_container(container_options, ip=None, docker_gpus=False, timeout=30):
# Sanitize and validate input # Sanitize and validate input
container_options = sanitize_input(container_options) container_options = sanitize_input(container_options)
command = ["docker", ("create" if paused else "run")] command = ["docker", "run", "--detach", "--tty"]
if not paused:
command.append("--detach")
command.append("--tty")
if "name" in container_options: if "name" in container_options:
command.extend(["--name", container_options["name"]]) command.extend(["--name", container_options["name"]])
@ -25,21 +21,10 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if "network_mode" in container_options: if "network_mode" in container_options:
command.extend(["--network", container_options["network_mode"]]) command.extend(["--network", container_options["network_mode"]])
if "hostname" in container_options:
command.extend(["--hostname", container_options["hostname"]])
if "cap_add" in container_options: if "cap_add" in container_options:
for cap in container_options["cap_add"]: for cap in container_options["cap_add"]:
command.extend(["--cap-add", cap]) command.extend(["--cap-add", cap])
if "devices" in container_options:
for device in container_options["devices"]:
command.extend(["--device", device])
if "security_opt" in container_options:
for security_opt in container_options["security_opt"]:
command.extend(["--security-opt", security_opt])
if "volumes" in container_options: if "volumes" in container_options:
for volume_host, volume_container in container_options["volumes"].items(): for volume_host, volume_container in container_options["volumes"].items():
bind = f"{volume_host}:{volume_container['bind']}" bind = f"{volume_host}:{volume_container['bind']}"
@ -67,10 +52,6 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if "runtime" in container_options: if "runtime" in container_options:
command.extend(["--runtime", container_options["runtime"]]) command.extend(["--runtime", container_options["runtime"]])
if shm_size != 64:
command.extend(["--shm-size", f"{shm_size}m"])
if docker_gpus: if docker_gpus:
if type(docker_gpus)==list: if type(docker_gpus)==list:
command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"']) command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"'])
@ -87,8 +68,6 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if ip: if ip:
command.extend(["--ip", ip]) command.extend(["--ip", ip])
command.append('--stop-timeout')
command.append('0')
command.append(container_options["image"]) command.append(container_options["image"])
try: try:

View File

@ -1,22 +1,17 @@
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import docker_cli_wrapper from lib import docker_cli_wrapper
from lib import background_job
from lib import docker_interface from lib import docker_interface
from lib import clore_partner
from lib import get_specs from lib import get_specs
from lib import utils
import docker import docker
from docker.types import EndpointConfig, NetworkingConfig from docker.types import EndpointConfig, NetworkingConfig
import os import os
shm_calculator = utils.shm_calculator(get_specs.get_total_ram_mb())
client = docker_interface.client client = docker_interface.client
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False): def deploy(validated_containers):
local_images = docker_interface.get_local_images() local_images = docker_interface.get_local_images()
all_containers = docker_interface.get_containers(all=True) all_containers = docker_interface.get_containers(all=True)
@ -48,7 +43,6 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
for validated_container in validated_containers: for validated_container in validated_containers:
try: try:
SHM_SIZE = 64 # MB - default
image_ready = False image_ready = False
docker_gpus = None docker_gpus = None
@ -69,9 +63,7 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
'tty': True, 'tty': True,
'network_mode': 'clore-br0', 'network_mode': 'clore-br0',
'cap_add': [], 'cap_add': [],
'devices': [], 'volumes': {},
'security_opt': [],
'volumes': validated_container["volumes"] if "volumes" in validated_container else {},
'ports': {}, 'ports': {},
'device_requests': [], 'device_requests': [],
'environment': validated_container["env"] if "env" in validated_container else {}, 'environment': validated_container["env"] if "env" in validated_container else {},
@ -84,30 +76,12 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
) )
} }
if "security_opt" in validated_container:
container_options["security_opt"] = validated_container["security_opt"]
if "devices" in validated_container:
container_options["devices"] = validated_container["devices"]
if "cap_add" in validated_container:
container_options["cap_add"] = validated_container["cap_add"]
if "hostname" in validated_container:
container_options["hostname"]=validated_container["hostname"]
elif "clore-order-" in validated_container["name"]:
try:
container_options["hostname"] = f"O-{int(validated_container["name"][12:])}"
except Exception as eon:
pass
if "network" in validated_container: if "network" in validated_container:
container_options["network_mode"]=validated_container["network"] container_options["network_mode"]=validated_container["network"]
if "ip" in validated_container and config.creation_engine=="sdk": if "ip" in validated_container and config.creation_engine=="sdk":
del container_options["network_mode"] del container_options["network_mode"]
if "gpus" in validated_container and type(validated_container["gpus"])==bool: if "gpus" in validated_container and type(validated_container["gpus"])==bool:
if "clore-order-" in validated_container["name"]:
SHM_SIZE = shm_calculator.calculate('*')
container_options["runtime"]="nvidia" container_options["runtime"]="nvidia"
docker_gpus=True docker_gpus=True
container_options["device_requests"].append(docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])) container_options["device_requests"].append(docker.types.DeviceRequest(count=-1, capabilities=[['gpu']]))
@ -147,11 +121,9 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
elif "entrypoint_command" in validated_container and type(validated_container["entrypoint_command"])==str and len(validated_container["entrypoint_command"])>0: elif "entrypoint_command" in validated_container and type(validated_container["entrypoint_command"])==str and len(validated_container["entrypoint_command"])>0:
container_options["entrypoint"]=validated_container["entrypoint_command"] container_options["entrypoint"]=validated_container["entrypoint_command"]
container_options["shm_size"] = f"{SHM_SIZE}m" if not validated_container["name"] in created_container_names and image_ready:
if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])):
if config.creation_engine == "wrapper": if config.creation_engine == "wrapper":
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus, paused="paused" in validated_container) docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), docker_gpus=docker_gpus)
else: else:
container = client.containers.create(**container_options) container = client.containers.create(**container_options)
if "ip" in validated_container: if "ip" in validated_container:
@ -172,10 +144,7 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
all_running_container_names.append(container.name) all_running_container_names.append(container.name)
else: else:
all_stopped_container_names.append(container.name) all_stopped_container_names.append(container.name)
if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled(): if container.name in needed_running_names and container.status != 'running':
if container.status == "running":
container.stop()
elif container.name in needed_running_names and container.status != 'running':
try: try:
attached_networks = container.attrs['NetworkSettings']['Networks'] attached_networks = container.attrs['NetworkSettings']['Networks']
if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container
@ -190,22 +159,17 @@ def deploy(validated_containers, allowed_running_containers=[], can_run_partner_
container.stop() container.stop()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): elif container.name not in paused_names+needed_running_names and container.status == 'running':
try: try:
container.stop() container.stop()
container.remove() container.remove()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name): elif container.name not in paused_names+needed_running_names:
try: try:
container.remove() container.remove()
except Exception as e: except Exception as e:
pass pass
elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name):
try:
container.stop()
except Exception as e:
pass
return all_running_container_names, all_stopped_container_names return all_running_container_names, all_stopped_container_names
#print(validated_containers) #print(validated_containers)

View File

@ -11,13 +11,6 @@ from typing import List, Optional
import docker import docker
import json import json
import os import os
import re
partner_bridge_subnet = ''
for clore_network in config.clore_default_networks:
if clore_network["name"] == "clore-partner-br0":
partner_bridge_subnet = clore_network["subnet"]
try: try:
os.makedirs(config.startup_scripts_folder, exist_ok=True) os.makedirs(config.startup_scripts_folder, exist_ok=True)
@ -57,26 +50,6 @@ class DockerNetwork(BaseModel):
client = docker.from_env() client = docker.from_env()
low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock') low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock')
daemon_config_path = "/etc/docker/daemon.json"
def get_info():
try:
client_info = client.info()
return client_info
except Exception as e:
return {}
def stop_all_containers():
try:
# List all containers
containers = client.containers.list(all=True) # Use all=True to include stopped containers
for container in containers:
log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})")
container.stop() # Stop the container
log.success("stop_all_containers() | All containers have been stopped.")
except Exception as e:
log.error(f"stop_all_containers() |An error occurred: {e}")
return True
def check_docker_connection(): def check_docker_connection():
try: try:
@ -114,7 +87,7 @@ def get_local_images(no_latest_tag=False):
return image_list return image_list
except Exception as e: except Exception as e:
log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}") log.error(f"DOCKER | Can't get local images | {e}")
os._exit(1) os._exit(1)
def get_containers(all=False): def get_containers(all=False):
@ -203,8 +176,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
gateway=gateway gateway=gateway
)] )]
), ),
check_duplicate=True, check_duplicate=True
#options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {}
) )
log.debug(f"Network {network_name} created successfully.") log.debug(f"Network {network_name} created successfully.")
return True return True
@ -212,7 +184,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
log.error(f"DOCKER | Failed to create network {network_name}: {e}") log.error(f"DOCKER | Failed to create network {network_name}: {e}")
return False return False
def validate_and_secure_networks(partner_forwarding_ips): def validate_and_secure_networks():
try: try:
failed_appending_iptables_rule = False failed_appending_iptables_rule = False
@ -258,13 +230,6 @@ def validate_and_secure_networks(partner_forwarding_ips):
#print(this_ipv4_range) #print(this_ipv4_range)
outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) outside_ranges_ip_network = networking.exclude_network(this_ipv4_range)
if this_ipv4_range == partner_bridge_subnet:
for partner_forwarding_ip in partner_forwarding_ips:
outside_ranges = []
for ip_range in outside_ranges_ip_network:
outside_ranges.append(str(ip_range))
outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges)
outside_ranges = [] outside_ranges = []
for outside_range_ip_network in outside_ranges_ip_network: for outside_range_ip_network in outside_ranges_ip_network:
outside_ranges.append(str(outside_range_ip_network)) outside_ranges.append(str(outside_range_ip_network))
@ -292,7 +257,7 @@ def validate_and_secure_networks(partner_forwarding_ips):
succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) succesfully_appended = networking.add_iptables_rule(needed_iptables_rule)
if not succesfully_appended: if not succesfully_appended:
failed_appending_iptables_rule = True failed_appending_iptables_rule = True
elif this_ipv4_range != partner_bridge_subnet: else:
needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name) needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name)
for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule
for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule))
@ -381,15 +346,16 @@ def validate_and_secure_networks(partner_forwarding_ips):
def get_daemon_config(): def get_daemon_config():
config_path = "/etc/docker/daemon.json"
try: try:
with open(daemon_config_path, 'r') as file: with open(config_path, 'r') as file:
config_data = json.load(file) config_data = json.load(file)
return config_data return config_data
except FileNotFoundError: except FileNotFoundError:
print(f"Error: {daemon_config_path} not found.") print(f"Error: {config_path} not found.")
return None return None
except json.JSONDecodeError: except json.JSONDecodeError:
print(f"Error: Failed to parse JSON from {daemon_config_path}.") print(f"Error: Failed to parse JSON from {config_path}.")
return None return None
def verify_docker_version(min_version="17.06"): def verify_docker_version(min_version="17.06"):
@ -402,45 +368,3 @@ def verify_docker_version(min_version="17.06"):
except Exception as e: except Exception as e:
log.error(f"Failed to verify docker version | {e}") log.error(f"Failed to verify docker version | {e}")
os._exit(1) os._exit(1)
def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"):
deamon_config = get_daemon_config()
if deamon_config:
try:
if (not "exec-opts" in deamon_config or type(deamon_config["exec-opts"])!=list) and value!=None:
deamon_config["exec-opts"]=[f"{key}={value}"]
elif "exec-opts" in deamon_config:
new_exec_opts=[]
matched_key=False
for exec_opt in deamon_config["exec-opts"]:
if '=' in exec_opt:
exec_opt_key, exec_opt_value = exec_opt.split('=',1)
if exec_opt_key==key:
matched_key=True
if value!=None:
new_exec_opts.append(f"{key}={value}")
else:
new_exec_opts.append(exec_opt)
else:
new_exec_opts.append(exec_opt)
if not matched_key:
new_exec_opts.append(f"{key}={value}")
if len(new_exec_opts)==0:
del deamon_config["exec-opts"]
else:
if deamon_config["exec-opts"] == new_exec_opts:
return "Same"
deamon_config["exec-opts"]=new_exec_opts
json_string = json.dumps(deamon_config, indent=4)
with open(daemon_config_path, 'w') as file:
file.write(json_string)
return True
except Exception as e:
log.error(f"Failed 'configure_exec_opts' | {e}")
return False
else:
return False
def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,
pattern = r'^[a-z]+_[a-z]+$'
return re.match(pattern, container_name) is not None

View File

@ -1,116 +0,0 @@
from lib import logging as logging_lib
import aiofiles
import os
from datetime import datetime
from typing import List
from lib import utils
import time
log = logging_lib.log
LOGGING_ENABLED = True
async def ensure_packages_installed(
packages: List[str] = [],
total_timeout: float = 300
) -> bool:
non_interactive_env = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
}
start_time = time.time()
packages_to_install = []
for package in packages:
check_cmd = f"dpkg -s {package} > /dev/null 2>&1"
return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env)
if return_code != 0:
packages_to_install.append(package)
if not packages_to_install:
log.debug("All packages are already installed.")
return True
update_cmd = (
"apt-get update -y --no-install-recommends"
)
return_code, stdout, stderr = await utils.async_run_command(
update_cmd,
timeout=None if total_timeout == None else 180,
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"update stdout: {stdout}")
await ensure_packages_installed_log(f"update stderr: {stderr}\ncode: {str(return_code)}")
if return_code != 0:
log.error(f"Failed to update package lists: {stderr}")
return False
install_cmd = (
"apt-get install -y --no-install-recommends --assume-yes "+
"-o Dpkg::Options::='--force-confdef' "+ # Default to existing config
"-o Dpkg::Options::='--force-confold' "+ # Keep existing config
f"{' '.join(packages_to_install)}"
)
# Calculate remaining timeout
remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time))
# Install packages
return_code, stdout, stderr = await utils.async_run_command(
install_cmd,
timeout=remaining_timeout,
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"install stdout: {stdout}")
await ensure_packages_installed_log(f"install stderr: {stderr}\ncode: {str(return_code)}")
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
elif return_code == 100:
dpkg_rc, dpkg_stdout, dpkg_stderr = await utils.async_run_command(
"sudo dpkg --configure -a",
timeout=200,
env=non_interactive_env
)
# Install packages
return_code, stdout, stderr = await utils.async_run_command(
install_cmd,
timeout=remaining_timeout,
env=non_interactive_env
)
if LOGGING_ENABLED:
await ensure_packages_installed_log(f"post-dpkg install stdout: {stdout}")
await ensure_packages_installed_log(f"post-dpkg install stderr: {stderr}\ncode: {str(return_code)}")
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
else:
log.error(f"Failed to install packages: {stderr}")
return False
else:
log.error(f"Failed to install packages: {stderr}")
return False
async def ensure_packages_installed_log(msg):
try:
log_file_path = "/opt/clore-hosting/ensure-packages-installed-log.txt"
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"{current_time} | {msg}\n"
async with aiofiles.open(log_file_path, "a") as log_file:
await log_file.write(log_message)
except Exception as e:
pass

View File

@ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat
from pydantic import BaseModel, Field, constr from pydantic import BaseModel, Field, constr
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from lib import docker_interface from lib import docker_interface
from typing import Dict, List, Optional from typing import Dict, List
from lib import utils from lib import utils
import subprocess import subprocess
import speedtest import speedtest
@ -43,32 +43,6 @@ def get_kernel():
def is_hive(): def is_hive():
return "hive" in get_kernel() return "hive" in get_kernel()
def get_total_ram_mb():
total_ram = psutil.virtual_memory().total
return total_ram / (1024 ** 2)
def get_os_release():
try:
with open("/etc/os-release") as f:
os_info = f.read()
os_release = {}
for line in os_info.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
if value[:1]=='"' and value.endswith('"'):
value = value[1:len(value)-1]
os_release[key]=value
needed_cgroupfs_versions = ["22.04", "22.10"] # Mitigate issue https://github.com/NVIDIA/nvidia-docker/issues/1730
if "NAME" in os_release and "VERSION_ID" in os_release:
if os_release["NAME"].lower() == "ubuntu" and os_release["VERSION_ID"] in needed_cgroupfs_versions:
os_release["use_cgroupfs"]=True
return os_release
except Exception as e:
return {}
def drop_caches(): def drop_caches():
try: try:
with open('/proc/sys/vm/drop_caches', 'w') as f: with open('/proc/sys/vm/drop_caches', 'w') as f:
@ -311,7 +285,7 @@ def get_gpu_info():
class DockerDaemonConfig(BaseModel): class DockerDaemonConfig(BaseModel):
data_root: str = Field(alias="data-root") data_root: str = Field(alias="data-root")
storage_driver: str = Field(alias="storage-driver") storage_driver: str = Field(alias="storage-driver")
storage_opts: Optional[List[str]] = Field(alias="storage-opts") storage_opts: List[str] = Field(alias="storage-opts")
class Specs: class Specs:
def __init__(self): def __init__(self):
@ -322,7 +296,7 @@ class Specs:
gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info() gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info()
if require_same_gpus: if require_same_gpus:
last_gpu_name='' last_gpu_name=''
for gpu in gpus["nvidia"]: for gpu in gpus:
if not last_gpu_name: if not last_gpu_name:
last_gpu_name=gpu["name"] last_gpu_name=gpu["name"]
elif last_gpu_name!=gpu["name"]: elif last_gpu_name!=gpu["name"]:
@ -336,14 +310,26 @@ class Specs:
else: else:
overlay_total_size=None overlay_total_size=None
disk_type="" disk_type=""
disk_usage_source_path = '/'
try: try:
if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config: validated_config = DockerDaemonConfig(**docker_daemon_config)
disk_usage_source_path = docker_daemon_config["data-root"] disk_udevadm = get_disk_udevadm(validated_config.data_root)
for udevadm_line in disk_udevadm.split('\n'):
try:
key, value=udevadm_line.split('=',1)
if "id_model" in key.lower():
disk_type=value[:24]
elif "devpath" in key.lower() and "/virtual/" in value:
disk_type="Virtual"
except Exception as e_int:
pass
for storage_opt in validated_config.storage_opts:
if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]:
numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4)
overlay_total_size=numeric_size
except Exception as e: except Exception as e:
pass pass
if overlay_total_size==None: if overlay_total_size==None:
total, used, free = shutil.disk_usage(disk_usage_source_path) total, used, free = shutil.disk_usage("/")
disk_udevadm = get_disk_udevadm("/") disk_udevadm = get_disk_udevadm("/")
for udevadm_line in disk_udevadm.split('\n'): for udevadm_line in disk_udevadm.split('\n'):
try: try:
@ -460,21 +446,3 @@ class Specs:
return round(total_swap_gb, 4) return round(total_swap_gb, 4)
except Exception as e: except Exception as e:
return 0 return 0
def get_root_device():
try:
mount_info = subprocess.check_output(['findmnt', '-n', '-o', 'SOURCE', '/']).decode().strip()
return mount_info
except subprocess.CalledProcessError:
return None
def is_usb_device(device):
try:
lsblk_output = subprocess.check_output(['lsblk', '-o', 'NAME,TRAN', '-n']).decode().strip()
for line in lsblk_output.splitlines():
parts = line.split()
if len(parts) == 2 and device.endswith(parts[0]):
return parts[1] == 'usb'
except subprocess.CalledProcessError:
return True
return False

View File

@ -1,205 +0,0 @@
import aiofiles
import asyncio
import json
import time
import os
import re
def extract_json_with_key(text, key):
json_pattern = r'({.*?})'
json_strings = re.findall(json_pattern, text, re.DOTALL)
json_objects_with_key = []
for json_str in json_strings:
try:
json_obj = json.loads(json_str)
if key in json.dumps(json_obj):
json_objects_with_key.append(json_obj)
except json.JSONDecodeError:
continue
return json_objects_with_key
async def async_run_bash_command(command):
process = await asyncio.create_subprocess_exec(
'/bin/bash', '-c', command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return stdout.decode().strip(), stderr.decode().strip(), process.returncode
async def check_and_read_file(file_path):
try:
if os.path.exists(file_path):
async with aiofiles.open(file_path, mode='r') as file:
contents = await file.read()
return contents
else:
return "fail"
except Exception as e:
return "fail"
async def get_session_start_time(pid):
try:
async with aiofiles.open(f'/proc/{pid}/stat', 'r') as file:
stat_info = (await file.read()).split()
start_time_ticks = int(stat_info[21])
clock_ticks_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
start_time_seconds = start_time_ticks / clock_ticks_per_sec
boot_time = None
async with aiofiles.open('/proc/stat', 'r') as file:
async for line in file:
if line.startswith('btime'):
boot_time = int(line.split()[1])
break
if boot_time is None:
raise ValueError("Failed to find boot time in /proc/stat")
start_time = (boot_time + start_time_seconds)
return start_time
except FileNotFoundError:
return None
except Exception as e:
print(f"Error retrieving session start time: {e}")
return None
async def get_miner_stats(miner_dir, api_timeout=15):
stdout, stderr, code = await async_run_bash_command(f'export PATH=$PATH:/hive/sbin:/hive/bin && export API_TIMEOUT={api_timeout}'+''' && read -t $((API_TIMEOUT + 5)) -d "" pid khs stats < <(function run_miner_scripts { echo "$BASHPID"; cd '''+miner_dir+''' || exit; cd '''+miner_dir+'''; source h-manifest.conf; cd '''+miner_dir+'''; source h-stats.sh; printf "%q\n" "$khs"; echo "$stats"; }; run_miner_scripts) && [[ $? -ge 128 ]] && [[ ! -z "$pid" && "$pid" -ne $$ ]] && kill -9 "$pid" 2>/dev/null ; echo $stats''')
try:
if stdout and not stderr and code == 0:
stats = json.loads(stdout)
return stats
else:
return 'fail'
except Exception as e:
try:
miner_stat_jsons = extract_json_with_key(stdout, "hs_units")
return miner_stat_jsons[0] if len(miner_stat_jsons) > 0 else 'fail'
except Exception :
return 'fail'
async def get_miner_uptime_stats():
stdout, stderr, code = await async_run_bash_command("screen -ls")
if not stderr and code == 0:
for line in stdout.split('\n'):
if line[:1]=='\t':
if line.split('\t')[1].endswith(".miner"):
miner_screen_pid = line.split('\t')[1].split('.')[0]
if miner_screen_pid.isdigit():
miner_start_time = await get_session_start_time(miner_screen_pid)
return miner_start_time
return None
def extract_miner_names(rig_conf):
miner_names=[]
for line in rig_conf.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
if key[:5]=="MINER" and (len(key)==5 or str(key[5:]).isdigit()):
if value.startswith('"') and value.endswith('"'):
value = value.strip('"')
if len(value)>0:
miner_names.append(value)
return miner_names
def extract_miner_config(miner_names, wallet_conf):
lines = wallet_conf.split('\n')
meta_config = {}
miners = {}
for miner_idx, miner_name in enumerate(miner_names):
remaining_lines = []
algos = []
for line in lines:
if not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
if value.startswith('"') and value.endswith('"'):
value = value.strip('"')
if value.startswith("'") and value.endswith("'"):
value = value.strip("'")
if key[:len(miner_name.replace('-','_'))+1].lower() == f"{miner_name.replace('-','_')}_".lower():
if key.split('_')[-1][:4].lower()=="algo":
algos.append(value)
elif miner_name.lower()=="custom" and key.lower()=="custom_miner":
miner_names[miner_idx] = os.path.join(miner_names[miner_idx],value)
elif key.lower()=="meta":
try:
meta_config=json.loads(value)
except Exception as e:
pass
else:
remaining_lines.append(line)
lines = remaining_lines
miners[miner_name]={
"algos":algos,
"coins":[]
}
for miner_name in miner_names:
if "custom/" in miner_name.lower():
miner_name = "custom"
if miner_name in meta_config and type(meta_config[miner_name]) == dict:
for key in meta_config[miner_name].keys():
if "coin" in key:
miners[miner_name]["coins"].append(meta_config[miner_name][key])
return miner_names, miners
def sum_numbers_in_list(lst):
if all(isinstance(i, (int, float)) for i in lst):
return sum(lst)
else:
return "The list contains non-numeric elements."
class hive_interface:
def __init__(self):
self.hive_miners_dir = "/hive/miners"
self.hive_rig_config = "/hive-config/rig.conf"
self.hive_wallet_config = "/hive-config/wallet.conf"
async def get_miners_stats(self, miner_names):
scrape_tasks = []
for miner_name in miner_names:
scrape_tasks.append(get_miner_stats(os.path.join(self.hive_miners_dir, miner_name)))
results = await asyncio.gather(*scrape_tasks)
return results
async def get_configured_miners(self):
rig_conf, wallet_conf = await asyncio.gather(*[check_and_read_file(self.hive_rig_config), check_and_read_file(self.hive_wallet_config)])
miner_names = extract_miner_names(rig_conf)
miner_names, miner_config = extract_miner_config(miner_names, wallet_conf)
return miner_names, miner_config
async def export_miner_stats(self, get_hashrates=False):
output = {
"miner_uptime": 0
}
miner_start_ts = await get_miner_uptime_stats()
if miner_start_ts:
output["miner_uptime"] = int(time.time()-miner_start_ts)
miner_names, miner_config = await self.get_configured_miners()
output["miners"]=miner_config
if get_hashrates:
miners_stats = await self.get_miners_stats(miner_names)
for idx, miner_name in enumerate(miner_names):
miner_names[idx] = "custom" if "custom/" in miner_name.lower() else miner_name
for idx, miner_stats in enumerate(miners_stats):
if type(miner_stats) == dict:
for key in miner_stats.keys():
if key[:2]=="hs" and (key=="hs" or key[2:].isdigit()):
all_hs = sum_numbers_in_list(miner_stats[key])
try:
if "hs_units" in miner_stats:
if miner_stats["hs_units"]=="hs":
all_hs = all_hs/1000
if not "hashrates" in output['miners'][miner_names[idx]]:
output['miners'][miner_names[idx]]["hashrates"]=[]
if isinstance(all_hs, (float, int)):
output['miners'][miner_names[idx]]["hashrates"].append(all_hs)
else:
output['miners'][miner_names[idx]]["hashrates"].append(0)
except Exception as e:
pass
return output

View File

@ -3,7 +3,6 @@ from lib import logging as logging_lib
from lib import get_specs from lib import get_specs
from lib import utils from lib import utils
import threading import threading
import socket
import aiohttp import aiohttp
import asyncio import asyncio
import json import json
@ -48,11 +47,9 @@ async def register_server(data):
"Content-Type": "application/json" "Content-Type": "application/json"
} }
connector = aiohttp.TCPConnector(family=socket.AF_INET) async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=connector) as session:
try: try:
async with session.post(url, data=json_data, headers=headers, timeout=15) as response: async with session.post(url, data=json_data, headers=headers, timeout=5) as response:
if response.status == 200: if response.status == 200:
# Successful response # Successful response
response_data = await response.json() response_data = await response.json()

View File

@ -1,66 +0,0 @@
from lib import ensure_packages_installed
from lib import utils
import asyncio
import socket
import re
MANDATORY_PACKEGES = ['iputils-ping']
def is_valid_ipv4_or_hostname(string):
def is_valid_ipv4(ip):
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
hostname_regex = re.compile(
r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
)
def is_valid_hostname(hostname):
if len(hostname) > 255:
return False
return all(hostname_regex.match(part) for part in hostname.split("."))
return is_valid_ipv4(string) or is_valid_hostname(string)
async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4):
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
if success:
outcome = []
concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)]
for concurent_job in concurent_jobs:
tasks = []
for host in concurent_job:
if is_valid_ipv4_or_hostname(host):
tasks.append(
utils.async_run_command(
f"ping -c {str(count)} -W 2 -i 0.3 {host}",
20 * count,
{"LANG": "C", "LC_ALL": "C"}
)
)
if len(tasks) > 0:
r = await asyncio.gather(*tasks)
try:
for output in r:
results = []
code, stdout, stderr = output
if code == 0:
for line in stdout.split('\n'):
match = re.search(r"time=(\d+\.?\d*) ms", line)
if match:
results.append(float(match.group(1)))
outcome.append(
{
"host": hosts[len(outcome)],
"received": len(results),
"avg_latency": sum(results) / len(results) if len(results)>0 else 0
}
)
except Exception as e:
print(e)
return outcome
else:
return False

View File

@ -1,7 +1,6 @@
from lib import docker_interface from lib import docker_interface
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import clore_partner
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
@ -11,7 +10,7 @@ from lib import container_logs
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import queue # Import the synchronous queue module import queue # Import the synchronous queue module
async def log_streaming_task(message_broker, monitoring, do_not_stream_containers): async def log_streaming_task(message_broker, monitoring):
client = docker_interface.client client = docker_interface.client
executor = ThreadPoolExecutor(max_workers=4) executor = ThreadPoolExecutor(max_workers=4)
tasks = {} tasks = {}
@ -30,15 +29,14 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container
# Start tasks for new containers # Start tasks for new containers
for container_name, container in current_containers.items(): for container_name, container in current_containers.items():
if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name): log_container_names.append(container_name)
log_container_names.append(container_name) if container_name not in tasks:
if container_name not in tasks: log.debug(f"log_streaming_task() | Starting task for {container_name}")
log.debug(f"log_streaming_task() | Starting task for {container_name}") sync_queue = queue.Queue()
sync_queue = queue.Queue() task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor(
task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor( executor, container_logs.stream_logs, container_name, sync_queue))
executor, container_logs.stream_logs, container_name, sync_queue)) tasks[container_name] = task
tasks[container_name] = task queues[container_name] = sync_queue
queues[container_name] = sync_queue
await message_broker.put(log_container_names) await message_broker.put(log_container_names)

View File

@ -6,7 +6,6 @@ import ipaddress
import socket import socket
import psutil import psutil
import sys import sys
import os
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
@ -26,15 +25,12 @@ def get_network_interfaces_with_subnet():
except Exception as e: except Exception as e:
return str(e) return str(e)
def exclude_network(excluded_network, input_ranges=None): def exclude_network(excluded_network):
# Convert exclude_network to ip_network object # Convert exclude_network to ip_network object
excluded_network = ip_network(excluded_network) excluded_network = ip_network(excluded_network)
if not input_ranges:
input_ranges=config.local_ipv4_ranges
# Remove the excluded network from the local_ranges list # Remove the excluded network from the local_ranges list
local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network] local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network]
ranges_outside_exclude = [] ranges_outside_exclude = []
for local_range in local_ranges: for local_range in local_ranges:
@ -97,19 +93,3 @@ def is_ip_in_network(ip: str, network: str) -> bool:
# If there's an error with the input values, print the error and return False # If there's an error with the input values, print the error and return False
log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}")
return False return False
def purge_clore_interfaces():
network_interfaces = get_network_interfaces_with_subnet()
if type(network_interfaces) != dict:
log.error("Failed to load network interfaces, restarting...")
os._exit(1)
clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here
for clore_default_network in config.clore_default_networks:
clore_subnets.append(clore_default_network["subnet"])
for network_interface in network_interfaces.keys():
if network_interface == "docker0" or network_interface[:3] == "br-":
subnet = network_interfaces[network_interface]
if subnet in clore_subnets or network_interface == "docker0":
utils.run_command(f"ip link delete {network_interface}")

View File

@ -1,70 +0,0 @@
from lib import config as config_module
from lib import docker_interface
from lib import background_job
from lib import utils
import asyncio
import aiofiles
import os
config = config_module.config
non_interactive_env = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
}
non_interactive_env_hive = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./',
}
INTENDED_DRIVER_VERSION = 550
async def get_running_containers():
try:
containers = await asyncio.to_thread(docker_interface.get_containers, False)
return containers
except Exception as e:
return False
async def run_update(is_hive = False):
try:
code, stdout, stderr = await utils.async_run_command("nvidia-smi --query-gpu=driver_version --format=csv,noheader", 20, env=non_interactive_env)
if code == 0 and stderr == '' and stdout:
driver_version = stdout.split('\n')[0].split('.')
if len(driver_version) >= 2 and driver_version[0].isdigit() and int(driver_version[0]) > 300 and int(driver_version[0]) < INTENDED_DRIVER_VERSION:
running_containers = await get_running_containers()
if type(running_containers) == list:
order_running = False
for container in running_containers:
if container.name[:11] == "clore-order" or container.name[:2] == "C.":
order_running = True
break
if not order_running:
if is_hive:
background_job.temporarly_disable(14400)
driver_update_code, driver_update_stdout, driver_update_stderr = await utils.async_run_command("nvidia-driver-update http://45.12.132.34/NVIDIA-Linux-x86_64-550.135.run --force", 14400, non_interactive_env_hive)
if driver_update_code == 1 and "Unload modules failed" in driver_update_stdout:
async with aiofiles.open("/opt/clore-hosting/.run_hive_driver_update", mode='w') as file:
await file.write("")
os._exit(0)
background_job.enable()
if driver_update_code == 0:
async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file:
await file.write("")
await aiofiles.os.remove(config.update_driver_550_flag)
os._exit(0)
else:
driver_update_code, driver_update_stdout, driver_update_stderr = await utils.async_run_command("apt update -y && apt install nvidia-driver-550 -y", 14400, non_interactive_env)
if driver_update_code == 0:
await aiofiles.os.remove(config.update_driver_550_flag)
await utils.async_run_command("reboot") # On ubuntu it's just safest to reboot to get new driver version working properly
except Exception as e:
pass
async def update_loop(is_hive):
while True:
flag_exists = await aiofiles.os.path.exists(config.update_driver_550_flag)
if flag_exists:
await run_update(is_hive)
await asyncio.sleep(300)

View File

@ -6,78 +6,20 @@ config = config_module.config
log = logging_lib.log log = logging_lib.log
import subprocess import subprocess
import clore_pynvml as pynvml import pynvml
import json import json
import math
HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./"
GPU_MEM_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
"NVIDIA P102-100": [-2000, 2000],
"NVIDIA P104-100": [-2000, 2000],
"NVIDIA P106-090": [-2000, 2000],
"NVIDIA P106-100": [-2000, 2000],
"NVIDIA GeForce GTX 1050 Ti": [-2000, 2000],
"NVIDIA GeForce GTX 1060 3GB": [-2000, 2000],
"NVIDIA GeForce GTX 1060 6GB": [-2000, 2000],
"NVIDIA GeForce GTX 1070": [-2000, 2000],
"NVIDIA GeForce GTX 1070 Ti": [-2000, 2000],
"NVIDIA GeForce GTX 1080": [-2000, 2000],
"NVIDIA GeForce GTX 1080 Ti": [-2000, 2000],
"NVIDIA CMP 30HX": [-2000, 6000],
"NVIDIA CMP 40HX": [-2000, 6000],
"NVIDIA CMP 50HX": [-2000, 6000],
"NVIDIA CMP 90HX": [-2000, 6000],
"NVIDIA GeForce GTX 1650": [-2000, 6000],
"NVIDIA GeForce GTX 1660 SUPER": [-2000, 6000],
"NVIDIA GeForce GTX 1660 Ti": [-2000, 6000],
"NVIDIA GeForce RTX 2060": [-2000, 6000],
"NVIDIA GeForce RTX 2060 SUPER": [-2000, 6000],
"NVIDIA GeForce RTX 2070": [-2000, 6000],
"NVIDIA GeForce RTX 2070 SUPER": [-2000, 6000],
"NVIDIA GeForce RTX 2080": [-2000, 6000],
"NVIDIA GeForce RTX 2080 Ti": [-2000, 6000]
}
GPU_CORE_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
"NVIDIA P102-100": [-200, 1200],
"NVIDIA P104-100": [-200, 1200],
"NVIDIA P106-090": [-200, 1200],
"NVIDIA P106-100": [-200, 1200],
"NVIDIA GeForce GTX 1050 Ti": [-200, 1200],
"NVIDIA GeForce GTX 1060 3GB": [-200, 1200],
"NVIDIA GeForce GTX 1060 6GB": [-200, 1200],
"NVIDIA GeForce GTX 1070": [-200, 1200],
"NVIDIA GeForce GTX 1070 Ti": [-200, 1200],
"NVIDIA GeForce GTX 1080": [-200, 1200],
"NVIDIA GeForce GTX 1080 Ti": [-200, 1200],
"NVIDIA CMP 30HX": [-1000, 1000],
"NVIDIA CMP 40HX": [-1000, 1000],
"NVIDIA CMP 50HX": [-1000, 1000],
"NVIDIA CMP 90HX": [-1000, 1000],
"NVIDIA GeForce GTX 1650": [-1000, 1000],
"NVIDIA GeForce GTX 1660 SUPER": [-1000, 1000],
"NVIDIA GeForce GTX 1660 Ti": [-1000, 1000],
"NVIDIA GeForce RTX 2060": [-1000, 1000],
"NVIDIA GeForce RTX 2060 SUPER": [-1000, 1000],
"NVIDIA GeForce RTX 2070": [-1000, 1000],
"NVIDIA GeForce RTX 2070 SUPER": [-1000, 1000],
"NVIDIA GeForce RTX 2080": [-1000, 1000],
"NVIDIA GeForce RTX 2080 Ti": [-1000, 1000]
}
is_hive = False is_hive = False
all_gpus_data_list=[] all_gpus_data_list=[]
gpu_name_list=[]
get_data_fail=False get_data_fail=False
def init(gpu_specs_file=None, allow_hive_binaries=True): def init(gpu_specs_file=None):
global is_hive, all_gpus_data_list, get_data_fail, gpu_name_list global is_hive, all_gpus_data_list, get_data_fail
log.info("Loading GPU OC specs [ working ]") log.info("Loading GPU OC specs [ working ]")
try: try:
pynvml.nvmlInit() pynvml.nvmlInit()
kernel = get_specs.get_kernel() kernel = get_specs.get_kernel()
if "hive" in kernel and allow_hive_binaries: if "hive" in kernel:
is_hive=True is_hive=True
specs_file_loc = gpu_specs_file if gpu_specs_file else config.gpu_specs_file specs_file_loc = gpu_specs_file if gpu_specs_file else config.gpu_specs_file
@ -97,20 +39,14 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
break break
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle)
gpu_name_list.append(pynvml.nvmlDeviceGetName(gpu_handle))
if not f"{i}-{gpu_uuid}" in parsed_specs_keys: if not f"{i}-{gpu_uuid}" in parsed_specs_keys:
parsed_specs={} parsed_specs={}
regenerate_specs=True regenerate_specs=True
break break
elif not "locks" in parsed_specs[f"{i}-{gpu_uuid}"]:
parsed_specs={}
regenerate_specs=True
break
if regenerate_specs: if regenerate_specs:
for i in range(0,gpu_count): for i in range(0,gpu_count):
gpu_spec={} gpu_spec={}
mem_to_core_allowed_locks = get_gpu_locked_clocks(i)
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle)
power_limits = pynvml.nvmlDeviceGetPowerManagementLimitConstraints(gpu_handle) power_limits = pynvml.nvmlDeviceGetPowerManagementLimitConstraints(gpu_handle)
@ -119,8 +55,6 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0) gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0)
gpu_spec["power_limits"] = [min_power_limit, max_power_limit] gpu_spec["power_limits"] = [min_power_limit, max_power_limit]
gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle) gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle)
gpu_name_list.append(gpu_spec["name"])
gpu_spec["locks"] = mem_to_core_allowed_locks
pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle) pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle)
pci_bus_id = pci_info.bus pci_bus_id = pci_info.bus
@ -130,57 +64,22 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
mem_range = get_hive_clock_range(is_hive, i, "mem") mem_range = get_hive_clock_range(is_hive, i, "mem")
core_range = get_hive_clock_range(is_hive, i, "core") core_range = get_hive_clock_range(is_hive, i, "core")
try: if type(mem_range) != list:
if type(mem_range) != list: pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, 200, 300) # Force low clocks, so the GPU can't crash when testing if under load
pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, 200, 300) # Force low clocks, so the GPU can't crash when testing if under load failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle)
failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle) failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle)
failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle) if (not failure_min) and (not failure_max):
if (not failure_min) and (not failure_max): mem_range=[min_oc_solution, max_oc_solution]
mem_range=[min_oc_solution, max_oc_solution] pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, 0)
pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, 0) pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle)
pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle) if type(core_range) != list:
if type(core_range) != list: pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, 300, 350) # Force low clocks, so the GPU can't crash when testing if under load
pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, 300, 350) # Force low clocks, so the GPU can't crash when testing if under load failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle, True)
failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle, True) failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle, True)
failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle, True) if (not failure_min) and (not failure_max):
if (not failure_min) and (not failure_max): core_range=[min_oc_solution, max_oc_solution]
core_range=[min_oc_solution, max_oc_solution] pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, 0)
pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, 0) pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
except Exception as e_pinpointing:
if "not supported" in str(e_pinpointing).lower():
try:
min_core_offset, max_core_offset = pynvml.nvmlDeviceGetGpcClkMinMaxVfOffset(gpu_handle)
if min_core_offset>0:
min_core_offset = min_core_offset - math.floor((2**32)/1000)
if min_core_offset > -20000 and min_core_offset <= 0 and max_core_offset>=0 and min_core_offset < 20000:
core_range=[min_core_offset, max_core_offset]
else:
core_range=[0,0]
min_mem_offset, max_mem_offset = pynvml.nvmlDeviceGetMemClkMinMaxVfOffset(gpu_handle)
if min_mem_offset>0:
min_mem_offset = min_mem_offset - math.floor((2**32)/1000)
if min_mem_offset==0 and max_mem_offset==0:
if gpu_spec["name"] in GPU_MEM_ALLOWED_OC_RANGES:
mem_range = GPU_MEM_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
mem_range = [0,0]
elif min_mem_offset > -20000 and min_mem_offset <= 0 and max_mem_offset>=0 and max_mem_offset < 20000:
mem_range=[min_mem_offset, max_mem_offset]
else:
mem_range=[0,0]
except Exception as e2:
if "function not found" in str(e2).lower():
if gpu_spec["name"] in GPU_MEM_ALLOWED_OC_RANGES:
mem_range = GPU_MEM_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
mem_range = [0,0]
if gpu_spec["name"] in GPU_CORE_ALLOWED_OC_RANGES:
core_range = GPU_CORE_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
core_range = [0,0]
else:
get_data_fail=True
if type(mem_range) == list and type(core_range) == list and len(mem_range)==2 and len(core_range)==2: if type(mem_range) == list and type(core_range) == list and len(mem_range)==2 and len(core_range)==2:
gpu_spec["mem"]=mem_range gpu_spec["mem"]=mem_range
gpu_spec["core"]=core_range gpu_spec["core"]=core_range
@ -204,10 +103,6 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
print(all_gpus_data_list) print(all_gpus_data_list)
# Load GPU specs # Load GPU specs
def get_gpu_name_list():
global gpu_name_list
return gpu_name_list
def get_gpu_oc_specs(): def get_gpu_oc_specs():
global get_data_fail global get_data_fail
if get_data_fail: if get_data_fail:
@ -218,19 +113,6 @@ def get_gpu_oc_specs():
def shutdown(): def shutdown():
pynvml.nvmlShutdown() pynvml.nvmlShutdown()
def get_gpu_locked_clocks(gpu_index):
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index)
mem_clocks = pynvml.nvmlDeviceGetSupportedMemoryClocks(handle)
mem_to_core = {}
for idx, mem_clock in enumerate(mem_clocks):
if idx < 12 or idx == len(mem_clocks)-1:
graphics_clocks = pynvml.nvmlDeviceGetSupportedGraphicsClocks(handle, mem_clock)
mem_to_core[str(mem_clock)] = [min(graphics_clocks), max(graphics_clocks)]
return mem_to_core
except Exception as e:
return {}
def handle_nn(input_int): def handle_nn(input_int):
if abs(4293967-input_int) < 10000: if abs(4293967-input_int) < 10000:
return input_int-4293967 return input_int-4293967
@ -336,7 +218,6 @@ def pinpoint_oc_limits_positive(gpu_handle, core=False):
return failure, found_solution return failure, found_solution
def set_oc(settings): def set_oc(settings):
global is_hive
try: try:
gpu_count = pynvml.nvmlDeviceGetCount() gpu_count = pynvml.nvmlDeviceGetCount()
settings_keys = settings.keys() settings_keys = settings.keys()
@ -350,10 +231,6 @@ def set_oc(settings):
} }
settings_keys = settings.keys() settings_keys = settings.keys()
log.debug(f"Rewriting settings with: {json.dumps(settings)}") log.debug(f"Rewriting settings with: {json.dumps(settings)}")
core_locks = []
mem_locks = []
any_lock_failure = False
for oc_gpu_index in settings_keys: for oc_gpu_index in settings_keys:
if oc_gpu_index.isdigit(): if oc_gpu_index.isdigit():
oc_gpu_index=int(oc_gpu_index) oc_gpu_index=int(oc_gpu_index)
@ -361,42 +238,13 @@ def set_oc(settings):
gpu_oc_config = settings[str(oc_gpu_index)] gpu_oc_config = settings[str(oc_gpu_index)]
gpu_possible_ranges = all_gpus_data_list[oc_gpu_index] gpu_possible_ranges = all_gpus_data_list[oc_gpu_index]
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(oc_gpu_index) gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(oc_gpu_index)
if "core" in gpu_oc_config:
if "core_lock" in gpu_oc_config:
core_lock = int(gpu_oc_config["core_lock"])
core_locks.append(str(core_lock))
try:
pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, core_lock, core_lock)
except Exception as core_lock_exception:
any_lock_failure=True
else:
core_locks.append('0')
try:
pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
except Exception as core_lock_exception:
any_lock_failure=True
if "mem_lock" in gpu_oc_config:
mem_lock = int(gpu_oc_config["mem_lock"])
mem_locks.append(str(mem_lock))
try:
pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, mem_lock, mem_lock)
except Exception as mem_lock_exception:
any_lock_failure=True
else:
mem_locks.append('0')
try:
pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle)
except Exception as mem_lock_exception:
any_lock_failure=True
if "core" in gpu_oc_config: # Core offset
wanted_core_clock = int(round(gpu_oc_config["core"]*2)) wanted_core_clock = int(round(gpu_oc_config["core"]*2))
if gpu_possible_ranges["core"][0] <= wanted_core_clock and wanted_core_clock <= gpu_possible_ranges["core"][1]: if gpu_possible_ranges["core"][0] <= wanted_core_clock and wanted_core_clock <= gpu_possible_ranges["core"][1]:
pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, wanted_core_clock) pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, wanted_core_clock)
else: else:
log.error(f"Requested OC for GPU:{oc_gpu_index} (CORE) out of bound | {wanted_core_clock} | [{gpu_possible_ranges["core"][0]}, {gpu_possible_ranges["core"][1]}]") log.error(f"Requested OC for GPU:{oc_gpu_index} (CORE) out of bound | {wanted_core_clock} | [{gpu_possible_ranges["core"][0]}, {gpu_possible_ranges["core"][1]}]")
if "mem" in gpu_oc_config: # Memory offset if "mem" in gpu_oc_config:
wanted_mem_clock = int(round(gpu_oc_config["mem"]*2)) wanted_mem_clock = int(round(gpu_oc_config["mem"]*2))
if gpu_possible_ranges["mem"][0] <= wanted_mem_clock and wanted_mem_clock <= gpu_possible_ranges["mem"][1]: if gpu_possible_ranges["mem"][0] <= wanted_mem_clock and wanted_mem_clock <= gpu_possible_ranges["mem"][1]:
pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, wanted_mem_clock) pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, wanted_mem_clock)
@ -408,17 +256,6 @@ def set_oc(settings):
pynvml.nvmlDeviceSetPowerManagementLimit(gpu_handle, wanted_power_limit_milliwatts) pynvml.nvmlDeviceSetPowerManagementLimit(gpu_handle, wanted_power_limit_milliwatts)
else: else:
log.error(f"Requested OC for GPU:{oc_gpu_index} (POWER LIMIT) out of bound | {gpu_oc_config["pl"]} | [{gpu_possible_ranges["power_limits"][0]}, {gpu_possible_ranges["power_limits"][1]}]") log.error(f"Requested OC for GPU:{oc_gpu_index} (POWER LIMIT) out of bound | {gpu_oc_config["pl"]} | [{gpu_possible_ranges["power_limits"][0]}, {gpu_possible_ranges["power_limits"][1]}]")
if is_hive and any_lock_failure and len(mem_locks)==len(core_locks):
try:
nvtool_commands = []
for idx, mem_lock in enumerate(mem_locks):
core_lock = core_locks[idx]
nvtool_commands.append(f"nvtool -i {str(idx)} --setmem {mem_lock} --setcore {core_lock}")
cmd = ["bash",'-c',f"PATH={HIVE_PATH} && sudo {' && '.join(nvtool_commands)}"]
#print(cmd)
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as hive_oc_settings:
pass
return True return True
except Exception as e: except Exception as e:
log.error(f"set_oc | ERROR | {e}") log.error(f"set_oc | ERROR | {e}")
@ -430,7 +267,7 @@ def get_hive_clock_range(is_hive, gpu_index, part):
if is_hive: if is_hive:
try: try:
flag = "--setmemoffset" if part=="mem" else "--setcoreoffset" flag = "--setmemoffset" if part=="mem" else "--setcoreoffset"
cmd = ["bash",'-c',f"PATH={HIVE_PATH} && sudo nvtool -i {gpu_index} {flag} -100000"] cmd = ["bash",'-c',f"nvtool -i 0 {flag} -100000"]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
lines = result.stdout.decode().splitlines() lines = result.stdout.decode().splitlines()
@ -455,16 +292,3 @@ def get_hive_clock_range(is_hive, gpu_index, part):
return False return False
else: else:
return False return False
def get_vram_per_gpu():
vram_per_gpu = []
try:
gpu_count = pynvml.nvmlDeviceGetCount()
for i in range(0,gpu_count):
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(gpu_handle)
vram_per_gpu.append(mem_info.total / 1024 ** 2)
except Exception as e:
log.error(f"Failed loading get_vram_per_gpu() | {e}")
pass
return vram_per_gpu

View File

@ -1,187 +0,0 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import utils
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client"
PARTNER_CONFIG_NAME = "clore_partner.conf"
def generate_openvpn_config(
local_ip='10.1.0.2',
server_ip='10.1.0.1',
server_hostname='example.com',
udp_port=1194,
vpn_secret_key='YOUR_VPN_SECRET_KEY'
):
openvpn_config = f"""nobind
proto udp4
remote {server_hostname} {udp_port}
resolv-retry infinite
auth SHA256
cipher AES-256-CBC
dev {config.openvpn_forwarding_tun_device}
ifconfig {local_ip} {server_ip}
<secret>
-----BEGIN OpenVPN Static key V1-----
{vpn_secret_key}
-----END OpenVPN Static key V1-----
</secret>
fragment 1300
mssfix 1300
sndbuf 524288
rcvbuf 524288
user nobody
group nogroup
ping 15
ping-restart 45
ping-timer-rem
persist-tun
persist-key
verb 0"""
return openvpn_config
async def get_iptables_forward_rules():
code, stdout, stderr = await utils.async_run_command(
f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers"
)
rules = []
if code == 0:
collumns = []
for idx, line in enumerate(stdout.split('\n')):
if "num" in collumns and "target" in collumns and "in" in collumns:
items = line.split(maxsplit=len(collumns)+1)
rule = {}
for idx, name in enumerate(collumns):
rule[name]=items[idx]
rule["desc"] = items[len(collumns)+1]
rules.append(rule)
else:
collumns = line.split()
return rules
async def remove_iptables_rule(rule_dict):
cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables"
if rule_dict.get('target') == 'DNAT':
cmd += " -t nat"
cmd += " -D PREROUTING"
if rule_dict.get('prot') and rule_dict['prot'] != '--':
cmd += f" -p {rule_dict['prot']}"
if rule_dict.get('in') and rule_dict['in'] != '*':
cmd += f" -i {rule_dict['in']}"
if rule_dict.get('out') and rule_dict['out'] != '*':
cmd += f" -o {rule_dict['out']}"
if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0':
cmd += f" -s {rule_dict['source']}"
if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0':
cmd += f" -d {rule_dict['destination']}"
if rule_dict.get('target') == 'DNAT':
if 'dports' in rule_dict.get('desc', ''):
port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0]
if ':' in port_info:
cmd += f" -m multiport --dports {port_info}"
else:
cmd += f" --dport {port_info}"
if 'to:' in rule_dict.get('desc', ''):
dest_ip = rule_dict['desc'].split('to:')[1].split()[0]
cmd += f" -j DNAT --to-destination {dest_ip}"
await utils.async_run_command(cmd)
async def clore_partner_configure(clore_partner_config):
try:
if clore_partner_config:
docker_restart_required = False
needed_openvpn_config = generate_openvpn_config(
local_ip=clore_partner_config["provider"],
server_ip=clore_partner_config["forwarding"],
server_hostname=clore_partner_config["openvpn_host"],
udp_port=clore_partner_config["openvpn_port"],
vpn_secret_key=clore_partner_config["secret"]
)
saved_config=''
config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME))
if config_exists:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file:
saved_config = await file.read()
if saved_config != needed_openvpn_config:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file:
await file.write(needed_openvpn_config)
is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command(
f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
if is_active_code == 0 and saved_config != needed_openvpn_config:
code, stdout, stderr = await utils.async_run_command(
f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
elif is_active_code != 0:
code, stdout, stderr = await utils.async_run_command(
f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}"
)
ip_route_configured = False
if code == 0:
for line in stdout.split('\n'):
items = line.split(' ')
if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items:
ip_route_configured = True
break
if not ip_route_configured:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}"
)
ip_tables_configured = False
rules = await get_iptables_forward_rules()
for rule in rules:
try:
if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]:
ip_tables_configured = True
elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]:
print("REMOVE RULE", rule)
await remove_iptables_rule(rule)
except Exception as ei:
log.error(f"clore_partner_configure() | ei | {ei}")
if ip_tables_configured == False:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}"
)
if docker_restart_required:
async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file:
await file.write("")
os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting
else:
code, stdout, stderr = await utils.async_run_command(
f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
return True
except Exception as e:
log.error(f"FAIL | openvpn.clore_partner_configure | {e}")
return False

View File

@ -1,16 +1,11 @@
from typing import Optional, Tuple, Dict
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import nvml
import subprocess import subprocess
import hashlib import hashlib
import asyncio
import random import random
import string import string
import shutil
import shlex import shlex
import time import time
import math
import json import json
import os import os
@ -46,8 +41,6 @@ def normalize_rule(rule_dict):
def get_auth(): def get_auth():
try: try:
if 'AUTH_TOKEN' in os.environ:
return os.environ['AUTH_TOKEN']
auth_str = '' auth_str = ''
with open(config.auth_file, "r", encoding="utf-8") as file: with open(config.auth_file, "r", encoding="utf-8") as file:
auth_str = file.read().strip() auth_str = file.read().strip()
@ -55,12 +48,6 @@ def get_auth():
except Exception as e: except Exception as e:
return '' return ''
def get_allowed_container_names():
allowed_container_names = os.getenv("ALLOWED_CONTAINER_NAMES")
if type(allowed_container_names)==str and len(allowed_container_names)>0:
return [x for x in allowed_container_names.split(',') if x]
return []
def unix_timestamp(): def unix_timestamp():
return int(time.time()) return int(time.time())
@ -104,127 +91,18 @@ def generate_random_string(length):
characters = string.ascii_letters + string.digits characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length)) return ''.join(random.choice(characters) for _ in range(length))
HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./"
def hive_set_miner_status(enabled=False): def hive_set_miner_status(enabled=False):
### control miner state - OFF/ON ### control miner state - OFF/ON
screen_out = run_command("screen -ls") screen_out = run_command("screen -ls")
miner_screen_running = False miner_screen_running = False
miner_screen_session_pids = []
if screen_out[0] == 0 or screen_out[0] == 1: if screen_out[0] == 0 or screen_out[0] == 1:
screen_lines=screen_out[1].split('\n') screen_lines=screen_out[1].split('\n')
for screen_line in screen_lines: for screen_line in screen_lines:
screen_line_parts=screen_line.replace('\t', '', 1).split('\t') screen_line_parts=screen_line.replace('\t', '', 1).split('\t')
if len(screen_line_parts)>2 and '.' in screen_line_parts[0]: if len(screen_line_parts)>2 and '.' in screen_line_parts[0]:
if screen_line_parts[0].split('.',1)[1]=="miner": if screen_line_parts[0].split('.',1)[1]=="miner":
miner_screen_session_pids.append(screen_line_parts[0].split('.',1)[0])
miner_screen_running=True miner_screen_running=True
if len(miner_screen_session_pids) > 1: ## Something really bad going on, destroy all instances if miner_screen_running and not enabled:
for idx, miner_screen_session_pid in enumerate(miner_screen_session_pids): run_command("miner stop")
run_command(f"kill -9 {miner_screen_session_pid}{' && screen -wipe' if idx==len(miner_screen_session_pids)-1 else ''}")
elif miner_screen_running and not enabled:
run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/bin/miner stop\"")
elif enabled and not miner_screen_running: elif enabled and not miner_screen_running:
run_command(f"/bin/bash -c \"export PATH={HIVE_PATH} && sudo /hive/sbin/nvidia-oc && source ~/.bashrc ; sudo /hive/bin/miner start\"") run_command("nvidia-oc && miner start")
def get_extra_allowed_images():
if os.path.exists(config.extra_allowed_images_file):
try:
with open(config.extra_allowed_images_file, 'r') as file:
content = file.read()
data = json.loads(content)
if isinstance(data, list):
if all(isinstance(item, dict) and set(item.keys()) == {'repository', 'allowed_tags'} and isinstance(item['repository'], str) and isinstance(item['allowed_tags'], list) and all(isinstance(tag, str) for tag in item['allowed_tags']) for item in data):
return data
else:
return []
else:
return []
except Exception as e:
log.error(f"get_extra_allowed_images() | error: {e}")
return []
else:
return []
async def async_run_command(
command: str,
timeout: Optional[float] = None,
env: Optional[Dict[str, str]] = None
) -> Tuple[int, str, str]:
command_env = env if env is not None else {}
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=command_env
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
stdout_str = stdout.decode('utf-8').strip() if stdout else ''
stderr_str = stderr.decode('utf-8').strip() if stderr else ''
return proc.returncode, stdout_str, stderr_str
except asyncio.TimeoutError:
# Handle timeout: terminate the process gracefully first
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5) # Wait for it to exit
except asyncio.TimeoutError:
# Force kill the process if it doesn't terminate
proc.kill()
await proc.wait()
return -1, '', f'Command timed out after {timeout} seconds'
except Exception as e:
return -1, '', str(e)
def get_free_space_mb(path):
"""Get free space in MB for the given path."""
total, used, free = shutil.disk_usage(path)
return free // (1024 * 1024) # Convert bytes to MB
def get_directory_size_mb(path):
"""Get the size of a directory in MB."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
# Skip if the file doesn't exist (symlinks, etc.)
if not os.path.islink(fp) and os.path.exists(fp):
total_size += os.path.getsize(fp)
return total_size // (1024 * 1024) # Convert bytes to MB
class shm_calculator:
def __init__(self, total_ram):
self.total_ram = total_ram
self.gpu_vram_sizes = []
def calculate(self, used_gpu_ids):
assume_ram_utilised = 2500 #MB
default_shm_size = 64 #MB
if len(self.gpu_vram_sizes) == 0:
self.gpu_vram_sizes = nvml.get_vram_per_gpu()
instance_vram_total = 0
total_vram_size = sum(self.gpu_vram_sizes)
for idx, value in enumerate(self.gpu_vram_sizes):
if used_gpu_ids == '*' or idx in used_gpu_ids:
instance_vram_total += value
if instance_vram_total == 0 or total_vram_size == 0:
return default_shm_size
shm_size = instance_vram_total * 1.5 if instance_vram_total * 1.5 < self.total_ram - assume_ram_utilised else (
instance_vram_total/total_vram_size * (self.total_ram - assume_ram_utilised)
)
return math.floor(shm_size if shm_size > default_shm_size else default_shm_size)

View File

@ -1,269 +0,0 @@
# Library to setup XFS partition for docker
from lib import ensure_packages_installed
from lib import logging as logging_lib
from lib import docker_interface
from lib import networking
from lib import get_specs
from lib import utils
from datetime import datetime
import asyncio
import json
import os
log = logging_lib.log
DOCKER_ROOT = "/var/lib/docker"
DOCKER_DATA_IMG = "/opt/clore-hosting/data.img"
HP_LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
HP_MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
GENERIC_LEAVE_FREE_SPACE_MB = 1024*8 # 8 GB
GENERIC_MIN_XFS_PARTITION_SIZE = 1024*10 # 10 GB
XFS_STATE_FILE = "/opt/clore-hosting/xfs_state"
HIGH_PERFORMANCE_GPUS = [
"NVIDIA GeForce RTX 4090",
"NVIDIA GeForce RTX 3090"
]
MANDATORY_PACKAGES = [
"xfsprogs",
"dmidecode",
"openvpn",
"iproute2",
"iputils-ping",
"util-linux"
]
# This code is runned on start of clore hosting to migrate docker to XFS partition system
# sudo fallocate -l 300G /docker-storage.img
# sudo mkfs.xfs /docker-storage.img
# mount -o loop,pquota /docker-storage.img /mnt/docker-storage
def get_to_use_storage_values(max_free_space):
gpu_str, gpu_mem, gpus, nvml_err = get_specs.get_gpu_info()
if nvml_err:
return None, None
try:
gpu_names = []
for gpu in gpus["nvidia"]:
gpu_names.append(gpu["name"])
if len(gpu_names) > 0:
all_gpus_same = all(item == gpu_names[0] for item in gpu_names)
if (all_gpus_same and gpu_names[0] in HIGH_PERFORMANCE_GPUS) or max_free_space > 1024 * 70:
return HP_LEAVE_FREE_SPACE_MB, HP_MIN_XFS_PARTITION_SIZE
else:
return GENERIC_LEAVE_FREE_SPACE_MB, GENERIC_MIN_XFS_PARTITION_SIZE
else:
return "no-gpus", "no-gpus"
except Exception as e:
return None, None
def migrate():
docker_xfs_state = validate_docker_xfs()
if docker_xfs_state == "skip":
migrate_log("skipping migration")
return
elif docker_xfs_state == "valid":
migrate_log("migration succeeded")
return 'success'
packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed(
MANDATORY_PACKAGES
))
if not packages_available:
migrate_log("packages missing")
return 'packages-missing'
root_device = get_specs.get_root_device()
if not root_device:
migrate_log("not supported boot device")
return "not-supported-boot-device"
device_name = os.path.basename(root_device).split('p')[0].rstrip('0123456789')
if get_specs.is_usb_device(device_name):
migrate_log("not supported boot device")
return "not-supported-boot-device"
log.info("Starting migration to xfs")
docker_interface.stop_all_containers()
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
migrate_log("Failed to remove DOCKER_DATA_IMG")
return "failure"
max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT)
leave_free_space, min_xfs_size = get_to_use_storage_values(max_free_space)
if leave_free_space == "no-gpus":
return "nvidia-failure"
if leave_free_space == None:
migrate_log("can't get free space")
return "failure"
data_img_size = int(max_free_space - leave_free_space)
if data_img_size < min_xfs_size:
migrate_log("not enought free space")
return 'not-enough-space'
docker_config_success = False
fstab_config_success = False
code, stdout, stderr = utils.run_command(
f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}"
)
networking.purge_clore_interfaces()
if code == 0:
docker_config_success = configure_docker_daemon()
if code == 0 and docker_config_success:
fstab_config_success = configure_fstab()
if code == 0 and fstab_config_success:
code, stdout, stderr = utils.run_command(
f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a"
)
if code==0:
return 'success'
else:
migrate_log("failed to migrate v1")
configure_docker_daemon(remove=True)
configure_fstab(remove=True)
return 'failure'
else:
utils.run_command(
f"mkdir {DOCKER_ROOT}"
)
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
pass
migrate_log("failed to migrate v2")
return 'failure'
def migrate_log(msg):
log_file_path = "/opt/clore-hosting/migrate-log.txt"
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_message = f"{current_time} | {msg}\n"
with open(log_file_path, "a") as log_file:
log_file.write(log_message)
def validate_docker_xfs():
code_root, stdout_root, stderr_root = utils.run_command("df -T /")
code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}")
#print([code, stderr, stdout])
if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')):
root_blocks = None
docker_root_blocks = None
docker_root_format = ''
for idx, line in enumerate(stdout_root.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
root_blocks = int(line.split()[2])
if code == 1:
docker_root_blocks = root_blocks
else:
for idx, line in enumerate(stdout.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
docker_root_blocks = int(line.split()[2])
docker_root_format = line.split()[1]
if root_blocks == None or docker_root_blocks == None:
return "skip"
elif docker_root_format=="xfs" and root_blocks > docker_root_blocks:
return "valid"
else:
return "default"
else:
return "skip"
def configure_docker_daemon(remove=False):
try:
daemon_json_path = "/etc/docker/daemon.json"
with open(daemon_json_path, 'r') as file:
raw_content = file.read()
daemon_config = json.loads(raw_content)
if remove:
if daemon_config.get("data-root") == DOCKER_ROOT:
del daemon_config["data-root"]
if daemon_config.get("storage-driver") == "overlay2":
del daemon_config["storage-driver"]
elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2":
daemon_config["data-root"] = DOCKER_ROOT
daemon_config["storage-driver"] = "overlay2"
with open(daemon_json_path, 'w') as file:
file.write(json.dumps(daemon_config,indent=4))
return True
except Exception as e:
return False
def configure_fstab(remove=False):
try:
file_path = "/etc/fstab"
mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0"
with open(file_path, 'r') as file:
raw_content = file.read()
if remove:
if mount_line in raw_content:
raw_content = raw_content.replace(f"\n{mount_line}\n", '')
with open(file_path, 'w') as file:
file.write(raw_content)
elif not mount_line in raw_content:
raw_content += f"\n{mount_line}\n"
with open(file_path, 'w') as file:
file.write(raw_content)
return True
except Exception as e:
return False
def init():
try:
if os.path.exists(XFS_STATE_FILE):
with open(XFS_STATE_FILE, 'r') as file:
raw_content = file.read()
if "enabled" in raw_content:
migarion_status = migrate()
if migarion_status == "success":
with open(XFS_STATE_FILE, 'w') as file:
file.write("active")
return "active"
elif migarion_status == "not-enough-space":
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-enough-space")
return 'not-enough-space'
elif migarion_status == "not-supported-boot-device":
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-supported-boot-device")
return 'failed'
elif migarion_status == "nvidia-failure":
return 'failed'
else:
with open(XFS_STATE_FILE, 'w') as file:
file.write("failed-migration")
return 'failed'
elif 'not-enough-space' in raw_content:
return 'not-enough-space'
elif "active" in raw_content:
return "active"
else:
return "failed"
else:
return "disabled"
except Exception as e:
print(e)
return "failed"

View File

@ -7,5 +7,4 @@ psutil==5.9.0
python-iptables==1.0.1 python-iptables==1.0.1
websockets==12.0 websockets==12.0
packaging==23.2 packaging==23.2
clore-pynvml==11.5.4 git+https://git.clore.ai/clore/pynvml.git@main
requests==2.31.0