# Library to load specs of machine without the need for any 3rd party libs import subprocess import xml.etree.ElementTree as ET import requests import shutil import time import json import os def drop_caches(): try: with open('/proc/sys/vm/drop_caches', 'w') as f: f.write('3\n') except Exception as e: pass def write_test(file_path, block_size, num_blocks): data = os.urandom(block_size) total_bytes = block_size * num_blocks start_time = time.time() with open(file_path, 'wb') as f: for _ in range(num_blocks): f.write(data) f.flush() os.fsync(f.fileno()) elapsed_time = time.time() - start_time write_speed = total_bytes / elapsed_time / (1024 * 1024) return write_speed, elapsed_time def read_test(file_path, block_size, num_blocks): total_bytes = block_size * num_blocks # Drop caches to avoid OS-level caching effects drop_caches() start_time = time.time() with open(file_path, 'rb') as f: for _ in range(num_blocks): data = f.read(block_size) if not data: break elapsed_time = time.time() - start_time read_speed = total_bytes / elapsed_time / (1024 * 1024) return read_speed, elapsed_time def disk_benchmark(): total, used, free = shutil.disk_usage("/") free_gb = free/1024/1024/1024 if free_gb<1: return 0,0 block_size = 1024*1024 num_blocks = 250 if free_gb < 3 else 1500 file_path="/tmp/output" print("Running disk benchmark...") print(f"Block Size: {block_size} bytes, Number of Blocks: {num_blocks}") # Run write test write_speed, write_time = write_test(file_path, block_size, num_blocks) print(f"Write Speed: {write_speed:.2f} MB/s, Time: {write_time:.2f} seconds") # Run read test read_speed, read_time = read_test(file_path, block_size, num_blocks) print(f"Read Speed: {read_speed:.2f} MB/s, Time: {read_time:.2f} seconds") # Cleanup os.remove(file_path) return float(round(write_speed,2)), float(round(read_speed,2)) def get_root_disk_model(): try: # Get the root filesystem mount point root_mount = '/' # Find the device associated with the root mount cmd_find_device = ['df', root_mount] output = subprocess.check_output(cmd_find_device).decode() lines = output.splitlines() device_line = lines[1] # Second line contains the device info device_path = device_line.split()[0] # Strip partition number to get the main disk device if device_path.startswith('/dev/'): device_path = device_path.rstrip('0123456789') # Get disk model information using lsblk command cmd_lsblk = ['lsblk', '-no', 'MODEL', device_path] model = subprocess.check_output(cmd_lsblk).decode().strip() # Check if the disk is virtual virtual_identifiers = ['virtual', 'qemu', 'vmware', 'hyper-v', 'vbox'] if any(virtual_id.lower() in model.lower() for virtual_id in virtual_identifiers): return "Virtual" return model except Exception: return "" def get_free_space_gb(): try: statvfs = os.statvfs('/') free_space_gb = (statvfs.f_frsize * statvfs.f_bfree) / (1024 ** 3) return round(free_space_gb, 2) except Exception: print("Can't measure the available disk space") os._exit(1) return "" def all_items_same(lst): if len(lst) <= 1: return True return all(item == lst[0] for item in lst) def get_motherboard_name(): filepath = "/sys/devices/virtual/dmi/id/board_name" if os.path.exists(filepath): try: with open(filepath, 'r') as file: return file.read().strip().replace('\n','')[:32] except Exception as e: print(f"Error reading Motherboard name: {e}") return "Unknown" else: return "Unknown" def get_cpu_info(): lscpu_out = subprocess.check_output(['lscpu']).decode('utf-8') threads_per_code=1 total_threads = os.cpu_count() model_name = "Unknown CPU" for line in lscpu_out.split('\n'): try: key, value = line.split(':', 1) value=value.strip(' ') if "model name" in key.lower(): model_name=value elif "Thread(s) per core" == key and int(value): threads_per_code=int(value) except Exception as e: pass total_cores = int(total_threads/threads_per_code) return total_threads, total_cores, model_name def get_ram_size(): try: with open('/proc/meminfo', 'r') as f: lines = f.readlines() for line in lines: if line.startswith('MemTotal'): total_memory_kb = int(line.split()[1]) total_memory_gb = total_memory_kb / (1024) / 1000 return round(total_memory_gb, 4) except Exception as e: return 0 def get_gpu_info_from_xml(): result = subprocess.run(['nvidia-smi', '-x', '-q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: print(f"Error running nvidia-smi: {result.stderr}") os._exit(1) xml_output = result.stdout gpu_names = [] gpu_vram_sizes = [] try: root = ET.fromstring(xml_output) for gpu in root.findall('.//gpu'): product_name = gpu.find('product_name') fb_memory_usage = gpu.find('.//fb_memory_usage') total_memory = fb_memory_usage.find('total') if fb_memory_usage is not None else None if product_name is not None and total_memory is not None: memory_size_mb = total_memory.text.split()[0] memory_size_gb = int(round(float(memory_size_mb) / 1024, 0)) gpu_names.append(product_name.text) gpu_vram_sizes.append(memory_size_gb) except ET.ParseError as e: print(f"Error parsing XML: {e}") return gpu_names, gpu_vram_sizes def get_country(): # Define the directory and file path directory_path = "/opt/clore-hosting" file_path = os.path.join(directory_path, "CC") # Check if the file exists; if so, read from the file if os.path.exists(file_path): try: with open(file_path, "r") as file: country = file.read().strip() if country: # Ensure the file is not empty return country except IOError as e: print(f"Error reading the country from file: {e}") apis = [ "https://ipinfo.io/country" ] headers = { 'User-Agent': 'CloreAutoOnboard/1.0' } for api in apis: try: response = requests.get(api, headers=headers, timeout=5) response.raise_for_status() # Raise an exception for HTTP errors country = response.text.strip() # Remove any extra whitespace/newlines if country: # Check if the response is not empty save_data_to_file(country, directory_path, file_path) return country except (requests.RequestException, ValueError) as e: print(f"Error with {api}: {e}") continue return "Unable to determine country" def save_data_to_file(data, directory_path, file_path): if not os.path.exists(directory_path): os.makedirs(directory_path) try: with open(file_path, "w") as file: file.write(data) except IOError as e: print(f"Error saving the data to file: {e}") def get(benchmark_disk=False, mock=False): if mock: return {'mb': 'X99-6PLUS', 'cpu': 'Intel(R) Xeon(R) CPU E5-2698 v3 @ 2.30GHz', 'cpus': '32/64', 'ram': 128.8009, 'gpu': '1x NVIDIA GeForce RTX 4090', 'gpuram': 24, 'disk': 'MSATA SSD 1TB 878.78GB', 'disk_speed': 0, 'net': {'up': 0, 'down': 0, 'cc': 'UA'}} threads, cores, model = get_cpu_info() gpu_names, gpu_vram_sizes = get_gpu_info_from_xml() disk_speeds = None if len(gpu_names)==0: print("No nvidia GPUs found") os._exit(1) if benchmark_disk: benchmark_file = "/opt/clore-hosting/disk_benchmark.json" if os.path.exists(benchmark_file): try: with open(benchmark_file, "r") as file: benchmark_result = file.read().strip() benchmark_result = json.loads(benchmark_result) disk_speeds = benchmark_result except Exception as load_fail: pass if not disk_speeds: disk_speeds = disk_benchmark() save_data_to_file(json.dumps(disk_speeds), "/opt/clore-hosting", "/opt/clore-hosting/disk_benchmark.json") else: disk_speeds = [0,0] specs = { "mb": get_motherboard_name(), "cpu": model, "cpus": f"{cores}/{threads}", "ram": get_ram_size(), "gpu": f"{len(gpu_names)}x "+(gpu_names[0] if all_items_same(gpu_names) else "Mixed GPUs"), "gpuram": min(gpu_vram_sizes), "disk": get_root_disk_model()[:32]+f' {get_free_space_gb()}GB', "disk_speed": disk_speeds[1], "net":{ "up": 0, "down": 0, "cc": get_country() } } return specs