Compare commits

..

No commits in common. "main" and "nvml_integration" have entirely different histories.

15 changed files with 69 additions and 662 deletions

View File

@ -4,7 +4,6 @@ from lib import custom_entrypoint
from lib import networking from lib import networking
from lib import wireguard from lib import wireguard
from lib import logging as logging_lib from lib import logging as logging_lib
from clore_hosting import utils as hosting_utils
import shutil import shutil
import os import os
import re import re
@ -54,12 +53,9 @@ def configure(containers):
for index, container in enumerate(containers): for index, container in enumerate(containers):
ok_custom_entrypoint = False ok_custom_entrypoint = False
invalid_hostname = False
if index < len(custom_entrypoint_state): if index < len(custom_entrypoint_state):
ok_custom_entrypoint = custom_entrypoint_state[index] ok_custom_entrypoint = custom_entrypoint_state[index]
startup_script_name = f"{container['name']}.sh" startup_script_name = f"{container['name']}.sh"
if "hostname" in container and not hosting_utils.validate_hostname(container["hostname"]):
invalid_hostname = True
if "ip" in container and len(container["ip"])>6 and type(container["ip"])==str: if "ip" in container and len(container["ip"])>6 and type(container["ip"])==str:
if container["ip"][:8] == "; echo '": if container["ip"][:8] == "; echo '":
last_occurrence, text_after_last_ip = get_last_ip_occurrence_and_text(container["ip"]) last_occurrence, text_after_last_ip = get_last_ip_occurrence_and_text(container["ip"])
@ -99,14 +95,14 @@ def configure(containers):
newly_created_networks.append(container["network"]) newly_created_networks.append(container["network"])
else: else:
any_fail=True any_fail=True
if not any_fail and ok_custom_entrypoint and not invalid_hostname: if not any_fail and ok_custom_entrypoint:
valid_containers.append(container) valid_containers.append(container)
elif "network" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: # Subnet & gateway not defined, must be some of default networks, otherwise dump it elif "network" in container and container["network"][:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: # Subnet & gateway not defined, must be some of default networks, otherwise dump it
if container["network"] in default_network_names: if container["network"] in default_network_names:
for docker_network in docker_networks: for docker_network in docker_networks:
if docker_network["Name"]==container["network"]: if docker_network["Name"]==container["network"]:
for ipam in docker_network["IPAM"]: for ipam in docker_network["IPAM"]:
if not ok_custom_entrypoint or invalid_hostname: if not ok_custom_entrypoint:
break break
elif not "ip" in container: elif not "ip" in container:
valid_containers.append(container) valid_containers.append(container)

View File

@ -2,7 +2,6 @@ from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import log_streaming_task from lib import log_streaming_task
from lib import run_startup_script from lib import run_startup_script
from lib import hive_miner_interface
from lib import docker_interface from lib import docker_interface
from lib import docker_deploy from lib import docker_deploy
from lib import docker_pull from lib import docker_pull
@ -42,9 +41,9 @@ async def configure_networks(containers):
except Exception as e: except Exception as e:
return False return False
async def deploy_containers(validated_containers, allowed_running_containers): async def deploy_containers(validated_containers):
try: try:
all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers, allowed_running_containers) all_running_container_names, all_stopped_container_names = await asyncio.to_thread(docker_deploy.deploy, validated_containers)
return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names) return types.DeployContainersRes(all_running_container_names=all_running_container_names, all_stopped_container_names=all_stopped_container_names)
except Exception as e: except Exception as e:
return False return False
@ -98,8 +97,7 @@ class CloreClient:
"log_streaming_task": utils.unix_timestamp(), "log_streaming_task": utils.unix_timestamp(),
"container_log_streaming_service": utils.unix_timestamp(), "container_log_streaming_service": utils.unix_timestamp(),
"specs_service": utils.unix_timestamp(), "specs_service": utils.unix_timestamp(),
"oc_service": utils.unix_timestamp(), "oc_service": utils.unix_timestamp()
"background_pow_data_collection": utils.unix_timestamp()
} }
self.max_service_inactivity = 600 # seconds self.max_service_inactivity = 600 # seconds
@ -107,24 +105,9 @@ class CloreClient:
self.ws_peers[str(config.debug_ws_peer)]={ self.ws_peers[str(config.debug_ws_peer)]={
"expiration":"immune" "expiration":"immune"
} }
self.os_release = get_specs.get_os_release()
self.restart_docker = False
if "use_cgroupfs" in self.os_release:
self.updated_exec_opts = True if docker_interface.configure_exec_opts("native.cgroupdriver","cgroupfs") else False
if self.updated_exec_opts:
docker_info = docker_interface.get_info()
if "CgroupDriver" in docker_info and docker_info["CgroupDriver"]=="systemd":
self.restart_docker = True # Restart docker when it's loaded under systemd (accual restart will happen only if no orders running to not disrupt workload)
docker_interface.verify_docker_version() docker_interface.verify_docker_version()
nvml.init()
self.dont_use_hive_binaries = True if 'DONT_USE_HIVE_BINARIES' in os.environ else False
nvml.init(allow_hive_binaries=not self.dont_use_hive_binaries)
self.extra_allowed_images = utils.get_extra_allowed_images()
self.allowed_running_containers = utils.get_allowed_container_names()
self.gpu_oc_specs = nvml.get_gpu_oc_specs() self.gpu_oc_specs = nvml.get_gpu_oc_specs()
self.last_oc_service_submit = 0 self.last_oc_service_submit = 0
@ -134,9 +117,6 @@ class CloreClient:
self.is_hive = get_specs.is_hive() self.is_hive = get_specs.is_hive()
self.use_hive_flightsheet = False self.use_hive_flightsheet = False
self.hive_miner_interface = hive_miner_interface.hive_interface()
self.next_pow_background_job_send_update = 0
async def service(self): async def service(self):
global container_log_broken global container_log_broken
@ -146,15 +126,14 @@ class CloreClient:
task1 = asyncio.create_task(self.main(pull_list, monitoring)) task1 = asyncio.create_task(self.main(pull_list, monitoring))
task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring)) task2 = asyncio.create_task(self.handle_container_cache(pull_list, monitoring))
task3 = asyncio.create_task(self.startup_script_runner(monitoring)) task3 = asyncio.create_task(self.startup_script_runner(monitoring))
task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring, self.allowed_running_containers)) task4 = asyncio.create_task(log_streaming_task.log_streaming_task(container_log_broken, monitoring))
task5 = asyncio.create_task(self.container_log_streaming_service(monitoring)) task5 = asyncio.create_task(self.container_log_streaming_service(monitoring))
task6 = asyncio.create_task(self.specs_service(monitoring)) task6 = asyncio.create_task(self.specs_service(monitoring))
task7 = asyncio.create_task(self.oc_service(monitoring)) task7 = asyncio.create_task(self.oc_service(monitoring))
task8 = asyncio.create_task(self.background_pow_data_collection(monitoring))
monitoring_task = asyncio.create_task(self.monitoring_service(monitoring)) monitoring_task = asyncio.create_task(self.monitoring_service(monitoring))
# Wait for both tasks to complete (they won't in this case) # Wait for both tasks to complete (they won't in this case)
await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, task8, monitoring_task) await asyncio.gather(task1, task2, task3, task4, task5, task6, task7, monitoring_task)
async def monitoring_service(self, monitoring): async def monitoring_service(self, monitoring):
while True: while True:
@ -354,7 +333,6 @@ class CloreClient:
print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '') print("STEP",step,'|',self.containers_set, self.containers if config.log_containers_strings else '')
tasks = [] tasks = []
running_order = False
container_conf = WebSocketClient.get_containers() container_conf = WebSocketClient.get_containers()
@ -363,11 +341,10 @@ class CloreClient:
self.containers=container_conf[1] self.containers=container_conf[1]
tmp_images = [] tmp_images = []
for container in self.containers: for container in self.containers:
if "image" in container and "image" in container and container["image"]!="cloreai/hive-use-flightsheet": if "image" in container:
log_pull = False log_pull = False
if "name" in container: if "name" in container:
if "-order-" in container["name"]: if "-order-" in container["name"]:
running_order=True
log_pull=True log_pull=True
image_config = { image_config = {
"image":container["image"], "image":container["image"],
@ -385,12 +362,6 @@ class CloreClient:
if not image_config in tmp_images: if not image_config in tmp_images:
tmp_images.append(image_config) tmp_images.append(image_config)
if self.restart_docker and not running_order and len(self.containers)>0:
log.debug("Sending docker restart command")
utils.run_command_v2("systemctl restart docker")
self.restart_docker=False
if tmp_images!=self.needed_images: if tmp_images!=self.needed_images:
self.needed_images=tmp_images self.needed_images=tmp_images
await pull_list.put(self.needed_images) await pull_list.put(self.needed_images)
@ -404,7 +375,7 @@ class CloreClient:
tasks.append(WebSocketClient.stream_pull_logs()) tasks.append(WebSocketClient.stream_pull_logs())
if self.validated_containers_set: if self.validated_containers_set:
tasks.append(deploy_containers(self.validated_containers, self.allowed_running_containers)) tasks.append(deploy_containers(self.validated_containers))
if step==1: if step==1:
WebSocketClient.set_auth(self.auth_key) WebSocketClient.set_auth(self.auth_key)
@ -426,7 +397,7 @@ class CloreClient:
if type(result)==types.ServerConfig: if type(result)==types.ServerConfig:
if result.success: if result.success:
self.last_checked_ws_peers = utils.unix_timestamp() self.last_checked_ws_peers = utils.unix_timestamp()
self.allowed_images=result.allowed_images+self.extra_allowed_images self.allowed_images=result.allowed_images
if not config.debug_ws_peer: if not config.debug_ws_peer:
for pure_ws_peer in result.ws_peers: for pure_ws_peer in result.ws_peers:
self.ws_peers[pure_ws_peer]={ self.ws_peers[pure_ws_peer]={
@ -440,7 +411,6 @@ class CloreClient:
self.validated_containers_set=True self.validated_containers_set=True
self.validated_containers = result.valid_containers self.validated_containers = result.valid_containers
self.use_hive_flightsheet = result.use_hive_flightsheet self.use_hive_flightsheet = result.use_hive_flightsheet
log.debug(f"Use Hive flightsheet: {result.use_hive_flightsheet}")
elif type(result)==types.DeployContainersRes: elif type(result)==types.DeployContainersRes:
try: try:
self.all_running_container_names = result.all_running_container_names self.all_running_container_names = result.all_running_container_names
@ -455,7 +425,7 @@ class CloreClient:
async def submit_specs(self, current_specs): async def submit_specs(self, current_specs):
try: try:
if type(current_specs) == dict: if type(current_specs) == dict:
current_specs["backend_version"]=18 current_specs["backend_version"]=9
current_specs["update_hw"]=True current_specs["update_hw"]=True
smallest_pcie_width = 999 smallest_pcie_width = 999
for gpu in current_specs["gpus"]["nvidia"]: for gpu in current_specs["gpus"]["nvidia"]:
@ -477,7 +447,7 @@ class CloreClient:
"update_realtime_data":True, "update_realtime_data":True,
"gpus": gpu_list, "gpus": gpu_list,
"cpu": cpu_usage, "cpu": cpu_usage,
"ram": ram_usage.percent, "ram": ram_usage,
"all_running_container_names": self.all_running_container_names, "all_running_container_names": self.all_running_container_names,
"all_stopped_container_names": self.all_stopped_container_names "all_stopped_container_names": self.all_stopped_container_names
} }
@ -504,10 +474,10 @@ class CloreClient:
await monitoring.put("oc_service") await monitoring.put("oc_service")
oc_apply_allowed = True oc_apply_allowed = True
### OC Service should also hande Hive stuff ### OC Service should also hande Hive stuff
if self.use_hive_flightsheet and self.is_hive and not self.dont_use_hive_binaries: if self.use_hive_flightsheet and self.is_hive:
await set_hive_miner_status(True) await set_hive_miner_status(True)
oc_apply_allowed = False # Don't apply any OC when running HiveOS miner oc_apply_allowed = False # Don't apply any OC when running HiveOS miner
elif self.is_hive and not self.dont_use_hive_binaries: elif self.is_hive:
await set_hive_miner_status(False) await set_hive_miner_status(False)
### Run OC tasks ### Run OC tasks
oc_conf = WebSocketClient.get_oc() oc_conf = WebSocketClient.get_oc()
@ -528,22 +498,6 @@ class CloreClient:
log.debug(f"FAIL | oc_service() | {e}") log.debug(f"FAIL | oc_service() | {e}")
await asyncio.sleep(2) await asyncio.sleep(2)
async def background_pow_data_collection(self, monitoring):
while True:
try:
await monitoring.put("background_pow_data_collection")
if not self.dont_use_hive_binaries and self.is_hive:
miner_config = await self.hive_miner_interface.export_miner_stats(get_hashrates=False)
if (miner_config["miner_uptime"]>0 and miner_config["miner_uptime"]<60) or self.next_pow_background_job_send_update < time.time():
self.next_pow_background_job_send_update = time.time()+(5*60)
current_statistics = await self.hive_miner_interface.export_miner_stats(get_hashrates=True)
submit_result = await WebSocketClient.send({"submit_hashrates": current_statistics})
if not submit_result:
self.next_pow_background_job_send_update = time.time()+40
except Exception as e:
log.debug(f"FAIL | background_pow_data_collection() | {e}")
await asyncio.sleep(6)
def expire_ws_peers(self): def expire_ws_peers(self):
for ws_peer_address in list(self.ws_peers.keys()): for ws_peer_address in list(self.ws_peers.keys()):
ws_peer_info = self.ws_peers[ws_peer_address] ws_peer_info = self.ws_peers[ws_peer_address]

View File

@ -10,13 +10,5 @@ def is_valid_websocket_url(url):
return True return True
return False return False
def validate_hostname(hostname):
# Define the regular expression pattern for a valid hostname
pattern = re.compile(r'^[a-zA-Z0-9._-]{1,63}$')
if pattern.match(hostname):
return True
else:
return False
def unix_timestamp(): def unix_timestamp():
return int(time.time()) return int(time.time())

View File

@ -33,7 +33,7 @@ hard_config = {
"maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app "maximum_service_loop_time": 900, # Seconds, failsafe variable - if service is stuck processing longer than this timeframe it will lead into restarting the app
"maximum_pull_service_loop_time": 14400, # Exception for image pulling "maximum_pull_service_loop_time": 14400, # Exception for image pulling
"creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk "creation_engine": "wrapper", # "wrapper" or "sdk" | Wrapper - wrapped docker cli, SDK - docker sdk
"allow_mixed_gpus": True "allow_mixed_gpus": False
} }
parser = argparse.ArgumentParser(description='Example argparse usage') parser = argparse.ArgumentParser(description='Example argparse usage')
@ -48,8 +48,7 @@ parser.add_argument('--startup-scripts-folder', type=str, default='/opt/clore-ho
parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs') parser.add_argument('--wireguard-config-folder', type=str, default='/opt/clore-hosting/wireguard/configs', help='Folder with wireguard configs')
parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints') parser.add_argument('--entrypoints-folder', type=str, default='/opt/clore-hosting/entrypoints', help='Folder with custom entrypoints')
parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)") parser.add_argument('--debug-ws-peer', type=str, help="Specific ws peer to connect to (for debugging only)")
parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json', help="Cache with specs of GPU possible OC/Power limit changes") parser.add_argument('--gpu-specs-file', type=str, default='/opt/clore-hosting/client/gpu_specs.json' ,help="Cache with specs of GPU possible OC/Power limit changes")
parser.add_argument('--extra-allowed-images-file', type=str, default="/opt/clore-hosting/extra_allowed_images.json", help="Docker image whitelist, that are allowed by clore.ai hosting software")
# Parse arguments, ignoring any non-defined arguments # Parse arguments, ignoring any non-defined arguments
args, _ = parser.parse_known_args() args, _ = parser.parse_known_args()

View File

@ -63,7 +63,7 @@ def cache_entrypoints(containers):
else: else:
valid_conf.append(True) valid_conf.append(True)
for remaining_file in entrypoint_files: # We can remove files that are not needed anymore for remaining_file in entrypoint_files: # We can remove files that are not needed anymore
os.remove(os.path.join(config.entrypoints_folder,remaining_file)) os.remove(remaining_file)
return valid_conf return valid_conf
except Exception as e: except Exception as e:
return 'e' return 'e'

View File

@ -9,7 +9,7 @@ import docker
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30): def create_container(container_options, ip=None, docker_gpus=False, timeout=30):
# Sanitize and validate input # Sanitize and validate input
container_options = sanitize_input(container_options) container_options = sanitize_input(container_options)
@ -21,9 +21,6 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if "network_mode" in container_options: if "network_mode" in container_options:
command.extend(["--network", container_options["network_mode"]]) command.extend(["--network", container_options["network_mode"]])
if "hostname" in container_options:
command.extend(["--hostname", container_options["hostname"]])
if "cap_add" in container_options: if "cap_add" in container_options:
for cap in container_options["cap_add"]: for cap in container_options["cap_add"]:
command.extend(["--cap-add", cap]) command.extend(["--cap-add", cap])
@ -55,10 +52,6 @@ def create_container(container_options, ip=None, docker_gpus=False, shm_size=64,
if "runtime" in container_options: if "runtime" in container_options:
command.extend(["--runtime", container_options["runtime"]]) command.extend(["--runtime", container_options["runtime"]])
if shm_size != 64:
command.extend(["--shm-size", f"{shm_size}m"])
if docker_gpus: if docker_gpus:
if type(docker_gpus)==list: if type(docker_gpus)==list:
command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"']) command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"'])

View File

@ -3,18 +3,15 @@ from lib import logging as logging_lib
from lib import docker_cli_wrapper from lib import docker_cli_wrapper
from lib import docker_interface from lib import docker_interface
from lib import get_specs from lib import get_specs
from lib import utils
import docker import docker
from docker.types import EndpointConfig, NetworkingConfig from docker.types import EndpointConfig, NetworkingConfig
import os import os
shm_calculator = utils.shm_calculator(get_specs.get_total_ram_mb())
client = docker_interface.client client = docker_interface.client
config = config_module.config config = config_module.config
log = logging_lib.log log = logging_lib.log
def deploy(validated_containers, allowed_running_containers=[]): def deploy(validated_containers):
local_images = docker_interface.get_local_images() local_images = docker_interface.get_local_images()
all_containers = docker_interface.get_containers(all=True) all_containers = docker_interface.get_containers(all=True)
@ -46,7 +43,6 @@ def deploy(validated_containers, allowed_running_containers=[]):
for validated_container in validated_containers: for validated_container in validated_containers:
try: try:
SHM_SIZE = 64 # MB - default
image_ready = False image_ready = False
docker_gpus = None docker_gpus = None
@ -80,21 +76,12 @@ def deploy(validated_containers, allowed_running_containers=[]):
) )
} }
if "hostname" in validated_container:
container_options["hostname"]=validated_container["hostname"]
elif "clore-order-" in validated_container["name"]:
try:
container_options["hostname"] = f"O-{int(validated_container["name"][12:])}"
except Exception as eon:
pass
if "network" in validated_container: if "network" in validated_container:
container_options["network_mode"]=validated_container["network"] container_options["network_mode"]=validated_container["network"]
if "ip" in validated_container and config.creation_engine=="sdk": if "ip" in validated_container and config.creation_engine=="sdk":
del container_options["network_mode"] del container_options["network_mode"]
if "gpus" in validated_container and type(validated_container["gpus"])==bool: if "gpus" in validated_container and type(validated_container["gpus"])==bool:
if "clore-order-" in validated_container["name"]:
SHM_SIZE = shm_calculator.calculate('*')
container_options["runtime"]="nvidia" container_options["runtime"]="nvidia"
docker_gpus=True docker_gpus=True
container_options["device_requests"].append(docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])) container_options["device_requests"].append(docker.types.DeviceRequest(count=-1, capabilities=[['gpu']]))
@ -134,11 +121,9 @@ def deploy(validated_containers, allowed_running_containers=[]):
elif "entrypoint_command" in validated_container and type(validated_container["entrypoint_command"])==str and len(validated_container["entrypoint_command"])>0: elif "entrypoint_command" in validated_container and type(validated_container["entrypoint_command"])==str and len(validated_container["entrypoint_command"])>0:
container_options["entrypoint"]=validated_container["entrypoint_command"] container_options["entrypoint"]=validated_container["entrypoint_command"]
container_options["shm_size"] = f"{SHM_SIZE}m"
if not validated_container["name"] in created_container_names and image_ready: if not validated_container["name"] in created_container_names and image_ready:
if config.creation_engine == "wrapper": if config.creation_engine == "wrapper":
docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), shm_size=SHM_SIZE, docker_gpus=docker_gpus) docker_cli_wrapper.create_container(container_options, ip=(validated_container["ip"] if "ip" in validated_container else None), docker_gpus=docker_gpus)
else: else:
container = client.containers.create(**container_options) container = client.containers.create(**container_options)
if "ip" in validated_container: if "ip" in validated_container:
@ -174,13 +159,13 @@ def deploy(validated_containers, allowed_running_containers=[]):
container.stop() container.stop()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers and container.status == 'running': elif container.name not in paused_names+needed_running_names and container.status == 'running':
try: try:
container.stop() container.stop()
container.remove() container.remove()
except Exception as e: except Exception as e:
pass pass
elif container.name not in paused_names+needed_running_names+allowed_running_containers: elif container.name not in paused_names+needed_running_names:
try: try:
container.remove() container.remove()
except Exception as e: except Exception as e:

View File

@ -50,14 +50,6 @@ class DockerNetwork(BaseModel):
client = docker.from_env() client = docker.from_env()
low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock') low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock')
daemon_config_path = "/etc/docker/daemon.json"
def get_info():
try:
client_info = client.info()
return client_info
except Exception as e:
return {}
def check_docker_connection(): def check_docker_connection():
try: try:
@ -354,15 +346,16 @@ def validate_and_secure_networks():
def get_daemon_config(): def get_daemon_config():
config_path = "/etc/docker/daemon.json"
try: try:
with open(daemon_config_path, 'r') as file: with open(config_path, 'r') as file:
config_data = json.load(file) config_data = json.load(file)
return config_data return config_data
except FileNotFoundError: except FileNotFoundError:
print(f"Error: {daemon_config_path} not found.") print(f"Error: {config_path} not found.")
return None return None
except json.JSONDecodeError: except json.JSONDecodeError:
print(f"Error: Failed to parse JSON from {daemon_config_path}.") print(f"Error: Failed to parse JSON from {config_path}.")
return None return None
def verify_docker_version(min_version="17.06"): def verify_docker_version(min_version="17.06"):
@ -374,42 +367,4 @@ def verify_docker_version(min_version="17.06"):
os._exit(1) os._exit(1)
except Exception as e: except Exception as e:
log.error(f"Failed to verify docker version | {e}") log.error(f"Failed to verify docker version | {e}")
os._exit(1) os._exit(1)
def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"):
deamon_config = get_daemon_config()
if deamon_config:
try:
if (not "exec-opts" in deamon_config or type(deamon_config["exec-opts"])!=list) and value!=None:
deamon_config["exec-opts"]=[f"{key}={value}"]
elif "exec-opts" in deamon_config:
new_exec_opts=[]
matched_key=False
for exec_opt in deamon_config["exec-opts"]:
if '=' in exec_opt:
exec_opt_key, exec_opt_value = exec_opt.split('=',1)
if exec_opt_key==key:
matched_key=True
if value!=None:
new_exec_opts.append(f"{key}={value}")
else:
new_exec_opts.append(exec_opt)
else:
new_exec_opts.append(exec_opt)
if not matched_key:
new_exec_opts.append(f"{key}={value}")
if len(new_exec_opts)==0:
del deamon_config["exec-opts"]
else:
if deamon_config["exec-opts"] == new_exec_opts:
return "Same"
deamon_config["exec-opts"]=new_exec_opts
json_string = json.dumps(deamon_config, indent=4)
with open(daemon_config_path, 'w') as file:
file.write(json_string)
return True
except Exception as e:
log.error(f"Failed 'configure_exec_opts' | {e}")
return False
else:
return False

View File

@ -43,32 +43,6 @@ def get_kernel():
def is_hive(): def is_hive():
return "hive" in get_kernel() return "hive" in get_kernel()
def get_total_ram_mb():
total_ram = psutil.virtual_memory().total
return total_ram / (1024 ** 2)
def get_os_release():
try:
with open("/etc/os-release") as f:
os_info = f.read()
os_release = {}
for line in os_info.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
if value[:1]=='"' and value.endswith('"'):
value = value[1:len(value)-1]
os_release[key]=value
needed_cgroupfs_versions = ["22.04", "22.10"] # Mitigate issue https://github.com/NVIDIA/nvidia-docker/issues/1730
if "NAME" in os_release and "VERSION_ID" in os_release:
if os_release["NAME"].lower() == "ubuntu" and os_release["VERSION_ID"] in needed_cgroupfs_versions:
os_release["use_cgroupfs"]=True
return os_release
except Exception as e:
return {}
def drop_caches(): def drop_caches():
try: try:
with open('/proc/sys/vm/drop_caches', 'w') as f: with open('/proc/sys/vm/drop_caches', 'w') as f:
@ -322,7 +296,7 @@ class Specs:
gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info() gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info()
if require_same_gpus: if require_same_gpus:
last_gpu_name='' last_gpu_name=''
for gpu in gpus["nvidia"]: for gpu in gpus:
if not last_gpu_name: if not last_gpu_name:
last_gpu_name=gpu["name"] last_gpu_name=gpu["name"]
elif last_gpu_name!=gpu["name"]: elif last_gpu_name!=gpu["name"]:

View File

@ -1,205 +0,0 @@
import aiofiles
import asyncio
import json
import time
import os
import re
def extract_json_with_key(text, key):
json_pattern = r'({.*?})'
json_strings = re.findall(json_pattern, text, re.DOTALL)
json_objects_with_key = []
for json_str in json_strings:
try:
json_obj = json.loads(json_str)
if key in json.dumps(json_obj):
json_objects_with_key.append(json_obj)
except json.JSONDecodeError:
continue
return json_objects_with_key
async def async_run_bash_command(command):
process = await asyncio.create_subprocess_exec(
'/bin/bash', '-c', command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return stdout.decode().strip(), stderr.decode().strip(), process.returncode
async def check_and_read_file(file_path):
try:
if os.path.exists(file_path):
async with aiofiles.open(file_path, mode='r') as file:
contents = await file.read()
return contents
else:
return "fail"
except Exception as e:
return "fail"
async def get_session_start_time(pid):
try:
async with aiofiles.open(f'/proc/{pid}/stat', 'r') as file:
stat_info = (await file.read()).split()
start_time_ticks = int(stat_info[21])
clock_ticks_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
start_time_seconds = start_time_ticks / clock_ticks_per_sec
boot_time = None
async with aiofiles.open('/proc/stat', 'r') as file:
async for line in file:
if line.startswith('btime'):
boot_time = int(line.split()[1])
break
if boot_time is None:
raise ValueError("Failed to find boot time in /proc/stat")
start_time = (boot_time + start_time_seconds)
return start_time
except FileNotFoundError:
return None
except Exception as e:
print(f"Error retrieving session start time: {e}")
return None
async def get_miner_stats(miner_dir, api_timeout=15):
stdout, stderr, code = await async_run_bash_command(f'export PATH=$PATH:/hive/sbin:/hive/bin && export API_TIMEOUT={api_timeout}'+''' && read -t $((API_TIMEOUT + 5)) -d "" pid khs stats < <(function run_miner_scripts { echo "$BASHPID"; cd '''+miner_dir+''' || exit; cd '''+miner_dir+'''; source h-manifest.conf; cd '''+miner_dir+'''; source h-stats.sh; printf "%q\n" "$khs"; echo "$stats"; }; run_miner_scripts) && [[ $? -ge 128 ]] && [[ ! -z "$pid" && "$pid" -ne $$ ]] && kill -9 "$pid" 2>/dev/null ; echo $stats''')
try:
if stdout and not stderr and code == 0:
stats = json.loads(stdout)
return stats
else:
return 'fail'
except Exception as e:
try:
miner_stat_jsons = extract_json_with_key(stdout, "hs_units")
return miner_stat_jsons[0] if len(miner_stat_jsons) > 0 else 'fail'
except Exception :
return 'fail'
async def get_miner_uptime_stats():
stdout, stderr, code = await async_run_bash_command("screen -ls")
if not stderr and code == 0:
for line in stdout.split('\n'):
if line[:1]=='\t':
if line.split('\t')[1].endswith(".miner"):
miner_screen_pid = line.split('\t')[1].split('.')[0]
if miner_screen_pid.isdigit():
miner_start_time = await get_session_start_time(miner_screen_pid)
return miner_start_time
return None
def extract_miner_names(rig_conf):
miner_names=[]
for line in rig_conf.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
if key[:5]=="MINER" and (len(key)==5 or str(key[5:]).isdigit()):
if value.startswith('"') and value.endswith('"'):
value = value.strip('"')
if len(value)>0:
miner_names.append(value)
return miner_names
def extract_miner_config(miner_names, wallet_conf):
lines = wallet_conf.split('\n')
meta_config = {}
miners = {}
for miner_idx, miner_name in enumerate(miner_names):
remaining_lines = []
algos = []
for line in lines:
if not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
if value.startswith('"') and value.endswith('"'):
value = value.strip('"')
if value.startswith("'") and value.endswith("'"):
value = value.strip("'")
if key[:len(miner_name.replace('-','_'))+1].lower() == f"{miner_name.replace('-','_')}_".lower():
if key.split('_')[-1][:4].lower()=="algo":
algos.append(value)
elif miner_name.lower()=="custom" and key.lower()=="custom_miner":
miner_names[miner_idx] = os.path.join(miner_names[miner_idx],value)
elif key.lower()=="meta":
try:
meta_config=json.loads(value)
except Exception as e:
pass
else:
remaining_lines.append(line)
lines = remaining_lines
miners[miner_name]={
"algos":algos,
"coins":[]
}
for miner_name in miner_names:
if "custom/" in miner_name.lower():
miner_name = "custom"
if miner_name in meta_config and type(meta_config[miner_name]) == dict:
for key in meta_config[miner_name].keys():
if "coin" in key:
miners[miner_name]["coins"].append(meta_config[miner_name][key])
return miner_names, miners
def sum_numbers_in_list(lst):
if all(isinstance(i, (int, float)) for i in lst):
return sum(lst)
else:
return "The list contains non-numeric elements."
class hive_interface:
def __init__(self):
self.hive_miners_dir = "/hive/miners"
self.hive_rig_config = "/hive-config/rig.conf"
self.hive_wallet_config = "/hive-config/wallet.conf"
async def get_miners_stats(self, miner_names):
scrape_tasks = []
for miner_name in miner_names:
scrape_tasks.append(get_miner_stats(os.path.join(self.hive_miners_dir, miner_name)))
results = await asyncio.gather(*scrape_tasks)
return results
async def get_configured_miners(self):
rig_conf, wallet_conf = await asyncio.gather(*[check_and_read_file(self.hive_rig_config), check_and_read_file(self.hive_wallet_config)])
miner_names = extract_miner_names(rig_conf)
miner_names, miner_config = extract_miner_config(miner_names, wallet_conf)
return miner_names, miner_config
async def export_miner_stats(self, get_hashrates=False):
output = {
"miner_uptime": 0
}
miner_start_ts = await get_miner_uptime_stats()
if miner_start_ts:
output["miner_uptime"] = int(time.time()-miner_start_ts)
miner_names, miner_config = await self.get_configured_miners()
output["miners"]=miner_config
if get_hashrates:
miners_stats = await self.get_miners_stats(miner_names)
for idx, miner_name in enumerate(miner_names):
miner_names[idx] = "custom" if "custom/" in miner_name.lower() else miner_name
for idx, miner_stats in enumerate(miners_stats):
if type(miner_stats) == dict:
for key in miner_stats.keys():
if key[:2]=="hs" and (key=="hs" or key[2:].isdigit()):
all_hs = sum_numbers_in_list(miner_stats[key])
try:
if "hs_units" in miner_stats:
if miner_stats["hs_units"]=="hs":
all_hs = all_hs/1000
if not "hashrates" in output['miners'][miner_names[idx]]:
output['miners'][miner_names[idx]]["hashrates"]=[]
if isinstance(all_hs, (float, int)):
output['miners'][miner_names[idx]]["hashrates"].append(all_hs)
else:
output['miners'][miner_names[idx]]["hashrates"].append(0)
except Exception as e:
pass
return output

View File

@ -3,7 +3,6 @@ from lib import logging as logging_lib
from lib import get_specs from lib import get_specs
from lib import utils from lib import utils
import threading import threading
import socket
import aiohttp import aiohttp
import asyncio import asyncio
import json import json
@ -48,11 +47,9 @@ async def register_server(data):
"Content-Type": "application/json" "Content-Type": "application/json"
} }
connector = aiohttp.TCPConnector(family=socket.AF_INET) async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=connector) as session:
try: try:
async with session.post(url, data=json_data, headers=headers, timeout=15) as response: async with session.post(url, data=json_data, headers=headers, timeout=5) as response:
if response.status == 200: if response.status == 200:
# Successful response # Successful response
response_data = await response.json() response_data = await response.json()

View File

@ -10,7 +10,7 @@ from lib import container_logs
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import queue # Import the synchronous queue module import queue # Import the synchronous queue module
async def log_streaming_task(message_broker, monitoring, do_not_stream_containers): async def log_streaming_task(message_broker, monitoring):
client = docker_interface.client client = docker_interface.client
executor = ThreadPoolExecutor(max_workers=4) executor = ThreadPoolExecutor(max_workers=4)
tasks = {} tasks = {}
@ -29,15 +29,14 @@ async def log_streaming_task(message_broker, monitoring, do_not_stream_container
# Start tasks for new containers # Start tasks for new containers
for container_name, container in current_containers.items(): for container_name, container in current_containers.items():
if not container_name in do_not_stream_containers: log_container_names.append(container_name)
log_container_names.append(container_name) if container_name not in tasks:
if container_name not in tasks: log.debug(f"log_streaming_task() | Starting task for {container_name}")
log.debug(f"log_streaming_task() | Starting task for {container_name}") sync_queue = queue.Queue()
sync_queue = queue.Queue() task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor(
task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor( executor, container_logs.stream_logs, container_name, sync_queue))
executor, container_logs.stream_logs, container_name, sync_queue)) tasks[container_name] = task
tasks[container_name] = task queues[container_name] = sync_queue
queues[container_name] = sync_queue
await message_broker.put(log_container_names) await message_broker.put(log_container_names)

View File

@ -6,77 +6,20 @@ config = config_module.config
log = logging_lib.log log = logging_lib.log
import subprocess import subprocess
import clore_pynvml as pynvml import pynvml
import json import json
import math
HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./"
GPU_MEM_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
"NVIDIA P102-100": [-2000, 2000],
"NVIDIA P104-100": [-2000, 2000],
"NVIDIA P106-090": [-2000, 2000],
"NVIDIA P106-100": [-2000, 2000],
"NVIDIA GeForce GTX 1050 Ti": [-2000, 2000],
"NVIDIA GeForce GTX 1060 3GB": [-2000, 2000],
"NVIDIA GeForce GTX 1060 6GB": [-2000, 2000],
"NVIDIA GeForce GTX 1070": [-2000, 2000],
"NVIDIA GeForce GTX 1070 Ti": [-2000, 2000],
"NVIDIA GeForce GTX 1080": [-2000, 2000],
"NVIDIA GeForce GTX 1080 Ti": [-2000, 2000],
"NVIDIA CMP 30HX": [-2000, 6000],
"NVIDIA CMP 40HX": [-2000, 6000],
"NVIDIA CMP 50HX": [-2000, 6000],
"NVIDIA CMP 90HX": [-2000, 6000],
"NVIDIA GeForce GTX 1650": [-2000, 6000],
"NVIDIA GeForce GTX 1660 SUPER": [-2000, 6000],
"NVIDIA GeForce GTX 1660 Ti": [-2000, 6000],
"NVIDIA GeForce RTX 2060": [-2000, 6000],
"NVIDIA GeForce RTX 2060 SUPER": [-2000, 6000],
"NVIDIA GeForce RTX 2070": [-2000, 6000],
"NVIDIA GeForce RTX 2070 SUPER": [-2000, 6000],
"NVIDIA GeForce RTX 2080": [-2000, 6000],
"NVIDIA GeForce RTX 2080 Ti": [-2000, 6000]
}
GPU_CORE_ALLOWED_OC_RANGES = { # Known to be problematic GPUs
"NVIDIA P102-100": [-200, 1200],
"NVIDIA P104-100": [-200, 1200],
"NVIDIA P106-090": [-200, 1200],
"NVIDIA P106-100": [-200, 1200],
"NVIDIA GeForce GTX 1050 Ti": [-200, 1200],
"NVIDIA GeForce GTX 1060 3GB": [-200, 1200],
"NVIDIA GeForce GTX 1060 6GB": [-200, 1200],
"NVIDIA GeForce GTX 1070": [-200, 1200],
"NVIDIA GeForce GTX 1070 Ti": [-200, 1200],
"NVIDIA GeForce GTX 1080": [-200, 1200],
"NVIDIA GeForce GTX 1080 Ti": [-200, 1200],
"NVIDIA CMP 30HX": [-1000, 1000],
"NVIDIA CMP 40HX": [-1000, 1000],
"NVIDIA CMP 50HX": [-1000, 1000],
"NVIDIA CMP 90HX": [-1000, 1000],
"NVIDIA GeForce GTX 1650": [-1000, 1000],
"NVIDIA GeForce GTX 1660 SUPER": [-1000, 1000],
"NVIDIA GeForce GTX 1660 Ti": [-1000, 1000],
"NVIDIA GeForce RTX 2060": [-1000, 1000],
"NVIDIA GeForce RTX 2060 SUPER": [-1000, 1000],
"NVIDIA GeForce RTX 2070": [-1000, 1000],
"NVIDIA GeForce RTX 2070 SUPER": [-1000, 1000],
"NVIDIA GeForce RTX 2080": [-1000, 1000],
"NVIDIA GeForce RTX 2080 Ti": [-1000, 1000]
}
is_hive = False is_hive = False
all_gpus_data_list=[] all_gpus_data_list=[]
get_data_fail=False get_data_fail=False
def init(gpu_specs_file=None, allow_hive_binaries=True): def init(gpu_specs_file=None):
global is_hive, all_gpus_data_list, get_data_fail global is_hive, all_gpus_data_list, get_data_fail
log.info("Loading GPU OC specs [ working ]") log.info("Loading GPU OC specs [ working ]")
try: try:
pynvml.nvmlInit() pynvml.nvmlInit()
kernel = get_specs.get_kernel() kernel = get_specs.get_kernel()
if "hive" in kernel and allow_hive_binaries: if "hive" in kernel:
is_hive=True is_hive=True
specs_file_loc = gpu_specs_file if gpu_specs_file else config.gpu_specs_file specs_file_loc = gpu_specs_file if gpu_specs_file else config.gpu_specs_file
@ -100,15 +43,10 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
parsed_specs={} parsed_specs={}
regenerate_specs=True regenerate_specs=True
break break
elif not "locks" in parsed_specs[f"{i}-{gpu_uuid}"]:
parsed_specs={}
regenerate_specs=True
break
if regenerate_specs: if regenerate_specs:
for i in range(0,gpu_count): for i in range(0,gpu_count):
gpu_spec={} gpu_spec={}
mem_to_core_allowed_locks = get_gpu_locked_clocks(i)
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i) gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle) gpu_uuid = pynvml.nvmlDeviceGetUUID(gpu_handle)
power_limits = pynvml.nvmlDeviceGetPowerManagementLimitConstraints(gpu_handle) power_limits = pynvml.nvmlDeviceGetPowerManagementLimitConstraints(gpu_handle)
@ -117,7 +55,6 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0) gpu_spec["default_power_limit"] = int(pynvml.nvmlDeviceGetPowerManagementDefaultLimit(gpu_handle) / 1000.0)
gpu_spec["power_limits"] = [min_power_limit, max_power_limit] gpu_spec["power_limits"] = [min_power_limit, max_power_limit]
gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle) gpu_spec["name"] = pynvml.nvmlDeviceGetName(gpu_handle)
gpu_spec["locks"] = mem_to_core_allowed_locks
pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle) pci_info = pynvml.nvmlDeviceGetPciInfo(gpu_handle)
pci_bus_id = pci_info.bus pci_bus_id = pci_info.bus
@ -127,57 +64,22 @@ def init(gpu_specs_file=None, allow_hive_binaries=True):
mem_range = get_hive_clock_range(is_hive, i, "mem") mem_range = get_hive_clock_range(is_hive, i, "mem")
core_range = get_hive_clock_range(is_hive, i, "core") core_range = get_hive_clock_range(is_hive, i, "core")
try: if type(mem_range) != list:
if type(mem_range) != list: pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, 200, 300) # Force low clocks, so the GPU can't crash when testing if under load
pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, 200, 300) # Force low clocks, so the GPU can't crash when testing if under load failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle)
failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle) failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle)
failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle) if (not failure_min) and (not failure_max):
if (not failure_min) and (not failure_max): mem_range=[min_oc_solution, max_oc_solution]
mem_range=[min_oc_solution, max_oc_solution] pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, 0)
pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, 0) pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle)
pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle) if type(core_range) != list:
if type(core_range) != list: pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, 300, 350) # Force low clocks, so the GPU can't crash when testing if under load
pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, 300, 350) # Force low clocks, so the GPU can't crash when testing if under load failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle, True)
failure_min, min_oc_solution = pinpoint_oc_limits_negative(gpu_handle, True) failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle, True)
failure_max, max_oc_solution = pinpoint_oc_limits_positive(gpu_handle, True) if (not failure_min) and (not failure_max):
if (not failure_min) and (not failure_max): core_range=[min_oc_solution, max_oc_solution]
core_range=[min_oc_solution, max_oc_solution] pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, 0)
pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, 0) pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
except Exception as e_pinpointing:
if "not supported" in str(e_pinpointing).lower():
try:
min_core_offset, max_core_offset = pynvml.nvmlDeviceGetGpcClkMinMaxVfOffset(gpu_handle)
if min_core_offset>0:
min_core_offset = min_core_offset - math.floor((2**32)/1000)
if min_core_offset > -20000 and min_core_offset <= 0 and max_core_offset>=0 and min_core_offset < 20000:
core_range=[min_core_offset, max_core_offset]
else:
core_range=[0,0]
min_mem_offset, max_mem_offset = pynvml.nvmlDeviceGetMemClkMinMaxVfOffset(gpu_handle)
if min_mem_offset>0:
min_mem_offset = min_mem_offset - math.floor((2**32)/1000)
if min_mem_offset==0 and max_mem_offset==0:
if gpu_spec["name"] in GPU_MEM_ALLOWED_OC_RANGES:
mem_range = GPU_MEM_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
mem_range = [0,0]
elif min_mem_offset > -20000 and min_mem_offset <= 0 and max_mem_offset>=0 and max_mem_offset < 20000:
mem_range=[min_mem_offset, max_mem_offset]
else:
mem_range=[0,0]
except Exception as e2:
if "function not found" in str(e2).lower():
if gpu_spec["name"] in GPU_MEM_ALLOWED_OC_RANGES:
mem_range = GPU_MEM_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
mem_range = [0,0]
if gpu_spec["name"] in GPU_CORE_ALLOWED_OC_RANGES:
core_range = GPU_CORE_ALLOWED_OC_RANGES[gpu_spec["name"]]
else:
core_range = [0,0]
else:
get_data_fail=True
if type(mem_range) == list and type(core_range) == list and len(mem_range)==2 and len(core_range)==2: if type(mem_range) == list and type(core_range) == list and len(mem_range)==2 and len(core_range)==2:
gpu_spec["mem"]=mem_range gpu_spec["mem"]=mem_range
gpu_spec["core"]=core_range gpu_spec["core"]=core_range
@ -211,19 +113,6 @@ def get_gpu_oc_specs():
def shutdown(): def shutdown():
pynvml.nvmlShutdown() pynvml.nvmlShutdown()
def get_gpu_locked_clocks(gpu_index):
try:
handle = pynvml.nvmlDeviceGetHandleByIndex(gpu_index)
mem_clocks = pynvml.nvmlDeviceGetSupportedMemoryClocks(handle)
mem_to_core = {}
for idx, mem_clock in enumerate(mem_clocks):
if idx < 12 or idx == len(mem_clocks)-1:
graphics_clocks = pynvml.nvmlDeviceGetSupportedGraphicsClocks(handle, mem_clock)
mem_to_core[str(mem_clock)] = [min(graphics_clocks), max(graphics_clocks)]
return mem_to_core
except Exception as e:
return {}
def handle_nn(input_int): def handle_nn(input_int):
if abs(4293967-input_int) < 10000: if abs(4293967-input_int) < 10000:
return input_int-4293967 return input_int-4293967
@ -329,7 +218,6 @@ def pinpoint_oc_limits_positive(gpu_handle, core=False):
return failure, found_solution return failure, found_solution
def set_oc(settings): def set_oc(settings):
global is_hive
try: try:
gpu_count = pynvml.nvmlDeviceGetCount() gpu_count = pynvml.nvmlDeviceGetCount()
settings_keys = settings.keys() settings_keys = settings.keys()
@ -343,10 +231,6 @@ def set_oc(settings):
} }
settings_keys = settings.keys() settings_keys = settings.keys()
log.debug(f"Rewriting settings with: {json.dumps(settings)}") log.debug(f"Rewriting settings with: {json.dumps(settings)}")
core_locks = []
mem_locks = []
any_lock_failure = False
for oc_gpu_index in settings_keys: for oc_gpu_index in settings_keys:
if oc_gpu_index.isdigit(): if oc_gpu_index.isdigit():
oc_gpu_index=int(oc_gpu_index) oc_gpu_index=int(oc_gpu_index)
@ -354,42 +238,13 @@ def set_oc(settings):
gpu_oc_config = settings[str(oc_gpu_index)] gpu_oc_config = settings[str(oc_gpu_index)]
gpu_possible_ranges = all_gpus_data_list[oc_gpu_index] gpu_possible_ranges = all_gpus_data_list[oc_gpu_index]
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(oc_gpu_index) gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(oc_gpu_index)
if "core" in gpu_oc_config:
if "core_lock" in gpu_oc_config:
core_lock = int(gpu_oc_config["core_lock"])
core_locks.append(str(core_lock))
try:
pynvml.nvmlDeviceSetGpuLockedClocks(gpu_handle, core_lock, core_lock)
except Exception as core_lock_exception:
any_lock_failure=True
else:
core_locks.append('0')
try:
pynvml.nvmlDeviceResetGpuLockedClocks(gpu_handle)
except Exception as core_lock_exception:
any_lock_failure=True
if "mem_lock" in gpu_oc_config:
mem_lock = int(gpu_oc_config["mem_lock"])
mem_locks.append(str(mem_lock))
try:
pynvml.nvmlDeviceSetMemoryLockedClocks(gpu_handle, mem_lock, mem_lock)
except Exception as mem_lock_exception:
any_lock_failure=True
else:
mem_locks.append('0')
try:
pynvml.nvmlDeviceResetMemoryLockedClocks(gpu_handle)
except Exception as mem_lock_exception:
any_lock_failure=True
if "core" in gpu_oc_config: # Core offset
wanted_core_clock = int(round(gpu_oc_config["core"]*2)) wanted_core_clock = int(round(gpu_oc_config["core"]*2))
if gpu_possible_ranges["core"][0] <= wanted_core_clock and wanted_core_clock <= gpu_possible_ranges["core"][1]: if gpu_possible_ranges["core"][0] <= wanted_core_clock and wanted_core_clock <= gpu_possible_ranges["core"][1]:
pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, wanted_core_clock) pynvml.nvmlDeviceSetGpcClkVfOffset(gpu_handle, wanted_core_clock)
else: else:
log.error(f"Requested OC for GPU:{oc_gpu_index} (CORE) out of bound | {wanted_core_clock} | [{gpu_possible_ranges["core"][0]}, {gpu_possible_ranges["core"][1]}]") log.error(f"Requested OC for GPU:{oc_gpu_index} (CORE) out of bound | {wanted_core_clock} | [{gpu_possible_ranges["core"][0]}, {gpu_possible_ranges["core"][1]}]")
if "mem" in gpu_oc_config: # Memory offset if "mem" in gpu_oc_config:
wanted_mem_clock = int(round(gpu_oc_config["mem"]*2)) wanted_mem_clock = int(round(gpu_oc_config["mem"]*2))
if gpu_possible_ranges["mem"][0] <= wanted_mem_clock and wanted_mem_clock <= gpu_possible_ranges["mem"][1]: if gpu_possible_ranges["mem"][0] <= wanted_mem_clock and wanted_mem_clock <= gpu_possible_ranges["mem"][1]:
pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, wanted_mem_clock) pynvml.nvmlDeviceSetMemClkVfOffset(gpu_handle, wanted_mem_clock)
@ -401,17 +256,6 @@ def set_oc(settings):
pynvml.nvmlDeviceSetPowerManagementLimit(gpu_handle, wanted_power_limit_milliwatts) pynvml.nvmlDeviceSetPowerManagementLimit(gpu_handle, wanted_power_limit_milliwatts)
else: else:
log.error(f"Requested OC for GPU:{oc_gpu_index} (POWER LIMIT) out of bound | {gpu_oc_config["pl"]} | [{gpu_possible_ranges["power_limits"][0]}, {gpu_possible_ranges["power_limits"][1]}]") log.error(f"Requested OC for GPU:{oc_gpu_index} (POWER LIMIT) out of bound | {gpu_oc_config["pl"]} | [{gpu_possible_ranges["power_limits"][0]}, {gpu_possible_ranges["power_limits"][1]}]")
if is_hive and any_lock_failure and len(mem_locks)==len(core_locks):
try:
nvtool_commands = []
for idx, mem_lock in enumerate(mem_locks):
core_lock = core_locks[idx]
nvtool_commands.append(f"nvtool -i {str(idx)} --setmem {mem_lock} --setcore {core_lock}")
cmd = ["bash",'-c',f"PATH={HIVE_PATH} && sudo {' && '.join(nvtool_commands)}"]
#print(cmd)
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as hive_oc_settings:
pass
return True return True
except Exception as e: except Exception as e:
log.error(f"set_oc | ERROR | {e}") log.error(f"set_oc | ERROR | {e}")
@ -423,7 +267,7 @@ def get_hive_clock_range(is_hive, gpu_index, part):
if is_hive: if is_hive:
try: try:
flag = "--setmemoffset" if part=="mem" else "--setcoreoffset" flag = "--setmemoffset" if part=="mem" else "--setcoreoffset"
cmd = ["bash",'-c',f"PATH={HIVE_PATH} && sudo nvtool -i {gpu_index} {flag} -100000"] cmd = ["bash",'-c',f"nvtool -i 0 {flag} -100000"]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
lines = result.stdout.decode().splitlines() lines = result.stdout.decode().splitlines()
@ -447,17 +291,4 @@ def get_hive_clock_range(is_hive, gpu_index, part):
except Exception as e: except Exception as e:
return False return False
else: else:
return False return False
def get_vram_per_gpu():
vram_per_gpu = []
try:
gpu_count = pynvml.nvmlDeviceGetCount()
for i in range(0,gpu_count):
gpu_handle = pynvml.nvmlDeviceGetHandleByIndex(i)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(gpu_handle)
vram_per_gpu.append(mem_info.total / 1024 ** 2)
except Exception as e:
log.error(f"Failed loading get_vram_per_gpu() | {e}")
pass
return vram_per_gpu

View File

@ -1,13 +1,11 @@
from lib import config as config_module from lib import config as config_module
from lib import logging as logging_lib from lib import logging as logging_lib
from lib import nvml
import subprocess import subprocess
import hashlib import hashlib
import random import random
import string import string
import shlex import shlex
import time import time
import math
import json import json
import os import os
@ -43,20 +41,12 @@ def normalize_rule(rule_dict):
def get_auth(): def get_auth():
try: try:
if 'AUTH_TOKEN' in os.environ:
return os.environ['AUTH_TOKEN']
auth_str = '' auth_str = ''
with open(config.auth_file, "r", encoding="utf-8") as file: with open(config.auth_file, "r", encoding="utf-8") as file:
auth_str = file.read().strip() auth_str = file.read().strip()
return auth_str return auth_str
except Exception as e: except Exception as e:
return '' return ''
def get_allowed_container_names():
allowed_container_names = os.getenv("ALLOWED_CONTAINER_NAMES")
if type(allowed_container_names)==str and len(allowed_container_names)>0:
return [x for x in allowed_container_names.split(',') if x]
return []
def unix_timestamp(): def unix_timestamp():
return int(time.time()) return int(time.time())
@ -101,70 +91,18 @@ def generate_random_string(length):
characters = string.ascii_letters + string.digits characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length)) return ''.join(random.choice(characters) for _ in range(length))
HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./"
def hive_set_miner_status(enabled=False): def hive_set_miner_status(enabled=False):
### control miner state - OFF/ON ### control miner state - OFF/ON
screen_out = run_command("screen -ls") screen_out = run_command("screen -ls")
miner_screen_running = False miner_screen_running = False
miner_screen_session_pids = []
if screen_out[0] == 0 or screen_out[0] == 1: if screen_out[0] == 0 or screen_out[0] == 1:
screen_lines=screen_out[1].split('\n') screen_lines=screen_out[1].split('\n')
for screen_line in screen_lines: for screen_line in screen_lines:
screen_line_parts=screen_line.replace('\t', '', 1).split('\t') screen_line_parts=screen_line.replace('\t', '', 1).split('\t')
if len(screen_line_parts)>2 and '.' in screen_line_parts[0]: if len(screen_line_parts)>2 and '.' in screen_line_parts[0]:
if screen_line_parts[0].split('.',1)[1]=="miner": if screen_line_parts[0].split('.',1)[1]=="miner":
miner_screen_session_pids.append(screen_line_parts[0].split('.',1)[0])
miner_screen_running=True miner_screen_running=True
if len(miner_screen_session_pids) > 1: ## Something really bad going on, destroy all instances if miner_screen_running and not enabled:
for idx, miner_screen_session_pid in enumerate(miner_screen_session_pids): run_command("miner stop")
run_command(f"kill -9 {miner_screen_session_pid}{' && screen -wipe' if idx==len(miner_screen_session_pids)-1 else ''}")
elif miner_screen_running and not enabled:
run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/bin/miner stop\"")
elif enabled and not miner_screen_running: elif enabled and not miner_screen_running:
run_command(f"/bin/bash -c \"export PATH={HIVE_PATH} && sudo /hive/sbin/nvidia-oc && source ~/.bashrc ; sudo /hive/bin/miner start\"") run_command("nvidia-oc && miner start")
def get_extra_allowed_images():
if os.path.exists(config.extra_allowed_images_file):
try:
with open(config.extra_allowed_images_file, 'r') as file:
content = file.read()
data = json.loads(content)
if isinstance(data, list):
if all(isinstance(item, dict) and set(item.keys()) == {'repository', 'allowed_tags'} and isinstance(item['repository'], str) and isinstance(item['allowed_tags'], list) and all(isinstance(tag, str) for tag in item['allowed_tags']) for item in data):
return data
else:
return []
else:
return []
except Exception as e:
log.error(f"get_extra_allowed_images() | error: {e}")
return []
else:
return []
class shm_calculator:
def __init__(self, total_ram):
self.total_ram = total_ram
self.gpu_vram_sizes = []
def calculate(self, used_gpu_ids):
assume_ram_utilised = 2500 #MB
default_shm_size = 64 #MB
if len(self.gpu_vram_sizes) == 0:
self.gpu_vram_sizes = nvml.get_vram_per_gpu()
instance_vram_total = 0
total_vram_size = sum(self.gpu_vram_sizes)
for idx, value in enumerate(self.gpu_vram_sizes):
if used_gpu_ids == '*' or idx in used_gpu_ids:
instance_vram_total += value
if instance_vram_total == 0 or total_vram_size == 0:
return default_shm_size
shm_size = instance_vram_total * 1.5 if instance_vram_total * 1.5 < self.total_ram - assume_ram_utilised else (
instance_vram_total/total_vram_size * (self.total_ram - assume_ram_utilised)
)
return math.floor(shm_size if shm_size > default_shm_size else default_shm_size)

View File

@ -7,5 +7,4 @@ psutil==5.9.0
python-iptables==1.0.1 python-iptables==1.0.1
websockets==12.0 websockets==12.0
packaging==23.2 packaging==23.2
clore-pynvml==11.5.4 git+https://git.clore.ai/clore/pynvml.git@main
requests==2.31.0