112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
from lib import config as config_module
|
|
from lib import logging as logging_lib
|
|
import subprocess
|
|
import hashlib
|
|
import random
|
|
import string
|
|
import shlex
|
|
import time
|
|
import json
|
|
import os
|
|
|
|
log = logging_lib.log
|
|
|
|
config = config_module.config
|
|
|
|
def run_command(command):
|
|
"""Utility function to run a shell command and return its output."""
|
|
result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
|
|
|
|
return result.returncode, result.stdout.strip(), result.stderr.strip()
|
|
|
|
def parse_rule_to_dict(rule):
|
|
tokens = shlex.split(rule)
|
|
rule_dict = {}
|
|
i = 0
|
|
while i < len(tokens):
|
|
if tokens[i].startswith("-"):
|
|
# For options without a value, set them to True
|
|
rule_dict[tokens[i]] = tokens[i + 1] if i + 1 < len(tokens) and not tokens[i + 1].startswith("-") else True
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
return rule_dict
|
|
|
|
def normalize_rule(rule_dict):
|
|
# If necessary, convert values to a normalized form here
|
|
# For example, converting IP addresses to a standard format
|
|
# For this example, we'll just sort the dictionary
|
|
normalized = dict(sorted(rule_dict.items()))
|
|
return normalized
|
|
|
|
def get_auth():
|
|
try:
|
|
if 'AUTH_TOKEN' in os.environ:
|
|
return os.environ['AUTH_TOKEN']
|
|
auth_str = ''
|
|
with open(config.auth_file, "r", encoding="utf-8") as file:
|
|
auth_str = file.read().strip()
|
|
return auth_str
|
|
except Exception as e:
|
|
return ''
|
|
|
|
def unix_timestamp():
|
|
return int(time.time())
|
|
|
|
def hash_md5(input_string):
|
|
return hashlib.md5(input_string.encode()).hexdigest()
|
|
|
|
def run_command_v2(command, timeout=900):
|
|
try:
|
|
# Set the timeout to 900 seconds (15 minutes)
|
|
subprocess.run(["bash", "-c", command], check=True, timeout=timeout)
|
|
except subprocess.CalledProcessError as e:
|
|
log.debug(f"run_command_v2() | A subprocess error occurred: {e}")
|
|
except subprocess.TimeoutExpired as e:
|
|
log.debug(f"run_command_v2() | Command timed out: {e}")
|
|
|
|
def yes_no_question(prompt):
|
|
while True:
|
|
response = input(prompt + " (y/n): ").strip().lower()
|
|
if response in {'y', 'yes'}:
|
|
return True
|
|
elif response in {'n', 'no'}:
|
|
return False
|
|
else:
|
|
print("Please enter 'y' or 'n'.")
|
|
|
|
def validate_cuda_version(ver_str):
|
|
if ':' in ver_str:
|
|
pc = ver_str.split(':')
|
|
if pc[0] == "11":
|
|
if int(pc[1]) >= 7:
|
|
return True
|
|
else:
|
|
return False
|
|
elif int(pc[0]) > 11:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
def generate_random_string(length):
|
|
characters = string.ascii_letters + string.digits
|
|
return ''.join(random.choice(characters) for _ in range(length))
|
|
|
|
HIVE_PATH="/hive/bin:/hive/sbin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
|
|
|
|
def hive_set_miner_status(enabled=False):
|
|
### control miner state - OFF/ON
|
|
screen_out = run_command("screen -ls")
|
|
miner_screen_running = False
|
|
if screen_out[0] == 0 or screen_out[0] == 1:
|
|
screen_lines=screen_out[1].split('\n')
|
|
for screen_line in screen_lines:
|
|
screen_line_parts=screen_line.replace('\t', '', 1).split('\t')
|
|
if len(screen_line_parts)>2 and '.' in screen_line_parts[0]:
|
|
if screen_line_parts[0].split('.',1)[1]=="miner":
|
|
miner_screen_running=True
|
|
if miner_screen_running and not enabled:
|
|
run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/bin/miner stop\"")
|
|
elif enabled and not miner_screen_running:
|
|
run_command(f"/bin/bash -c \"PATH={HIVE_PATH} && sudo /hive/sbin/nvidia-oc && sudo /hive/bin/miner start\"") |