from aiofiles.os import stat as aio_stat from pydantic import BaseModel, Field, constr import xml.etree.ElementTree as ET from lib import docker_interface from typing import Dict, List from lib import utils import subprocess import speedtest import platform import aiofiles import aiohttp import asyncio import shutil import psutil import sys import os import re class NvidiaVersionInfo(BaseModel): driver_version: str cuda_version: str class PCIBusInfo(BaseModel): width: int = Field(None, description="The width of the PCI bus") revision: int = Field(None, description="The revision number of the PCI device", ge=0) # Example usage with None values example_pci_bus_info = PCIBusInfo() #print(example_pci_bus_info) async def get_cpu_usage(): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, psutil.cpu_percent, 1) async def get_ram_usage(): loop = asyncio.get_running_loop() return await loop.run_in_executor(None, psutil.virtual_memory) def get_kernel(): return platform.uname().release def is_hive(): return "hive" in get_kernel() def get_nvidia_version(): try: output = subprocess.check_output(['nvidia-smi', '-x', '-q'], encoding='utf-8') root = ET.fromstring(output) driver_version = root.find('driver_version').text cuda_version = root.find('.//cuda_version').text if driver_version and cuda_version: return NvidiaVersionInfo(driver_version=driver_version, cuda_version=cuda_version) else: return NvidiaVersionInfo() except Exception as e: return NvidiaVersionInfo() async def measure_internet_speed(): try: st = speedtest.Speedtest() server = st.get_best_server() country = server['cc'] loop = asyncio.get_event_loop() download_speed = await loop.run_in_executor(None, st.download) upload_speed = await loop.run_in_executor(None, st.upload) return country, download_speed/1024/1024, upload_speed/1024/1024 except Exception as e: return '',0, 0 async def get_country_code(): async with aiohttp.ClientSession() as session: try: # Set a timeout for the request async with session.get("https://ifconfig.io/all.json", timeout=5) as response: # Check if the request was successful if response.status == 200: data = await response.json() # Return the country code return data.get("country_code") else: return f"Error: Response status {response.status}" except asyncio.TimeoutError: return "Error: The request timed out after 5 seconds" def filter_non_numeric(input_string): return re.sub(r'[^0-9]', '', input_string) def get_disk_udevadm(mount_point='/'): try: find_mnt_return_code, find_mnt_stdout, find_mnt_stderr = utils.run_command(f"findmnt -n -o SOURCE {mount_point}") if find_mnt_return_code!=0 or find_mnt_stderr!='': return '' lsblk_return_code, lsblk_stdout, lsblk_stderr = utils.run_command(f"lsblk -no pkname {find_mnt_stdout}") if lsblk_return_code!=0 or lsblk_stderr!='': return '' if lsblk_stdout[:5]!="/dev/": lsblk_stdout=f"/dev/{lsblk_stdout}" udevadm_return_code, udevadm_stdout, udevadm_stderr = utils.run_command(f"udevadm info --query=all --name={lsblk_stdout}") if udevadm_return_code!=0 or udevadm_stderr!='': return '' return udevadm_stdout except Exception as e: return '' def get_bus_spec(bus_id): try: with open(f"/sys/bus/pci/devices/{bus_id}/current_link_speed", "r", encoding="utf-8") as file: current_link_speed = file.read().strip() with open(f"/sys/bus/pci/devices/{bus_id}/current_link_width", "r", encoding="utf-8") as file: current_link_width = file.read().strip() speed_to_rev_mapping = { "128": 7, "64": 6, "32": 5, "16": 4, "8": 3, "5.0": 2, } pci_revision = 1 # Default value current_link_width=int(current_link_width) # Iterate over the mapping and update pci_rev based on the pcie_speed for speed_pattern, rev in speed_to_rev_mapping.items(): if speed_pattern in current_link_speed: pci_revision = rev return PCIBusInfo(revision=pci_revision, width=current_link_width) except Exception as e: print(e) return PCIBusInfo() def get_gpu_info(): gpu_str = "0x Unknown" nvml_err = False gpu_mem = 0 gpus={ "nvidia":[], "amd":[] # To be implemented in future releases } valid_pci_dev_list = [] try: valid_pci_dev_list = os.listdir("/sys/bus/pci/devices") except Exception as e: pass nvidia_smi_return_code, nvidia_smi_stdout, nvidia_smi_stderr = utils.run_command(f"nvidia-smi --query-gpu=index,name,uuid,serial,memory.total --format=csv") nvidia_smi_xl_return_code, nvidia_smi_xl_stdout, nvidia_smi_xl_stderr = utils.run_command("nvidia-smi --query-gpu=timestamp,name,pci.bus_id,driver_version,pstate,pcie.link.gen.max,pcie.link.gen.current,temperature.gpu,utilization.gpu,utilization.memory,memory.total,memory.free,memory.used --format=csv") if "Failed to initialize NVML" in nvidia_smi_stdout or "Failed to initialize NVML" in nvidia_smi_stderr or "Failed to initialize NVML" in nvidia_smi_xl_stdout or "Failed to initialize NVML" in nvidia_smi_xl_stderr: nvml_err=True elif nvidia_smi_return_code==0 and nvidia_smi_xl_return_code==0: try: lines_xl = nvidia_smi_xl_stdout.split('\n') for index, line in enumerate(lines_xl): parts = [s.strip() for s in line.split(',')] if len(parts)>12 and index>0: xl_gpu_info={ "id":index-1, "timestamp": parts[0], "name": parts[1], "pcie_bus": parts[2].split(':', 1)[1], "driver": parts[3], "pstate": parts[4], "temp": parts[7], "core_utilization": int(parts[8].replace(" %",'')), "mem_utilization": int(parts[9].replace(" %",'')), "mem_total": parts[10], "mem_free": parts[11], "mem_used": parts[12] } try: pci_query = parts[2][parts[2].find(':')+1:] for index, valid_pci_dev in enumerate(valid_pci_dev_list): if pci_query.lower() in valid_pci_dev.lower(): bus_spec = get_bus_spec(valid_pci_dev) if bus_spec.width!=None and bus_spec.revision!=None: xl_gpu_info["pcie_width"]=bus_spec.width xl_gpu_info["pcie_revision"]=bus_spec.revision except Exception as e: pass gpus["nvidia"].append(xl_gpu_info) lines = nvidia_smi_stdout.split('\n') for line in lines: parts = line.split(',') if bool(re.match(r'^[0-9]+$', parts[0])): gpu_str = f"{len(lines)-1}x {parts[1].strip()}" gpu_mem = round(int(filter_non_numeric(parts[4]).strip())/1024, 2) except Exception as e: nvml_err=True pass else: nvml_err=True return gpu_str, gpu_mem, gpus, nvml_err class DockerDaemonConfig(BaseModel): data_root: str = Field(alias="data-root") storage_driver: str = Field(alias="storage-driver") storage_opts: List[str] = Field(alias="storage-opts") class Specs: def __init__(self): self.motherboard_name_file = "/sys/devices/virtual/dmi/id/board_name" async def get(self, benchmark_internet=False): total_threads, total_cores, model_name = self.get_cpu_info() gpu_str, gpu_mem, gpus, nvml_err = get_gpu_info() docker_daemon_config = docker_interface.get_daemon_config() disk_str="" data_root_location="main_disk" if docker_daemon_config==None or type(docker_daemon_config)!=dict: sys.exit(1) else: overlay_total_size=None disk_type="" try: validated_config = DockerDaemonConfig(**docker_daemon_config) disk_udevadm = get_disk_udevadm(validated_config.data_root) for udevadm_line in disk_udevadm.split('\n'): try: key, value=udevadm_line.split('=',1) if "id_model" in key.lower(): disk_type=value[:24] elif "devpath" in key.lower() and "/virtual/" in value: disk_type="Virtual" except Exception as e_int: pass for storage_opt in validated_config.storage_opts: if storage_opt[:14]=="overlay2.size=" and "GB" in storage_opt[14:]: numeric_size = round(float(filter_non_numeric(storage_opt[14:])), 4) overlay_total_size=numeric_size except Exception as e: pass if overlay_total_size==None: total, used, free = shutil.disk_usage("/") disk_udevadm = get_disk_udevadm("/") for udevadm_line in disk_udevadm.split('\n'): try: key, value=udevadm_line.split('=',1) if "id_model" in key.lower(): disk_type=value[:24] elif "devpath" in key.lower() and "/virtual/" in value: disk_type="Virtual" except Exception as e_int: pass disk_str = f"{disk_type} {round(free / (1024**3), 4)}GB" else: # Disk is overlay data_root_location="separate" disk_str = f"{disk_type} {overlay_total_size}GB" response = { "mb": await self.motherboard_type(), "cpu":model_name, "cpus":f"{total_cores}/{total_threads}", "ram": self.get_ram_size(), "swap": self.get_swap_size(), "data_root_location":data_root_location, "disk": disk_str, "disk_speed":0, "gpu":gpu_str, "gpuram": gpu_mem, "gpus": gpus, "nvml_error":nvml_err } if benchmark_internet: country, download_speed, upload_speed = await measure_internet_speed() if country=='': download_speed=0 upload_speed=0 possible_cc = await get_country_code() if len(possible_cc)<4: country=possible_cc response["net"]={ "cc":country, "down":download_speed, "up":upload_speed } return response async def read_file(self, file_path): try: async with aiofiles.open(file_path, mode='r') as file: contents = await file.read() return contents except Exception as e: return None async def check_file_existence(self, file_path): try: await aio_stat(file_path) return True except Exception as e: return False async def motherboard_type(self): if await self.check_file_existence(self.motherboard_name_file): motherboard_type = await self.read_file(self.motherboard_name_file) return motherboard_type.replace('\n','')[:32] if motherboard_type!=None else "Unknown" else: return "Unknown" def get_cpu_info(self): lscpu_out = subprocess.check_output(['lscpu']).decode('utf-8') threads_per_code=1 total_threads = os.cpu_count() model_name = "Unknown CPU" for line in lscpu_out.split('\n'): try: key, value = line.split(':', 1) value=value.strip(' ') #print(key,'|',value) if "model name" in key.lower(): model_name=value elif "Thread(s) per core" == key and int(value): threads_per_code=int(value) except Exception as e: pass total_cores = int(total_threads/threads_per_code) return total_threads, total_cores, model_name def get_ram_size(self): try: with open('/proc/meminfo', 'r') as f: lines = f.readlines() for line in lines: if line.startswith('MemTotal'): total_memory_kb = int(line.split()[1]) total_memory_gb = total_memory_kb / (1024) / 1000 # Convert KB to GB return round(total_memory_gb, 4) except Exception as e: return 0 def get_swap_size(self): try: with open('/proc/meminfo', 'r') as f: lines = f.readlines() for line in lines: if line.startswith('SwapTotal'): total_swap_kb = int(line.split()[1]) total_swap_gb = total_swap_kb / (1024) / 1000 # Convert KB to GB return round(total_swap_gb, 4) except Exception as e: return 0