from lib import ensure_packages_installed from lib import utils import asyncio import socket import re MANDATORY_PACKEGES = ['iputils-ping'] def is_valid_ipv4_or_hostname(string): def is_valid_ipv4(ip): try: socket.inet_aton(ip) return True except socket.error: return False hostname_regex = re.compile( r"^(?!-)[A-Za-z0-9-]{1,63}(? 255: return False return all(hostname_regex.match(part) for part in hostname.split(".")) return is_valid_ipv4(string) or is_valid_hostname(string) async def measure_latency_icmp(hosts, max_concurent_measurments=2, count=4): success = await ensure_packages_installed.ensure_packages_installed(MANDATORY_PACKEGES, None) if success: outcome = [] concurent_jobs = [hosts[i:i + max_concurent_measurments] for i in range(0, len(hosts), max_concurent_measurments)] for concurent_job in concurent_jobs: tasks = [] for host in concurent_job: if is_valid_ipv4_or_hostname(host): tasks.append( utils.async_run_command( f"ping -c {str(count)} -W 2 -i 0.3 {host}", 20 * count, {"LANG": "C", "LC_ALL": "C"} ) ) if len(tasks) > 0: r = await asyncio.gather(*tasks) try: for output in r: results = [] code, stdout, stderr = output if code == 0: for line in stdout.split('\n'): match = re.search(r"time=(\d+\.?\d*) ms", line) if match: results.append(float(match.group(1))) outcome.append( { "host": hosts[len(outcome)], "received": len(results), "avg_latency": sum(results) / len(results) if len(results)>0 else 0 } ) except Exception as e: print(e) return outcome else: return False