Merge pull request 'V5.2.9 | XFS, hosting to partner platforms' (#1) from xfs into main

Reviewed-on: #1
This commit is contained in:
clore 2024-12-08 22:24:00 +00:00
commit c7b9047f9b
19 changed files with 1200 additions and 64 deletions

View File

@ -32,7 +32,7 @@ def get_last_ip_occurrence_and_text(input_string):
else: else:
return None, None return None, None
def configure(containers): def configure(containers, partner_forwarding_ips):
valid_containers = [] valid_containers = []
newly_created_networks = [] newly_created_networks = []
containers_required_networks = [] containers_required_networks = []
@ -141,7 +141,7 @@ def configure(containers):
if config.log_containers_strings: if config.log_containers_strings:
print("FROM DOCKER CONFIGURATOR", valid_containers) print("FROM DOCKER CONFIGURATOR", valid_containers)
validation_and_security = docker_interface.validate_and_secure_networks() validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips)
if startup_sctipt_creation_fail: if startup_sctipt_creation_fail:
validation_and_security=False validation_and_security=False
return validation_and_security, valid_containers, use_hive_flightsheet return validation_and_security, valid_containers, use_hive_flightsheet

View File

@ -4,7 +4,10 @@ from lib import log_streaming_task
from lib import run_startup_script from lib import run_startup_script
from lib import hive_miner_interface from lib import hive_miner_interface
from lib import docker_interface from lib import docker_interface
from lib import background_job
from lib import docker_deploy from lib import docker_deploy
from lib import clore_partner
from lib import clore_partner_socket
from lib import docker_pull from lib import docker_pull
from lib import get_specs from lib import get_specs
from lib import utils from lib import utils
@ -34,17 +37,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_
#print(config) #print(config)
async def configure_networks(containers): async def configure_networks(containers, partner_forwarding_ips):
res = await asyncio.to_thread(docker_configurator.configure, containers) res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips)
try: try:
fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2])
return fin_res return fin_res
except Exception as e: except Exception as e:
return False return False
async def deploy_containers(validated_containers, allowed_running_containers): async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads):
try: try:
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers) all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads)
return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
except Exception as e: except Exception as e:
return False return False
@ -70,8 +73,10 @@ async def set_hive_miner_status(enabled=False):
return False return False
class CloreClient: class CloreClient:
def __init__(self, auth_key): def __init__(self, auth_key, xfs_state):
self.auth_key=auth_key self.auth_key=auth_key
self.xfs_state = xfs_state
self.ws_peers = {} self.ws_peers = {}
self.last_checked_ws_peers=0 self.last_checked_ws_peers=0
self.containers={} self.containers={}
@ -99,9 +104,11 @@ class CloreClient:
"container_log_streaming_service": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(),
"specs_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(),
"oc_service": utils.unix_timestamp(), "oc_service": utils.unix_timestamp(),
"background_pow_data_collection": utils.unix_timestamp() "background_pow_data_collection": utils.unix_timestamp(),
"partner_service": utils.unix_timestamp()
} }
self.max_service_inactivity = 600 # seconds self.max_service_inactivity = 600 # seconds
self.no_restart_services = ["partner_service"] # Services that are allowed to run indefinetly without triggering the app to restart
if config.debug_ws_peer: if config.debug_ws_peer:
self.ws_peers[str(config.debug_ws_peer)]={ self.ws_peers[str(config.debug_ws_peer)]={
@ -137,6 +144,10 @@ class CloreClient:
self.hive_miner_interface = hive_miner_interface.hive_interface() self.hive_miner_interface = hive_miner_interface.hive_interface()
self.next_pow_background_job_send_update = 0 self.next_pow_background_job_send_update = 0
self.clore_partner_initiazized = False
self.partner_forwarding_ips = []
self.start_time = utils.unix_timestamp()
async def service(self): async def service(self):
global container_log_broken global container_log_broken
@ -151,10 +162,11 @@ class CloreClient:
task6 = asyncio.create_task(self.specs_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring))
task7 = asyncio.create_task(self.oc_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring))
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring)) task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
task9 = asyncio.create_task(self.partner_service(monitoring))
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
# Wait for both tasks to complete (they won't in this case) # Wait for both tasks to complete (they won't in this case)
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task)
async def monitoring_service(self, monitoring): async def monitoring_service(self, monitoring):
while True: while True:
@ -169,13 +181,14 @@ class CloreClient:
if config.debug: if config.debug:
log.success(self.last_service_heartbeat) log.success(self.last_service_heartbeat)
for service_name in self.last_service_heartbeat.keys(): for service_name in self.last_service_heartbeat.keys():
last_hearthbeat = self.last_service_heartbeat[service_name] if not service_name in self.no_restart_services:
if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache": last_hearthbeat = self.last_service_heartbeat[service_name]
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache":
os._exit(1) log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache": os._exit(1)
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...") elif last_hearthbeat < utils.unix_timestamp()-config.maximum_service_loop_time and service_name!="handle_container_cache":
os._exit(1) log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
os._exit(1)
except Exception as e: except Exception as e:
log.debug(f"monitoring_service() | ERROR | {e}") log.debug(f"monitoring_service() | ERROR | {e}")
await asyncio.sleep(5) await asyncio.sleep(5)
@ -260,6 +273,7 @@ class CloreClient:
if len(self.p_needed_containers)>0: if len(self.p_needed_containers)>0:
local_images = await get_local_images(no_latest_tag=True) local_images = await get_local_images(no_latest_tag=True)
partner_images = await clore_partner.get_partner_allowed_images()
for local_image in local_images: for local_image in local_images:
self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()}
image_needed = False image_needed = False
@ -283,11 +297,32 @@ class CloreClient:
if local_image_tag==allowed_tag or allowed_tag=='*': if local_image_tag==allowed_tag or allowed_tag=='*':
image_needed=True image_needed=True
break break
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers: if not image_needed and type(partner_images) == list:
for partner_image in partner_images:
if local_image.replace(':latest', '') == partner_image.replace(':latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if len(local_image.split('/')) >= 3:
partner_image_spl = partner_image.split(':')
image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1]
if len(partner_image_spl) == 1:
if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest":
image_needed = True
del self.last_pull_progress[local_image]
break
elif len(partner_image_spl) == 2:
if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None:
log.success(f"GOING TO REMOVE {local_image}")
with concurrent.futures.ThreadPoolExecutor() as pool: with concurrent.futures.ThreadPoolExecutor() as pool:
r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image)
if r: if r:
removed_cnt+=1 removed_cnt+=1
del self.last_pull_progress[local_image]
#if config.debug: #if config.debug:
# log.success(f"{local_image} | {image_needed}") # log.success(f"{local_image} | {image_needed}")
@ -327,7 +362,7 @@ class CloreClient:
try: try:
await pull_task await pull_task
except Exception as e: except Exception as e:
self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()} self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()}
log_task.cancel() log_task.cancel()
try: try:
await log_task await log_task
@ -358,11 +393,18 @@ class CloreClient:
container_conf = WebSocketClient.get_containers() container_conf = WebSocketClient.get_containers()
can_run_partner_workloads = False
if container_conf[0]: if container_conf[0]:
self.containers_set=True self.containers_set=True
self.containers=container_conf[1] self.containers=container_conf[1]
tmp_images = [] tmp_images = []
for container in self.containers:
is_order_spot = False
for idx, container in enumerate(self.containers):
if "spot" in container:
is_order_spot = True
if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet":
log_pull = False log_pull = False
if "name" in container: if "name" in container:
@ -386,6 +428,9 @@ class CloreClient:
if not image_config in tmp_images: if not image_config in tmp_images:
tmp_images.append(image_config) tmp_images.append(image_config)
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
if self.restart_docker and not running_order and len(self.containers)>0: if self.restart_docker and not running_order and len(self.containers)>0:
log.debug("Sending docker restart command") log.debug("Sending docker restart command")
utils.run_command_v2("systemctl restart docker") utils.run_command_v2("systemctl restart docker")
@ -400,14 +445,14 @@ class CloreClient:
tasks.append(api_interface.get_server_config()) tasks.append(api_interface.get_server_config())
if self.containers_set: if self.containers_set:
tasks.append(configure_networks(self.containers)) tasks.append(configure_networks(self.containers, self.partner_forwarding_ips))
tasks.append(WebSocketClient.stream_pull_logs()) tasks.append(WebSocketClient.stream_pull_logs())
if self.validated_containers_set: if self.validated_containers_set:
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers)) tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads))
if step==1: if step==1:
WebSocketClient.set_auth(self.auth_key) WebSocketClient.set_auth(self.auth_key, self.xfs_state)
asyncio.create_task(WebSocketClient.run()) asyncio.create_task(WebSocketClient.run())
elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval):
log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds")
@ -427,6 +472,11 @@ class CloreClient:
if result.success: if result.success:
self.last_checked_ws_peers = utils.unix_timestamp() self.last_checked_ws_peers = utils.unix_timestamp()
self.allowed_images=result.allowed_images+self.extra_allowed_images self.allowed_images=result.allowed_images+self.extra_allowed_images
if self.xfs_state == "active":
self.allowed_images.append({
"repository": "vastai/test",
"allowed_tags": ["bandwidth-test-nvidia"]
})
if not config.debug_ws_peer: if not config.debug_ws_peer:
for pure_ws_peer in result.ws_peers: for pure_ws_peer in result.ws_peers:
self.ws_peers[pure_ws_peer]={ self.ws_peers[pure_ws_peer]={
@ -455,7 +505,7 @@ class CloreClient:
async def submit_specs(self, current_specs): async def submit_specs(self, current_specs):
try: try:
if type(current_specs) == dict: if type(current_specs) == dict:
current_specs["backend_version"]=18 current_specs["backend_version"]=19
current_specs["update_hw"]=True current_specs["update_hw"]=True
smallest_pcie_width = 999 smallest_pcie_width = 999
for gpu in current_specs["gpus"]["nvidia"]: for gpu in current_specs["gpus"]["nvidia"]:
@ -504,7 +554,7 @@ class CloreClient:
await monitoring.put("oc_service") await monitoring.put("oc_service")
oc_apply_allowed = True oc_apply_allowed = True
### OC Service should also hande Hive stuff ### OC Service should also hande Hive stuff
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries: if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled():
await set_hive_miner_status(True) await set_hive_miner_status(True)
oc_apply_allowed = False # Don't apply any OC when running HiveOS miner oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
elif self.is_hive and not self.dont_use_hive_binaries: elif self.is_hive and not self.dont_use_hive_binaries:
@ -544,6 +594,30 @@ class CloreClient:
log.debug(f"FAIL | background_pow_data_collection() | {e}") log.debug(f"FAIL | background_pow_data_collection() | {e}")
await asyncio.sleep(6) await asyncio.sleep(6)
async def partner_service(self, monitoring):
while True:
try:
await monitoring.put("partner_service")
if self.start_time < utils.unix_timestamp() - 180:
forwarding_latency_measurment = await clore_partner.measure_forwarding_latency()
if type(forwarding_latency_measurment) == list:
await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment)
partner_config = WebSocketClient.get_clore_partner_config()
if partner_config != None:
if self.clore_partner_initiazized == False:
ir = await clore_partner.initialize()
if ir:
self.clore_partner_initiazized = True
if self.clore_partner_initiazized == True:
if 'provider' in partner_config and 'forwarding' in partner_config:
self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']]
else:
self.partner_forwarding_ips = []
await clore_partner.configure(partner_config)
except Exception as e:
log.debug(f"FAIL | partner_service() | {e}")
await asyncio.sleep(6)
def expire_ws_peers(self): def expire_ws_peers(self):
for ws_peer_address in list(self.ws_peers.keys()): for ws_peer_address in list(self.ws_peers.keys()):
ws_peer_info = self.ws_peers[ws_peer_address] ws_peer_info = self.ws_peers[ws_peer_address]

View File

@ -1,4 +1,5 @@
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from lib import clore_partner
import asyncio import asyncio
import random import random
import websockets import websockets
@ -31,6 +32,7 @@ class WebSocketClient:
self.connected = False self.connected = False
self.authorized = False self.authorized = False
self.auth = auth self.auth = auth
self.xfs_state = None
self.log_auth_fail = True self.log_auth_fail = True
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
self.containers={} self.containers={}
@ -51,15 +53,30 @@ class WebSocketClient:
self.last_gpu_oc_specs = [] self.last_gpu_oc_specs = []
self.last_set_oc = {} self.last_set_oc = {}
self.clore_partner_config = None
self.forwarding_latency_measurment = None
def get_last_heartbeat(self): def get_last_heartbeat(self):
return self.last_heartbeat return self.last_heartbeat
def get_containers(self): def get_containers(self):
return self.containers_set, self.containers partner_container_config = clore_partner.get_partner_container_config()
return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers)
def get_oc(self): def get_oc(self):
return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc
def get_clore_partner_config(self):
return self.clore_partner_config
async def set_forwarding_latency_measurment(self, forwarding_latency_measurment):
await self.send(json.dumps(
{
"forwarding_latency_measurment": forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = forwarding_latency_measurment
def set_ws_peers(self, ws_peers): def set_ws_peers(self, ws_peers):
tmp_ws_peers=[] tmp_ws_peers=[]
for ws_peer in list(ws_peers.keys()): for ws_peer in list(ws_peers.keys()):
@ -68,8 +85,9 @@ class WebSocketClient:
self.ws_peers = tmp_ws_peers self.ws_peers = tmp_ws_peers
def set_auth(self, auth): def set_auth(self, auth, xfs_state):
self.auth=auth self.auth=auth
self.xfs_state=xfs_state
def set_pull_logs(self, pull_logs): def set_pull_logs(self, pull_logs):
self.pull_logs=pull_logs self.pull_logs=pull_logs
@ -93,7 +111,9 @@ class WebSocketClient:
log.debug(f"CLOREWS | Connected to {random_ws_peer}") log.debug(f"CLOREWS | Connected to {random_ws_peer}")
await self.send(json.dumps({ await self.send(json.dumps({
"login":str(self.auth), "login":str(self.auth),
"type":"python" "xfs_state": self.xfs_state,
"type":"python",
"clore_partner_support": True
})) }))
except Exception as e: except Exception as e:
log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}") log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}")
@ -136,6 +156,16 @@ class WebSocketClient:
pass pass
elif message=="KEEPALIVE": elif message=="KEEPALIVE":
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
try:
if self.forwarding_latency_measurment:
await self.send(json.dumps(
{
"forwarding_latency_measurment": self.forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = None
except Exception as e:
pass
elif message=="NEWER_LOGIN" or message=="WAIT": elif message=="NEWER_LOGIN" or message=="WAIT":
await self.close_websocket() await self.close_websocket()
elif message[:10]=="PROVEPULL;": elif message[:10]=="PROVEPULL;":
@ -148,13 +178,16 @@ class WebSocketClient:
else: else:
try: try:
parsed_json = json.loads(message) parsed_json = json.loads(message)
if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list: if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict:
self.clore_partner_config = parsed_json["partner_config"]
await self.send(json.dumps({"partner_config":parsed_json["partner_config"]}))
elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list:
self.last_heartbeat = clore_utils.unix_timestamp() self.last_heartbeat = clore_utils.unix_timestamp()
container_str = json.dumps({"containers":parsed_json["new_containers"]}) container_str = json.dumps({"containers":parsed_json["new_containers"]})
await self.send(container_str) await self.send(container_str)
if len(parsed_json["new_containers"]) > 0: # There should be at least one container if len(parsed_json["new_containers"]) > 0: # There should be at least one container
self.containers_set = True self.containers_set = True
self.containers=parsed_json["new_containers"] self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"])
#log.success(container_str) #log.success(container_str)
elif "allow_oc" in parsed_json: # Enable OC elif "allow_oc" in parsed_json: # Enable OC
self.oc_enabled=True self.oc_enabled=True

View File

@ -1,5 +1,6 @@
from lib import config as config_module from lib import config as config_module
from lib import init_server from lib import init_server
from lib import xfs
from lib import utils from lib import utils
from clore_hosting import main as clore_hosting from clore_hosting import main as clore_hosting
import asyncio, os import asyncio, os
@ -29,7 +30,15 @@ elif config.reset:
log.success("Client login reseted") log.success("Client login reseted")
elif config.service: elif config.service:
if len(auth)==32+48+1: if len(auth)==32+48+1:
clore_client = clore_hosting.CloreClient(auth_key=auth) utils.run_command("sysctl -w net.ipv4.ip_forward=1")
xfs_state = xfs.init()
if os.path.isfile(config.restart_docker_flag_file):
utils.run_command("systemctl restart docker")
os.remove(config.restart_docker_flag_file)
clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state)
asyncio.run(clore_client.service()) asyncio.run(clore_client.service())
else: else:
print("TODO: Firstly config auth") print("TODO: Firstly config auth")

22
lib/background_job.py Normal file
View File

@ -0,0 +1,22 @@
import time
import re
disabled_till = 0
def is_background_job_container_name(string):
if type(string) != str:
return False
pattern = r"^clore-default-\d+$"
return bool(re.match(pattern, string))
def temporarly_disable(seconds):
global disabled_till
disabled_till = time.time() + seconds
def enable():
global disabled_till
disabled_till=0
def is_enabled():
global disabled_till
return True if disabled_till < time.time() else False

238
lib/clore_partner.py Normal file
View File

@ -0,0 +1,238 @@
from lib import ensure_packages_installed
from lib import config as config_module
from lib import logging as logging_lib
from lib import clore_partner_socket
from lib import latency_test
from lib import openvpn
from lib import utils
import asyncio
import random
import json
import time
import re
import os
import aiofiles.os
from aiohttp import ClientSession, ClientTimeout
config = config_module.config
log = logging_lib.log
MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2']
DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload"
host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts")
partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache")
next_ensupe_packages_check = 0
is_socket_running = False
partner_container_config = None
async def initialize():
global next_ensupe_packages_check
global is_socket_running
try:
await aiofiles.os.makedirs(host_facts_location, exist_ok=True)
await aiofiles.os.makedirs(partner_cache_location, exist_ok=True)
await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True)
if not is_socket_running:
is_socket_running=True
asyncio.create_task(clore_partner_socket.socket_service(
location=os.path.join(host_facts_location, "partner_interface.socket")
))
if next_ensupe_packages_check < time.time():
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr
if not success:
return False
elif next_ensupe_packages_check != float('inf'):
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode -t 2 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
return True
except Exception as e:
log.error(f"FAIL | clore_partner.initialize | {e}")
return False
async def get_partner_allowed_images():
try:
file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json"))
if not file_exists:
return []
images = []
async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file:
content = await file.read()
containers = json.loads(content)
for container in containers:
image = container.get("Config", {}).get("Image", None)
if image and not image in images:
images.append(image)
return images
except Exception as e:
return None
def validate_partner_container_name(name):
if type(name) != str:
return False
elif name==config.clore_partner_container_name:
return True
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
def validate_partner_workload_container_name(name):
if type(name) != str:
return False
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
last_openvpn_config = None
def get_partner_container_config():
global partner_container_config
return partner_container_config
async def configure(partner_config):
global last_openvpn_config
global partner_container_config
if last_openvpn_config != partner_config:
partner_container_config = {
"image": partner_config["partner_image"],
"name": config.clore_partner_container_name,
"hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}",
"env": {
"AUTH": partner_config['partner_id'],
"ip_addr": partner_config['openvpn_host'],
"port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}'
},
"volumes": {
f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"},
f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"},
f"/var/lib/docker": {"bind": "/var/lib/docker"},
f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"}
},
"gpus": True,
"command": '',
"network": "clore-partner-br0",
"ip": "172.19.0.254",
"cap_add": ["SYS_ADMIN"],
"devices": ["/dev/fuse"],
#"security_opt": ["apparmor:unconfined"],
"ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"],
}
r = await openvpn.clore_partner_configure(partner_config)
if r:
last_openvpn_config = partner_config
# -----------------------------------------
next_latency_measurment = 0
async def fetch_forwarding_nodes():
url = "https://api.clore.ai/v1/get_relay"
timeout = ClientTimeout(total=30)
async with ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
return data
except Exception as e:
print(f"An error occurred: {e}")
return None
async def set_next_latency_measurment(ts):
global next_latency_measurment
try:
next_latency_measurment=ts
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file:
await file.write(str(ts))
except Exception as e:
pass
async def measure_forwarding_latency():
global next_latency_measurment
if next_latency_measurment > time.time():
return False
try:
await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True)
file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"))
if file_exists:
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file:
content = await file.read()
if content.isdigit():
next_latency_measurment = int(content)
if next_latency_measurment < time.time():
node_info = await fetch_forwarding_nodes()
if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0:
to_test_nodes = []
ip_to_region = {}
valid_regions = []
for node_region in node_info.get("nodes").keys():
nodes_ip_list = node_info.get("nodes")[node_region]
if type(nodes_ip_list) == list and len(nodes_ip_list) > 0:
to_test_nodes = to_test_nodes + nodes_ip_list
for node_ip in nodes_ip_list:
ip_to_region[node_ip]=node_region
if len(to_test_nodes) > 0:
measurment_result = await latency_test.measure_latency_icmp(to_test_nodes)
if measurment_result:
for idx, res in enumerate(measurment_result):
if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions:
valid_regions.append(ip_to_region.get(res["host"]))
measurment_result[idx]["region"] = ip_to_region.get(res["host"])
if len(valid_regions) == len(ip_to_region.keys()):
await set_next_latency_measurment(int(
time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded
))
return measurment_result
else:
await set_next_latency_measurment(int(
time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable
))
else:
await set_next_latency_measurment(int(
time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet)
))
return False
else:
await set_next_latency_measurment(int(
time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format
))
return False
return False
except Exception as e:
return False
def filter_partner_dummy_workload_container(containers):
try:
remaining_containers = []
for container in containers:
if container["image"] != DUMMY_WORKLOAD_CONTAINER:
remaining_containers.append(container)
return remaining_containers
except Exception as e:
return containers

View File

@ -0,0 +1,94 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import background_job
from lib import utils
import asyncio
import json
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}']
allowed_commands = ["df"]
can_deploy = True
async def remove_socket_file(path):
try:
file_exists = await aiofiles.os.path.exists(path)
if file_exists:
await aiofiles.os.remove(path)
except Exception:
pass
async def handle_client(reader, writer):
global can_deploy
try:
while True:
data = await reader.read(1024*64)
try:
if not data:
break
#print("DATA", data, data.decode())
parsed_data = json.loads(data.decode())
if "run_command" in parsed_data and type(parsed_data["run_command"])==str:
allowed = False
for allowed_command in allowed_commands:
if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]:
allowed = True
break
if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars):
allowed = False
log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}")
if allowed:
code, stdout, stderr = await utils.async_run_command(
parsed_data["run_command"]
)
writer.write(json.dumps({
"code": code,
"stderr": stderr,
"stdout": stdout
}).encode())
else:
writer.write(json.dumps({
"code": -1,
"stderr": 'Command not allowed',
"stdout": 'Command not allowed'
}).encode())
elif "can_deploy" in parsed_data:
writer.write(json.dumps({
"can_deploy": can_deploy
}).encode())
elif "stop_background_job" in parsed_data and "time" in parsed_data:
try:
if isinstance(parsed_data["time"], int):
background_job.temporarly_disable(parsed_data["time"])
except Exception as e:
pass
else:
writer.write('?'.encode())
await writer.drain()
except Exception as data_exception:
pass
break
except asyncio.CancelledError:
log.debug(f"clore partner socket | Client handler canceled.")
finally:
log.debug(f"clore partner socket | Closing client connection.")
writer.close()
await writer.wait_closed()
def set_can_deploy(state):
global can_deploy
can_deploy = state
async def socket_service(location):
await remove_socket_file(location)
server = await asyncio.start_unix_server(handle_client, path=location)
log.debug(f"clore partner socket | running at {location}")
async with server:
await server.serve_forever()

View File

@ -9,6 +9,11 @@ hard_config = {
"name": "clore-br0", "name": "clore-br0",
"subnet": "172.18.0.0/16", "subnet": "172.18.0.0/16",
"gateway": "172.18.0.1" "gateway": "172.18.0.1"
},
{
"name": "clore-partner-br0",
"subnet": "172.19.0.0/20",
"gateway": "172.19.0.1"
} }
], ],
"run_iptables_with_sudo":True, "run_iptables_with_sudo":True,
@ -33,7 +38,11 @@ hard_config = {
"maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app
"maximum_pull_service_loop_time": 14400, # Exception for image pulling "maximum_pull_service_loop_time": 14400, # Exception for image pulling
"creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk
"allow_mixed_gpus": True "allow_mixed_gpus": True,
"openvpn_forwarding_tun_device": "tun1313",
"forwarding_ip_route_table_id": 100,
"clore_partner_container_name": "clore-partner-service",
"restart_docker_flag_file": "/opt/clore-hosting/.restart_docker"
} }
parser = argparse.ArgumentParser(description='Example argparse usage') parser = argparse.ArgumentParser(description='Example argparse usage')
@ -50,6 +59,7 @@ parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hostin
parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)")
parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes")
parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software") parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software")
parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner")
# Parse arguments, ignoring any non-defined arguments # Parse arguments, ignoring any non-defined arguments
args, _ = parser.parse_known_args() args, _ = parser.parse_known_args()

View File

@ -28,6 +28,14 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
for cap in container_options["cap_add"]: for cap in container_options["cap_add"]:
command.extend(["--cap-add", cap]) command.extend(["--cap-add", cap])
if "devices" in container_options:
for device in container_options["devices"]:
command.extend(["--device", device])
if "security_opt" in container_options:
for security_opt in container_options["security_opt"]:
command.extend(["--security-opt", security_opt])
if "volumes" in container_options: if "volumes" in container_options:
for volume_host, volume_container in container_options["volumes"].items(): for volume_host, volume_container in container_options["volumes"].items():
bind = f"{volume_host}:{volume_container['bind']}" bind = f"{volume_host}:{volume_container['bind']}"
@ -75,6 +83,8 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if ip: if ip:
command.extend(["--ip", ip]) command.extend(["--ip", ip])
command.append('--stop-timeout')
command.append('0')
command.append(container_options["image"]) command.append(container_options["image"])
try: try:

View File

@ -1,7 +1,9 @@
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import docker_cli_wrapper from lib import docker_cli_wrapper
from lib import background_job
from lib import docker_interface from lib import docker_interface
from lib import clore_partner
from lib import get_specs from lib import get_specs
from lib import utils from lib import utils
import docker import docker
@ -14,7 +16,7 @@ client = docker_interface.client
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
def deploy(validated_containers, allowed_running_containers=[]): def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False):
local_images = docker_interface.get_local_images() local_images = docker_interface.get_local_images()
all_containers = docker_interface.get_containers(all=True) all_containers = docker_interface.get_containers(all=True)
@ -67,7 +69,9 @@ def deploy(validated_containers, allowed_running_containers=[]):
'tty': True, 'tty': True,
'network_mode': 'clore-br0', 'network_mode': 'clore-br0',
'cap_add': [], 'cap_add': [],
'volumes': {}, 'devices': [],
'security_opt': [],
'volumes': validated_container["volumes"] if "volumes" in validated_container else {},
'ports': {}, 'ports': {},
'device_requests': [], 'device_requests': [],
'environment': validated_container["env"] if "env" in validated_container else {}, 'environment': validated_container["env"] if "env" in validated_container else {},
@ -80,6 +84,15 @@ def deploy(validated_containers, allowed_running_containers=[]):
) )
} }
if "security_opt" in validated_container:
container_options["security_opt"] = validated_container["security_opt"]
if "devices" in validated_container:
container_options["devices"] = validated_container["devices"]
if "cap_add" in validated_container:
container_options["cap_add"] = validated_container["cap_add"]
if "hostname" in validated_container: if "hostname" in validated_container:
container_options["hostname"]=validated_container["hostname"] container_options["hostname"]=validated_container["hostname"]
elif "clore-order-" in validated_container["name"]: elif "clore-order-" in validated_container["name"]:
@ -136,7 +149,7 @@ def deploy(validated_containers, allowed_running_containers=[]):
container_options["shm_size"] = f"{SHM_SIZE}m" container_options["shm_size"] = f"{SHM_SIZE}m"
if not validated_container["name"] in created_container_names and image_ready: if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])):
if config.creation_engine == "wrapper": if config.creation_engine == "wrapper":
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus)
else: else:
@ -159,7 +172,10 @@ def deploy(validated_containers, allowed_running_containers=[]):
all_running_container_names.append(container.name) all_running_container_names.append(container.name)
else: else:
all_stopped_container_names.append(container.name) all_stopped_container_names.append(container.name)
if container.name in needed_running_names and container.status != 'running': if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled():
if container.status == "running":
container.stop()
elif container.name in needed_running_names and container.status != 'running':
try: try:
attached_networks = container.attrs['NetworkSettings']['Networks'] attached_networks = container.attrs['NetworkSettings']['Networks']
if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container
@ -174,17 +190,22 @@ def deploy(validated_containers, allowed_running_containers=[]):
container.stop() container.stop()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running': elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name):
try: try:
container.stop() container.stop()
container.remove() container.remove()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers: elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name):
try: try:
container.remove() container.remove()
except Exception as e: except Exception as e:
pass pass
elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name):
try:
container.stop()
except Exception as e:
pass
return all_running_container_names, all_stopped_container_names return all_running_container_names, all_stopped_container_names
#print(validated_containers) #print(validated_containers)

View File

@ -11,6 +11,13 @@ from typing import List, Optional
import docker import docker
import json import json
import os import os
import re
partner_bridge_subnet = ''
for clore_network in config.clore_default_networks:
if clore_network["name"] == "clore-partner-br0":
partner_bridge_subnet = clore_network["subnet"]
try: try:
os.makedirs(config.startup_scripts_folder, exist_ok=True) os.makedirs(config.startup_scripts_folder, exist_ok=True)
@ -59,6 +66,18 @@ def get_info():
except Exception as e: except Exception as e:
return {} return {}
def stop_all_containers():
try:
# List all containers
containers = client.containers.list(all=True) # Use all=True to include stopped containers
for container in containers:
log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})")
container.stop() # Stop the container
log.success("stop_all_containers() | All containers have been stopped.")
except Exception as e:
log.error(f"stop_all_containers() |An error occurred: {e}")
return True
def check_docker_connection(): def check_docker_connection():
try: try:
client.ping() client.ping()
@ -95,7 +114,7 @@ def get_local_images(no_latest_tag=False):
return image_list return image_list
except Exception as e: except Exception as e:
log.error(f"DOCKER | Can't get local images | {e}") log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}")
os._exit(1) os._exit(1)
def get_containers(all=False): def get_containers(all=False):
@ -184,7 +203,8 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
gateway=gateway gateway=gateway
)] )]
), ),
check_duplicate=True check_duplicate=True,
#options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {}
) )
log.debug(f"Network {network_name} created successfully.") log.debug(f"Network {network_name} created successfully.")
return True return True
@ -192,7 +212,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
log.error(f"DOCKER | Failed to create network {network_name}: {e}") log.error(f"DOCKER | Failed to create network {network_name}: {e}")
return False return False
def validate_and_secure_networks(): def validate_and_secure_networks(partner_forwarding_ips):
try: try:
failed_appending_iptables_rule = False failed_appending_iptables_rule = False
@ -238,6 +258,13 @@ def validate_and_secure_networks():
#print(this_ipv4_range) #print(this_ipv4_range)
outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) outside_ranges_ip_network = networking.exclude_network(this_ipv4_range)
if this_ipv4_range == partner_bridge_subnet:
for partner_forwarding_ip in partner_forwarding_ips:
outside_ranges = []
for ip_range in outside_ranges_ip_network:
outside_ranges.append(str(ip_range))
outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges)
outside_ranges = [] outside_ranges = []
for outside_range_ip_network in outside_ranges_ip_network: for outside_range_ip_network in outside_ranges_ip_network:
outside_ranges.append(str(outside_range_ip_network)) outside_ranges.append(str(outside_range_ip_network))
@ -265,7 +292,7 @@ def validate_and_secure_networks():
succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) succesfully_appended = networking.add_iptables_rule(needed_iptables_rule)
if not succesfully_appended: if not succesfully_appended:
failed_appending_iptables_rule = True failed_appending_iptables_rule = True
else: elif this_ipv4_range != partner_bridge_subnet:
needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name) needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name)
for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule
for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule))
@ -413,3 +440,7 @@ def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"):
return False return False
else: else:
return False return False
def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,
pattern = r'^[a-z]+_[a-z]+$'
return re.match(pattern, container_name) is not None

View File

@ -0,0 +1,71 @@
from lib import logging as logging_lib
from typing import List
from lib import utils
import time
log = logging_lib.log
async def ensure_packages_installed(
packages: List[str] = [],
total_timeout: float = 300
) -> bool:
non_interactive_env = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
}
start_time = time.time()
packages_to_install = []
for package in packages:
check_cmd = f"dpkg -s {package} > /dev/null 2>&1"
return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env)
if return_code != 0:
packages_to_install.append(package)
if not packages_to_install:
log.debug("All packages are already installed.")
return True
update_cmd = (
"apt-get update "
"-y "
"--no-install-recommends"
)
return_code, stdout, stderr = await utils.async_run_command(
update_cmd,
timeout=None if total_timeout == None else 180,
env=non_interactive_env
)
if return_code != 0:
log.error(f"Failed to update package lists: {stderr}")
return False
install_cmd = (
"apt-get install "
"-y "
"--no-install-recommends "
"--assume-yes "
"-o Dpkg::Options::='--force-confdef' " # Default to existing config
"-o Dpkg::Options::='--force-confold' " # Keep existing config
f"{' '.join(packages_to_install)}"
)
# Calculate remaining timeout
remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time))
# Install packages
return_code, stdout, stderr = await utils.async_run_command(
install_cmd,
timeout=remaining_timeout,
env=non_interactive_env
)
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
else:
log.error(f"Failed to install packages: {stderr}")
return False

View File

@ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat
from pydantic import BaseModel, Field, constr from pydantic import BaseModel, Field, constr
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from lib import docker_interface from lib import docker_interface
from typing import Dict, List from typing import Dict, List, Optional
from lib import utils from lib import utils
import subprocess import subprocess
import speedtest import speedtest
@ -311,7 +311,7 @@ def get_gpu_info():
class DockerDaemonConfig(BaseModel): class DockerDaemonConfig(BaseModel):
data_root: str = Field(alias="data-root") data_root: str = Field(alias="data-root")
storage_driver: str = Field(alias="storage-driver") storage_driver: str = Field(alias="storage-driver")
storage_opts: List[str] = Field(alias="storage-opts") storage_opts: Optional[List[str]] = Field(alias="storage-opts")
class Specs: class Specs:
def __init__(self): def __init__(self):
@ -336,26 +336,14 @@ class Specs:
else: else:
overlay_total_size=None overlay_total_size=None
disk_type="" disk_type=""
disk_usage_source_path = '/'
try: try:
validated_config = DockerDaemonConfig(**docker_daemon_config) if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config:
disk_udevadm = get_disk_udevadm(validated_config.data_root) disk_usage_source_path = docker_daemon_config["data-root"]
for udevadm_line in disk_udevadm.split('\n'):
try:
key, value=udevadm_line.split('=',1)
if "id_model" in key.lower():
disk_type=value[:24]
elif "devpath" in key.lower() and "/virtual/" in value:
disk_type="Virtual"
except Exception as e_int:
pass
for storage_opt in validated_config.storage_opts:
if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]:
numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4)
overlay_total_size=numeric_size
except Exception as e: except Exception as e:
pass pass
if overlay_total_size==None: if overlay_total_size==None:
total, used, free = shutil.disk_usage("/") total, used, free = shutil.disk_usage(disk_usage_source_path)
disk_udevadm = get_disk_udevadm("/") disk_udevadm = get_disk_udevadm("/")
for udevadm_line in disk_udevadm.split('\n'): for udevadm_line in disk_udevadm.split('\n'):
try: try:

66
lib/latency_test.py Normal file
View File

@ -0,0 +1,66 @@
from lib import ensure_packages_installed
from lib import utils
import asyncio
import socket
import re
MANDATORY_PACKEGES = ['iputils-ping']
def is_valid_ipv4_or_hostname(string):
def is_valid_ipv4(ip):
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
hostname_regex = re.compile(
r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
)
def is_valid_hostname(hostname):
if len(hostname) > 255:
return False
return all(hostname_regex.match(part) for part in hostname.split("."))
return is_valid_ipv4(string) or is_valid_hostname(string)
async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4):
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
if success:
outcome = []
concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)]
for concurent_job in concurent_jobs:
tasks = []
for host in concurent_job:
if is_valid_ipv4_or_hostname(host):
tasks.append(
utils.async_run_command(
f"ping -c {str(count)} -W 2 -i 0.3 {host}",
20 * count,
{"LANG": "C", "LC_ALL": "C"}
)
)
if len(tasks) > 0:
r = await asyncio.gather(*tasks)
try:
for output in r:
results = []
code, stdout, stderr = output
if code == 0:
for line in stdout.split('\n'):
match = re.search(r"time=(\d+\.?\d*) ms", line)
if match:
results.append(float(match.group(1)))
outcome.append(
{
"host": hosts[len(outcome)],
"received": len(results),
"avg_latency": sum(results) / len(results) if len(results)>0 else 0
}
)
except Exception as e:
print(e)
return outcome
else:
return False

View File

@ -1,6 +1,7 @@
from lib import docker_interface from lib import docker_interface
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import clore_partner
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
@ -29,7 +30,7 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container
# Start tasks for new containers # Start tasks for new containers
for container_name, container in current_containers.items(): for container_name, container in current_containers.items():
if not container_name in do_not_stream_containers: if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name):
log_container_names.append(container_name) log_container_names.append(container_name)
if container_name not in tasks: if container_name not in tasks:
log.debug(f"log_streaming_task() | Starting task for {container_name}") log.debug(f"log_streaming_task() | Starting task for {container_name}")

View File

@ -6,6 +6,7 @@ import ipaddress
import socket import socket
import psutil import psutil
import sys import sys
import os
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
@ -25,12 +26,15 @@ def get_network_interfaces_with_subnet():
except Exception as e: except Exception as e:
return str(e) return str(e)
def exclude_network(excluded_network): def exclude_network(excluded_network, input_ranges=None):
# Convert exclude_network to ip_network object # Convert exclude_network to ip_network object
excluded_network = ip_network(excluded_network) excluded_network = ip_network(excluded_network)
if not input_ranges:
input_ranges=config.local_ipv4_ranges
# Remove the excluded network from the local_ranges list # Remove the excluded network from the local_ranges list
local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network] local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network]
ranges_outside_exclude = [] ranges_outside_exclude = []
for local_range in local_ranges: for local_range in local_ranges:
@ -93,3 +97,19 @@ def is_ip_in_network(ip: str, network: str) -> bool:
# If there's an error with the input values, print the error and return False # If there's an error with the input values, print the error and return False
log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}")
return False return False
def purge_clore_interfaces():
network_interfaces = get_network_interfaces_with_subnet()
if type(network_interfaces) != dict:
log.error("Failed to load network interfaces, restarting...")
os._exit(1)
clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here
for clore_default_network in config.clore_default_networks:
clore_subnets.append(clore_default_network["subnet"])
for network_interface in network_interfaces.keys():
if network_interface == "docker0" or network_interface[:3] == "br-":
subnet = network_interfaces[network_interface]
if subnet in clore_subnets or network_interface == "docker0":
utils.run_command(f"ip link delete {network_interface}")

187
lib/openvpn.py Normal file
View File

@ -0,0 +1,187 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import utils
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client"
PARTNER_CONFIG_NAME = "clore_partner.conf"
def generate_openvpn_config(
local_ip='10.1.0.2',
server_ip='10.1.0.1',
server_hostname='example.com',
udp_port=1194,
vpn_secret_key='YOUR_VPN_SECRET_KEY'
):
openvpn_config = f"""nobind
proto udp4
remote {server_hostname} {udp_port}
resolv-retry infinite
auth SHA256
cipher AES-256-CBC
dev {config.openvpn_forwarding_tun_device}
ifconfig {local_ip} {server_ip}
<secret>
-----BEGIN OpenVPN Static key V1-----
{vpn_secret_key}
-----END OpenVPN Static key V1-----
</secret>
fragment 1300
mssfix 1300
sndbuf 524288
rcvbuf 524288
user nobody
group nogroup
ping 15
ping-restart 45
ping-timer-rem
persist-tun
persist-key
verb 0"""
return openvpn_config
async def get_iptables_forward_rules():
code, stdout, stderr = await utils.async_run_command(
f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers"
)
rules = []
if code == 0:
collumns = []
for idx, line in enumerate(stdout.split('\n')):
if "num" in collumns and "target" in collumns and "in" in collumns:
items = line.split(maxsplit=len(collumns)+1)
rule = {}
for idx, name in enumerate(collumns):
rule[name]=items[idx]
rule["desc"] = items[len(collumns)+1]
rules.append(rule)
else:
collumns = line.split()
return rules
async def remove_iptables_rule(rule_dict):
cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables"
if rule_dict.get('target') == 'DNAT':
cmd += " -t nat"
cmd += " -D PREROUTING"
if rule_dict.get('prot') and rule_dict['prot'] != '--':
cmd += f" -p {rule_dict['prot']}"
if rule_dict.get('in') and rule_dict['in'] != '*':
cmd += f" -i {rule_dict['in']}"
if rule_dict.get('out') and rule_dict['out'] != '*':
cmd += f" -o {rule_dict['out']}"
if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0':
cmd += f" -s {rule_dict['source']}"
if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0':
cmd += f" -d {rule_dict['destination']}"
if rule_dict.get('target') == 'DNAT':
if 'dports' in rule_dict.get('desc', ''):
port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0]
if ':' in port_info:
cmd += f" -m multiport --dports {port_info}"
else:
cmd += f" --dport {port_info}"
if 'to:' in rule_dict.get('desc', ''):
dest_ip = rule_dict['desc'].split('to:')[1].split()[0]
cmd += f" -j DNAT --to-destination {dest_ip}"
await utils.async_run_command(cmd)
async def clore_partner_configure(clore_partner_config):
try:
if clore_partner_config:
docker_restart_required = False
needed_openvpn_config = generate_openvpn_config(
local_ip=clore_partner_config["provider"],
server_ip=clore_partner_config["forwarding"],
server_hostname=clore_partner_config["openvpn_host"],
udp_port=clore_partner_config["openvpn_port"],
vpn_secret_key=clore_partner_config["secret"]
)
saved_config=''
config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME))
if config_exists:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file:
saved_config = await file.read()
if saved_config != needed_openvpn_config:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file:
await file.write(needed_openvpn_config)
is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command(
f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
if is_active_code == 0 and saved_config != needed_openvpn_config:
code, stdout, stderr = await utils.async_run_command(
f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
elif is_active_code != 0:
code, stdout, stderr = await utils.async_run_command(
f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}"
)
ip_route_configured = False
if code == 0:
for line in stdout.split('\n'):
items = line.split(' ')
if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items:
ip_route_configured = True
break
if not ip_route_configured:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}"
)
ip_tables_configured = False
rules = await get_iptables_forward_rules()
for rule in rules:
try:
if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]:
ip_tables_configured = True
elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]:
print("REMOVE RULE", rule)
await remove_iptables_rule(rule)
except Exception as ei:
log.error(f"clore_partner_configure() | ei | {ei}")
if ip_tables_configured == False:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}"
)
if docker_restart_required:
async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file:
await file.write("")
os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting
else:
code, stdout, stderr = await utils.async_run_command(
f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
return True
except Exception as e:
log.error(f"FAIL | openvpn.clore_partner_configure | {e}")
return False

View File

@ -1,10 +1,13 @@
from typing import Optional, Tuple, Dict
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import nvml from lib import nvml
import subprocess import subprocess
import hashlib import hashlib
import asyncio
import random import random
import string import string
import shutil
import shlex import shlex
import time import time
import math import math
@ -145,6 +148,63 @@ def get_extra_allowed_images():
else: else:
return [] return []
async def async_run_command(
command: str,
timeout: Optional[float] = None,
env: Optional[Dict[str, str]] = None
) -> Tuple[int, str, str]:
command_env = env if env is not None else {}
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=command_env
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
stdout_str = stdout.decode('utf-8').strip() if stdout else ''
stderr_str = stderr.decode('utf-8').strip() if stderr else ''
return proc.returncode, stdout_str, stderr_str
except asyncio.TimeoutError:
# Handle timeout: terminate the process gracefully first
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5) # Wait for it to exit
except asyncio.TimeoutError:
# Force kill the process if it doesn't terminate
proc.kill()
await proc.wait()
return -1, '', f'Command timed out after {timeout} seconds'
except Exception as e:
return -1, '', str(e)
def get_free_space_mb(path):
"""Get free space in MB for the given path."""
total, used, free = shutil.disk_usage(path)
return free // (1024 * 1024) # Convert bytes to MB
def get_directory_size_mb(path):
"""Get the size of a directory in MB."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
# Skip if the file doesn't exist (symlinks, etc.)
if not os.path.islink(fp) and os.path.exists(fp):
total_size += os.path.getsize(fp)
return total_size // (1024 * 1024) # Convert bytes to MB
class shm_calculator: class shm_calculator:
def __init__(self, total_ram): def __init__(self, total_ram):
self.total_ram = total_ram self.total_ram = total_ram

201
lib/xfs.py Normal file
View File

@ -0,0 +1,201 @@
# Library to setup XFS partition for docker
from lib import ensure_packages_installed
from lib import logging as logging_lib
from lib import docker_interface
from lib import networking
from lib import utils
import asyncio
import json
import os
log = logging_lib.log
DOCKER_ROOT = "/var/lib/docker"
DOCKER_DATA_IMG = "/opt/clore-hosting/data.img"
LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
XFS_STATE_FILE = "/opt/clore-hosting/xfs_state"
MANDATORY_PACKAGES = [
"xfsprogs",
"dmidecode",
"openvpn",
"iproute2",
"iputils-ping",
"util-linux"
]
# This code is runned on start of clore hosting to migrate docker to XFS partition system
# sudo fallocate -l 300G /docker-storage.img
# sudo mkfs.xfs /docker-storage.img
# mount -o loop,pquota /docker-storage.img /mnt/docker-storage
def migrate():
docker_xfs_state = validate_docker_xfs()
#print(docker_xfs_state)
if docker_xfs_state == "skip":
return
elif docker_xfs_state == "valid":
return 'success'
packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed(
MANDATORY_PACKAGES
))
if not packages_available:
return 'packages-missing'
log.info("Starting migration to xfs")
docker_interface.stop_all_containers()
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}")
return "failure"
max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT)
data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB)
if data_img_size < MIN_XFS_PARTITION_SIZE:
return 'not-enough-space'
docker_config_success = False
fstab_config_success = False
code, stdout, stderr = utils.run_command(
f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}"
)
networking.purge_clore_interfaces()
if code == 0:
docker_config_success = configure_docker_daemon()
if code == 0 and docker_config_success:
fstab_config_success = configure_fstab()
if code == 0 and fstab_config_success:
code, stdout, stderr = utils.run_command(
f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a"
)
if code==0:
return 'success'
else:
configure_docker_daemon(remove=True)
configure_fstab(remove=True)
return 'failure'
else:
utils.run_command(
f"mkdir {DOCKER_ROOT}"
)
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
pass
return 'failure'
def validate_docker_xfs():
code_root, stdout_root, stderr_root = utils.run_command("df -T /")
code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}")
#print([code, stderr, stdout])
if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')):
root_blocks = None
docker_root_blocks = None
docker_root_format = ''
for idx, line in enumerate(stdout_root.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
root_blocks = int(line.split()[2])
if code == 1:
docker_root_blocks = root_blocks
else:
for idx, line in enumerate(stdout.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
docker_root_blocks = int(line.split()[2])
docker_root_format = line.split()[1]
if root_blocks == None or docker_root_blocks == None:
return "skip"
elif docker_root_format=="xfs" and root_blocks > docker_root_blocks:
return "valid"
else:
return "default"
else:
return "skip"
def configure_docker_daemon(remove=False):
try:
daemon_json_path = "/etc/docker/daemon.json"
with open(daemon_json_path, 'r') as file:
raw_content = file.read()
daemon_config = json.loads(raw_content)
if remove:
if daemon_config.get("data-root") == DOCKER_ROOT:
del daemon_config["data-root"]
if daemon_config.get("storage-driver") == "overlay2":
del daemon_config["storage-driver"]
elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2":
daemon_config["data-root"] = DOCKER_ROOT
daemon_config["storage-driver"] = "overlay2"
with open(daemon_json_path, 'w') as file:
file.write(json.dumps(daemon_config,indent=4))
return True
except Exception as e:
return False
def configure_fstab(remove=False):
try:
file_path = "/etc/fstab"
mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0"
with open(file_path, 'r') as file:
raw_content = file.read()
if remove:
if mount_line in raw_content:
raw_content = raw_content.replace(f"\n{mount_line}\n", '')
with open(file_path, 'w') as file:
file.write(raw_content)
elif not mount_line in raw_content:
raw_content += f"\n{mount_line}\n"
with open(file_path, 'w') as file:
file.write(raw_content)
return True
except Exception as e:
return False
def init():
try:
if os.path.exists(XFS_STATE_FILE):
with open(XFS_STATE_FILE, 'r') as file:
raw_content = file.read()
if "enabled" in raw_content:
migarion_status = migrate()
if migarion_status == "success":
with open(XFS_STATE_FILE, 'w') as file:
file.write("active")
return "active"
elif migarion_status == "not-enough-space":
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-enough-space")
return 'not-enough-space'
else:
with open(XFS_STATE_FILE, 'w') as file:
file.write("failed-migration")
return 'failed'
elif 'not-enough-space' in raw_content:
return 'not-enough-space'
elif "active" in raw_content:
return "active"
else:
return "failed"
else:
return "disabled"
except Exception as e:
print(e)
pass