hosting/lib/utils.py

92 lines
2.7 KiB
Python

from lib import config as config_module
from lib import logging as logging_lib
import subprocess
import hashlib
import random
import string
import shlex
import time
import json
import os
log = logging_lib.log
config = config_module.config
def run_command(command):
"""Utility function to run a shell command and return its output."""
result = subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
return result.returncode, result.stdout.strip(), result.stderr.strip()
def parse_rule_to_dict(rule):
tokens = shlex.split(rule)
rule_dict = {}
i = 0
while i < len(tokens):
if tokens[i].startswith("-"):
# For options without a value, set them to True
rule_dict[tokens[i]] = tokens[i + 1] if i + 1 < len(tokens) and not tokens[i + 1].startswith("-") else True
i += 2
else:
i += 1
return rule_dict
def normalize_rule(rule_dict):
# If necessary, convert values to a normalized form here
# For example, converting IP addresses to a standard format
# For this example, we'll just sort the dictionary
normalized = dict(sorted(rule_dict.items()))
return normalized
def get_auth():
try:
auth_str = ''
with open(config.auth_file, "r", encoding="utf-8") as file:
auth_str = file.read().strip()
return auth_str
except Exception as e:
return ''
def unix_timestamp():
return int(time.time())
def hash_md5(input_string):
return hashlib.md5(input_string.encode()).hexdigest()
def run_command_v2(command, timeout=900):
try:
# Set the timeout to 900 seconds (15 minutes)
subprocess.run(["bash", "-c", command], check=True, timeout=timeout)
except subprocess.CalledProcessError as e:
log.debug(f"run_command_v2() | A subprocess error occurred: {e}")
except subprocess.TimeoutExpired as e:
log.debug(f"run_command_v2() | Command timed out: {e}")
def yes_no_question(prompt):
while True:
response = input(prompt + " (y/n): ").strip().lower()
if response in {'y', 'yes'}:
return True
elif response in {'n', 'no'}:
return False
else:
print("Please enter 'y' or 'n'.")
def validate_cuda_version(ver_str):
if ':' in ver_str:
pc = ver_str.split(':')
if pc[0] == "11":
if int(pc[1]) >= 7:
return True
else:
return False
elif int(pc[0]) > 11:
return True
else:
return False
else:
return False
def generate_random_string(length):
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for _ in range(length))