238 lines
9.3 KiB
Python
238 lines
9.3 KiB
Python
|
from lib import ensure_packages_installed
|
||
|
from lib import config as config_module
|
||
|
from lib import logging as logging_lib
|
||
|
from lib import clore_partner_socket
|
||
|
from lib import latency_test
|
||
|
from lib import openvpn
|
||
|
from lib import utils
|
||
|
import asyncio
|
||
|
import random
|
||
|
import json
|
||
|
import time
|
||
|
import re
|
||
|
|
||
|
import os
|
||
|
import aiofiles.os
|
||
|
from aiohttp import ClientSession, ClientTimeout
|
||
|
|
||
|
config = config_module.config
|
||
|
log = logging_lib.log
|
||
|
|
||
|
MANDATORY_PACKEGES = ['dmidecode', 'openvpn', 'iproute2']
|
||
|
|
||
|
DUMMY_WORKLOAD_CONTAINER = "cloreai/partner-dummy-workload"
|
||
|
|
||
|
host_facts_location = os.path.join(config.clore_partner_base_dir, "host_facts")
|
||
|
partner_cache_location = os.path.join(config.clore_partner_base_dir, "partner_cache")
|
||
|
|
||
|
next_ensupe_packages_check = 0
|
||
|
is_socket_running = False
|
||
|
|
||
|
partner_container_config = None
|
||
|
|
||
|
async def initialize():
|
||
|
global next_ensupe_packages_check
|
||
|
global is_socket_running
|
||
|
try:
|
||
|
await aiofiles.os.makedirs(host_facts_location, exist_ok=True)
|
||
|
await aiofiles.os.makedirs(partner_cache_location, exist_ok=True)
|
||
|
await aiofiles.os.makedirs("/etc/openvpn/client", exist_ok=True)
|
||
|
if not is_socket_running:
|
||
|
is_socket_running=True
|
||
|
asyncio.create_task(clore_partner_socket.socket_service(
|
||
|
location=os.path.join(host_facts_location, "partner_interface.socket")
|
||
|
))
|
||
|
if next_ensupe_packages_check < time.time():
|
||
|
success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None)
|
||
|
next_ensupe_packages_check = float('inf') if success else time.time() + 60*60 # if did not succeeed -> retry in 1hr
|
||
|
if not success:
|
||
|
return False
|
||
|
elif next_ensupe_packages_check != float('inf'):
|
||
|
return False
|
||
|
|
||
|
code, stdout, stderr = await utils.async_run_command(
|
||
|
"dmidecode -t 2",
|
||
|
20
|
||
|
)
|
||
|
if code == 0 and not stderr:
|
||
|
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode_t2.txt"), mode='w') as file:
|
||
|
await file.write(stdout)
|
||
|
else:
|
||
|
return False
|
||
|
code, stdout, stderr = await utils.async_run_command(
|
||
|
"dmidecode",
|
||
|
20
|
||
|
)
|
||
|
if code == 0 and not stderr:
|
||
|
async with aiofiles.open(os.path.join(host_facts_location, "dmidecode.txt"), mode='w') as file:
|
||
|
await file.write(stdout)
|
||
|
else:
|
||
|
return False
|
||
|
return True
|
||
|
except Exception as e:
|
||
|
log.error(f"FAIL | clore_partner.initialize | {e}")
|
||
|
return False
|
||
|
|
||
|
async def get_partner_allowed_images():
|
||
|
try:
|
||
|
file_exists = await aiofiles.os.path.exists(os.path.join(partner_cache_location, "container_list.json"))
|
||
|
if not file_exists:
|
||
|
return []
|
||
|
images = []
|
||
|
async with aiofiles.open(os.path.join(partner_cache_location, "container_list.json"), mode='r') as file:
|
||
|
content = await file.read()
|
||
|
containers = json.loads(content)
|
||
|
for container in containers:
|
||
|
image = container.get("Config", {}).get("Image", None)
|
||
|
if image and not image in images:
|
||
|
images.append(image)
|
||
|
return images
|
||
|
except Exception as e:
|
||
|
return None
|
||
|
|
||
|
def validate_partner_container_name(name):
|
||
|
if type(name) != str:
|
||
|
return False
|
||
|
elif name==config.clore_partner_container_name:
|
||
|
return True
|
||
|
pattern = r"^C\.\d+$"
|
||
|
return bool(re.match(pattern, name))
|
||
|
|
||
|
def validate_partner_workload_container_name(name):
|
||
|
if type(name) != str:
|
||
|
return False
|
||
|
pattern = r"^C\.\d+$"
|
||
|
return bool(re.match(pattern, name))
|
||
|
|
||
|
last_openvpn_config = None
|
||
|
|
||
|
def get_partner_container_config():
|
||
|
global partner_container_config
|
||
|
return partner_container_config
|
||
|
|
||
|
async def configure(partner_config):
|
||
|
global last_openvpn_config
|
||
|
global partner_container_config
|
||
|
if last_openvpn_config != partner_config:
|
||
|
partner_container_config = {
|
||
|
"image": partner_config["partner_image"],
|
||
|
"name": config.clore_partner_container_name,
|
||
|
"hostname": f"{partner_config['partner_id'][:4]}-m{partner_config['machine_id']}",
|
||
|
"env": {
|
||
|
"AUTH": partner_config['partner_id'],
|
||
|
"ip_addr": partner_config['openvpn_host'],
|
||
|
"port_range": f'{partner_config['ports'][0]}-{partner_config['ports'][1]}'
|
||
|
},
|
||
|
"volumes": {
|
||
|
f"{host_facts_location}": {"bind": "/var/lib/vastai_kaalia/specs_source"},
|
||
|
f"{partner_cache_location}": {"bind": "/var/lib/vastai_kaalia/data"},
|
||
|
f"/var/lib/docker": {"bind": "/var/lib/docker"},
|
||
|
f"/var/run/docker.sock": {"bind": "/var/run/docker.sock"}
|
||
|
},
|
||
|
"gpus": True,
|
||
|
"command": '',
|
||
|
"network": "clore-partner-br0",
|
||
|
"ip": "172.19.0.254",
|
||
|
"cap_add": ["SYS_ADMIN"],
|
||
|
"devices": ["/dev/fuse"],
|
||
|
#"security_opt": ["apparmor:unconfined"],
|
||
|
"ports": [f"{partner_config['ports'][1]}:{partner_config['ports'][1]}"],
|
||
|
}
|
||
|
r = await openvpn.clore_partner_configure(partner_config)
|
||
|
if r:
|
||
|
last_openvpn_config = partner_config
|
||
|
|
||
|
# -----------------------------------------
|
||
|
|
||
|
next_latency_measurment = 0
|
||
|
|
||
|
async def fetch_forwarding_nodes():
|
||
|
url = "https://api.clore.ai/v1/get_relay"
|
||
|
timeout = ClientTimeout(total=30)
|
||
|
|
||
|
async with ClientSession(timeout=timeout) as session:
|
||
|
try:
|
||
|
async with session.get(url) as response:
|
||
|
response.raise_for_status()
|
||
|
data = await response.json()
|
||
|
return data
|
||
|
except Exception as e:
|
||
|
print(f"An error occurred: {e}")
|
||
|
return None
|
||
|
|
||
|
async def set_next_latency_measurment(ts):
|
||
|
global next_latency_measurment
|
||
|
try:
|
||
|
next_latency_measurment=ts
|
||
|
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='w') as file:
|
||
|
await file.write(str(ts))
|
||
|
except Exception as e:
|
||
|
pass
|
||
|
|
||
|
async def measure_forwarding_latency():
|
||
|
global next_latency_measurment
|
||
|
|
||
|
if next_latency_measurment > time.time():
|
||
|
return False
|
||
|
try:
|
||
|
await aiofiles.os.makedirs(config.clore_partner_base_dir, exist_ok=True)
|
||
|
file_exists = await aiofiles.os.path.exists(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"))
|
||
|
if file_exists:
|
||
|
async with aiofiles.open(os.path.join(config.clore_partner_base_dir, ".next_latency_measurment"), mode='r') as file:
|
||
|
content = await file.read()
|
||
|
if content.isdigit():
|
||
|
next_latency_measurment = int(content)
|
||
|
if next_latency_measurment < time.time():
|
||
|
node_info = await fetch_forwarding_nodes()
|
||
|
if type(node_info) == dict and node_info.get("country") and type(node_info.get("nodes")) == dict and node_info.get("code") == 0:
|
||
|
to_test_nodes = []
|
||
|
ip_to_region = {}
|
||
|
|
||
|
valid_regions = []
|
||
|
|
||
|
for node_region in node_info.get("nodes").keys():
|
||
|
nodes_ip_list = node_info.get("nodes")[node_region]
|
||
|
if type(nodes_ip_list) == list and len(nodes_ip_list) > 0:
|
||
|
to_test_nodes = to_test_nodes + nodes_ip_list
|
||
|
for node_ip in nodes_ip_list:
|
||
|
ip_to_region[node_ip]=node_region
|
||
|
|
||
|
if len(to_test_nodes) > 0:
|
||
|
measurment_result = await latency_test.measure_latency_icmp(to_test_nodes)
|
||
|
if measurment_result:
|
||
|
for idx, res in enumerate(measurment_result):
|
||
|
if res["received"] > 2 and not ip_to_region.get(res["host"]) in valid_regions:
|
||
|
valid_regions.append(ip_to_region.get(res["host"]))
|
||
|
measurment_result[idx]["region"] = ip_to_region.get(res["host"])
|
||
|
if len(valid_regions) == len(ip_to_region.keys()):
|
||
|
await set_next_latency_measurment(int(
|
||
|
time.time() + 60*60*24*30 # Re run in 30 days, because measurment succeeded
|
||
|
))
|
||
|
return measurment_result
|
||
|
else:
|
||
|
await set_next_latency_measurment(int(
|
||
|
time.time() + 60*60*24 # Retry in 24hr, all regions in country should be reacheable
|
||
|
))
|
||
|
else:
|
||
|
await set_next_latency_measurment(int(
|
||
|
time.time() + 60*60*72 # Retry in 72hr (clore partner service is not available in host country yet)
|
||
|
))
|
||
|
return False
|
||
|
else:
|
||
|
await set_next_latency_measurment(int(
|
||
|
time.time() + 60*60*12 # Retry in 12hr, the response was not matching the required format
|
||
|
))
|
||
|
return False
|
||
|
return False
|
||
|
except Exception as e:
|
||
|
return False
|
||
|
|
||
|
def filter_partner_dummy_workload_container(containers):
|
||
|
try:
|
||
|
remaining_containers = []
|
||
|
for container in containers:
|
||
|
if container["image"] != DUMMY_WORKLOAD_CONTAINER:
|
||
|
remaining_containers.append(container)
|
||
|
return remaining_containers
|
||
|
except Exception as e:
|
||
|
return containers
|