From 4d2510eb2be360e16cbfbd370f1b5aaae8ee68c5 Mon Sep 17 00:00:00 2001 From: clore Date: Thu, 9 May 2024 23:32:41 +0000 Subject: [PATCH] initial nvml integration (OC support), HiveOS flightsheet support --- .gitignore | 3 +- clore_hosting/docker_configurator.py | 9 +- clore_hosting/main.py | 68 ++++++- clore_hosting/types.py | 1 + clore_hosting/ws_interface.py | 11 ++ lib/config.py | 1 + lib/get_specs.py | 3 + lib/logging.py | 3 + lib/nvml.py | 284 +++++++++++++++++++++++++++ lib/utils.py | 18 +- requirements.txt | 3 +- 11 files changed, 394 insertions(+), 10 deletions(-) create mode 100644 lib/nvml.py diff --git a/.gitignore b/.gitignore index 2fda43b..b1dc8d3 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ wireguard/ log-mon.ipynb t.py tests.ipynb -network-tests.ipynb \ No newline at end of file +network-tests.ipynb +gpu_specs.json \ No newline at end of file diff --git a/clore_hosting/docker_configurator.py b/clore_hosting/docker_configurator.py index a8ff153..e82c848 100644 --- a/clore_hosting/docker_configurator.py +++ b/clore_hosting/docker_configurator.py @@ -43,12 +43,13 @@ def configure(containers): used_startup_files=[] used_wireguard_configs=[] startup_sctipt_creation_fail = False + use_hive_flightsheet = False if type(containers) == list: custom_entrypoint_state = custom_entrypoint.cache_entrypoints(containers) if type(custom_entrypoint_state)!=list: - return False, valid_containers + return False, valid_containers, use_hive_flightsheet for index, container in enumerate(containers): ok_custom_entrypoint = False @@ -76,7 +77,9 @@ def configure(containers): used_startup_files.append(startup_script_name) used_startup_files.append(f"{container['name']}.finished") - if "network" in container and "network_subnet" in container and "network_gateway" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: + if "image" in container and container["image"]=="cloreai/hive-use-flightsheet": + use_hive_flightsheet=True + elif "network" in container and "network_subnet" in container and "network_gateway" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: if not container["network"] in containers_required_networks: containers_required_networks.append(container["network"]) if not container["network"] in default_network_names: @@ -137,4 +140,4 @@ def configure(containers): validation_and_security = docker_interface.validate_and_secure_networks() if startup_sctipt_creation_fail: validation_and_security=False - return validation_and_security, valid_containers \ No newline at end of file + return validation_and_security, valid_containers, use_hive_flightsheet \ No newline at end of file diff --git a/clore_hosting/main.py b/clore_hosting/main.py index 43ffc8f..6b7f08a 100644 --- a/clore_hosting/main.py +++ b/clore_hosting/main.py @@ -7,6 +7,7 @@ from lib import docker_deploy from lib import docker_pull from lib import get_specs from lib import utils +from lib import nvml log = logging_lib.log from clore_hosting import docker_configurator @@ -35,7 +36,7 @@ WebSocketClient = ws_interface.WebSocketClient(log_message_broker=container_log_ async def configure_networks(containers): res = await asyncio.to_thread(docker_configurator.configure, containers) try: - fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1]) + fin_res = types.DockerConfiguratorRes(validation_and_security=res[0], valid_containers=res[1], use_hive_flightsheet=res[2]) return fin_res except Exception as e: return False @@ -51,6 +52,22 @@ async def get_local_images(no_latest_tag = False): res = await asyncio.to_thread(docker_interface.get_local_images, no_latest_tag) return res +async def set_oc(settings): + try: + result = await asyncio.to_thread(nvml.set_oc, settings) + return result + except Exception as e: + log.error(f"set_oc() | error | {e}") + return False + +async def set_hive_miner_status(enabled=False): + try: + result = await asyncio.to_thread(utils.hive_set_miner_status, enabled) + return True + except Exception as e: + log.error(f"set_hive_miner_status() | error | {e}") + return False + class CloreClient: def __init__(self, auth_key): self.auth_key=auth_key @@ -79,7 +96,8 @@ class CloreClient: "startup_script_runner": utils.unix_timestamp(), "log_streaming_task": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(), - "specs_service": utils.unix_timestamp() + "specs_service": utils.unix_timestamp(), + "oc_service": utils.unix_timestamp() } self.max_service_inactivity = 600 # seconds @@ -87,7 +105,17 @@ class CloreClient: self.ws_peers[str(config.debug_ws_peer)]={ "expiration":"immune" } + docker_interface.verify_docker_version() + nvml.init() + + self.gpu_oc_specs = nvml.get_gpu_oc_specs() + self.last_oc_service_submit = 0 + self.last_applied_oc = {} + self.last_oc_apply_time = 0 + + self.is_hive = get_specs.is_hive() + self.use_hive_flightsheet = False async def service(self): global container_log_broken @@ -101,10 +129,11 @@ class CloreClient: task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring)) task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring)) + task7 = asyncio.create_task(self.oc_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) # Wait for both tasks to complete (they won't in this case) - await asyncio.gather(task1, task2, task3, task4, task5, task6, monitoring_task) + await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task) async def monitoring_service(self, monitoring): while True: @@ -381,6 +410,7 @@ class CloreClient: if result.validation_and_security: self.validated_containers_set=True self.validated_containers = result.valid_containers + self.use_hive_flightsheet = result.use_hive_flightsheet elif type(result)==types.DeployContainersRes: try: self.all_running_container_names = result.all_running_container_names @@ -395,7 +425,7 @@ class CloreClient: async def submit_specs(self, current_specs): try: if type(current_specs) == dict: - current_specs["backend_version"]=8 + current_specs["backend_version"]=9 current_specs["update_hw"]=True smallest_pcie_width = 999 for gpu in current_specs["gpus"]["nvidia"]: @@ -438,6 +468,36 @@ class CloreClient: log.debug(f"FAIL | specs_service() | {e}") await asyncio.sleep(7) + async def oc_service(self, monitoring): + while True: + try: + await monitoring.put("oc_service") + oc_apply_allowed = True + ### OC Service should also hande Hive stuff + if self.use_hive_flightsheet and self.is_hive: + await set_hive_miner_status(True) + oc_apply_allowed = False # Don't apply any OC when running HiveOS miner + elif self.is_hive: + await set_hive_miner_status(False) + ### Run OC tasks + oc_conf = WebSocketClient.get_oc() + if oc_conf[0] and type(self.gpu_oc_specs)==list and oc_conf[1]!=self.gpu_oc_specs and self.last_oc_service_submit+240 < utils.unix_timestamp(): + log.debug("Submitting \"gpu_oc_specs\"") + self.last_oc_service_submit = utils.unix_timestamp() + await WebSocketClient.send({ + "set_gpu_info":oc_conf[1], + "xorg_valid": True + }) + if oc_conf[0] and len(oc_conf[2].keys())>0 and oc_apply_allowed: + if utils.normalize_rule(self.last_applied_oc)!=utils.normalize_rule(oc_conf[2]) or (self.last_oc_apply_time < utils.unix_timestamp()-300): + self.last_oc_apply_time = utils.unix_timestamp() + log.debug(f"Applying OC | {json.dumps(oc_conf[2], separators=(',',':'))}") + await set_oc(oc_conf[2]) + self.last_applied_oc=oc_conf[2] + except Exception as e: + log.debug(f"FAIL | oc_service() | {e}") + await asyncio.sleep(2) + def expire_ws_peers(self): for ws_peer_address in list(self.ws_peers.keys()): ws_peer_info = self.ws_peers[ws_peer_address] diff --git a/clore_hosting/types.py b/clore_hosting/types.py index 273abc4..40e3fdf 100644 --- a/clore_hosting/types.py +++ b/clore_hosting/types.py @@ -9,6 +9,7 @@ class ServerConfig(BaseModel): class DockerConfiguratorRes(BaseModel): validation_and_security: bool valid_containers: List[Any] + use_hive_flightsheet: bool class DeployContainersRes(BaseModel): all_running_container_names: List[str] diff --git a/clore_hosting/ws_interface.py b/clore_hosting/ws_interface.py index e9ec821..cf54793 100644 --- a/clore_hosting/ws_interface.py +++ b/clore_hosting/ws_interface.py @@ -46,12 +46,19 @@ class WebSocketClient: self.current_container_logs = {} self.last_bash_rnd = '' + + self.oc_enabled = False + self.last_gpu_oc_specs = [] + self.last_set_oc = {} def get_last_heartbeat(self): return self.last_heartbeat def get_containers(self): return self.containers_set, self.containers + + def get_oc(self): + return self.oc_enabled, self.last_gpu_oc_specs, self.last_set_oc def set_ws_peers(self, ws_peers): tmp_ws_peers=[] @@ -150,8 +157,12 @@ class WebSocketClient: self.containers=parsed_json["new_containers"] #log.success(container_str) elif "allow_oc" in parsed_json: # Enable OC + self.oc_enabled=True await self.send(json.dumps({"allow_oc":True})) + elif "gpu_oc_info" in parsed_json: + self.last_gpu_oc_specs = parsed_json["gpu_oc_info"] elif "set_oc" in parsed_json: # Set specific OC + self.last_set_oc=parsed_json["set_oc"] back_oc_str = json.dumps({"current_oc":json.dumps(parsed_json["set_oc"], separators=(',',':'))}) await self.send(back_oc_str) elif "bash_cmd" in parsed_json and type(parsed_json["bash_cmd"])==str and "bash_rnd" in parsed_json: diff --git a/lib/config.py b/lib/config.py index 7a67080..101c932 100644 --- a/lib/config.py +++ b/lib/config.py @@ -47,6 +47,7 @@ parser.add_argument('--startup-scripts-folder', type=str, default='/opt/clore-ho parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs') parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints') parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") +parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json' ,help="Cache with specs of GPU possible OC/Power limit changes") # Parse arguments, ignoring any non-defined arguments args, _ = parser.parse_known_args() diff --git a/lib/get_specs.py b/lib/get_specs.py index e71a478..5add793 100644 --- a/lib/get_specs.py +++ b/lib/get_specs.py @@ -39,6 +39,9 @@ async def get_ram_usage(): def get_kernel(): return platform.uname().release +def is_hive(): + return "hive" in get_kernel() + def get_nvidia_version(): try: output = subprocess.check_output(['nvidia-smi', '-x', '-q'], encoding='utf-8') diff --git a/lib/logging.py b/lib/logging.py index 210b7de..2e696b0 100644 --- a/lib/logging.py +++ b/lib/logging.py @@ -15,6 +15,9 @@ class log: def error (message): ts = time_str() print(f"\033[91m{ts} | {message}\033[0m") + def info (message): + ts = time_str() + print(f"\033[94m{ts} | {message}\033[0m") def debug(message): if config.debug: ts = time_str() diff --git a/lib/nvml.py b/lib/nvml.py new file mode 100644 index 0000000..51efdda --- /dev/null +++ b/lib/nvml.py @@ -0,0 +1,284 @@ +from lib import config as config_module +from lib import logging as logging_lib +from lib import get_specs + +config = config_module.config +log = logging_lib.log + +import subprocess +import pynvml +import json + +is_hive = False +all_gpus_data_list=[] +get_data_fail=False + +def init(gpu_specs_file=None): + global is_hive, all_gpus_data_list, get_data_fail + log.info("Loading GPU OC specs [ working ]") + try: + pynvml.nvmlInit() + kernel = get_specs.get_kernel() + if "hive" in kernel: + is_hive=True + + specs_file_loc = gpu_specs_file if gpu_specs_file else config.gpu_specs_file + regenerate_specs = False + parsed_specs={} + try: + with open(specs_file_loc, "r") as specs_file: + parsed_specs = json.loads(specs_file.read()) + except Exception as specs_load_fail: + log.error(f"Failed loading gpu_specs_file ({specs_load_fail}) | regenerating...") + regenerate_specs=True + + parsed_specs_keys = parsed_specs.keys() + gpu_count = pynvml.nvmlDeviceGetCount() + for i in range(0,gpu_count): + if regenerate_specs: + break + gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) + gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) + if not f"{i}-{gpu_uuid}" in parsed_specs_keys: + parsed_specs={} + regenerate_specs=True + break + + if regenerate_specs: + for i in range(0,gpu_count): + gpu_spec={} + gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) + gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) + power_limits = pynvml.nvmlDeviceGetPowerManagementLimitConstraints(gpu_handle) + min_power_limit = int(power_limits[0] / 1000.0) + max_power_limit = int(power_limits[1] / 1000.0) + gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0) + gpu_spec["power_limits"] = [min_power_limit, max_power_limit] + gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle) + + pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle) + pci_bus_id = pci_info.bus + pci_device_id = pci_info.device + pci_domain_id = pci_info.domain + gpu_spec["pci_core"] = f"{pci_domain_id}:{pci_bus_id:02d}:{pci_device_id:02d}.0" + + mem_range = get_hive_clock_range(is_hive, i, "mem") + core_range = get_hive_clock_range(is_hive, i, "core") + if type(mem_range) != list: + pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, 200, 300) # Force low clocks, so the GPU can't crash when testing if under load + failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle) + failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle) + if (not failure_min) and (not failure_max): + mem_range=[min_oc_solution, max_oc_solution] + pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, 0) + pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle) + if type(core_range) != list: + pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, 300, 350) # Force low clocks, so the GPU can't crash when testing if under load + failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle, True) + failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle, True) + if (not failure_min) and (not failure_max): + core_range=[min_oc_solution, max_oc_solution] + pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, 0) + pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle) + if type(mem_range) == list and type(core_range) == list and len(mem_range)==2 and len(core_range)==2: + gpu_spec["mem"]=mem_range + gpu_spec["core"]=core_range + else: + get_data_fail=True + + parsed_specs[f"{i}-{gpu_uuid}"]=gpu_spec + with open(specs_file_loc, "w") as specs_file: + json.dump(parsed_specs, specs_file) + + if not get_data_fail: + parsed_specs_keys=parsed_specs.keys() + for key in parsed_specs_keys: + all_gpus_data_list.append(parsed_specs[key]) + except Exception as e: + get_data_fail=True + log.error("Loading GPU OC specs [ fail ]") + if not get_data_fail: + log.success("Loading GPU OC specs [ success ]") + + print(all_gpus_data_list) + # Load GPU specs + +def get_gpu_oc_specs(): + global get_data_fail + if get_data_fail: + return False + else: + return all_gpus_data_list + +def shutdown(): + pynvml.nvmlShutdown() + +def handle_nn(input_int): + if abs(4293967-input_int) < 10000: + return input_int-4293967 + elif abs(8589934-input_int) < 10000: + return input_int-8589934 + else: + return input_int + +def pinpoint_find_dicts_negative(data): + false_success_items = [d for d in data if not d['success']] + true_success_items = [d for d in data if d['success']] + highest_false_success = max(false_success_items, key=lambda x: x['offset'], default=None) + lowest_true_success = min(true_success_items, key=lambda x: x['offset'], default=None) + return highest_false_success, lowest_true_success + +def pinpoint_find_dicts_positive(data): + false_success_items = [d for d in data if not d['success']] + true_success_items = [d for d in data if d['success']] + lowest_false_success = min(false_success_items, key=lambda x: x['offset'], default=None) + highest_true_success = max(true_success_items, key=lambda x: x['offset'], default=None) + return highest_true_success, lowest_false_success + +def pinpoint_oc_limits_negative(gpu_handle, core=False): + step_cnt = 0 + found_solution = None + init_negative_max = -19855 # Probably + history_info = [{"offset": init_negative_max*2, "success":False}] + failure = False + max_step_cnt = 20 + try: + while found_solution == None and step_cnt2 and '.' in screen_line_parts[0]: + if screen_line_parts[0].split('.',1)[1]=="miner": + miner_screen_running=True + if miner_screen_running and not enabled: + run_command("miner stop") + elif enabled and not miner_screen_running: + run_command("nvidia-oc && miner start") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index cdb4674..6f8c3db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ speedtest-cli==2.1.3 psutil==5.9.0 python-iptables==1.0.1 websockets==12.0 -packaging==23.2 \ No newline at end of file +packaging==23.2 +git+https://git.clore.ai/clore/pynvml.git@main \ No newline at end of file