commit dcd48b9d0967faacd1add9ce00d5990bef2444e6 Author: clore Date: Thu Mar 21 01:28:02 2024 +0000 Initial version diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2fda43b --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +auth_local +auth +startup_scripts/ +entrypoints/ +tests/ +clore_hosting/__pycache__/ +lib/__pycache__/ +docker-tests.ipynb +wireguard/ +log-mon.ipynb +t.py +tests.ipynb +network-tests.ipynb \ No newline at end of file diff --git a/clore_hosting/api_interface.py b/clore_hosting/api_interface.py new file mode 100644 index 0000000..3d5f518 --- /dev/null +++ b/clore_hosting/api_interface.py @@ -0,0 +1,32 @@ +import aiohttp +import asyncio +from clore_hosting import types + +async def fetch_url(url, timeout=10): + client_timeout = aiohttp.ClientTimeout(total=timeout) + try: + async with aiohttp.ClientSession(timeout=client_timeout) as session: + async with session.get(url) as response: + response.raise_for_status() + return await response.json() + except aiohttp.ClientError as e: + return f"Client error occurred: {e}" + except asyncio.TimeoutError: + return "The request timed out." + except Exception as e: + return f"An error occurred: {e}" + +async def get_server_config(): + direct_res = await fetch_url('https://api.clore.ai/server-config.json') + if type(direct_res)==dict and "ws_peers" in direct_res and "allowed_images" in direct_res: + return types.ServerConfig(**{ + "success": True, + "ws_peers": direct_res["ws_peers"], + "allowed_images": direct_res["allowed_images"] + }) + else: + return types.ServerConfig(**{ + "success": False, + "ws_peers": [], + "allowed_images": [] + }) \ No newline at end of file diff --git a/clore_hosting/docker_configurator.py b/clore_hosting/docker_configurator.py new file mode 100644 index 0000000..2d33e37 --- /dev/null +++ b/clore_hosting/docker_configurator.py @@ -0,0 +1,119 @@ +from lib import config as config_module +from lib import docker_interface +from lib import custom_entrypoint +from lib import networking +from lib import wireguard +from lib import logging as logging_lib +import shutil +import os + +log = logging_lib.log +config = config_module.config + +default_network_names=[] +for default_network in config.clore_default_networks: + if "name" in default_network: + default_network_names.append(default_network["name"]) + + + +def configure(containers): + valid_containers = [] + newly_created_networks = [] + containers_required_networks = [] + docker_networks = docker_interface.get_docker_networks() + docker_containers = docker_interface.get_containers(all=True) + + current_startup_files = os.listdir(config.startup_scripts_folder) + current_wireguard_configs = os.listdir(config.wireguard_config_folder) + used_startup_files=[] + used_wireguard_configs=[] + startup_sctipt_creation_fail = False + + if type(containers) == list: + custom_entrypoint_state = custom_entrypoint.cache_entrypoints(containers) + + if type(custom_entrypoint_state)!=list: + return False, valid_containers + + for index, container in enumerate(containers): + ok_custom_entrypoint = False + if index < len(custom_entrypoint_state): + ok_custom_entrypoint = custom_entrypoint_state[index] + startup_script_name = f"{container['name']}.sh" + if "wireguard" in container and "name" in container: + wireguard.generate_config(container) + used_wireguard_configs.append(container["name"]) + + if "command" in container and container["command"]!='' and not startup_script_name in current_startup_files: + try: + with open(os.path.join(config.startup_scripts_folder, startup_script_name), 'w') as file: + file.write(container["command"]) + except Exception as e: + startup_sctipt_creation_fail=True + elif "command" in container and container["command"]!='' and startup_script_name in current_startup_files: + used_startup_files.append(startup_script_name) + used_startup_files.append(f"{container['name']}.finished") + + if "network" in container and "network_subnet" in container and "network_gateway" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: + if not container["network"] in containers_required_networks: + containers_required_networks.append(container["network"]) + if not container["network"] in default_network_names: + is_network_created=False + any_fail = False + for docker_network in docker_networks: + if docker_network["Name"]==container["network"]: + is_network_created=True + break + if (not is_network_created) and container["network"] not in newly_created_networks: + creation_success = docker_interface.create_docker_network(container["network"], container["network_subnet"], container["network_gateway"]) + if creation_success: + newly_created_networks.append(container["network"]) + else: + any_fail=True + if not any_fail and ok_custom_entrypoint: + valid_containers.append(container) + elif "network" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: # Subnet & gateway not defined, must be some of default networks, otherwise dump it + if container["network"] in default_network_names: + for docker_network in docker_networks: + if docker_network["Name"]==container["network"]: + for ipam in docker_network["IPAM"]: + if not ok_custom_entrypoint: + break + elif not "ip" in container: + valid_containers.append(container) + break + elif networking.is_ip_in_network(container["ip"], ipam["Subnet"]): + valid_containers.append(container) + break + + for docker_network in docker_networks: + if not docker_network["Name"] in containers_required_networks and not docker_network["Name"] in default_network_names: + if docker_network["Name"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: + docker_interface.remove_docker_network(docker_network["Name"]) + + for existing_wireguard_config in current_wireguard_configs: + if not existing_wireguard_config in used_wireguard_configs: + try: + directory_path = os.path.join(config.wireguard_config_folder, existing_wireguard_config) + shutil.rmtree(directory_path) + log.debug(f"DOCKER CONFIGURATOR | WIREGUARD CLEANUP | The directory {directory_path} has been removed successfully.") + except Exception as e: + log.error(f"DOCKER CONFIGURATOR | WIREGUARD CLEANUP | Error: {e}") + + for remaining_file in current_startup_files: + if not remaining_file in used_startup_files: + try: + if str(remaining_file).endswith(".sh") or str(remaining_file).endswith(".finished"): + log.debug(f"REMOVIN {os.path.join(config.startup_scripts_folder, str(remaining_file))}") + os.remove(os.path.join(config.startup_scripts_folder, str(remaining_file))) + except Exception as e: + pass + + if config.log_containers_strings: + print("FROM DOCKER CONFIGURATOR", valid_containers) + + validation_and_security = docker_interface.validate_and_secure_networks() + if startup_sctipt_creation_fail: + validation_and_security=False + return validation_and_security, valid_containers \ No newline at end of file diff --git a/clore_hosting/main.py b/clore_hosting/main.py new file mode 100644 index 0000000..e581ff1 --- /dev/null +++ b/clore_hosting/main.py @@ -0,0 +1,397 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import log_streaming_task +from lib import run_startup_script +from lib import docker_interface +from lib import docker_deploy +from lib import docker_pull +from lib import get_specs +from lib import utils +log = logging_lib.log + +from clore_hosting import docker_configurator +from clore_hosting import api_interface +from clore_hosting import ws_interface +from clore_hosting import types + +from queue import Queue +import concurrent.futures +import threading +import asyncio +import time +import json +from aiofiles import os as async_os +import os + +specs = get_specs.Specs() + +container_log_broken = asyncio.Queue() + +config = config_module.config +WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_broken) + +#print(config) + +async def configure_networks(containers): + res = await asyncio.to_thread(docker_configurator.configure, containers) + try: + fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1]) + return fin_res + except Exception as e: + return False + +async def deploy_containers(validated_containers): + try: + all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers) + return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) + except Exception as e: + return False + +async def get_local_images(): + res = await asyncio.to_thread(docker_interface.get_local_images) + return res + +class CloreClient: + def __init__(self, auth_key): + self.auth_key=auth_key + self.ws_peers = {} + self.last_checked_ws_peers=0 + self.containers={} + self.needed_images=[] + self.containers_set=False + + self.allowed_images = None + + self.p_needed_containers=[] + self.last_pull_progress={} + + self.validated_containers_set=False + self.validated_containers=[] + + self.all_running_container_names=[] + self.all_stopped_container_names=[] + + self.last_hw_specs_submit = time.time()-(1800-60) + + if config.debug_ws_peer: + self.ws_peers[str(config.debug_ws_peer)]={ + "expiration":"immune" + } + + async def service(self): + global container_log_broken + + pull_list = asyncio.Queue() + pull_logs = asyncio.Queue() + + task1 = asyncio.create_task(self.main(pull_list, pull_logs)) + task2 = asyncio.create_task(self.handle_container_cache(pull_list, pull_logs)) + task3 = asyncio.create_task(self.startup_script_runner()) + task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken)) + task5 = asyncio.create_task(self.container_log_streaming_service()) + task6 = asyncio.create_task(self.specs_service()) + + # Wait for both tasks to complete (they won't in this case) + await asyncio.gather(task1, task2, task3, task4, task5, task6) + + async def container_log_streaming_service(self): + while True: + try: + await WebSocketClient.stream_container_logs() + except Exception as e: + log.debug(f"container_log_streaming_service() | ERROR | {e}") + await asyncio.sleep(0.6) + async def run_startup_scripts(self, startup_script_full_path, container_name): + try: + log.success(f"Runnin' {startup_script_full_path}") + log.error(self.all_running_container_names) + await asyncio.to_thread(run_startup_script.run, container_name, startup_script_full_path, f"/init-{container_name}.sh") + return True + except Exception as e: + return False + + async def startup_script_runner(self): + + startup_script_ongoing_tasks = {} + + while True: + try: + startup_script_files = await async_os.listdir(config.startup_scripts_folder) + for startup_script_file in startup_script_files: + if type(startup_script_file)==str and startup_script_file.endswith(".sh") and startup_script_file[:-3] in self.all_running_container_names: + if not f"{startup_script_file[:-3]}.finished" in startup_script_files: + full_startup_script_path = os.path.join(config.startup_scripts_folder, startup_script_file) + if os.path.isfile(full_startup_script_path) and full_startup_script_path not in startup_script_ongoing_tasks: + # Start processing the file immediately in a non-blocking way + startup_script_task = asyncio.create_task(self.run_startup_scripts(full_startup_script_path, startup_script_file[:-3])) + startup_script_ongoing_tasks[full_startup_script_path] = startup_script_task + # Attach a callback to clean up the task once it's done + startup_script_task.add_done_callback(lambda t, path=full_startup_script_path: startup_script_ongoing_tasks.pop(path, None)) + + # Remove completed tasks + completed_tasks = [path for path, task in startup_script_ongoing_tasks.items() if task.done()] + for path in completed_tasks: + startup_script_ongoing_tasks.pop(path, None) + except Exception as e: + log.debug(f"ERROR | startup_script_runner() | {e}") + await asyncio.sleep(2) + + async def pull_log_progress(self, log_dict, image_name): + while True: + tmp_progress='' + for layer, status in log_dict.items(): + first_char = '' if tmp_progress=='' else '\n' + tmp_progress+=f"{first_char}{layer}: {status}" + self.last_pull_progress[image_name]={"log":tmp_progress, "last_update":time.time()} + await asyncio.sleep(config.pull_log_streaming_interval/1000) + + async def check_if_pulling_required(self, pulling_image, cancellation_event): + while True: + try: + matched_image = False + for needed_image in self.needed_images: + if "image" in needed_image and needed_image["image"]==pulling_image: + matched_image=True + break + + if not matched_image: + cancellation_event.set() + except Exception as e: + pass + await asyncio.sleep(0.5) + + async def handle_container_cache(self, pull_list, pull_logs): + while True: + got_data = [] + while not pull_list.empty(): + got_data.append(await pull_list.get()) + if len(got_data)>0: + self.p_needed_containers=got_data[len(got_data)-1] + + if len(self.p_needed_containers)>0: + local_images = await get_local_images() + for local_image in local_images: + self.last_pull_progress[local_image]={"log":"Pull complete", "last_update":time.time()} + image_needed = False + removed_cnt = 0 + try: + for p_needed_container in self.p_needed_containers: + if "image" in p_needed_container and local_image.replace(':latest','')==p_needed_container["image"].replace(':latest',''): + image_needed=True + break + if type(self.allowed_images)==list: + if image_needed==False: + after_split = local_image.split(':', 1) + local_image_name = after_split[0] + local_image_tag = '' + if len(after_split)>1: + local_image_tag=after_split[1] + + for allowed_image in self.allowed_images: + if "repository" in allowed_image and "allowed_tags" in allowed_image and allowed_image["repository"]==local_image_name: + for allowed_tag in allowed_image["allowed_tags"]: + if local_image_tag==allowed_tag or allowed_tag=='*': + image_needed=True + break + if not image_needed and removed_cnt < config.max_remove_images_per_run and config.delete_unused_containers: + with concurrent.futures.ThreadPoolExecutor() as pool: + r = await asyncio.get_running_loop().run_in_executor(pool, docker_interface.remove_docker_image, local_image) + if r: + removed_cnt+=1 + log.success(f"{local_image} | {image_needed}") + + except Exception as e: + image_needed=True + log.debug(f"ERROR | image_needed | {e}") + + for lpp_image in self.last_pull_progress.keys(): + log_info = self.last_pull_progress[lpp_image] + if log_info["last_update"] < time.time()-300: + del self.last_pull_progress[lpp_image] + break + most_recent_wanted_state = self.p_needed_containers + for wanted_image in most_recent_wanted_state: + if not wanted_image["image"] in local_images: + log.debug(f"Starting to pull \"{wanted_image}\"") + + auth_config = {} + if "dockerhub_token" in wanted_image and "dockerhub_user" in wanted_image: + auth_config={ + "username": wanted_image["dockerhub_user"], + "password": wanted_image["dockerhub_token"] + } + + log_dict = {} + pull_cancellation_event = asyncio.Event() + loop = asyncio.get_running_loop() + + # Run the image pull, log progress concurrently and cancel if not needed anymore + pull_task = asyncio.create_task(docker_pull.pull_image(wanted_image["image"], auth_config, log_dict, loop, pull_cancellation_event)) + log_task = asyncio.create_task(self.pull_log_progress(log_dict, wanted_image["image"])) + check_if_pulling_required_task = asyncio.create_task(self.check_if_pulling_required(wanted_image["image"], pull_cancellation_event)) + + # Wait for the image pull to complete, then cancel the log progress task + try: + await pull_task + except Exception as e: + self.last_pull_progress[local_image]={f"log":"Can't pull image \"{local_image}\"", "last_update":time.time()} + log_task.cancel() + try: + await log_task + except asyncio.CancelledError: + # Expect the task to be cancelled, so pass here + pass + check_if_pulling_required_task.cancel() + try: + await check_if_pulling_required_task + except asyncio.CancelledError: + # Expect the task to be cancelled, so pass here + pass + await asyncio.sleep(1) + + async def main(self, pull_list, pull_logs): + step=0 + while True: + print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') + + step+=1 + + tasks = [] + + container_conf = WebSocketClient.get_containers() + + if container_conf[0]: + self.containers_set=True + self.containers=container_conf[1] + tmp_images = [] + for container in self.containers: + if "image" in container: + log_pull = False + if "name" in container: + if "-order-" in container["name"]: + log_pull=True + image_config = { + "image":container["image"], + "log":log_pull + } + if "ip" in container and "| docker login -u " in container["ip"] and container["ip"][:8]=="; echo '": + try: + dockerhub_token = container["ip"][8:].split("'")[0] + dockerhub_user = container["ip"].split('docker login -u ')[1].split(';')[0][:-17] + image_config["dockerhub_token"]=dockerhub_token + image_config["dockerhub_user"]=dockerhub_user + except Exception as e: + log.error(e) + pass + + if not image_config in tmp_images: + tmp_images.append(image_config) + if tmp_images!=self.needed_images: + self.needed_images=tmp_images + await pull_list.put(self.needed_images) + #self.containers.append({'name': 'clore-test', 'image': 'cloreai/monitoring:0.2', 'command': '', 'env': {'TOKEN': '22'}, 'gpus': True, 'network': 'clore-br69', 'ip': '172.22.0.23', 'network_subnet':'172.22.0.0/24', 'network_gateway':'172.22.0.1'}) + + if (self.last_checked_ws_peers < (utils.unix_timestamp()-config.ws_peers_recheck_interval)): + tasks.append(api_interface.get_server_config()) + + if self.containers_set: + tasks.append(configure_networks(self.containers)) + tasks.append(WebSocketClient.stream_pull_logs()) + + if self.validated_containers_set: + tasks.append(deploy_containers(self.validated_containers)) + + if step==1: + WebSocketClient.set_auth(self.auth_key) + asyncio.create_task(WebSocketClient.run()) + elif step%5 == 0 and WebSocketClient.get_last_heartbeat() < (utils.unix_timestamp()-config.max_ws_peer_heartbeat_interval): + log.error(f"CLORE HOSTING | Didn't received heartbeat from clore.ai for over {config.max_ws_peer_heartbeat_interval} seconds") + log.error("CLORE HOSTING | exiting ...") + os._exit(1) + + self.expire_ws_peers() + WebSocketClient.set_ws_peers(self.ws_peers) + WebSocketClient.set_pull_logs(self.last_pull_progress) + + if len(tasks)>0: + results = await asyncio.gather(*tasks) + + # Process the results (optional) + for result in results: + if type(result)==types.ServerConfig: + if result.success: + self.last_checked_ws_peers = utils.unix_timestamp() + self.allowed_images=result.allowed_images + if not config.debug_ws_peer: + for pure_ws_peer in result.ws_peers: + self.ws_peers[pure_ws_peer]={ + "expiration":utils.unix_timestamp()+900 + } + elif self.allowed_images==None: + log.error("Can't contact clore.ai, restarting") + os._exit(1) + elif type(result)==types.DockerConfiguratorRes: + if result.validation_and_security: + self.validated_containers_set=True + self.validated_containers = result.valid_containers + elif type(result)==types.DeployContainersRes: + try: + self.all_running_container_names = result.all_running_container_names + self.all_stopped_container_names = result.all_stopped_container_names + except Exception as e: + pass + await asyncio.sleep(1) + + async def submit_specs(self, current_specs): + try: + if type(current_specs) == dict: + current_specs["backend_version"]=7 + current_specs["update_hw"]=True + smallest_pcie_width = 999 + for gpu in current_specs["gpus"]["nvidia"]: + if gpu["pcie_width"] config.max_container_log_size: + return string[-config.max_container_log_size:] + else: + return string + +class WebSocketClient: + def __init__(self, log_message_broker, auth=None): + self.ws_peers = [] + self.connection = None + self.connected = False + self.authorized = False + self.auth = auth + self.log_auth_fail = True + self.last_heartbeat = clore_utils.unix_timestamp() + self.containers={} + self.containers_set=False + + self.pull_logs={} + self.pull_logs_last_fingerprints={} + + self.to_stream={} + + self.log_message_broker=log_message_broker + self.allowed_log_container_names = [] + self.current_container_logs = {} + + self.last_bash_rnd = '' + + def get_last_heartbeat(self): + return self.last_heartbeat + + def get_containers(self): + return self.containers_set, self.containers + + def set_ws_peers(self, ws_peers): + tmp_ws_peers=[] + for ws_peer in list(ws_peers.keys()): + if clore_utils.is_valid_websocket_url(ws_peer): + tmp_ws_peers.append(ws_peer) + + self.ws_peers = tmp_ws_peers + + def set_auth(self, auth): + self.auth=auth + + def set_pull_logs(self, pull_logs): + self.pull_logs=pull_logs + + async def close_websocket(self, timeout=5): + try: + await asyncio.wait_for(self.connection.close(), timeout) + except asyncio.TimeoutError: + log.debug("close_websocket() | Closing timed out. Forcing close.") + try: + await self.connection.ensure_open() # Force close + except Exception as e: + pass + + async def connect(self): + if len(self.ws_peers)>0 and self.auth: + random_ws_peer = random.choice(self.ws_peers) + try: + self.connection = await websockets.connect(random_ws_peer) + self.connected = True + log.debug(f"CLOREWS | Connected to {random_ws_peer} ✅") + await self.send(json.dumps({ + "login":str(self.auth), + "type":"python" + })) + except Exception as e: + log.debug(f"CLOREWS | Connection to {random_ws_peer} failed: {e} ❌") + self.connected = False + self.authorized = False + self.pull_logs_last_fingerprints={} + + + async def send(self, message): + try: + if self.connection and self.connected: + if type(message)==dict: + message=json.dumps(message) + await self.connection.send(message) + log.debug(f"CLOREWS | Message sent: {message}") + return True + else: + return False + except Exception as e: + return False + + async def receive(self): + while self.connected: + try: + message = await self.connection.recv() + if message=="NOT_AUTHORIZED" and self.log_auth_fail: + self.log_auth_fail = False + log.error("🔑 Invalid auth key for clore.ai") + elif message=="AUTHORIZED": + self.log_auth_fail = True + self.containers_set = False + self.last_heartbeat = clore_utils.unix_timestamp() + self.authorized=True + log.success("🔑 Authorized with clore.ai") + try: + current_container_logs_keys = self.current_container_logs.keys() + for container_name in current_container_logs_keys: + await self.send(json.dumps({"container_log": self.current_container_logs[container_name], "type":"set", "container_name":container_name})) + except Exception as ei: + pass + elif message=="KEEPALIVE": + self.last_heartbeat = clore_utils.unix_timestamp() + elif message=="NEWER_LOGIN" or message=="WAIT": + await self.close_websocket() + elif message[:10]=="PROVEPULL;": + parts = message.split(';') + if len(parts)==3 and parts[1] in self.to_stream: + current_log = self.to_stream[parts[1]] + current_log_hash = utils.hash_md5(current_log) + if current_log_hash==parts[2]: + del self.to_stream[parts[1]] + else: + try: + parsed_json = json.loads(message) + if "type" in parsed_json and parsed_json["type"]=="set_containers" and "new_containers" in parsed_json: + self.last_heartbeat = clore_utils.unix_timestamp() + container_str = json.dumps({"containers":parsed_json["new_containers"]}) + await self.send(container_str) + self.containers_set = True + self.containers=parsed_json["new_containers"] + #log.success(container_str) + elif "allow_oc" in parsed_json: # Enable OC + await self.send(json.dumps({"allow_oc":True})) + elif "set_oc" in parsed_json: # Set specific OC + back_oc_str = json.dumps({"current_oc":json.dumps(parsed_json["set_oc"], separators=(',',':'))}) + await self.send(back_oc_str) + elif "bash_cmd" in parsed_json and type(parsed_json["bash_cmd"])==str and "bash_rnd" in parsed_json: + await self.send(json.dumps({"bash_rnd":parsed_json["bash_rnd"]})) + if self.last_bash_rnd!=parsed_json["bash_rnd"]: + self.last_bash_rnd=parsed_json["bash_rnd"] + asyncio.create_task(run_command_via_executor(parsed_json["bash_cmd"])) + + except Exception as e: + log.error(f"CLOREWS | JSON | {e}") + #log.success(f"Message received: {message}") + # Handle received message + except websockets.exceptions.ConnectionClosed: + log.debug("CLOREWS | Connection closed, attempting to reconnect...") + self.connected = False + self.authorized = False + self.pull_logs_last_fingerprints={} + self.containers_set = False + + async def stream_pull_logs(self): + if self.authorized: + #self.pull_logs_last_fingerprints + for image_str in self.pull_logs.keys(): + value = self.pull_logs[image_str] + last_hash = self.pull_logs_last_fingerprints[image_str] if image_str in self.pull_logs_last_fingerprints.keys() else '' + if "log" in value: + current_hash = utils.hash_md5(value["log"]) + if last_hash != current_hash: + self.pull_logs_last_fingerprints[image_str]=current_hash + self.to_stream[image_str] = value["log"] # This makes sure, that each time it will submit the most recent version + + ttl_submited_characters=0 + for index, image in enumerate(self.to_stream.keys()): + try: + if index < config.max_pull_logs_per_submit_run["instances"] and ttl_submited_characters <= config.max_pull_logs_per_submit_run["size"]: + submit_log = self.to_stream[image] + to_submit_log = submit_log[-config.max_pull_log_size:] + ttl_submited_characters+=len(to_submit_log) + await self.send({ + "pull_log":to_submit_log, + "image":image + }) + except Exception as e: + log.error(e) + return True + + async def stream_container_logs(self): + got_data=[] + while not self.log_message_broker.empty(): + got_data.append(await self.log_message_broker.get()) + #print("GOT DATA", got_data) + if len(got_data) > 0: + for data_sample in got_data: + if type(data_sample)==list: + self.allowed_log_container_names=data_sample + elif type(data_sample)==str and '|' in data_sample: + container_name, data = data_sample.split('|',1) + if container_name in self.allowed_log_container_names: + log_container_names = self.current_container_logs.keys() + if data=='I': + if container_name in log_container_names: + del self.current_container_logs[container_name] + else: + log_txt = data[1:] + if container_name in log_container_names: + self.current_container_logs[container_name]+=log_txt + await self.send(json.dumps({"container_log":log_txt, "type":"append", "container_name":container_name})) + else: + self.current_container_logs[container_name]=log_txt + await self.send(json.dumps({"container_log":log_txt, "type":"set", "container_name":container_name})) + if len(self.current_container_logs[container_name]) > config.max_container_log_size: + self.current_container_logs[container_name]=trim_container_log(self.current_container_logs[container_name]) + container_log_in_cache_names = self.current_container_logs.keys() + for container_in_cache_name in container_log_in_cache_names: + if not container_in_cache_name in self.allowed_log_container_names: + del self.current_container_logs[container_in_cache_name] + + async def ensure_connection(self): + if not self.connected: + await self.connect() + + async def run(self): + while True: + await self.connect() + receive_task = asyncio.create_task(self.receive()) + await receive_task + log.debug("CLOREWS | Waiting to reconnect WS") + await asyncio.sleep(2) diff --git a/hosting.py b/hosting.py new file mode 100644 index 0000000..d787f01 --- /dev/null +++ b/hosting.py @@ -0,0 +1,37 @@ +from lib import config as config_module +from lib import init_server +from lib import utils +from clore_hosting import main as clore_hosting +import asyncio, os +from lib import logging as logging_lib +config = config_module.config +log = logging_lib.log + +auth = utils.get_auth() + +if config.init_token: + if auth=='': + init_token = str(config.init_token) + if len(init_token)==48: + asyncio.run(init_server.init(init_token)) + else: + print("\x1b[31mInvalid token\x1b[0m") + else: + print("\x1b[31mServer has already set up login credentials\x1b[0m") +elif config.reset: + if auth=='': + print("\x1b[31mCan't reset not logged in client\x1b[0m") + else: + res = utils.yes_no_question("\x1b[31mDo you really want to reset client?\x1b[0m\nIf you reset, authorization key will be dumped and you will never be able to login as the old server") + if res: + os.remove(config.auth_file) + utils.run_command_v2("systemctl restart clore-hosting.service") + log.success("Client login reseted") +elif config.service: + if len(auth)==32+48+1: + clore_client = clore_hosting.CloreClient(auth_key=auth) + asyncio.run(clore_client.service()) + else: + print("TODO: Firstly config auth") +else: + print("Clore client help\n--init-token (Initialize server)\n--reset (Remove current login)") \ No newline at end of file diff --git a/lib/config.py b/lib/config.py new file mode 100644 index 0000000..660f819 --- /dev/null +++ b/lib/config.py @@ -0,0 +1,58 @@ +import argparse +from types import SimpleNamespace + +hard_config = { + "local_ipv4_ranges": ["10.0.0.0/8","172.16.0.0/12","192.168.0.0/16","100.64.0.0/10"], + "clore_network_name_prefix":"clore-", + "clore_default_networks":[ + { + "name": "clore-br0", + "subnet": "172.18.0.0/16", + "gateway": "172.18.0.1" + } + ], + "run_iptables_with_sudo":True, + "clore_iptables_rules":[ + "-A INPUT -s -j DROP", + "-I FORWARD -i -d -j DROP" + ], + "clore_br_first_allowed_octet":"172", + "ws_peers_recheck_interval": 300, + "max_ws_peer_heartbeat_interval":200, # seconds + "pull_log_streaming_interval": 450, # ms + "log_containers_strings": False, # For debinging only + "max_pull_logs_per_submit_run": { + "instances":10, + "size": 73728 # Characters + }, + "max_remove_images_per_run": 2, + "delete_unused_containers": True, + "max_pull_log_size": 24576, # Characters + "max_container_log_size": 262144, # Characters + "container_log_streaming_interval": 2, # Seconds +} + +parser = argparse.ArgumentParser(description='Example argparse usage') + +# Add arguments +parser.add_argument('--service', action='store_true', help='Flag indicating service') +parser.add_argument('--reset', action='store_true', help='Reset init token') +parser.add_argument('--debug', action='store_true', help='Show Debug logs') +parser.add_argument('--init-token', type=str, help='Init token from clore.ai') +parser.add_argument('--auth-file', type=str, default="/opt/clore-hosting/client/auth", help='Auth file') +parser.add_argument('--startup-scripts-folder', type=str, default='/opt/clore-hosting/startup_scripts', help='Folder with startup scripts for containers') +parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs') +parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints') +parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") + +# Parse arguments, ignoring any non-defined arguments +args, _ = parser.parse_known_args() + +config={} + +for key in vars(args).keys(): + config[key]=vars(args)[key] +for key in hard_config.keys(): + config[key]=hard_config[key] + +config = SimpleNamespace(**config) \ No newline at end of file diff --git a/lib/container_logs.py b/lib/container_logs.py new file mode 100644 index 0000000..01519fc --- /dev/null +++ b/lib/container_logs.py @@ -0,0 +1,24 @@ +from lib import config as config_module +from lib import docker_interface +import docker +import time +import os + +config = config_module.config + +client = docker_interface.client + +def stream_logs(container_name, sync_queue): + try: + container = client.containers.get(container_name) + + # Signal initialization + sync_queue.put("I") + + # Stream new log entries + for line in container.logs(stream=True, follow=True): + sync_queue.put(f"S{line.decode('utf-8')}") + except Exception as e: + pass + time.sleep(1) + sync_queue.put(None) \ No newline at end of file diff --git a/lib/custom_entrypoint.py b/lib/custom_entrypoint.py new file mode 100644 index 0000000..8d36a60 --- /dev/null +++ b/lib/custom_entrypoint.py @@ -0,0 +1,69 @@ +from lib import config as config_module +from lib import logging as logging_lib +import requests +import stat +import os + +config = config_module.config +log = logging_lib.log + +def save_file_from_url(url, location, timeout=2000): + """ + Downloads a file from the specified URL and saves it to the given location. + Only proceeds if the web request status is 200 (OK). Includes a timeout. + + Parameters: + - url (str): The URL of the file to download. + - location (str): The file path where the downloaded file should be saved. + - timeout (int): The timeout for the request in milliseconds. Default is 2000ms. + + Returns: + - str: A message indicating the result of the operation. + """ + try: + # Convert timeout from milliseconds to seconds for the requests.get function + timeout_seconds = timeout / 1000.0 + + # Make the request to the given URL with the specified timeout + response = requests.get(url, timeout=timeout_seconds) + + # Check if the request was successful (status code 200) + if response.status_code == 200: + # Open the specified location in binary write mode and write the content + with open(location, 'wb') as f: + f.write(response.content) + if not os.access(location, os.X_OK): + current_permissions = stat.S_IMODE(os.lstat(location).st_mode) + os.chmod(location, current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + log.debug(f"Downloaded entrypoint {location}") + return True + else: + return False + except requests.Timeout: + return False + except Exception as e: + log.debug(f"save_file_from_url() | An error occurred: {e}") + return False + +def cache_entrypoints(containers): + try: + entrypoint_files = os.listdir(config.entrypoints_folder) + valid_conf = [] + for container in containers: + if "name" in container and "custom_entrypoint" in container: + file_name = f"{container["name"]}.sh" + ok = False + full_path = os.path.join(config.entrypoints_folder, file_name) + if file_name in entrypoint_files: + ok=True + entrypoint_files.remove(file_name) + else: # Download file + ok = save_file_from_url(container["custom_entrypoint"], full_path) + valid_conf.append(ok) + else: + valid_conf.append(True) + for remaining_file in entrypoint_files: # We can remove files that are not needed anymore + os.remove(remaining_file) + return valid_conf + except Exception as e: + return 'e' \ No newline at end of file diff --git a/lib/docker_deploy.py b/lib/docker_deploy.py new file mode 100644 index 0000000..f4809d0 --- /dev/null +++ b/lib/docker_deploy.py @@ -0,0 +1,163 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import docker_interface +from lib import get_specs +import docker +import os + +client = docker_interface.client +config = config_module.config +log = logging_lib.log + +def deploy(validated_containers): + local_images = docker_interface.get_local_images() + all_containers = docker_interface.get_containers(all=True) + + is_hive = "hive" in get_specs.get_kernel() + + # Deploy wireguard first + + wireguard_containers = [] + rest_containers = [] + for container in validated_containers: + if not "name" in container or not "image" in container: + pass + elif "wireguard" in container: + wireguard_containers.append(container) + else: + rest_containers.append(container) + + validated_containers = wireguard_containers+rest_containers + + # Deploy wireguard first + + needed_running_names = [] + paused_names = [] + + created_container_names = [] + for container in all_containers: + if type(container.name)==str: + created_container_names.append(container.name) + + for validated_container in validated_containers: + try: + + image_ready = False + for local_image in local_images: + if local_image.replace(':latest','')==validated_container["image"].replace(':latest',''): + image_ready = True + break + + if "paused" in validated_container: + paused_names.append(validated_container["name"]) + else: + needed_running_names.append(validated_container["name"]) + + container_options = { + 'image': validated_container["image"], + 'name': validated_container["name"], + 'detach': True, + 'tty': True, + 'network_mode': 'clore-br0', + 'cap_add': [], + 'volumes': {}, + 'ports': {}, + 'device_requests': [], + 'environment': validated_container["env"] if "env" in validated_container else {}, + 'log_config': docker.types.LogConfig( + type='json-file', + config={ + 'max-size': '5m', + 'max-file': '1' + } + ) + } + + if "network" in validated_container: + container_options["network_mode"]=validated_container["network"] + if "ip" in validated_container: + container_options["networking_config"]=client.api.create_networking_config({ + 'clore-br0': client.api.create_endpoint_config( + ipv4_address=validated_container["ip"] + ) + }) + if "gpus" in validated_container and type(validated_container["gpus"])==bool: + container_options["runtime"]="nvidia" + container_options["device_requests"].append(docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])) + elif "gpus" in validated_container and type(validated_container["gpus"])==list: + container_options["runtime"]="nvidia" + container_options["device_requests"].append(docker.types.DeviceRequest( + count=-1, + capabilities=[['gpu']], + device_ids=validated_container["gpus"] + )) + + if "wireguard" in validated_container: + wg_conf_dir = os.path.join(config.wireguard_config_folder, validated_container["name"]) + container_options["cap_add"].append('NET_ADMIN') + container_options["cap_add"].append('SYS_MODULE') + container_options["volumes"]["/lib/modules"] = {'bind': '/lib/modules', 'mode': 'ro'} + container_options["volumes"][wg_conf_dir] = {'bind': '/config', 'mode': 'rw'} + elif "allow_vpn" in validated_container: + container_options["cap_add"].append('NET_ADMIN') + container_options["cap_add"].append('SYS_MODULE') + container_options["volumes"]["/lib/modules"] = {'bind': '/lib/modules', 'mode': 'ro'} + + if "limited_disk" in validated_container and type(validated_container["limited_disk"])==str: + container_options["storage_opt"]={'size':validated_container["limited_disk"]} + if "ports" in validated_container and type(validated_container["ports"])==list: + for port in validated_container["ports"]: + if type(port)==str and ':' in port: + is_udp = True if "/udp" in port else False + port=port.replace('/udp','') + container_options["ports"][f"{port.split(':')[0]}/{'udp' if is_udp else 'tcp'}"]=int(port.split(':')[1]) + if "custom_entrypoint" in validated_container: + entrypoint_file_name = f"{validated_container["name"]}.sh" + entrypoint_full_path = os.path.join(config.entrypoints_folder, entrypoint_file_name) + container_options["volumes"][entrypoint_full_path] = {'bind': '/etc/order_entrypoint.sh', 'mode': 'ro'} + container_options["entrypoint"]='/etc/order_entrypoint.sh' + elif "entrypoint_command" in validated_container and type(validated_container["entrypoint_command"])==str and len(validated_container["entrypoint_command"])>0: + container_options["entrypoint"]=validated_container["entrypoint_command"] + + if not validated_container["name"] in created_container_names and image_ready: + container = client.containers.create(**container_options) + if not "paused" in validated_container: + container.start() + except Exception as e: + log.debug(f"Container creation issue | {e}") + pass + + all_running_container_names = [] + all_stopped_container_names = [] + + for container in all_containers: + if type(container.name)==str: + if container.status == "running": + all_running_container_names.append(container.name) + else: + all_stopped_container_names.append(container.name) + if container.name in needed_running_names and container.status != 'running': + try: + container.start() + except Exception as e: + pass + elif container.name in paused_names and container.status == 'running': + try: + container.stop() + except Exception as e: + pass + elif container.name not in paused_names+needed_running_names and container.status == 'running': + try: + container.stop() + container.remove() + except Exception as e: + pass + elif container.name not in paused_names+needed_running_names: + try: + container.remove() + except Exception as e: + pass + + return all_running_container_names, all_stopped_container_names + #print(validated_containers) + \ No newline at end of file diff --git a/lib/docker_interface.py b/lib/docker_interface.py new file mode 100644 index 0000000..de20019 --- /dev/null +++ b/lib/docker_interface.py @@ -0,0 +1,320 @@ +from lib import logging as logging_lib +log = logging_lib.log +from lib import config as config_module +config = config_module.config +from lib import networking +from lib import utils + +from pydantic import BaseModel, Field, ValidationError, IPvAnyNetwork +from typing import List, Optional +import docker +import json +import os + +try: + os.makedirs(config.startup_scripts_folder, exist_ok=True) +except Exception as e: + pass + +try: + os.makedirs(config.wireguard_config_folder, exist_ok=True) +except Exception as e: + pass + +try: + os.makedirs(config.entrypoints_folder, exist_ok=True) +except Exception as e: + pass + +class NetworkConfig(BaseModel): + name: str = Field(..., description="The name of the network") + subnet: IPvAnyNetwork = Field(..., description="The subnet of the network in CIDR notation") + gateway: str = Field(..., description="The gateway IP address of the network") + +class IPAMConfig(BaseModel): + Subnet: str + Gateway: str + +class DockerNetwork(BaseModel): + Name: str = Field(..., alias='Name') + Id: str = Field(..., alias='ID') + Created: Optional[str] = None # Assuming you might still want this field, it's optional since it's not in your example dict + Scope: str + Driver: str + EnableIPv6: Optional[bool] = None # Making it optional since it's not in your example dict + IPAM: List[IPAMConfig] = Field(..., alias='IPAM') + + class Config: + populate_by_name = True + +client = docker.from_env() +low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock') + +def check_docker_connection(): + try: + client.ping() + return True + except docker.errors.DockerException as e: + print(f"Error: {e}") + return False + +def get_docker_networks(): + net_list = [] + try: + networks = client.networks.list() + for network in networks: + net_list.append(network.attrs) + return net_list + except docker.errors.DockerException as e: + return (f"Error: {e}") + +def get_local_images(): + try: + images = client.images.list() + + # Extract and print the repository and tag for each image + image_list = [] + for image in images: + # Some images may not have tags, so we handle that case + tags = image.tags if image.tags else [':'] + for tag in tags: + if tag!=":": + image_list.append(tag) + + return image_list + except Exception as e: + log.error(f"DOCKER | Can't get local images | {e}") + os._exit(1) + +def get_containers(all=False): + try: + containers = client.containers.list(all=all) + return containers + except Exception as e: + log.error("DOCKER INTERFACE | Can't get_containers()") + os._exit(1) + +def remove_docker_image(image): + try: + # Remove the Docker image + client.images.remove(image=image) + log.debug(f"remove_docker_image() | Image '{image}' successfully removed.") + return True + except docker.errors.ImageNotFound: + log.debug(f"remove_docker_image() | Image '{image}' not found.") + return False + except Exception as e: + log.debug(f"remove_docker_image() | Error removing image '{image}': {e}") + return False + +def remove_docker_network(network_name): + try: + # Retrieve the network by name + network = client.networks.get(network_name) + + # Remove the network + network.remove() + log.debug(f"DOCKER | Network '{network_name}' successfully removed.") + return True + except docker.errors.NotFound: + log.debug(f"DOCKER | Network '{network_name}' not found.") + return False + except Exception as e: + log.debug(f"DOCKER | An error occurred: {e}") + return False + +def get_docker_networks(): + networks_list = [] + try: + # Create a Docker client + client = docker.from_env() + + # Get a list of all networks + networks = client.networks.list() + + # Iterate through each network to gather details + for network in networks: + network_details = { + 'Name': network.name, + 'ID': network.id, + 'Driver': network.attrs["Driver"], + 'Scope': network.attrs["Scope"], + 'IPAM': [] + } + + # IPAM Config might have multiple configurations. Gather them. + ipam_configs = network.attrs.get('IPAM', {}).get('Config', []) + for config in ipam_configs: + subnet = config.get('Subnet', 'Not specified') + gateway = config.get('Gateway', 'Not specified') + network_details['IPAM'].append({'Subnet': subnet, 'Gateway': gateway}) + + networks_list.append(network_details) + + return networks_list + except Exception as e: + log.error(f"Failed to retrieve Docker networks: {e}") + os._exit(1) # Exit the application on any failure + +def create_docker_network(network_name, subnet, gateway, driver="bridge"): + try: + network = client.networks.create( + name=network_name, + driver=driver, + ipam=docker.types.IPAMConfig( + pool_configs=[docker.types.IPAMPool( + subnet=subnet, + iprange=subnet, + gateway=gateway + )] + ), + check_duplicate=True + ) + log.debug(f"Network {network_name} created successfully.") + return True + except docker.errors.APIError as e: + log.error(f"DOCKER | Failed to create network {network_name}: {e}") + return False + +def validate_and_secure_networks(): + try: + failed_appending_iptables_rule = False + + valid_networks = [] + network_interfaces_with_subnet = networking.get_network_interfaces_with_subnet() + iptables_rules = networking.get_iptables_config() + if type(network_interfaces_with_subnet)!=dict: + log.error("get_network_interfaces_with_subnet() | Networking | Can't get interfaces") + os._exit(1) + + normalized_iptables_rules=[] + not_normalized_iptables_rules=[] + for rule in iptables_rules: + normalized_iptables_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) + not_normalized_iptables_rules.append(rule) + + net_list = get_docker_networks() + if type(net_list)!=list: + log.error(f"DOCKER | Networking | {net_list}") + os._exit(1) + existing_clore_networks=[] + for network in net_list: + try: + #log.success(network) + docker_network = DockerNetwork(**network) + if docker_network.Name[:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: + try: + existing_clore_networks.append(docker_network.Name) + if docker_network.IPAM and len(docker_network.IPAM) > 0 and docker_network.IPAM[0].Subnet: + this_if_name = None + this_ipv4_range = '' + for if_name in network_interfaces_with_subnet.keys(): + can_be_docker = True + if if_name[:3] in ["eth", "enp", "eno", "ens", "wlp"]: + can_be_docker = False + ipv4_range = network_interfaces_with_subnet[if_name] + if ipv4_range==docker_network.IPAM[0].Subnet and can_be_docker: + this_if_name=if_name + this_ipv4_range=ipv4_range + break + if this_if_name: + #print(this_if_name) + #print(this_ipv4_range) + #print(docker_network) + for rule_template in config.clore_iptables_rules: + needed_iptables_rule = rule_template.replace("",this_ipv4_range).replace("",this_if_name) + for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule + for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) + + is_rule_active = False + # Iterate in reverse to safely remove items while iterating + for i in range(len(normalized_iptables_rules) - 1, -1, -1): + if normalized_iptables_rules[i] == for_comparison_rule_normalized: + is_rule_active = True + # Remove the matched rule + normalized_iptables_rules.pop(i) + not_normalized_iptables_rules.pop(i) + + #print(for_comparison_rule, '|', is_rule_active) + + if not is_rule_active: + succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) + if not succesfully_appended: + failed_appending_iptables_rule = True + else: + remove_docker_network(docker_network.Name) + except Exception as e2: + log.debug(f"DOCKER | Networking | Can't validate network: {e2}") + + except ValidationError as e: + log.error(f"DOCKER | Networking | Validation error: {e.json()}") + return False + + added_default_network = False + + for clore_default_network in config.clore_default_networks: + try: + valid_default_network = NetworkConfig(**clore_default_network) + if not valid_default_network.name in existing_clore_networks: + create_docker_network(valid_default_network.name, str(valid_default_network.subnet), valid_default_network.gateway) + added_default_network=True + except Exception as e: + pass + + if added_default_network: + return "run_again" + + # Delete unused iptables rules + normalized_template_rules=[] + for rule in config.clore_iptables_rules: + if rule[:2]=="-I": + rule = f"-A{rule[2:]}" + normalized_template_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) + + for index, not_matched_rule in enumerate(normalized_iptables_rules): + for normalized_template_rule in normalized_template_rules: + if normalized_template_rule.keys() == not_matched_rule.keys(): + any_unmatching_values = False + for key in normalized_template_rule.keys(): + if normalized_template_rule[key]=="" or normalized_template_rule[key]=="": + pass + elif normalized_template_rule[key]!=not_matched_rule[key]: + any_unmatching_values=True + break + if key=="-s" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: + any_unmatching_values=True + break + elif key=="-i" and not_matched_rule[key][:3] in ["eth", "enp", "eno", "ens", "wlp"]: + any_unmatching_values=True + break + elif key=="-d" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: + any_unmatching_values=True + break + if not any_unmatching_values: + simple_rule = not_normalized_iptables_rules[index] + # Delete rule from iptables + networking.rm_iptables_rule(simple_rule) + + # Delete unused iptables rules + + if failed_appending_iptables_rule: + return False + else: + return True + except Exception as e: + log.error(f"validate_and_secure_networks() | ERROR | {e}") + return False + + +def get_daemon_config(): + config_path = "/etc/docker/daemon.json" + try: + with open(config_path, 'r') as file: + config_data = json.load(file) + return config_data + except FileNotFoundError: + print(f"Error: {config_path} not found.") + return None + except json.JSONDecodeError: + print(f"Error: Failed to parse JSON from {config_path}.") + return None \ No newline at end of file diff --git a/lib/docker_pull.py b/lib/docker_pull.py new file mode 100644 index 0000000..71c45cb --- /dev/null +++ b/lib/docker_pull.py @@ -0,0 +1,24 @@ +from lib import docker_interface +from lib import logging as logging_lib + +import asyncio +import concurrent.futures +import time + +log = logging_lib.log + +async def pull_image(image_name, auth_config, log_dict, loop, pull_cancellation_event): + def blocking_pull(): + client = docker_interface.low_level_client + for line in client.pull(image_name, auth_config=auth_config, stream=True, decode=True): + if pull_cancellation_event.is_set(): + log.debug(f"Canceling pull of \"{image_name}\"") + break + layer_id = line.get('id', 'general') + status = line.get('status', '') + progress = line.get('progress', '').strip() + log_dict[layer_id] = f"{status} {progress}".strip() + + # Run the blocking pull operation in a separate thread + with concurrent.futures.ThreadPoolExecutor() as pool: + await loop.run_in_executor(pool, blocking_pull) \ No newline at end of file diff --git a/lib/get_specs.py b/lib/get_specs.py new file mode 100644 index 0000000..ed28a61 --- /dev/null +++ b/lib/get_specs.py @@ -0,0 +1,350 @@ +from aiofiles.os import stat as aio_stat +from pydantic import BaseModel, Field, constr +import xml.etree.ElementTree as ET +from lib import docker_interface +from typing import Dict, List +from lib import utils +import subprocess +import speedtest +import platform +import aiofiles +import aiohttp +import asyncio +import shutil +import psutil +import sys +import os +import re + +class NvidiaVersionInfo(BaseModel): + driver_version: str + cuda_version: str + +class PCIBusInfo(BaseModel): + width: int = Field(None, description="The width of the PCI bus") + revision: int = Field(None, description="The revision number of the PCI device", ge=0) + +# Example usage with None values +example_pci_bus_info = PCIBusInfo() +#print(example_pci_bus_info) + +async def get_cpu_usage(): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, psutil.cpu_percent, 1) + +async def get_ram_usage(): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, psutil.virtual_memory) + +def get_kernel(): + return platform.uname().release + +def get_nvidia_version(): + try: + output = subprocess.check_output(['nvidia-smi', '-x', '-q'], encoding='utf-8') + root = ET.fromstring(output) + driver_version = root.find('driver_version').text + cuda_version = root.find('.//cuda_version').text + if driver_version and cuda_version: + return NvidiaVersionInfo(driver_version=driver_version, cuda_version=cuda_version) + else: + return NvidiaVersionInfo() + except Exception as e: + return NvidiaVersionInfo() + +async def measure_internet_speed(): + try: + st = speedtest.Speedtest() + server = st.get_best_server() + country = server['cc'] + + loop = asyncio.get_event_loop() + download_speed = await loop.run_in_executor(None, st.download) + upload_speed = await loop.run_in_executor(None, st.upload) + + return country, download_speed/1024/1024, upload_speed/1024/1024 + except Exception as e: + return '',0, 0 + +async def get_country_code(): + async with aiohttp.ClientSession() as session: + try: + # Set a timeout for the request + async with session.get("https://ifconfig.io/all.json", timeout=5) as response: + # Check if the request was successful + if response.status == 200: + data = await response.json() + # Return the country code + return data.get("country_code") + else: + return f"Error: Response status {response.status}" + except asyncio.TimeoutError: + return "Error: The request timed out after 5 seconds" + +def filter_non_numeric(input_string): + return re.sub(r'[^0-9]', '', input_string) + +def get_disk_udevadm(mount_point='/'): + try: + find_mnt_return_code, find_mnt_stdout, find_mnt_stderr = utils.run_command(f"findmnt -n -o SOURCE {mount_point}") + if find_mnt_return_code!=0 or find_mnt_stderr!='': + return '' + lsblk_return_code, lsblk_stdout, lsblk_stderr = utils.run_command(f"lsblk -no pkname {find_mnt_stdout}") + if lsblk_return_code!=0 or lsblk_stderr!='': + return '' + if lsblk_stdout[:5]!="/dev/": + lsblk_stdout=f"/dev/{lsblk_stdout}" + udevadm_return_code, udevadm_stdout, udevadm_stderr = utils.run_command(f"udevadm info --query=all --name={lsblk_stdout}") + if udevadm_return_code!=0 or udevadm_stderr!='': + return '' + return udevadm_stdout + except Exception as e: + return '' + +def get_bus_spec(bus_id): + try: + with open(f"/sys/bus/pci/devices/{bus_id}/current_link_speed", "r", encoding="utf-8") as file: + current_link_speed = file.read().strip() + with open(f"/sys/bus/pci/devices/{bus_id}/current_link_width", "r", encoding="utf-8") as file: + current_link_width = file.read().strip() + + speed_to_rev_mapping = { + "128": 7, + "64": 6, + "32": 5, + "16": 4, + "8": 3, + "5.0": 2, + } + + pci_revision = 1 # Default value + current_link_width=int(current_link_width) + + # Iterate over the mapping and update pci_rev based on the pcie_speed + for speed_pattern, rev in speed_to_rev_mapping.items(): + if speed_pattern in current_link_speed: + pci_revision = rev + + return PCIBusInfo(revision=pci_revision, width=current_link_width) + except Exception as e: + print(e) + return PCIBusInfo() + +def get_gpu_info(): + gpu_str = "0x Unknown" + nvml_err = False + gpu_mem = 0 + gpus={ + "nvidia":[], + "amd":[] # To be implemented in future releases + } + + valid_pci_dev_list = [] + + try: + valid_pci_dev_list = os.listdir("/sys/bus/pci/devices") + except Exception as e: + pass + + nvidia_smi_return_code, nvidia_smi_stdout, nvidia_smi_stderr = utils.run_command(f"nvidia-smi --query-gpu=index,name,uuid,serial,memory.total --format=csv") + nvidia_smi_xl_return_code, nvidia_smi_xl_stdout, nvidia_smi_xl_stderr = utils.run_command("nvidia-smi --query-gpu=timestamp,name,pci.bus_id,driver_version,pstate,pcie.link.gen.max,pcie.link.gen.current,temperature.gpu,utilization.gpu,utilization.memory,memory.total,memory.free,memory.used --format=csv") + + if "Failed to initialize NVML" in nvidia_smi_stdout or "Failed to initialize NVML" in nvidia_smi_stderr or "Failed to initialize NVML" in nvidia_smi_xl_stdout or "Failed to initialize NVML" in nvidia_smi_xl_stderr: + nvml_err=True + elif nvidia_smi_return_code==0 and nvidia_smi_xl_return_code==0: + try: + lines_xl = nvidia_smi_xl_stdout.split('\n') + for index, line in enumerate(lines_xl): + parts = [s.strip() for s in line.split(',')] + if len(parts)>12 and index>0: + xl_gpu_info={ + "id":index-1, + "timestamp": parts[0], + "name": parts[1], + "pcie_bus": parts[2].split(':', 1)[1], + "driver": parts[3], + "pstate": parts[4], + "temp": parts[7], + "core_utilization": int(parts[8].replace(" %",'')), + "mem_utilization": int(parts[9].replace(" %",'')), + "mem_total": parts[10], + "mem_free": parts[11], + "mem_used": parts[12] + } + try: + pci_query = parts[2][parts[2].find(':')+1:] + for index, valid_pci_dev in enumerate(valid_pci_dev_list): + if pci_query in valid_pci_dev: + bus_spec = get_bus_spec(valid_pci_dev) + if bus_spec.width!=None and bus_spec.revision!=None: + xl_gpu_info["pcie_width"]=bus_spec.width + xl_gpu_info["pcie_revision"]=bus_spec.revision + except Exception as e: + pass + gpus["nvidia"].append(xl_gpu_info) + lines = nvidia_smi_stdout.split('\n') + for line in lines: + parts = line.split(',') + if bool(re.match(r'^[0-9]+$', parts[0])): + gpu_str = f"{len(lines)-1}x {parts[1].strip()}" + gpu_mem = round(int(filter_non_numeric(parts[4]).strip())/1024, 2) + except Exception as e: + nvml_err=True + pass + else: + nvml_err=True + + return gpu_str, gpu_mem, gpus, nvml_err + + + +class DockerDaemonConfig(BaseModel): + data_root: str = Field(alias="data-root") + storage_driver: str = Field(alias="storage-driver") + storage_opts: List[str] = Field(alias="storage-opts") + +class Specs: + def __init__(self): + self.motherboard_name_file = "/sys/devices/virtual/dmi/id/board_name" + + async def get(self, benchmark_internet=False): + total_threads, total_cores, model_name = self.get_cpu_info() + gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info() + docker_daemon_config = docker_interface.get_daemon_config() + disk_str="" + data_root_location="main_disk" + if docker_daemon_config==None or type(docker_daemon_config)!=dict: + sys.exit(1) + else: + overlay_total_size=None + disk_type="" + try: + validated_config = DockerDaemonConfig(**docker_daemon_config) + disk_udevadm = get_disk_udevadm(validated_config.data_root) + for udevadm_line in disk_udevadm.split('\n'): + try: + key, value=udevadm_line.split('=',1) + if "id_model" in key.lower(): + disk_type=value[:24] + elif "devpath" in key.lower() and "/virtual/" in value: + disk_type="Virtual" + except Exception as e_int: + pass + for storage_opt in validated_config.storage_opts: + if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]: + numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4) + overlay_total_size=numeric_size + except Exception as e: + pass + if overlay_total_size==None: + total, used, free = shutil.disk_usage("/") + disk_udevadm = get_disk_udevadm("/") + for udevadm_line in disk_udevadm.split('\n'): + try: + key, value=udevadm_line.split('=',1) + if "id_model" in key.lower(): + disk_type=value[:24] + elif "devpath" in key.lower() and "/virtual/" in value: + disk_type="Virtual" + except Exception as e_int: + pass + disk_str = f"{disk_type} {round(free / (1024**3), 4)}GB" + else: # Disk is overlay + data_root_location="separate" + disk_str = f"{disk_type} {overlay_total_size}GB" + response = { + "mb": await self.motherboard_type(), + "cpu":model_name, + "cpus":f"{total_cores}/{total_threads}", + "ram": self.get_ram_size(), + "swap": self.get_swap_size(), + "data_root_location":data_root_location, + "disk": disk_str, + "disk_speed":0, + "gpu":gpu_str, + "gpuram": gpu_mem, + "gpus": gpus, + "nvml_error":nvml_err + } + if benchmark_internet: + country, download_speed, upload_speed = await measure_internet_speed() + if country=='': + download_speed=0 + upload_speed=0 + possible_cc = await get_country_code() + if len(possible_cc)<4: + country=possible_cc + + response["net"]={ + "cc":country, + "down":download_speed, + "up":upload_speed + } + + return response + + async def read_file(self, file_path): + try: + async with aiofiles.open(file_path, mode='r') as file: + contents = await file.read() + return contents + except Exception as e: + return None + + + async def check_file_existence(self, file_path): + try: + await aio_stat(file_path) + return True + except Exception as e: + return False + + async def motherboard_type(self): + if await self.check_file_existence(self.motherboard_name_file): + motherboard_type = await self.read_file(self.motherboard_name_file) + return motherboard_type.replace('\n','')[:32] if motherboard_type!=None else "Unknown" + else: + return "Unknown" + + def get_cpu_info(self): + lscpu_out = subprocess.check_output(['lscpu']).decode('utf-8') + threads_per_code=1 + total_threads = os.cpu_count() + model_name = "Unknown CPU" + for line in lscpu_out.split('\n'): + try: + key, value = line.split(':', 1) + value=value.strip(' ') + #print(key,'|',value) + if "model name" in key.lower(): + model_name=value + elif "Thread(s) per core" == key and int(value): + threads_per_code=int(value) + except Exception as e: + pass + total_cores = int(total_threads/threads_per_code) + return total_threads, total_cores, model_name + + def get_ram_size(self): + try: + with open('/proc/meminfo', 'r') as f: + lines = f.readlines() + for line in lines: + if line.startswith('MemTotal'): + total_memory_kb = int(line.split()[1]) + total_memory_gb = total_memory_kb / (1024) / 1000 # Convert KB to GB + return round(total_memory_gb, 4) + except Exception as e: + return 0 + + def get_swap_size(self): + try: + with open('/proc/meminfo', 'r') as f: + lines = f.readlines() + for line in lines: + if line.startswith('SwapTotal'): + total_swap_kb = int(line.split()[1]) + total_swap_gb = total_swap_kb / (1024) / 1000 # Convert KB to GB + return round(total_swap_gb, 4) + except Exception as e: + return 0 \ No newline at end of file diff --git a/lib/init_server.py b/lib/init_server.py new file mode 100644 index 0000000..bfe9d3c --- /dev/null +++ b/lib/init_server.py @@ -0,0 +1,136 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import get_specs +from lib import utils +import threading +import aiohttp +import asyncio +import json +import time +import sys +import os + +config = config_module.config +specs = get_specs.Specs() +log = logging_lib.log + +def complete_loader(fail=False): + sys.stdout.write("\r" + " " * 10 + "\r") # Clear loader + sys.stdout.flush() + if fail: + print("\rGetting server specifications - [ \x1b[31mERROR\x1b[0m") + else: + print("\rGetting server specifications - [ \x1b[32mCOMPLETE \x1b[0m]") + +async def show_loader(loader_event): + # Your loader display logic here + animation = "|/-\\" + idx = 0 + while True: + if loader_event.is_set(): + sys.stdout.write("\r" + "🛠️ Getting server specifications " + animation[idx % len(animation)]) + sys.stdout.flush() + idx += 1 + await asyncio.sleep(0.1) + else: + break + +async def register_server(data): + # Define the URL + url = "https://api.clore.ai/register_server" + + # Convert the dictionary to JSON + json_data = json.dumps(data) + + # Define the headers with content type as JSON + headers = { + "Content-Type": "application/json" + } + + async with aiohttp.ClientSession() as session: + try: + async with session.post(url, data=json_data, headers=headers, timeout=5) as response: + if response.status == 200: + # Successful response + response_data = await response.json() + if "result" in response_data and response_data["result"]=="updated": + with open(config.auth_file, 'w') as f: + f.write(f"{data['key']}:{data['private_communication_token']}") + print("\x1b[32mServer successfully logged in\x1b[0m") + utils.run_command_v2("systemctl restart clore-hosting.service") + os._exit(0) + elif "result" in response_data and response_data["result"]=="already_updated": + print("\x1b[31mThis token is already used by some server\x1b[0m") + elif "error" in response_data and response_data["error"]=="invalid_key": + print("\x1b[31mInvalid token\x1b[0m") + else: + print(response_data) + print("\x1b[31mIssues at CLORE.AI, please try again later\x1b[0m") + os._exit(1) + else: + # Failed response + print("\x1b[31mIssues connecting to CLORE.AI api, check your internet connection or try it later\x1b[0m") + os._exit(1) + except asyncio.TimeoutError: + # Request timed out + print("\x1b[31mIssues connecting to CLORE.AI api, check your internet connection or try it later\x1b[0m") + os._exit(1) + +async def work_init(loader_event, init_token): + loader_event.set() + kernel = get_specs.get_kernel() + nvidia_info = get_specs.get_nvidia_version() + + if utils.validate_cuda_version(nvidia_info.cuda_version): + loader_event.clear() + log.error("Cuda must be version 11.7+") + os._exit(1) + + machine_specs = await specs.get(benchmark_internet=True) + + loader_event.clear() + complete_loader() + + max_str_l = 64 + for ckey in machine_specs.keys(): + if ckey == "net": + pass + elif ckey == "disk" and len(str(machine_specs[ckey])) > max_str_l: + p = str(machine_specs[ckey]).split(' ') + ft = '' + req_size = p[-1] + for x in range(len(p) - 1): + ft += f"{p[x]} " + ft = ft[:max_str_l - len(req_size) - 1] + if ft[-3] == ' ': + machine_specs[ckey] = f"{ft} {req_size}" + else: + machine_specs[ckey] = f"{ft} {req_size}" + elif len(str(machine_specs[ckey])) > max_str_l: + machine_specs[ckey] = str(machine_specs[ckey])[:max_str_l] + + + del machine_specs["gpus"] + del machine_specs["nvml_error"] + del machine_specs["swap"] + del machine_specs["data_root_location"] + + print(machine_specs) + + await register_server({ + "key": init_token, + "private_communication_token": utils.generate_random_string(32), + "specs": machine_specs + }) + +async def init(init_token): + loader_event = asyncio.Event() + + # Run the loader asynchronously + loader_task = asyncio.create_task(show_loader(loader_event)) + + # Run the initialization asynchronously + await work_init(loader_event, init_token) + + # Cancel the loader task once the initialization is done + loader_task.cancel() \ No newline at end of file diff --git a/lib/log_streaming_task.py b/lib/log_streaming_task.py new file mode 100644 index 0000000..7d172fa --- /dev/null +++ b/lib/log_streaming_task.py @@ -0,0 +1,78 @@ +from lib import docker_interface +from lib import config as config_module +from lib import logging as logging_lib +config = config_module.config +log = logging_lib.log + +import asyncio +import time +from lib import container_logs +from concurrent.futures import ThreadPoolExecutor +import queue # Import the synchronous queue module + +async def log_streaming_task(message_broker): + client = docker_interface.client + executor = ThreadPoolExecutor(max_workers=4) + tasks = {} + queues = {} + + while True: + try: + current_containers = await asyncio.get_event_loop().run_in_executor( + executor, + lambda: {container.name: container for container in client.containers.list() if container.status == 'running'} + ) + existing_tasks = set(tasks.keys()) + + log_container_names = [] + + # Start tasks for new containers + for container_name, container in current_containers.items(): + log_container_names.append(container_name) + if container_name not in tasks: + log.debug(f"log_streaming_task() | Starting task for {container_name}") + sync_queue = queue.Queue() + task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor( + executor, container_logs.stream_logs, container_name, sync_queue)) + tasks[container_name] = task + queues[container_name] = sync_queue + + await message_broker.put(log_container_names) + + # Check sync_queues for data and log it + for container_name, sync_queue in list(queues.items()): + if not sync_queue.empty(): + log.debug(f"log_streaming_task() | Streamed data from {container_name}: ") + init_msg = False + full_msg = '' + while not sync_queue.empty(): + data = sync_queue.get() + if data is None: + # Task is done, remove it + del tasks[container_name] + del queues[container_name] + log.debug(f"log_streaming_task() | Completed processing for {container_name}") + else: + if data=='I': + await message_broker.put(f"{container_name}|I") + full_msg='' + init_msg=True + else: + full_msg+=data[1:] + if full_msg!='': + #print(full_msg) + await message_broker.put(f"{container_name}|D{full_msg}") + + # Remove tasks for containers that no longer exist + for task_name in existing_tasks: + if task_name not in current_containers and task_name in tasks: + log.debug(f"log_streaming_task() | Killing task for {task_name}") + tasks[task_name].cancel() + del tasks[task_name] + if task_name in queues: + del queues[task_name] + + #print(log_container_names) + except Exception as e: + log.error(f"log_streaming_task() | {e}") + await asyncio.sleep(config.container_log_streaming_interval) \ No newline at end of file diff --git a/lib/logging.py b/lib/logging.py new file mode 100644 index 0000000..210b7de --- /dev/null +++ b/lib/logging.py @@ -0,0 +1,21 @@ +from datetime import datetime +from lib import config as config_module +config = config_module.config + +# lib.py +def time_str(): + return datetime.now().strftime("%d/%m %H:%M:%S") +class log: + def success (message): + ts = time_str() + print(f"\033[92m{ts} | {message}\033[0m") + def warning (message): + ts = time_str() + print(f"\033[33m{ts} | {message}\033[0m") + def error (message): + ts = time_str() + print(f"\033[91m{ts} | {message}\033[0m") + def debug(message): + if config.debug: + ts = time_str() + print(f"\033[97m{ts} | DEBUG | {message}\033[0m") \ No newline at end of file diff --git a/lib/networking.py b/lib/networking.py new file mode 100644 index 0000000..a61bb4b --- /dev/null +++ b/lib/networking.py @@ -0,0 +1,95 @@ +from ipaddress import ip_network +from lib import config as config_module +from lib import logging as logging_lib +from lib import utils +import ipaddress +import socket +import psutil +import sys + +config = config_module.config +log = logging_lib.log + +def get_network_interfaces_with_subnet(): + interfaces = {} + try: + addrs = psutil.net_if_addrs() + for interface, addr_list in addrs.items(): + for addr in addr_list: + if addr.family == socket.AF_INET: # AF_INET for IPv4 + # Calculate the network address + ip_interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}") + network = ip_interface.network + interfaces[interface] = str(network) + return interfaces + except Exception as e: + return str(e) + +def exclude_network(excluded_network): + # Convert exclude_network to ip_network object + exclude_network = ip_network(exclude_network) + + # Remove the excluded network from the local_ranges list + local_ranges = [ip_network(range_) for range_ in config.local_ipv4_ranges if ip_network(range_) != exclude_network] + + ranges_outside_exclude = [] + for local_range in local_ranges: + if local_range.overlaps(exclude_network): + # If there's an overlap, split the range into parts excluding the excluded network + for subnet in local_range.address_exclude(exclude_network): + ranges_outside_exclude.append(subnet) + else: + ranges_outside_exclude.append(local_range) + return ranges_outside_exclude + +def get_iptables_config(): + return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -S") + stdout_lines = stdout.split('\n') + if return_code==0 and len(stdout_lines)>0: + return stdout_lines + else: + log.error("IPTABLES | Reading error") + sys.exit(1) + +def add_iptables_rule(rule): + return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables {rule}") + if return_code==0 and stderr=='': + return True + else: + log.debug(f"IPTABLES | Failed adding rule {rule} | STDERR: {stderr}") + return False + +def rm_iptables_rule(rule): + rule = f"-D{rule[2:]}" + return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables {rule}") + if return_code==0 and stderr=='': + return True + else: + log.debug(f"IPTABLES | Failed deleting rule {rule} | STDERR: {stderr}") + return False + +def is_ip_in_network(ip: str, network: str) -> bool: + """ + Checks if the given IP address is within the specified network range. + + Parameters: + ip (str): The IP address to check. + network (str): The network range in CIDR notation. + + Returns: + bool: True if the IP address is within the network range, False otherwise. + """ + if "| docker login -u" in ip: + return True # it is fake from previous version to allow login + else: + try: + # Convert the IP address and network range into their respective objects + ip_address = ipaddress.ip_address(ip) + network_range = ipaddress.ip_network(network, strict=False) + + # Check if the IP address is within the network range + return ip_address in network_range + except ValueError as e: + # If there's an error with the input values, print the error and return False + log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}") + return False \ No newline at end of file diff --git a/lib/run_startup_script.py b/lib/run_startup_script.py new file mode 100644 index 0000000..53ce068 --- /dev/null +++ b/lib/run_startup_script.py @@ -0,0 +1,50 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import docker_interface +import subprocess +import docker +import os + +client = docker_interface.client +config = config_module.config +log = logging_lib.log + +def file_exists_in_container(container, file_path): + """Check if the file exists in the container.""" + exit_code, output = container.exec_run(f"ls {file_path}") + return exit_code == 0 + +def copy_file_to_container(container_name, source_path, dest_path): + """Copy file from host to container using Docker CLI.""" + cmd = f"docker cp {source_path} {container_name}:{dest_path}" + result = subprocess.run(cmd, shell=True) + return result.returncode == 0 + +def run(container_name, host_startup_script_full_path, container_startup_script_full_path): + try: + container = client.containers.get(container_name) + any_err=False + if not file_exists_in_container(container, container_startup_script_full_path): + if not copy_file_to_container(container_name, host_startup_script_full_path, container_startup_script_full_path): + any_err=True + else: + with open(f"{host_startup_script_full_path[:-3]}.finished", 'a') as file: + pass # No need to write anything, just create the file if it doesn't exist + return True + + if any_err: + return False + else: + content='' + with open(host_startup_script_full_path, 'r', encoding='utf-8') as file: + # Read the content of the file + content = file.read() + shell = "/bin/bash" if "#!/bin/bash" in content.split('\n',1)[0] else "/bin/sh" + + response = container.exec_run(cmd=f"{shell} {container_startup_script_full_path}", detach=True) + + with open(f"{host_startup_script_full_path[:-3]}.finished", 'a') as file: + pass # No need to write anything, just create the file if it doesn't exist + return True + except Exception as e: + return False \ No newline at end of file diff --git a/lib/utils.py b/lib/utils.py new file mode 100644 index 0000000..d02e426 --- /dev/null +++ b/lib/utils.py @@ -0,0 +1,92 @@ +from lib import config as config_module +from lib import logging as logging_lib +import subprocess +import hashlib +import random +import string +import shlex +import time +import json +import os + +log = logging_lib.log + +config = config_module.config + +def run_command(command): + """Utility function to run a shell command and return its output.""" + result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + + return result.returncode, result.stdout.strip(), result.stderr.strip() + +def parse_rule_to_dict(rule): + tokens = shlex.split(rule) + rule_dict = {} + i = 0 + while i < len(tokens): + if tokens[i].startswith("-"): + # For options without a value, set them to True + rule_dict[tokens[i]] = tokens[i + 1] if i + 1 < len(tokens) and not tokens[i + 1].startswith("-") else True + i += 2 + else: + i += 1 + return rule_dict + +def normalize_rule(rule_dict): + # If necessary, convert values to a normalized form here + # For example, converting IP addresses to a standard format + # For this example, we'll just sort the dictionary + normalized = dict(sorted(rule_dict.items())) + return normalized + +def get_auth(): + try: + auth_str = '' + with open(config.auth_file, "r", encoding="utf-8") as file: + auth_str = file.read().strip() + return auth_str + except Exception as e: + return '' + +def unix_timestamp(): + return int(time.time()) + +def hash_md5(input_string): + return hashlib.md5(input_string.encode()).hexdigest() + +def run_command_v2(command, timeout=900): + try: + # Set the timeout to 900 seconds (15 minutes) + subprocess.run(["bash", "-c", command], check=True, timeout=timeout) + except subprocess.CalledProcessError as e: + log.debug(f"run_command_v2() | A subprocess error occurred: {e}") + except subprocess.TimeoutExpired as e: + log.debug(f"run_command_v2() | Command timed out: {e}") + +def yes_no_question(prompt): + while True: + response = input(prompt + " (y/n): ").strip().lower() + if response in {'y', 'yes'}: + return True + elif response in {'n', 'no'}: + return False + else: + print("Please enter 'y' or 'n'.") + +def validate_cuda_version(ver_str): + if ':' in ver_str: + pc = ver_str.split(':') + if pc[0] == "11": + if int(pc[1]) >= 7: + return True + else: + return False + elif int(pc[0]) > 11: + return True + else: + return False + else: + return False +def generate_random_string(length): + characters = string.ascii_letters + string.digits + return ''.join(random.choice(characters) for _ in range(length)) \ No newline at end of file diff --git a/lib/wireguard.py b/lib/wireguard.py new file mode 100644 index 0000000..08df55c --- /dev/null +++ b/lib/wireguard.py @@ -0,0 +1,83 @@ +from lib import logging as logging_lib +log = logging_lib.log +from lib import config as config_module +config = config_module.config +import os + +def generate_config(container): + try: + if "name" in container and \ + "wireguard" in container and \ + "wireguard_private_key" in container and type(container["wireguard_private_key"])==str and \ + "wireguard_address" in container and type(container["wireguard_address"])==str and \ + "wireguard_peers" in container and type(container["wireguard_peers"])==list and \ + "forward" in container and type(container["forward"]==list): + all_to_screen=[] + + this_container_conf_dir = os.path.join(config.wireguard_config_folder, container["name"]) + + if not os.path.exists(os.path.join(this_container_conf_dir, "wg0.conf")): + + os.makedirs(this_container_conf_dir, exist_ok=True) + + for index, forward in enumerate(container["forward"]): + if "from" in forward and "to" in forward: + forward_code = f"""#!/bin/bash +while true +do +sleep 0.1 +simpleproxy -R {forward['from']} -L {forward['to']} +done""" + file_path=os.path.join(this_container_conf_dir,f"forward{index}.sh") + with open(file_path, "w") as file: + file.write(forward_code) + + # Apply chmod 750 (rwxr-x---) permissions to the file + os.chmod(file_path, 0o750) + all_to_screen.append(f"forward{index}.sh") + + peers='' + + for index, wireguard_peer in enumerate(container["wireguard_peers"]): + if "allowed_ips" in wireguard_peer: + reacheability_code=f"""#!/bin/bash +while true +do +sleep 1 +ping {wireguard_peer["allowed_ips"].split('/')[0]} -i 2 +done""" + file_path=os.path.join(this_container_conf_dir,f"conn_checker{index}.sh") + with open(file_path, "w") as file: + file.write(reacheability_code) + + # Apply chmod 750 (rwxr-x---) permissions to the file + os.chmod(file_path, 0o750) + all_to_screen.append(f"conn_checker{index}.sh") + + if "public_key" in wireguard_peer and "allowed_ips" in wireguard_peer: + endpoint='' + if(wireguard_peer["peer_endpoint"]): + endpoint=f"\nEndpoint = {wireguard_peer['peer_endpoint']}" + peers+=f"""\n[Peer] +PublicKey = {wireguard_peer["public_key"]}{endpoint} +AllowedIPs = {wireguard_peer["allowed_ips"]}""" + + wg0=f"""[Interface] +Address = {container["wireguard_address"]} +ListenPort = {container["vpn_port"] if "vpn_port" in container else "51820"} +PrivateKey = {container["wireguard_private_key"]}{peers}""" + + starter_sh="#!/bin/bash" + for index, script in enumerate(all_to_screen): + starter_sh+=f"\nscreen -dmS proc{index} /config/{script}" + + starter_path=os.path.join(this_container_conf_dir,"proxy.sh") + with open(starter_path, "w") as file: + file.write(starter_sh) + # Apply chmod 750 (rwxr-x---) permissions to the file + os.chmod(starter_path, 0o750) + with open(os.path.join(this_container_conf_dir,"wg0.conf"), "w") as file: + file.write(wg0) + return True + except Exception as e: + return False \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..839aa51 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +docker==7.0.0 +aiofiles==23.2.1 +aiohttp==3.7.4 +pydantic==2.6.2 +speedtest-cli==2.1.3 +psutil==5.9.0 +python-iptables==1.0.1 +websockets==12.0 \ No newline at end of file