V5.2.9 | XFS, hosting to partner platforms #1

Merged
clore merged 6 commits from xfs into main 2024-12-08 22:24:00 +00:00
19 changed files with 1200 additions and 64 deletions

View File

@ -32,7 +32,7 @@ def get_last_ip_occurrence_and_text(input_string):
else:
return None, None
def configure(containers):
def configure(containers, partner_forwarding_ips):
valid_containers = []
newly_created_networks = []
containers_required_networks = []
@ -141,7 +141,7 @@ def configure(containers):
if config.log_containers_strings:
print("FROM DOCKER CONFIGURATOR", valid_containers)
validation_and_security = docker_interface.validate_and_secure_networks()
validation_and_security = docker_interface.validate_and_secure_networks(partner_forwarding_ips)
if startup_sctipt_creation_fail:
validation_and_security=False
return validation_and_security, valid_containers, use_hive_flightsheet

View File

@ -4,7 +4,10 @@ from lib import log_streaming_task
from lib import run_startup_script
from lib import hive_miner_interface
from lib import docker_interface
from lib import background_job
from lib import docker_deploy
from lib import clore_partner
from lib import clore_partner_socket
from lib import docker_pull
from lib import get_specs
from lib import utils
@ -34,17 +37,17 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_
#print(config)
async def configure_networks(containers):
res = await asyncio.to_thread(docker_configurator.configure, containers)
async def configure_networks(containers, partner_forwarding_ips):
res = await asyncio.to_thread(docker_configurator.configure, containers, partner_forwarding_ips)
try:
fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2])
return fin_res
except Exception as e:
return False
async def deploy_containers(validated_containers, allowed_running_containers):
async def deploy_containers(validated_containers, allowed_running_containers, can_run_partner_workloads):
try:
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers)
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers, can_run_partner_workloads)
return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
except Exception as e:
return False
@ -70,8 +73,10 @@ async def set_hive_miner_status(enabled=False):
return False
class CloreClient:
def __init__(self, auth_key):
def __init__(self, auth_key, xfs_state):
self.auth_key=auth_key
self.xfs_state = xfs_state
self.ws_peers = {}
self.last_checked_ws_peers=0
self.containers={}
@ -99,9 +104,11 @@ class CloreClient:
"container_log_streaming_service": utils.unix_timestamp(),
"specs_service": utils.unix_timestamp(),
"oc_service": utils.unix_timestamp(),
"background_pow_data_collection": utils.unix_timestamp()
"background_pow_data_collection": utils.unix_timestamp(),
"partner_service": utils.unix_timestamp()
}
self.max_service_inactivity = 600 # seconds
self.no_restart_services = ["partner_service"] # Services that are allowed to run indefinetly without triggering the app to restart
if config.debug_ws_peer:
self.ws_peers[str(config.debug_ws_peer)]={
@ -137,6 +144,10 @@ class CloreClient:
self.hive_miner_interface = hive_miner_interface.hive_interface()
self.next_pow_background_job_send_update = 0
self.clore_partner_initiazized = False
self.partner_forwarding_ips = []
self.start_time = utils.unix_timestamp()
async def service(self):
global container_log_broken
@ -151,10 +162,11 @@ class CloreClient:
task6 = asyncio.create_task(self.specs_service(monitoring))
task7 = asyncio.create_task(self.oc_service(monitoring))
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
task9 = asyncio.create_task(self.partner_service(monitoring))
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
# Wait for both tasks to complete (they won't in this case)
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task)
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, task9, monitoring_task)
async def monitoring_service(self, monitoring):
while True:
@ -169,6 +181,7 @@ class CloreClient:
if config.debug:
log.success(self.last_service_heartbeat)
for service_name in self.last_service_heartbeat.keys():
if not service_name in self.no_restart_services:
last_hearthbeat = self.last_service_heartbeat[service_name]
if last_hearthbeat < utils.unix_timestamp()-config.maximum_pull_service_loop_time and service_name=="handle_container_cache":
log.error(f"\"{service_name}\" service is stuck for {utils.unix_timestamp()-last_hearthbeat} s, Restarting...")
@ -260,6 +273,7 @@ class CloreClient:
if len(self.p_needed_containers)>0:
local_images = await get_local_images(no_latest_tag=True)
partner_images = await clore_partner.get_partner_allowed_images()
for local_image in local_images:
self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()}
image_needed = False
@ -283,11 +297,32 @@ class CloreClient:
if local_image_tag==allowed_tag or allowed_tag=='*':
image_needed=True
break
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers:
if not image_needed and type(partner_images) == list:
for partner_image in partner_images:
if local_image.replace(':latest', '') == partner_image.replace(':latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if len(local_image.split('/')) >= 3:
partner_image_spl = partner_image.split(':')
image, deployment_type = '/'.join(local_image.split('/', 2)[:2]), local_image.split('/', 2)[-1]
if len(partner_image_spl) == 1:
if image == partner_image_spl[0] or f"{image}" == f"{partner_image_spl[0]}_latest":
image_needed = True
del self.last_pull_progress[local_image]
break
elif len(partner_image_spl) == 2:
if image.replace('_latest', '') == f"{partner_image_spl[0]}_{partner_image_spl[1]}".replace('_latest', ''):
image_needed = True
del self.last_pull_progress[local_image]
break
if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers and partner_images != None:
log.success(f"GOING TO REMOVE {local_image}")
with concurrent.futures.ThreadPoolExecutor() as pool:
r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image)
if r:
removed_cnt+=1
del self.last_pull_progress[local_image]
#if config.debug:
# log.success(f"{local_image} | {image_needed}")
@ -327,7 +362,7 @@ class CloreClient:
try:
await pull_task
except Exception as e:
self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()}
self.last_pull_progress[local_image]={"log":f"Can't pull image \"{local_image}\"", "last_update":time.time()}
log_task.cancel()
try:
await log_task
@ -358,11 +393,18 @@ class CloreClient:
container_conf = WebSocketClient.get_containers()
can_run_partner_workloads = False
if container_conf[0]:
self.containers_set=True
self.containers=container_conf[1]
tmp_images = []
for container in self.containers:
is_order_spot = False
for idx, container in enumerate(self.containers):
if "spot" in container:
is_order_spot = True
if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet":
log_pull = False
if "name" in container:
@ -386,6 +428,9 @@ class CloreClient:
if not image_config in tmp_images:
tmp_images.append(image_config)
can_run_partner_workloads = False if ((not is_order_spot) and running_order) else True
clore_partner_socket.set_can_deploy(can_run_partner_workloads)
if self.restart_docker and not running_order and len(self.containers)>0:
log.debug("Sending docker restart command")
utils.run_command_v2("systemctl restart docker")
@ -400,14 +445,14 @@ class CloreClient:
tasks.append(api_interface.get_server_config())
if self.containers_set:
tasks.append(configure_networks(self.containers))
tasks.append(configure_networks(self.containers, self.partner_forwarding_ips))
tasks.append(WebSocketClient.stream_pull_logs())
if self.validated_containers_set:
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers))
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers, can_run_partner_workloads))
if step==1:
WebSocketClient.set_auth(self.auth_key)
WebSocketClient.set_auth(self.auth_key, self.xfs_state)
asyncio.create_task(WebSocketClient.run())
elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval):
log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds")
@ -427,6 +472,11 @@ class CloreClient:
if result.success:
self.last_checked_ws_peers = utils.unix_timestamp()
self.allowed_images=result.allowed_images+self.extra_allowed_images
if self.xfs_state == "active":
self.allowed_images.append({
"repository": "vastai/test",
"allowed_tags": ["bandwidth-test-nvidia"]
})
if not config.debug_ws_peer:
for pure_ws_peer in result.ws_peers:
self.ws_peers[pure_ws_peer]={
@ -455,7 +505,7 @@ class CloreClient:
async def submit_specs(self, current_specs):
try:
if type(current_specs) == dict:
current_specs["backend_version"]=18
current_specs["backend_version"]=19
current_specs["update_hw"]=True
smallest_pcie_width = 999
for gpu in current_specs["gpus"]["nvidia"]:
@ -504,7 +554,7 @@ class CloreClient:
await monitoring.put("oc_service")
oc_apply_allowed = True
### OC Service should also hande Hive stuff
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries:
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries and background_job.is_enabled():
await set_hive_miner_status(True)
oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
elif self.is_hive and not self.dont_use_hive_binaries:
@ -544,6 +594,30 @@ class CloreClient:
log.debug(f"FAIL | background_pow_data_collection() | {e}")
await asyncio.sleep(6)
async def partner_service(self, monitoring):
while True:
try:
await monitoring.put("partner_service")
if self.start_time < utils.unix_timestamp() - 180:
forwarding_latency_measurment = await clore_partner.measure_forwarding_latency()
if type(forwarding_latency_measurment) == list:
await WebSocketClient.set_forwarding_latency_measurment(forwarding_latency_measurment)
partner_config = WebSocketClient.get_clore_partner_config()
if partner_config != None:
if self.clore_partner_initiazized == False:
ir = await clore_partner.initialize()
if ir:
self.clore_partner_initiazized = True
if self.clore_partner_initiazized == True:
if 'provider' in partner_config and 'forwarding' in partner_config:
self.partner_forwarding_ips = [partner_config['provider'], partner_config['forwarding']]
else:
self.partner_forwarding_ips = []
await clore_partner.configure(partner_config)
except Exception as e:
log.debug(f"FAIL | partner_service() | {e}")
await asyncio.sleep(6)
def expire_ws_peers(self):
for ws_peer_address in list(self.ws_peers.keys()):
ws_peer_info = self.ws_peers[ws_peer_address]

View File

@ -1,4 +1,5 @@
from concurrent.futures import ThreadPoolExecutor
from lib import clore_partner
import asyncio
import random
import websockets
@ -31,6 +32,7 @@ class WebSocketClient:
self.connected = False
self.authorized = False
self.auth = auth
self.xfs_state = None
self.log_auth_fail = True
self.last_heartbeat = clore_utils.unix_timestamp()
self.containers={}
@ -51,15 +53,30 @@ class WebSocketClient:
self.last_gpu_oc_specs = []
self.last_set_oc = {}
self.clore_partner_config = None
self.forwarding_latency_measurment = None
def get_last_heartbeat(self):
return self.last_heartbeat
def get_containers(self):
return self.containers_set, self.containers
partner_container_config = clore_partner.get_partner_container_config()
return self.containers_set, ((self.containers + [partner_container_config]) if partner_container_config else self.containers)
def get_oc(self):
return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc
def get_clore_partner_config(self):
return self.clore_partner_config
async def set_forwarding_latency_measurment(self, forwarding_latency_measurment):
await self.send(json.dumps(
{
"forwarding_latency_measurment": forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = forwarding_latency_measurment
def set_ws_peers(self, ws_peers):
tmp_ws_peers=[]
for ws_peer in list(ws_peers.keys()):
@ -68,8 +85,9 @@ class WebSocketClient:
self.ws_peers = tmp_ws_peers
def set_auth(self, auth):
def set_auth(self, auth, xfs_state):
self.auth=auth
self.xfs_state=xfs_state
def set_pull_logs(self, pull_logs):
self.pull_logs=pull_logs
@ -93,7 +111,9 @@ class WebSocketClient:
log.debug(f"CLOREWS | Connected to {random_ws_peer}")
await self.send(json.dumps({
"login":str(self.auth),
"type":"python"
"xfs_state": self.xfs_state,
"type":"python",
"clore_partner_support": True
}))
except Exception as e:
log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e}")
@ -136,6 +156,16 @@ class WebSocketClient:
pass
elif message=="KEEPALIVE":
self.last_heartbeat = clore_utils.unix_timestamp()
try:
if self.forwarding_latency_measurment:
await self.send(json.dumps(
{
"forwarding_latency_measurment": self.forwarding_latency_measurment
}
))
self.forwarding_latency_measurment = None
except Exception as e:
pass
elif message=="NEWER_LOGIN" or message=="WAIT":
await self.close_websocket()
elif message[:10]=="PROVEPULL;":
@ -148,13 +178,16 @@ class WebSocketClient:
else:
try:
parsed_json = json.loads(message)
if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list:
if "type" in parsed_json and parsed_json["type"]=="partner_config" and "partner_config" in parsed_json and type(parsed_json["partner_config"])==dict:
self.clore_partner_config = parsed_json["partner_config"]
await self.send(json.dumps({"partner_config":parsed_json["partner_config"]}))
elif "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json and type(parsed_json["new_containers"])==list:
self.last_heartbeat = clore_utils.unix_timestamp()
container_str = json.dumps({"containers":parsed_json["new_containers"]})
await self.send(container_str)
if len(parsed_json["new_containers"]) > 0: # There should be at least one container
self.containers_set = True
self.containers=parsed_json["new_containers"]
self.containers=clore_partner.filter_partner_dummy_workload_container(parsed_json["new_containers"])
#log.success(container_str)
elif "allow_oc" in parsed_json: # Enable OC
self.oc_enabled=True

View File

@ -1,5 +1,6 @@
from lib import config as config_module
from lib import init_server
from lib import xfs
from lib import utils
from clore_hosting import main as clore_hosting
import asyncio, os
@ -29,7 +30,15 @@ elif config.reset:
log.success("Client login reseted")
elif config.service:
if len(auth)==32+48+1:
clore_client = clore_hosting.CloreClient(auth_key=auth)
utils.run_command("sysctl -w net.ipv4.ip_forward=1")
xfs_state = xfs.init()
if os.path.isfile(config.restart_docker_flag_file):
utils.run_command("systemctl restart docker")
os.remove(config.restart_docker_flag_file)
clore_client = clore_hosting.CloreClient(auth_key=auth, xfs_state=xfs_state)
asyncio.run(clore_client.service())
else:
print("TODO: Firstly config auth")

22
lib/background_job.py Normal file
View File

@ -0,0 +1,22 @@
import time
import re
disabled_till = 0
def is_background_job_container_name(string):
if type(string) != str:
return False
pattern = r"^clore-default-\d+$"
return bool(re.match(pattern, string))
def temporarly_disable(seconds):
global disabled_till
disabled_till = time.time() + seconds
def enable():
global disabled_till
disabled_till=0
def is_enabled():
global disabled_till
return True if disabled_till < time.time() else False

238
lib/clore_partner.py Normal file
View File

@ -0,0 +1,238 @@
from lib import ensure_packages_installed
from lib import config as config_module
from lib import logging as logging_lib
from lib import clore_partner_socket
from lib import latency_test
from lib import openvpn
from lib import utils
import asyncio
import random
import json
import time
import re
import os
import aiofiles.os
from aiohttp import ClientSession, ClientTimeout
config = config_module.config
log = logging_lib.log
MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2']
DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload"
host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts")
partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache")
next_ensupe_packages_check = 0
is_socket_running = False
partner_container_config = None
async def initialize():
global next_ensupe_packages_check
global is_socket_running
try:
await aiofiles.os.makedirs(host_facts_location, exist_ok=True)
await aiofiles.os.makedirs(partner_cache_location, exist_ok=True)
await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True)
if not is_socket_running:
is_socket_running=True
asyncio.create_task(clore_partner_socket.socket_service(
location=os.path.join(host_facts_location, "partner_interface.socket")
))
if next_ensupe_packages_check < time.time():
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr
if not success:
return False
elif next_ensupe_packages_check != float('inf'):
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode -t 2 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
code, stdout, stderr = await utils.async_run_command(
"dmidecode 2>&1",
20
)
if code == 0 and not stderr:
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file:
await file.write(stdout)
else:
return False
return True
except Exception as e:
log.error(f"FAIL | clore_partner.initialize | {e}")
return False
async def get_partner_allowed_images():
try:
file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json"))
if not file_exists:
return []
images = []
async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file:
content = await file.read()
containers = json.loads(content)
for container in containers:
image = container.get("Config", {}).get("Image", None)
if image and not image in images:
images.append(image)
return images
except Exception as e:
return None
def validate_partner_container_name(name):
if type(name) != str:
return False
elif name==config.clore_partner_container_name:
return True
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
def validate_partner_workload_container_name(name):
if type(name) != str:
return False
pattern = r"^C\.\d+$"
return bool(re.match(pattern, name))
last_openvpn_config = None
def get_partner_container_config():
global partner_container_config
return partner_container_config
async def configure(partner_config):
global last_openvpn_config
global partner_container_config
if last_openvpn_config != partner_config:
partner_container_config = {
"image": partner_config["partner_image"],
"name": config.clore_partner_container_name,
"hostname": f"{partner_config['partner_id'][:16]}-m{partner_config['machine_id']}",
"env": {
"AUTH": partner_config['partner_id'],
"ip_addr": partner_config['openvpn_host'],
"port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}'
},
"volumes": {
f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"},
f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"},
f"/var/lib/docker": {"bind": "/var/lib/docker"},
f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"}
},
"gpus": True,
"command": '',
"network": "clore-partner-br0",
"ip": "172.19.0.254",
"cap_add": ["SYS_ADMIN"],
"devices": ["/dev/fuse"],
#"security_opt": ["apparmor:unconfined"],
"ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"],
}
r = await openvpn.clore_partner_configure(partner_config)
if r:
last_openvpn_config = partner_config
# -----------------------------------------
next_latency_measurment = 0
async def fetch_forwarding_nodes():
url = "https://api.clore.ai/v1/get_relay"
timeout = ClientTimeout(total=30)
async with ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as response:
response.raise_for_status()
data = await response.json()
return data
except Exception as e:
print(f"An error occurred: {e}")
return None
async def set_next_latency_measurment(ts):
global next_latency_measurment
try:
next_latency_measurment=ts
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file:
await file.write(str(ts))
except Exception as e:
pass
async def measure_forwarding_latency():
global next_latency_measurment
if next_latency_measurment > time.time():
return False
try:
await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True)
file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"))
if file_exists:
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file:
content = await file.read()
if content.isdigit():
next_latency_measurment = int(content)
if next_latency_measurment < time.time():
node_info = await fetch_forwarding_nodes()
if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0:
to_test_nodes = []
ip_to_region = {}
valid_regions = []
for node_region in node_info.get("nodes").keys():
nodes_ip_list = node_info.get("nodes")[node_region]
if type(nodes_ip_list) == list and len(nodes_ip_list) > 0:
to_test_nodes = to_test_nodes + nodes_ip_list
for node_ip in nodes_ip_list:
ip_to_region[node_ip]=node_region
if len(to_test_nodes) > 0:
measurment_result = await latency_test.measure_latency_icmp(to_test_nodes)
if measurment_result:
for idx, res in enumerate(measurment_result):
if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions:
valid_regions.append(ip_to_region.get(res["host"]))
measurment_result[idx]["region"] = ip_to_region.get(res["host"])
if len(valid_regions) == len(ip_to_region.keys()):
await set_next_latency_measurment(int(
time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded
))
return measurment_result
else:
await set_next_latency_measurment(int(
time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable
))
else:
await set_next_latency_measurment(int(
time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet)
))
return False
else:
await set_next_latency_measurment(int(
time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format
))
return False
return False
except Exception as e:
return False
def filter_partner_dummy_workload_container(containers):
try:
remaining_containers = []
for container in containers:
if container["image"] != DUMMY_WORKLOAD_CONTAINER:
remaining_containers.append(container)
return remaining_containers
except Exception as e:
return containers

View File

@ -0,0 +1,94 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import background_job
from lib import utils
import asyncio
import json
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
dangerous_chars = [';', '|', '&', '`', '$', '>', '<', '(', ')', '\\', '!', '"', '\'', '[', ']', '{', '}']
allowed_commands = ["df"]
can_deploy = True
async def remove_socket_file(path):
try:
file_exists = await aiofiles.os.path.exists(path)
if file_exists:
await aiofiles.os.remove(path)
except Exception:
pass
async def handle_client(reader, writer):
global can_deploy
try:
while True:
data = await reader.read(1024*64)
try:
if not data:
break
#print("DATA", data, data.decode())
parsed_data = json.loads(data.decode())
if "run_command" in parsed_data and type(parsed_data["run_command"])==str:
allowed = False
for allowed_command in allowed_commands:
if f"{allowed_command} " == parsed_data["run_command"][:len(allowed_command)+1] or allowed_command==parsed_data["run_command"]:
allowed = True
break
if allowed and any(char in parsed_data["run_command"] for char in dangerous_chars):
allowed = False
log.debug(f"clore_partner_socket | Received \"{parsed_data["run_command"]}\" | {'allowed' if allowed else 'denied'}")
if allowed:
code, stdout, stderr = await utils.async_run_command(
parsed_data["run_command"]
)
writer.write(json.dumps({
"code": code,
"stderr": stderr,
"stdout": stdout
}).encode())
else:
writer.write(json.dumps({
"code": -1,
"stderr": 'Command not allowed',
"stdout": 'Command not allowed'
}).encode())
elif "can_deploy" in parsed_data:
writer.write(json.dumps({
"can_deploy": can_deploy
}).encode())
elif "stop_background_job" in parsed_data and "time" in parsed_data:
try:
if isinstance(parsed_data["time"], int):
background_job.temporarly_disable(parsed_data["time"])
except Exception as e:
pass
else:
writer.write('?'.encode())
await writer.drain()
except Exception as data_exception:
pass
break
except asyncio.CancelledError:
log.debug(f"clore partner socket | Client handler canceled.")
finally:
log.debug(f"clore partner socket | Closing client connection.")
writer.close()
await writer.wait_closed()
def set_can_deploy(state):
global can_deploy
can_deploy = state
async def socket_service(location):
await remove_socket_file(location)
server = await asyncio.start_unix_server(handle_client, path=location)
log.debug(f"clore partner socket | running at {location}")
async with server:
await server.serve_forever()

View File

@ -9,6 +9,11 @@ hard_config = {
"name": "clore-br0",
"subnet": "172.18.0.0/16",
"gateway": "172.18.0.1"
},
{
"name": "clore-partner-br0",
"subnet": "172.19.0.0/20",
"gateway": "172.19.0.1"
}
],
"run_iptables_with_sudo":True,
@ -33,7 +38,11 @@ hard_config = {
"maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app
"maximum_pull_service_loop_time": 14400, # Exception for image pulling
"creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk
"allow_mixed_gpus": True
"allow_mixed_gpus": True,
"openvpn_forwarding_tun_device": "tun1313",
"forwarding_ip_route_table_id": 100,
"clore_partner_container_name": "clore-partner-service",
"restart_docker_flag_file": "/opt/clore-hosting/.restart_docker"
}
parser = argparse.ArgumentParser(description='Example argparse usage')
@ -50,6 +59,7 @@ parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hostin
parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)")
parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes")
parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software")
parser.add_argument('--clore-partner-base-dir', type=str, default="/opt/clore-hosting/.clore-partner")
# Parse arguments, ignoring any non-defined arguments
args, _ = parser.parse_known_args()

View File

@ -28,6 +28,14 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
for cap in container_options["cap_add"]:
command.extend(["--cap-add", cap])
if "devices" in container_options:
for device in container_options["devices"]:
command.extend(["--device", device])
if "security_opt" in container_options:
for security_opt in container_options["security_opt"]:
command.extend(["--security-opt", security_opt])
if "volumes" in container_options:
for volume_host, volume_container in container_options["volumes"].items():
bind = f"{volume_host}:{volume_container['bind']}"
@ -75,6 +83,8 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if ip:
command.extend(["--ip", ip])
command.append('--stop-timeout')
command.append('0')
command.append(container_options["image"])
try:

View File

@ -1,7 +1,9 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import docker_cli_wrapper
from lib import background_job
from lib import docker_interface
from lib import clore_partner
from lib import get_specs
from lib import utils
import docker
@ -14,7 +16,7 @@ client = docker_interface.client
config = config_module.config
log = logging_lib.log
def deploy(validated_containers, allowed_running_containers=[]):
def deploy(validated_containers, allowed_running_containers=[], can_run_partner_workloads=False):
local_images = docker_interface.get_local_images()
all_containers = docker_interface.get_containers(all=True)
@ -67,7 +69,9 @@ def deploy(validated_containers, allowed_running_containers=[]):
'tty': True,
'network_mode': 'clore-br0',
'cap_add': [],
'volumes': {},
'devices': [],
'security_opt': [],
'volumes': validated_container["volumes"] if "volumes" in validated_container else {},
'ports': {},
'device_requests': [],
'environment': validated_container["env"] if "env" in validated_container else {},
@ -80,6 +84,15 @@ def deploy(validated_containers, allowed_running_containers=[]):
)
}
if "security_opt" in validated_container:
container_options["security_opt"] = validated_container["security_opt"]
if "devices" in validated_container:
container_options["devices"] = validated_container["devices"]
if "cap_add" in validated_container:
container_options["cap_add"] = validated_container["cap_add"]
if "hostname" in validated_container:
container_options["hostname"]=validated_container["hostname"]
elif "clore-order-" in validated_container["name"]:
@ -136,7 +149,7 @@ def deploy(validated_containers, allowed_running_containers=[]):
container_options["shm_size"] = f"{SHM_SIZE}m"
if not validated_container["name"] in created_container_names and image_ready:
if not validated_container["name"] in created_container_names and image_ready and not (not background_job.is_enabled() and background_job.is_background_job_container_name(validated_container["name"])):
if config.creation_engine == "wrapper":
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus)
else:
@ -159,7 +172,10 @@ def deploy(validated_containers, allowed_running_containers=[]):
all_running_container_names.append(container.name)
else:
all_stopped_container_names.append(container.name)
if container.name in needed_running_names and container.status != 'running':
if background_job.is_background_job_container_name(container.name) and not background_job.is_enabled():
if container.status == "running":
container.stop()
elif container.name in needed_running_names and container.status != 'running':
try:
attached_networks = container.attrs['NetworkSettings']['Networks']
if "bridge" in attached_networks.keys() or len(attached_networks.keys())==0: # Ip was not attached, remove container
@ -174,17 +190,22 @@ def deploy(validated_containers, allowed_running_containers=[]):
container.stop()
except Exception as e:
pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running':
elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running' and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name):
try:
container.stop()
container.remove()
except Exception as e:
pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers:
elif container.name not in paused_names+needed_running_names+allowed_running_containers and not clore_partner.validate_partner_container_name(container.name) and not docker_interface.is_docker_default_name_lenient(container.name):
try:
container.remove()
except Exception as e:
pass
elif not can_run_partner_workloads and container.status == "running" and clore_partner.validate_partner_workload_container_name(container.name):
try:
container.stop()
except Exception as e:
pass
return all_running_container_names, all_stopped_container_names
#print(validated_containers)

View File

@ -11,6 +11,13 @@ from typing import List, Optional
import docker
import json
import os
import re
partner_bridge_subnet = ''
for clore_network in config.clore_default_networks:
if clore_network["name"] == "clore-partner-br0":
partner_bridge_subnet = clore_network["subnet"]
try:
os.makedirs(config.startup_scripts_folder, exist_ok=True)
@ -59,6 +66,18 @@ def get_info():
except Exception as e:
return {}
def stop_all_containers():
try:
# List all containers
containers = client.containers.list(all=True) # Use all=True to include stopped containers
for container in containers:
log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})")
container.stop() # Stop the container
log.success("stop_all_containers() | All containers have been stopped.")
except Exception as e:
log.error(f"stop_all_containers() |An error occurred: {e}")
return True
def check_docker_connection():
try:
client.ping()
@ -95,7 +114,7 @@ def get_local_images(no_latest_tag=False):
return image_list
except Exception as e:
log.error(f"DOCKER | Can't get local images | {e}")
log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}")
os._exit(1)
def get_containers(all=False):
@ -184,7 +203,8 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
gateway=gateway
)]
),
check_duplicate=True
check_duplicate=True,
#options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {}
)
log.debug(f"Network {network_name} created successfully.")
return True
@ -192,7 +212,7 @@ def create_docker_network(network_name, subnet, gateway, driver="bridge"):
log.error(f"DOCKER | Failed to create network {network_name}: {e}")
return False
def validate_and_secure_networks():
def validate_and_secure_networks(partner_forwarding_ips):
try:
failed_appending_iptables_rule = False
@ -238,6 +258,13 @@ def validate_and_secure_networks():
#print(this_ipv4_range)
outside_ranges_ip_network = networking.exclude_network(this_ipv4_range)
if this_ipv4_range == partner_bridge_subnet:
for partner_forwarding_ip in partner_forwarding_ips:
outside_ranges = []
for ip_range in outside_ranges_ip_network:
outside_ranges.append(str(ip_range))
outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges)
outside_ranges = []
for outside_range_ip_network in outside_ranges_ip_network:
outside_ranges.append(str(outside_range_ip_network))
@ -265,7 +292,7 @@ def validate_and_secure_networks():
succesfully_appended = networking.add_iptables_rule(needed_iptables_rule)
if not succesfully_appended:
failed_appending_iptables_rule = True
else:
elif this_ipv4_range != partner_bridge_subnet:
needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name)
for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule
for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule))
@ -413,3 +440,7 @@ def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"):
return False
else:
return False
def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,
pattern = r'^[a-z]+_[a-z]+$'
return re.match(pattern, container_name) is not None

View File

@ -0,0 +1,71 @@
from lib import logging as logging_lib
from typing import List
from lib import utils
import time
log = logging_lib.log
async def ensure_packages_installed(
packages: List[str] = [],
total_timeout: float = 300
) -> bool:
non_interactive_env = {
'DEBIAN_FRONTEND': 'noninteractive',
'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
}
start_time = time.time()
packages_to_install = []
for package in packages:
check_cmd = f"dpkg -s {package} > /dev/null 2>&1"
return_code, _, _ = await utils.async_run_command(check_cmd, env=non_interactive_env)
if return_code != 0:
packages_to_install.append(package)
if not packages_to_install:
log.debug("All packages are already installed.")
return True
update_cmd = (
"apt-get update "
"-y "
"--no-install-recommends"
)
return_code, stdout, stderr = await utils.async_run_command(
update_cmd,
timeout=None if total_timeout == None else 180,
env=non_interactive_env
)
if return_code != 0:
log.error(f"Failed to update package lists: {stderr}")
return False
install_cmd = (
"apt-get install "
"-y "
"--no-install-recommends "
"--assume-yes "
"-o Dpkg::Options::='--force-confdef' " # Default to existing config
"-o Dpkg::Options::='--force-confold' " # Keep existing config
f"{' '.join(packages_to_install)}"
)
# Calculate remaining timeout
remaining_timeout = None if total_timeout == None else max(0, total_timeout - (time.time() - start_time))
# Install packages
return_code, stdout, stderr = await utils.async_run_command(
install_cmd,
timeout=remaining_timeout,
env=non_interactive_env
)
if return_code == 0:
log.debug(f"Successfully installed packages: {packages_to_install}")
return True
else:
log.error(f"Failed to install packages: {stderr}")
return False

View File

@ -2,7 +2,7 @@ from aiofiles.os import stat as aio_stat
from pydantic import BaseModel, Field, constr
import xml.etree.ElementTree as ET
from lib import docker_interface
from typing import Dict, List
from typing import Dict, List, Optional
from lib import utils
import subprocess
import speedtest
@ -311,7 +311,7 @@ def get_gpu_info():
class DockerDaemonConfig(BaseModel):
data_root: str = Field(alias="data-root")
storage_driver: str = Field(alias="storage-driver")
storage_opts: List[str] = Field(alias="storage-opts")
storage_opts: Optional[List[str]] = Field(alias="storage-opts")
class Specs:
def __init__(self):
@ -336,26 +336,14 @@ class Specs:
else:
overlay_total_size=None
disk_type=""
disk_usage_source_path = '/'
try:
validated_config = DockerDaemonConfig(**docker_daemon_config)
disk_udevadm = get_disk_udevadm(validated_config.data_root)
for udevadm_line in disk_udevadm.split('\n'):
try:
key, value=udevadm_line.split('=',1)
if "id_model" in key.lower():
disk_type=value[:24]
elif "devpath" in key.lower() and "/virtual/" in value:
disk_type="Virtual"
except Exception as e_int:
pass
for storage_opt in validated_config.storage_opts:
if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]:
numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4)
overlay_total_size=numeric_size
if "storage-driver" in docker_daemon_config and docker_daemon_config["storage-driver"] == "overlay2" and "data-root" in docker_daemon_config:
disk_usage_source_path = docker_daemon_config["data-root"]
except Exception as e:
pass
if overlay_total_size==None:
total, used, free = shutil.disk_usage("/")
total, used, free = shutil.disk_usage(disk_usage_source_path)
disk_udevadm = get_disk_udevadm("/")
for udevadm_line in disk_udevadm.split('\n'):
try:

66
lib/latency_test.py Normal file
View File

@ -0,0 +1,66 @@
from lib import ensure_packages_installed
from lib import utils
import asyncio
import socket
import re
MANDATORY_PACKEGES = ['iputils-ping']
def is_valid_ipv4_or_hostname(string):
def is_valid_ipv4(ip):
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
hostname_regex = re.compile(
r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
)
def is_valid_hostname(hostname):
if len(hostname) > 255:
return False
return all(hostname_regex.match(part) for part in hostname.split("."))
return is_valid_ipv4(string) or is_valid_hostname(string)
async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4):
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
if success:
outcome = []
concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)]
for concurent_job in concurent_jobs:
tasks = []
for host in concurent_job:
if is_valid_ipv4_or_hostname(host):
tasks.append(
utils.async_run_command(
f"ping -c {str(count)} -W 2 -i 0.3 {host}",
20 * count,
{"LANG": "C", "LC_ALL": "C"}
)
)
if len(tasks) > 0:
r = await asyncio.gather(*tasks)
try:
for output in r:
results = []
code, stdout, stderr = output
if code == 0:
for line in stdout.split('\n'):
match = re.search(r"time=(\d+\.?\d*) ms", line)
if match:
results.append(float(match.group(1)))
outcome.append(
{
"host": hosts[len(outcome)],
"received": len(results),
"avg_latency": sum(results) / len(results) if len(results)>0 else 0
}
)
except Exception as e:
print(e)
return outcome
else:
return False

View File

@ -1,6 +1,7 @@
from lib import docker_interface
from lib import config as config_module
from lib import logging as logging_lib
from lib import clore_partner
config = config_module.config
log = logging_lib.log
@ -29,7 +30,7 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container
# Start tasks for new containers
for container_name, container in current_containers.items():
if not container_name in do_not_stream_containers:
if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name):
log_container_names.append(container_name)
if container_name not in tasks:
log.debug(f"log_streaming_task() | Starting task for {container_name}")

View File

@ -6,6 +6,7 @@ import ipaddress
import socket
import psutil
import sys
import os
config = config_module.config
log = logging_lib.log
@ -25,12 +26,15 @@ def get_network_interfaces_with_subnet():
except Exception as e:
return str(e)
def exclude_network(excluded_network):
def exclude_network(excluded_network, input_ranges=None):
# Convert exclude_network to ip_network object
excluded_network = ip_network(excluded_network)
if not input_ranges:
input_ranges=config.local_ipv4_ranges
# Remove the excluded network from the local_ranges list
local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network]
local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network]
ranges_outside_exclude = []
for local_range in local_ranges:
@ -93,3 +97,19 @@ def is_ip_in_network(ip: str, network: str) -> bool:
# If there's an error with the input values, print the error and return False
log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}")
return False
def purge_clore_interfaces():
network_interfaces = get_network_interfaces_with_subnet()
if type(network_interfaces) != dict:
log.error("Failed to load network interfaces, restarting...")
os._exit(1)
clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here
for clore_default_network in config.clore_default_networks:
clore_subnets.append(clore_default_network["subnet"])
for network_interface in network_interfaces.keys():
if network_interface == "docker0" or network_interface[:3] == "br-":
subnet = network_interfaces[network_interface]
if subnet in clore_subnets or network_interface == "docker0":
utils.run_command(f"ip link delete {network_interface}")

187
lib/openvpn.py Normal file
View File

@ -0,0 +1,187 @@
from lib import config as config_module
from lib import logging as logging_lib
from lib import utils
import os
import aiofiles.os
config = config_module.config
log = logging_lib.log
CLIENT_CONFIGS_LOCATION = "/etc/openvpn/client"
PARTNER_CONFIG_NAME = "clore_partner.conf"
def generate_openvpn_config(
local_ip='10.1.0.2',
server_ip='10.1.0.1',
server_hostname='example.com',
udp_port=1194,
vpn_secret_key='YOUR_VPN_SECRET_KEY'
):
openvpn_config = f"""nobind
proto udp4
remote {server_hostname} {udp_port}
resolv-retry infinite
auth SHA256
cipher AES-256-CBC
dev {config.openvpn_forwarding_tun_device}
ifconfig {local_ip} {server_ip}
<secret>
-----BEGIN OpenVPN Static key V1-----
{vpn_secret_key}
-----END OpenVPN Static key V1-----
</secret>
fragment 1300
mssfix 1300
sndbuf 524288
rcvbuf 524288
user nobody
group nogroup
ping 15
ping-restart 45
ping-timer-rem
persist-tun
persist-key
verb 0"""
return openvpn_config
async def get_iptables_forward_rules():
code, stdout, stderr = await utils.async_run_command(
f"LC_ALL=C {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -L PREROUTING -n -v --line-numbers"
)
rules = []
if code == 0:
collumns = []
for idx, line in enumerate(stdout.split('\n')):
if "num" in collumns and "target" in collumns and "in" in collumns:
items = line.split(maxsplit=len(collumns)+1)
rule = {}
for idx, name in enumerate(collumns):
rule[name]=items[idx]
rule["desc"] = items[len(collumns)+1]
rules.append(rule)
else:
collumns = line.split()
return rules
async def remove_iptables_rule(rule_dict):
cmd = f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables"
if rule_dict.get('target') == 'DNAT':
cmd += " -t nat"
cmd += " -D PREROUTING"
if rule_dict.get('prot') and rule_dict['prot'] != '--':
cmd += f" -p {rule_dict['prot']}"
if rule_dict.get('in') and rule_dict['in'] != '*':
cmd += f" -i {rule_dict['in']}"
if rule_dict.get('out') and rule_dict['out'] != '*':
cmd += f" -o {rule_dict['out']}"
if rule_dict.get('source') and rule_dict['source'] != '0.0.0.0/0':
cmd += f" -s {rule_dict['source']}"
if rule_dict.get('destination') and rule_dict['destination'] != '0.0.0.0/0':
cmd += f" -d {rule_dict['destination']}"
if rule_dict.get('target') == 'DNAT':
if 'dports' in rule_dict.get('desc', ''):
port_info = rule_dict['desc'].split('dports ')[1].split(' ')[0]
if ':' in port_info:
cmd += f" -m multiport --dports {port_info}"
else:
cmd += f" --dport {port_info}"
if 'to:' in rule_dict.get('desc', ''):
dest_ip = rule_dict['desc'].split('to:')[1].split()[0]
cmd += f" -j DNAT --to-destination {dest_ip}"
await utils.async_run_command(cmd)
async def clore_partner_configure(clore_partner_config):
try:
if clore_partner_config:
docker_restart_required = False
needed_openvpn_config = generate_openvpn_config(
local_ip=clore_partner_config["provider"],
server_ip=clore_partner_config["forwarding"],
server_hostname=clore_partner_config["openvpn_host"],
udp_port=clore_partner_config["openvpn_port"],
vpn_secret_key=clore_partner_config["secret"]
)
saved_config=''
config_exists = await aiofiles.os.path.exists(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME))
if config_exists:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='r') as file:
saved_config = await file.read()
if saved_config != needed_openvpn_config:
async with aiofiles.open(os.path.join(CLIENT_CONFIGS_LOCATION, PARTNER_CONFIG_NAME), mode='w') as file:
await file.write(needed_openvpn_config)
is_active_code, is_active_stdout, is_active_stderr = await utils.async_run_command(
f"systemctl is-active openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
if is_active_code == 0 and saved_config != needed_openvpn_config:
code, stdout, stderr = await utils.async_run_command(
f"systemctl restart openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
elif is_active_code != 0:
code, stdout, stderr = await utils.async_run_command(
f"systemctl start openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
docker_restart_required = False if code != 0 else True
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route show table {str(config.forwarding_ip_route_table_id)}"
)
ip_route_configured = False
if code == 0:
for line in stdout.split('\n'):
items = line.split(' ')
if clore_partner_config["provider"] in items and config.openvpn_forwarding_tun_device in items:
ip_route_configured = True
break
if not ip_route_configured:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}ip route add 0.0.0.0/0 dev {config.openvpn_forwarding_tun_device} src {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id} && ip rule add from {clore_partner_config['provider']} table {config.forwarding_ip_route_table_id}"
)
ip_tables_configured = False
rules = await get_iptables_forward_rules()
for rule in rules:
try:
if rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and f"{clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]}" in rule["desc"] and f"to:{clore_partner_config['provider']}" in rule["desc"]:
ip_tables_configured = True
elif rule["in"] == config.openvpn_forwarding_tun_device and rule["target"].lower()=="dnat" and "to:10." in rule["desc"] and "dports " in rule["desc"]:
print("REMOVE RULE", rule)
await remove_iptables_rule(rule)
except Exception as ei:
log.error(f"clore_partner_configure() | ei | {ei}")
if ip_tables_configured == False:
code, stdout, stderr = await utils.async_run_command(
f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p tcp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']} && {'sudo ' if config.run_iptables_with_sudo else ''}iptables -t nat -A PREROUTING -i {config.openvpn_forwarding_tun_device} -p udp -m multiport --dports {clore_partner_config['ports'][0]}:{clore_partner_config['ports'][1]} -j DNAT --to-destination {clore_partner_config['provider']}"
)
if docker_restart_required:
async with aiofiles.open(config.restart_docker_flag_file, mode='w') as file:
await file.write("")
os._exit(0) # We close clore hosting, because it's mandatory to restart docker after starting the up to date version of VPN, docker will be restarted on next start of clore hosting
else:
code, stdout, stderr = await utils.async_run_command(
f"systemctl stop openvpn-client@{PARTNER_CONFIG_NAME.replace('.conf','')}"
)
return True
except Exception as e:
log.error(f"FAIL | openvpn.clore_partner_configure | {e}")
return False

View File

@ -1,10 +1,13 @@
from typing import Optional, Tuple, Dict
from lib import config as config_module
from lib import logging as logging_lib
from lib import nvml
import subprocess
import hashlib
import asyncio
import random
import string
import shutil
import shlex
import time
import math
@ -145,6 +148,63 @@ def get_extra_allowed_images():
else:
return []
async def async_run_command(
command: str,
timeout: Optional[float] = None,
env: Optional[Dict[str, str]] = None
) -> Tuple[int, str, str]:
command_env = env if env is not None else {}
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=command_env
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
stdout_str = stdout.decode('utf-8').strip() if stdout else ''
stderr_str = stderr.decode('utf-8').strip() if stderr else ''
return proc.returncode, stdout_str, stderr_str
except asyncio.TimeoutError:
# Handle timeout: terminate the process gracefully first
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5) # Wait for it to exit
except asyncio.TimeoutError:
# Force kill the process if it doesn't terminate
proc.kill()
await proc.wait()
return -1, '', f'Command timed out after {timeout} seconds'
except Exception as e:
return -1, '', str(e)
def get_free_space_mb(path):
"""Get free space in MB for the given path."""
total, used, free = shutil.disk_usage(path)
return free // (1024 * 1024) # Convert bytes to MB
def get_directory_size_mb(path):
"""Get the size of a directory in MB."""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
# Skip if the file doesn't exist (symlinks, etc.)
if not os.path.islink(fp) and os.path.exists(fp):
total_size += os.path.getsize(fp)
return total_size // (1024 * 1024) # Convert bytes to MB
class shm_calculator:
def __init__(self, total_ram):
self.total_ram = total_ram

201
lib/xfs.py Normal file
View File

@ -0,0 +1,201 @@
# Library to setup XFS partition for docker
from lib import ensure_packages_installed
from lib import logging as logging_lib
from lib import docker_interface
from lib import networking
from lib import utils
import asyncio
import json
import os
log = logging_lib.log
DOCKER_ROOT = "/var/lib/docker"
DOCKER_DATA_IMG = "/opt/clore-hosting/data.img"
LEAVE_FREE_SPACE_MB = 1024*24 # 24 GB
MIN_XFS_PARTITION_SIZE = 1024*24 # 24 GB
XFS_STATE_FILE = "/opt/clore-hosting/xfs_state"
MANDATORY_PACKAGES = [
"xfsprogs",
"dmidecode",
"openvpn",
"iproute2",
"iputils-ping",
"util-linux"
]
# This code is runned on start of clore hosting to migrate docker to XFS partition system
# sudo fallocate -l 300G /docker-storage.img
# sudo mkfs.xfs /docker-storage.img
# mount -o loop,pquota /docker-storage.img /mnt/docker-storage
def migrate():
docker_xfs_state = validate_docker_xfs()
#print(docker_xfs_state)
if docker_xfs_state == "skip":
return
elif docker_xfs_state == "valid":
return 'success'
packages_available = asyncio.run(ensure_packages_installed.ensure_packages_installed(
MANDATORY_PACKAGES
))
if not packages_available:
return 'packages-missing'
log.info("Starting migration to xfs")
docker_interface.stop_all_containers()
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
print(f"Error while trying to remove {DOCKER_DATA_IMG}: {e}")
return "failure"
max_free_space = utils.get_free_space_mb('/') + utils.get_directory_size_mb(DOCKER_ROOT)
data_img_size = int(max_free_space - LEAVE_FREE_SPACE_MB)
if data_img_size < MIN_XFS_PARTITION_SIZE:
return 'not-enough-space'
docker_config_success = False
fstab_config_success = False
code, stdout, stderr = utils.run_command(
f"systemctl stop docker && rm -rf {DOCKER_ROOT} && fallocate -l {str(data_img_size)}M {DOCKER_DATA_IMG} && mkfs.xfs {DOCKER_DATA_IMG}"
)
networking.purge_clore_interfaces()
if code == 0:
docker_config_success = configure_docker_daemon()
if code == 0 and docker_config_success:
fstab_config_success = configure_fstab()
if code == 0 and fstab_config_success:
code, stdout, stderr = utils.run_command(
f"mkdir {DOCKER_ROOT} && systemctl daemon-reload && mount -a"
)
if code==0:
return 'success'
else:
configure_docker_daemon(remove=True)
configure_fstab(remove=True)
return 'failure'
else:
utils.run_command(
f"mkdir {DOCKER_ROOT}"
)
if os.path.exists(DOCKER_DATA_IMG):
try:
os.remove(DOCKER_DATA_IMG)
except Exception as e:
pass
return 'failure'
def validate_docker_xfs():
code_root, stdout_root, stderr_root = utils.run_command("df -T /")
code, stdout, stderr = utils.run_command(f"df -T {DOCKER_ROOT}")
#print([code, stderr, stdout])
if code_root == 0 and stderr_root == '' and ((code == 0 and stderr == '') or (code == 1 and f" {DOCKER_ROOT}: " in stderr and stdout == '')):
root_blocks = None
docker_root_blocks = None
docker_root_format = ''
for idx, line in enumerate(stdout_root.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
root_blocks = int(line.split()[2])
if code == 1:
docker_root_blocks = root_blocks
else:
for idx, line in enumerate(stdout.split('\n')):
if idx == 1 and len(line.split()) >= 7 and line.split()[2].isnumeric():
docker_root_blocks = int(line.split()[2])
docker_root_format = line.split()[1]
if root_blocks == None or docker_root_blocks == None:
return "skip"
elif docker_root_format=="xfs" and root_blocks > docker_root_blocks:
return "valid"
else:
return "default"
else:
return "skip"
def configure_docker_daemon(remove=False):
try:
daemon_json_path = "/etc/docker/daemon.json"
with open(daemon_json_path, 'r') as file:
raw_content = file.read()
daemon_config = json.loads(raw_content)
if remove:
if daemon_config.get("data-root") == DOCKER_ROOT:
del daemon_config["data-root"]
if daemon_config.get("storage-driver") == "overlay2":
del daemon_config["storage-driver"]
elif daemon_config.get("data-root") != DOCKER_ROOT or daemon_config.get("storage-driver") != "overlay2":
daemon_config["data-root"] = DOCKER_ROOT
daemon_config["storage-driver"] = "overlay2"
with open(daemon_json_path, 'w') as file:
file.write(json.dumps(daemon_config,indent=4))
return True
except Exception as e:
return False
def configure_fstab(remove=False):
try:
file_path = "/etc/fstab"
mount_line = f"{DOCKER_DATA_IMG} {DOCKER_ROOT} xfs loop,pquota 0 0"
with open(file_path, 'r') as file:
raw_content = file.read()
if remove:
if mount_line in raw_content:
raw_content = raw_content.replace(f"\n{mount_line}\n", '')
with open(file_path, 'w') as file:
file.write(raw_content)
elif not mount_line in raw_content:
raw_content += f"\n{mount_line}\n"
with open(file_path, 'w') as file:
file.write(raw_content)
return True
except Exception as e:
return False
def init():
try:
if os.path.exists(XFS_STATE_FILE):
with open(XFS_STATE_FILE, 'r') as file:
raw_content = file.read()
if "enabled" in raw_content:
migarion_status = migrate()
if migarion_status == "success":
with open(XFS_STATE_FILE, 'w') as file:
file.write("active")
return "active"
elif migarion_status == "not-enough-space":
with open(XFS_STATE_FILE, 'w') as file:
file.write("not-enough-space")
return 'not-enough-space'
else:
with open(XFS_STATE_FILE, 'w') as file:
file.write("failed-migration")
return 'failed'
elif 'not-enough-space' in raw_content:
return 'not-enough-space'
elif "active" in raw_content:
return "active"
else:
return "failed"
else:
return "disabled"
except Exception as e:
print(e)
pass