from lib import config as config_module from lib import logging as logging_lib import subprocess import hashlib import random import string import shlex import time import json import os log = logging_lib.log config = config_module.config def run_command(command): """Utility function to run a shell command and return its output.""" result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) return result.returncode, result.stdout.strip(), result.stderr.strip() def parse_rule_to_dict(rule): tokens = shlex.split(rule) rule_dict = {} i = 0 while i < len(tokens): if tokens[i].startswith("-"): # For options without a value, set them to True rule_dict[tokens[i]] = tokens[i + 1] if i + 1 < len(tokens) and not tokens[i + 1].startswith("-") else True i += 2 else: i += 1 return rule_dict def normalize_rule(rule_dict): # If necessary, convert values to a normalized form here # For example, converting IP addresses to a standard format # For this example, we'll just sort the dictionary normalized = dict(sorted(rule_dict.items())) return normalized def get_auth(): try: if 'AUTH_TOKEN' in os.environ: return os.environ['AUTH_TOKEN'] auth_str = '' with open(config.auth_file, "r", encoding="utf-8") as file: auth_str = file.read().strip() return auth_str except Exception as e: return '' def get_allowed_container_names(): allowed_container_names = os.getenv("ALLOWED_CONTAINER_NAMES") if type(allowed_container_names)==str and len(allowed_container_names)>0: return [x for x in allowed_container_names.split(',') if x] return [] def unix_timestamp(): return int(time.time()) def hash_md5(input_string): return hashlib.md5(input_string.encode()).hexdigest() def run_command_v2(command, timeout=900): try: # Set the timeout to 900 seconds (15 minutes) subprocess.run(["bash", "-c", command], check=True, timeout=timeout) except subprocess.CalledProcessError as e: log.debug(f"run_command_v2() | A subprocess error occurred: {e}") except subprocess.TimeoutExpired as e: log.debug(f"run_command_v2() | Command timed out: {e}") def yes_no_question(prompt): while True: response = input(prompt + " (y/n): ").strip().lower() if response in {'y', 'yes'}: return True elif response in {'n', 'no'}: return False else: print("Please enter 'y' or 'n'.") def validate_cuda_version(ver_str): if ':' in ver_str: pc = ver_str.split(':') if pc[0] == "11": if int(pc[1]) >= 7: return True else: return False elif int(pc[0]) > 11: return True else: return False else: return False def generate_random_string(length): characters = string.ascii_letters + string.digits return ''.join(random.choice(characters) for _ in range(length)) HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:./" def hive_set_miner_status(enabled=False): ### control miner state - OFF/ON screen_out = run_command("screen -ls") miner_screen_running = False miner_screen_session_pids = [] if screen_out[0] == 0 or screen_out[0] == 1: screen_lines=screen_out[1].split('\n') for screen_line in screen_lines: screen_line_parts=screen_line.replace('\t', '', 1).split('\t') if len(screen_line_parts)>2 and '.' in screen_line_parts[0]: if screen_line_parts[0].split('.',1)[1]=="miner": miner_screen_session_pids.append(screen_line_parts[0].split('.',1)[0]) miner_screen_running=True if len(miner_screen_session_pids) > 1: ## Something really bad going on, destroy all instances for idx, miner_screen_session_pid in enumerate(miner_screen_session_pids): run_command(f"kill -9 {miner_screen_session_pid}{' && screen -wipe' if idx==len(miner_screen_session_pids)-1 else ''}") elif miner_screen_running and not enabled: run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/bin/miner stop\"") elif enabled and not miner_screen_running: run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/sbin/nvidia-oc && sudo /hive/bin/miner start\"") def get_extra_allowed_images(): if os.path.exists(config.extra_allowed_images_file): try: with open(config.extra_allowed_images_file, 'r') as file: content = file.read() data = json.loads(content) if isinstance(data, list): if all(isinstance(item, dict) and set(item.keys()) == {'repository', 'allowed_tags'} and isinstance(item['repository'], str) and isinstance(item['allowed_tags'], list) and all(isinstance(tag, str) for tag in item['allowed_tags']) for item in data): return data else: return [] else: return [] except Exception as e: log.error(f"get_extra_allowed_images() | error: {e}") return [] else: return []