import http.client import datetime import argparse import json import specs import base64 import time import math import sys import re import os import socket import asyncio from urllib.parse import urlparse import subprocess from functools import partial class logger: RED = '\033[91m' GREEN = '\033[92m' BLUE = '\033[94m' RESET = '\033[0m' @staticmethod def _get_current_time(): return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") @staticmethod def error(message): print(f"{logger.RED}{logger._get_current_time()} | ERROR | {message}{logger.RESET}") @staticmethod def success(message): print(f"{logger.GREEN}{logger._get_current_time()} | SUCCESS | {message}{logger.RESET}") @staticmethod def info(message): print(f"{logger.BLUE}{logger._get_current_time()} | INFO | {message}{logger.RESET}") if os.geteuid() != 0: logger.error("This script must be run as root!") sys.exit(1) parser = argparse.ArgumentParser(description="Script with --clore-endpoint flag.") parser.add_argument('--clore-endpoint', type=str, default="https://api.clore.ai/machine_onboarding", help='Specify the Clore API endpoint. Default is "https://api.clore.ai/machine_onboarding".') parser.add_argument('--mock', action='store_true', help='Report predefined machine specs (testing only)') parser.add_argument('--mode', type=str, default="linux") parser.add_argument('--write-linux-config', type=str, default="") parser.add_argument('--linux-hostname-override', default="") parser.add_argument('--auth-file', type=str, default="/opt/clore-hosting/client/auth", help='Auth file location') args = parser.parse_args() hive_hive_wallet_conf_path = "mock/wallet.conf" if args.mock else "/hive-config/wallet.conf" hive_rig_conf_path = "mock/rig.conf" if args.mock else "/hive-config/rig.conf" hive_oc_conf_path = "mock/oc.conf" if args.mock else "/hive-config/nvidia-oc.conf" clore_conf_path = "mock/onboarding.json" if args.mock else "/opt/clore-hosting/onboarding.json" async def run_command(command: str): loop = asyncio.get_running_loop() bash_command = ['/bin/bash', '-c', command] run_subprocess = partial(subprocess.run, bash_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,text=True) process = await loop.run_in_executor(None, run_subprocess) return process.stdout, process.stderr def clean_config_value(value): if '#' in value: value = value.split('#', 1)[0] value = value.strip() if (len(value) >= 2 and value[-1] == "'" and value[0] == "'") or (len(value) >= 2 and value[-1] == '"' and value[0] == '"'): value = value[1:-1] return value def filter_name(name): return re.sub(r'[^A-Za-z0-9_-]', '', name) def validate_clore_config(clore_config): def is_valid_hostname_override(value): return isinstance(value, str) and 1 <= len(value) <= 64 and re.match(r'^[A-Za-z0-9_-]+$', value) def is_valid_auth(value): return isinstance(value, str) and len(value) <= 256 def is_valid_multipliers(value): return isinstance(value, dict) and "on_demand_multiplier" in value and "spot_multiplier" in value and \ 1 <= value["on_demand_multiplier"] <= 50 and 1 <= value["spot_multiplier"] <= 50 def is_valid_mrl(value): return isinstance(value, int) and 6 <= value <= 1440 def is_valid_keep_params(value): return isinstance(value, bool) def is_valid_pricing(clore_config): required_keys = {"on_demand_bitcoin", "on_demand_clore", "spot_bitcoin", "spot_clore"} if required_keys.issubset(clore_config): return 0.000001 <= clore_config["on_demand_bitcoin"] <= 0.005 and \ 0.1 <= clore_config["on_demand_clore"] <= 5000 and \ 0.000001 <= clore_config["spot_bitcoin"] <= 0.005 and \ 0.1 <= clore_config["spot_clore"] <= 5000 return False def is_valid_usd_pricing(autoprice): required_keys = {"on_demand", "spot"} if required_keys.issubset(autoprice): return 0.1 <= autoprice["spot"] <= 1000 and \ 0.1 <= autoprice["on_demand"] <= 1000 return False errors = [] if "hostname_override" in clore_config and not is_valid_hostname_override(clore_config["hostname_override"]): errors.append("hostname_override must be a string between 1-64 characters, only A-Za-z0-9_- allowed") if "auth" not in clore_config or not is_valid_auth(clore_config["auth"]): errors.append("auth is mandatory and must be a string of max 256 character") if "autoprice" in clore_config and isinstance(clore_config["autoprice"], dict): if clore_config["autoprice"].get("usd"): if not is_valid_usd_pricing(clore_config["autoprice"]): errors.append("usd pricing input is invalid") elif not is_valid_multipliers(clore_config["autoprice"]): errors.append("multipliers are not following spec") if "mrl" not in clore_config or not is_valid_mrl(clore_config["mrl"]): errors.append("mrl is mandatory and must be an integer in range 6-1440") if "keep_params" in clore_config and not is_valid_keep_params(clore_config["keep_params"]): errors.append("keep_params must be a boolean value") crypto_keys = {"on_demand_bitcoin", "on_demand_clore", "spot_bitcoin", "spot_clore"} if any(key in clore_config for key in crypto_keys): if not is_valid_pricing(clore_config): errors.append("All pricing fields (on_demand_bitcoin, on_demand_clore, spot_bitcoin, spot_clore) must be specified and valid.") return errors if errors else "Validation successful" def base64_string_to_json(base64_string): try: padding_needed = len(base64_string) % 4 if padding_needed: base64_string += '=' * (4 - padding_needed) json_bytes = base64.b64decode(base64_string) json_str = json_bytes.decode('utf-8') json_obj = json.loads(json_str) return json_obj except Exception as e: return None def get_default_power_limits(): try: cmd = "nvidia-smi -q -d POWER | grep \"Default Power Limit\" | awk '{print $5}'" result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) lines = result.stdout.strip().split('\n') power_limits = [] for line in lines: if line.lower() != 'n/a': try: power_limits.append(int(float(line))) except ValueError: continue return power_limits if power_limits else None except subprocess.CalledProcessError: return None except Exception: return None def validate_and_convert(input_str, min_val, max_val, adjust_bounds=False): try: int_list = [int(x) for x in input_str.split()] if adjust_bounds: int_list = [max(min(num, max_val), min_val) for num in int_list] else: if not all(min_val <= num <= max_val for num in int_list): return None return int_list except Exception: return None def get_number_or_last(numbers, index): if index < len(numbers): return numbers[index] else: return numbers[-1] async def async_read_file(path): try: with open(path, 'r') as file: return file.read() except Exception as e: #print(f"Error reading file {path}: {e}") return None def extract_last_setcore_setmem(input_string): try: setcore_pattern = r'--setcore\s+((?:\d+\s*)+)(?=\D|$)' setmem_pattern = r'--setmem\s+((?:\d+\s*)+)(?=\D|$)' setcore_matches = re.findall(setcore_pattern, input_string) setmem_matches = re.findall(setmem_pattern, input_string) last_setcore = [int(num) for num in setcore_matches[-1].split()] if setcore_matches else [] last_setmem = [int(num) for num in setmem_matches[-1].split()] if setmem_matches else [] return last_setcore, last_setmem except Exception as e: return [[], []] async def hive_parse_config(file_content): conf = {} if file_content: for line in file_content.split('\n'): line = line.strip() if line[:1] != "#" and '=' in line: key, value = [line.split('=', 1)[0], clean_config_value(line.split('=', 1)[1])] conf[key] = value return conf async def hive_load_configs(default_power_limits, static_config): parsed_static_config = None try: parsed_static_config = json.loads(static_config) except Exception: pass try: # Non-blocking file reads wallet_conf_content = await async_read_file(hive_hive_wallet_conf_path) rig_conf_content = await async_read_file(hive_rig_conf_path) # Parse rig config rig_conf = await hive_parse_config(rig_conf_content) if not rig_conf or "WORKER_NAME" not in rig_conf or "RIG_ID" not in rig_conf: print("WORKER_NAME or RIG_ID is missing from rig config") os._exit(1) clore_miner_present = False # Parse wallet config clore_config = None get_oc_config = False fs_mem_lock = [] fs_core_lock = [] if wallet_conf_content: fs_core_lock, fs_mem_lock = extract_last_setcore_setmem(wallet_conf_content) for wallet_conf_line in wallet_conf_content.split('\n'): wallet_conf_line = wallet_conf_line.strip() if wallet_conf_line[:1] != "#" and '=' in wallet_conf_line: key, value = [wallet_conf_line.split('=', 1)[0], clean_config_value(wallet_conf_line.split('=', 1)[1])] if key[-9:] == "_TEMPLATE": possible_clore_config = base64_string_to_json(value) if possible_clore_config: clore_config = possible_clore_config elif key == "CUSTOM_MINER" and value == "clore": clore_miner_present = True if (not clore_miner_present or not clore_config) and parsed_static_config: clore_miner_present = True clore_config = parsed_static_config try: if clore_config and "set_stock_oc" in clore_config: get_oc_config=True except Exception as es: pass if not clore_miner_present: logger.info("CLORE not found in flighsheet, exiting") await run_command("systemctl disable clore-hosting.service ; systemctl stop clore-hosting.service ; systemctl disable docker ; systemctl stop docker ; systemctl disable clore-onboarding.service ; systemctl stop clore-onboarding.service") sys.exit(0) out_oc_config = {} if get_oc_config and default_power_limits: nvidia_oc = await async_read_file(hive_oc_conf_path) gpu_cnt = len(default_power_limits) if nvidia_oc and gpu_cnt > 0: try: core_offset = None mem_offset = None core_lock = None mem_lock = None pl_static = None for nvidia_conf_line in nvidia_oc.split('\n'): nvidia_conf_line = nvidia_conf_line.strip() if nvidia_conf_line[:1] != "#" and '=' in nvidia_conf_line: key, value = [nvidia_conf_line.split('=', 1)[0], clean_config_value(nvidia_conf_line.split('=', 1)[1])] if value == "": pass elif key=="CLOCK": core_offset = [math.floor(num / 2) for num in validate_and_convert(value, -2000, 2000, adjust_bounds=True)] elif key=="MEM": mem_offset = [math.floor(num / 2) for num in validate_and_convert(value, -2000, 6000, adjust_bounds=True)] elif key=="PLIMIT": pl_static = validate_and_convert(value, 1, 1500, adjust_bounds=True) elif key=="LCLOCK": core_lock = validate_and_convert(value, 0, 12000, adjust_bounds=True) elif key=="LMEM": mem_lock = validate_and_convert(value, 0, 32000, adjust_bounds=True) if len(fs_mem_lock)>0: mem_lock = fs_mem_lock if len(fs_core_lock)>0: core_lock = fs_core_lock #print(mem_lock, core_lock) if core_offset or mem_offset or pl_static or mem_lock or core_lock: for gpu_idx, default_pl in enumerate(default_power_limits): out_oc_config[str(gpu_idx)] = { "core": get_number_or_last(core_offset, gpu_idx) if core_offset else 0, "mem": get_number_or_last(mem_offset, gpu_idx) if mem_offset else 0, "pl": get_number_or_last(pl_static, gpu_idx) if pl_static else default_pl } if type(core_lock)==list and len(core_lock)>0: out_oc_config[str(gpu_idx)]["core_lock"] = get_number_or_last(core_lock, gpu_idx) if type(mem_lock)==list and len(mem_lock)>0: out_oc_config[str(gpu_idx)]["mem_lock"] = get_number_or_last(mem_lock, gpu_idx) except Exception as oc_info_e: pass # Construct machine name machine_name = f"{filter_name(rig_conf['WORKER_NAME'][:32])}_HIVE_{rig_conf['RIG_ID']}" return machine_name, clore_config, out_oc_config except Exception as e: logger.error(f"Can't load rig.conf, wallet.conf | {e}") async def post_request(url, body, headers=None, timeout=15): parsed_url = urlparse(url) if parsed_url.scheme == 'https': conn = http.client.HTTPSConnection(parsed_url.hostname, parsed_url.port or 443, timeout=timeout) elif parsed_url.scheme == 'http': conn = http.client.HTTPConnection(parsed_url.hostname, parsed_url.port or 80, timeout=timeout) else: raise ValueError(f"Unsupported URL scheme: {parsed_url.scheme}") json_data = json.dumps(body) if headers is None: headers = {} headers['Content-Type'] = 'application/json' path = parsed_url.path if parsed_url.query: path += '?' + parsed_url.query try: conn.request("POST", path, body=json_data, headers=headers) response = conn.getresponse() response_data = response.read().decode('utf-8') try: parsed_response_data = json.loads(response_data) response_data = parsed_response_data except Exception as ep: pass status_code = response.status #if 200 <= status_code < 300: # print(f"Request was successful: {status_code}") #else: # print(f"Non-standard status code received: {status_code}") return status_code, response_data except (http.client.HTTPException, TimeoutError) as e: print(f"Request failed: {e}") return None, None finally: conn.close() def clean_clore_config(clore_config): clore_config_copy = clore_config.copy() if "auth" in clore_config_copy: if "auth" in clore_config_copy: del clore_config_copy["auth"] if "hostname_override" in clore_config_copy: del clore_config_copy["hostname_override"] if "set_stock_oc" in clore_config_copy: del clore_config_copy["set_stock_oc"] if "save_config" in clore_config_copy: del clore_config_copy["save_config"] return clore_config_copy def verify_or_update_file(file_path: str, expected_content: str) -> bool: try: if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as file: current_content = file.read() if current_content == expected_content: return True with open(file_path, 'w', encoding='utf-8') as file: file.write(expected_content) return False except Exception: return True def get_machine_id(): machine_id_path = "/etc/machine-id" if os.path.isfile(machine_id_path): with open(machine_id_path, "r") as file: return file.read().strip() return None next_retry_reached_server_limit = 0 if args.write_linux_config: linux_config = base64_string_to_json(args.write_linux_config) if linux_config: if args.linux_hostname_override: if 1 <= len(args.linux_hostname_override) <= 64 and re.match(r'^[A-Za-z0-9_-]+$', args.linux_hostname_override): linux_config["hostname_override"] = args.linux_hostname_override else: logger.error("Input hostname not valid") sys.exit(1) verify_or_update_file(clore_conf_path, json.dumps(linux_config)) logger.success("Config written") sys.exit(0) else: logger.error("Invalid config") sys.exit(1) async def main(machine_specs): global next_retry_reached_server_limit last_used_config = None ever_pending_creation = False machine_id = get_machine_id() default_power_limits = get_default_power_limits() if not machine_id: logger.error("Can't load machine ID") sys.exit(1) if not default_power_limits or len(default_power_limits)==0: logger.error("Can't load default power limits of nVidia GPU(s)") sys.exit(1) oc_config = {} while True: try: if args.mode == "linux": clore_config = await async_read_file(clore_conf_path) clore_config = json.loads(clore_config) machine_name = f"LINUX_{machine_id}" elif args.mode == "hive": static_clore_config = await async_read_file(clore_conf_path) machine_name, clore_config, oc_config = await hive_load_configs(default_power_limits, static_clore_config) #print(f"Machine Name: {machine_name}") config_validation = validate_clore_config(clore_config) if config_validation == "Validation successful": if "save_config" in clore_config and args.mode == "hive": verify_or_update_file(clore_conf_path, json.dumps(clore_config)) if "set_stock_oc" in clore_config: if oc_config == {}: clore_config["clear_oc_override"] = True else: clore_config["stock_oc_override"] = oc_config if clore_config != last_used_config or (time.time() > next_retry_reached_server_limit and next_retry_reached_server_limit > 0): last_used_config = clore_config.copy() if type(clore_config) == dict and "hostname_override" in clore_config: machine_name = clore_config["hostname_override"] clore_config["name"]=machine_name clore_config["specs"]=machine_specs status_code, response_data = await post_request( args.clore_endpoint, clean_clore_config(clore_config), {"auth": clore_config["auth"]}, 15 ) next_retry_reached_server_limit = 0 if type(response_data) == dict: # Response data seem to be correct format if response_data.get("status") == "invalid_auth": logger.error("Invalid auth token") elif response_data.get("error") == "exceeded_rate_limit": logger.error("Exceeded API limits, probably a lot of servers on same network") logger.info("Retrying request in 65s") await asyncio.sleep(60) last_used_config = None elif response_data.get("status") == "exceeded_limit": logger.error("Your account already has the maximal server limit, retrying in 12hr") next_retry_reached_server_limit = time.time() + 60*60*12 elif response_data.get("status") == "creation_pending": logger.info("Machine creation is pending on clore.ai side") await asyncio.sleep(60 if ever_pending_creation else 10) ever_pending_creation = True last_used_config = None elif "init_communication_token" in response_data and "private_communication_token": clore_hosting_sw_auth_str = f"{response_data['init_communication_token']}:{response_data['private_communication_token']}" was_ok = verify_or_update_file(args.auth_file, clore_hosting_sw_auth_str) if was_ok: logger.info("Token for hosting software already configured") await run_command("systemctl start clore-hosting.service") else: logger.success("Updated local auth file, restarting clore hosting") await run_command("systemctl restart clore-hosting.service") else: logger.error("Unknown API response, retrying in 65s") await asyncio.sleep(60) last_used_config = None else: logger.error(f"Could not parse config - {' | '.join(config_validation)}") except Exception as e: print(e) await asyncio.sleep(5) if __name__ == "__main__": machine_specs = specs.get(benchmark_disk=True, mock=args.mock) asyncio.run(main(machine_specs))