onboarding/specs.py

283 lines
9.3 KiB
Python

# Library to load specs of machine without the need for any 3rd party libs
import subprocess
import xml.etree.ElementTree as ET
import requests
import shutil
import time
import json
import os
def drop_caches():
try:
with open('/proc/sys/vm/drop_caches', 'w') as f:
f.write('3\n')
except Exception as e:
pass
def write_test(file_path, block_size, num_blocks):
data = os.urandom(block_size)
total_bytes = block_size * num_blocks
start_time = time.time()
with open(file_path, 'wb') as f:
for _ in range(num_blocks):
f.write(data)
f.flush()
os.fsync(f.fileno())
elapsed_time = time.time() - start_time
write_speed = total_bytes / elapsed_time / (1024 * 1024)
return write_speed, elapsed_time
def read_test(file_path, block_size, num_blocks):
total_bytes = block_size * num_blocks
# Drop caches to avoid OS-level caching effects
drop_caches()
start_time = time.time()
with open(file_path, 'rb') as f:
for _ in range(num_blocks):
data = f.read(block_size)
if not data:
break
elapsed_time = time.time() - start_time
read_speed = total_bytes / elapsed_time / (1024 * 1024)
return read_speed, elapsed_time
def disk_benchmark():
total, used, free = shutil.disk_usage("/")
free_gb = free/1024/1024/1024
if free_gb<1:
return 0,0
block_size = 1024*1024
num_blocks = 250 if free_gb < 3 else 1500
file_path="/tmp/output"
print("Running disk benchmark...")
print(f"Block Size: {block_size} bytes, Number of Blocks: {num_blocks}")
# Run write test
write_speed, write_time = write_test(file_path, block_size, num_blocks)
print(f"Write Speed: {write_speed:.2f} MB/s, Time: {write_time:.2f} seconds")
# Run read test
read_speed, read_time = read_test(file_path, block_size, num_blocks)
print(f"Read Speed: {read_speed:.2f} MB/s, Time: {read_time:.2f} seconds")
# Cleanup
os.remove(file_path)
return float(round(write_speed,2)), float(round(read_speed,2))
def get_root_disk_model():
try:
# Get the root filesystem mount point
root_mount = '/'
# Find the device associated with the root mount
cmd_find_device = ['df', root_mount]
output = subprocess.check_output(cmd_find_device).decode()
lines = output.splitlines()
device_line = lines[1] # Second line contains the device info
device_path = device_line.split()[0]
# Strip partition number to get the main disk device
if device_path.startswith('/dev/'):
device_path = device_path.rstrip('0123456789')
# Get disk model information using lsblk command
cmd_lsblk = ['lsblk', '-no', 'MODEL', device_path]
model = subprocess.check_output(cmd_lsblk).decode().strip()
# Check if the disk is virtual
virtual_identifiers = ['virtual', 'qemu', 'vmware', 'hyper-v', 'vbox']
if any(virtual_id.lower() in model.lower() for virtual_id in virtual_identifiers):
return "Virtual"
return model
except Exception:
return ""
def get_free_space_gb():
try:
statvfs = os.statvfs('/')
free_space_gb = (statvfs.f_frsize * statvfs.f_bfree) / (1024 ** 3)
return round(free_space_gb, 2)
except Exception:
print("Can't measure the available disk space")
os._exit(1)
return ""
def all_items_same(lst):
if len(lst) <= 1:
return True
return all(item == lst[0] for item in lst)
def get_motherboard_name():
filepath = "/sys/devices/virtual/dmi/id/board_name"
if os.path.exists(filepath):
try:
with open(filepath, 'r') as file:
return file.read().strip().replace('\n','')[:32]
except Exception as e:
print(f"Error reading Motherboard name: {e}")
return "Unknown"
else:
return "Unknown"
def get_cpu_info():
lscpu_out = subprocess.check_output(['lscpu']).decode('utf-8')
threads_per_code=1
total_threads = os.cpu_count()
model_name = "Unknown CPU"
for line in lscpu_out.split('\n'):
try:
key, value = line.split(':', 1)
value=value.strip(' ')
if "model name" in key.lower():
model_name=value
elif "Thread(s) per core" == key and int(value):
threads_per_code=int(value)
except Exception as e:
pass
total_cores = int(total_threads/threads_per_code)
return total_threads, total_cores, model_name
def get_ram_size():
try:
with open('/proc/meminfo', 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('MemTotal'):
total_memory_kb = int(line.split()[1])
total_memory_gb = total_memory_kb / (1024) / 1000
return round(total_memory_gb, 4)
except Exception as e:
return 0
def get_gpu_info_from_xml():
result = subprocess.run(['nvidia-smi', '-x', '-q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
print(f"Error running nvidia-smi: {result.stderr}")
os._exit(1)
xml_output = result.stdout
gpu_names = []
gpu_vram_sizes = []
try:
root = ET.fromstring(xml_output)
for gpu in root.findall('.//gpu'):
product_name = gpu.find('product_name')
fb_memory_usage = gpu.find('.//fb_memory_usage')
total_memory = fb_memory_usage.find('total') if fb_memory_usage is not None else None
if product_name is not None and total_memory is not None:
memory_size_mb = total_memory.text.split()[0]
memory_size_gb = int(round(float(memory_size_mb) / 1024, 0))
gpu_names.append(product_name.text)
gpu_vram_sizes.append(memory_size_gb)
except ET.ParseError as e:
print(f"Error parsing XML: {e}")
return gpu_names, gpu_vram_sizes
def get_country():
# Define the directory and file path
directory_path = "/opt/clore-hosting"
file_path = os.path.join(directory_path, "CC")
# Check if the file exists; if so, read from the file
if os.path.exists(file_path):
try:
with open(file_path, "r") as file:
country = file.read().strip()
if country: # Ensure the file is not empty
return country
except IOError as e:
print(f"Error reading the country from file: {e}")
apis = [
"https://ipinfo.io/country"
]
headers = {
'User-Agent': 'CloreAutoOnboard/1.0'
}
for api in apis:
try:
response = requests.get(api, headers=headers, timeout=5)
response.raise_for_status() # Raise an exception for HTTP errors
country = response.text.strip() # Remove any extra whitespace/newlines
if country: # Check if the response is not empty
save_data_to_file(country, directory_path, file_path)
return country
except (requests.RequestException, ValueError) as e:
print(f"Error with {api}: {e}")
continue
return "Unable to determine country"
def save_data_to_file(data, directory_path, file_path):
if not os.path.exists(directory_path):
os.makedirs(directory_path)
try:
with open(file_path, "w") as file:
file.write(data)
except IOError as e:
print(f"Error saving the data to file: {e}")
def get(benchmark_disk=False, mock=False):
if mock:
return {'mb': 'X99-6PLUS', 'cpu': 'Intel(R) Xeon(R) CPU E5-2698 v3 @ 2.30GHz', 'cpus': '32/64', 'ram': 128.8009, 'gpu': '1x NVIDIA GeForce RTX 4090', 'gpuram': 24, 'disk': 'MSATA SSD 1TB 878.78GB', 'disk_speed': 0, 'net': {'up': 0, 'down': 0, 'cc': 'UA'}}
threads, cores, model = get_cpu_info()
gpu_names, gpu_vram_sizes = get_gpu_info_from_xml()
disk_speeds = None
if len(gpu_names)==0:
print("No nvidia GPUs found")
os._exit(1)
if benchmark_disk:
benchmark_file = "/opt/clore-hosting/disk_benchmark.json"
if os.path.exists(benchmark_file):
try:
with open(benchmark_file, "r") as file:
benchmark_result = file.read().strip()
benchmark_result = json.loads(benchmark_result)
disk_speeds = benchmark_result
except Exception as load_fail:
pass
if not disk_speeds:
disk_speeds = disk_benchmark()
save_data_to_file(json.dumps(disk_speeds), "/opt/clore-hosting", "/opt/clore-hosting/disk_benchmark.json")
else:
disk_speeds = [0,0]
specs = {
"mb": get_motherboard_name(),
"cpu": model,
"cpus": f"{cores}/{threads}",
"ram": get_ram_size(),
"gpu": f"{len(gpu_names)}x "+(gpu_names[0] if all_items_same(gpu_names) else "Mixed GPUs"),
"gpuram": min(gpu_vram_sizes),
"disk": get_root_disk_model()[:32]+f' {get_free_space_gb()}GB',
"disk_speed": disk_speeds[1],
"net":{
"up": 0,
"down": 0,
"cc": get_country()
}
}
return specs