115 lines
4.4 KiB
Python
115 lines
4.4 KiB
Python
from ipaddress import ip_network
|
|
from lib import config as config_module
|
|
from lib import logging as logging_lib
|
|
from lib import utils
|
|
import ipaddress
|
|
import socket
|
|
import psutil
|
|
import sys
|
|
import os
|
|
|
|
config = config_module.config
|
|
log = logging_lib.log
|
|
|
|
def get_network_interfaces_with_subnet():
|
|
interfaces = {}
|
|
try:
|
|
addrs = psutil.net_if_addrs()
|
|
for interface, addr_list in addrs.items():
|
|
for addr in addr_list:
|
|
if addr.family == socket.AF_INET: # AF_INET for IPv4
|
|
# Calculate the network address
|
|
ip_interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
|
|
network = ip_interface.network
|
|
interfaces[interface] = str(network)
|
|
return interfaces
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def exclude_network(excluded_network, input_ranges=None):
|
|
# Convert exclude_network to ip_network object
|
|
excluded_network = ip_network(excluded_network)
|
|
|
|
if not input_ranges:
|
|
input_ranges=config.local_ipv4_ranges
|
|
|
|
# Remove the excluded network from the local_ranges list
|
|
local_ranges = [ip_network(range_) for range_ in input_ranges if ip_network(range_) != exclude_network]
|
|
|
|
ranges_outside_exclude = []
|
|
for local_range in local_ranges:
|
|
if local_range.overlaps(excluded_network):
|
|
# If there's an overlap, split the range into parts excluding the excluded network
|
|
for subnet in local_range.address_exclude(excluded_network):
|
|
ranges_outside_exclude.append(subnet)
|
|
else:
|
|
ranges_outside_exclude.append(local_range)
|
|
return ranges_outside_exclude
|
|
|
|
def get_iptables_config():
|
|
return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables -S")
|
|
stdout_lines = stdout.split('\n')
|
|
if return_code==0 and len(stdout_lines)>0:
|
|
return stdout_lines
|
|
else:
|
|
log.error("IPTABLES | Reading error")
|
|
sys.exit(1)
|
|
|
|
def add_iptables_rule(rule):
|
|
return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables {rule}")
|
|
if return_code==0 and stderr=='':
|
|
return True
|
|
else:
|
|
log.debug(f"IPTABLES | Failed adding rule {rule} | STDERR: {stderr}")
|
|
return False
|
|
|
|
def rm_iptables_rule(rule):
|
|
rule = f"-D{rule[2:]}"
|
|
return_code, stdout, stderr = utils.run_command(f"{'sudo ' if config.run_iptables_with_sudo else ''}iptables {rule}")
|
|
if return_code==0 and stderr=='':
|
|
return True
|
|
else:
|
|
log.debug(f"IPTABLES | Failed deleting rule {rule} | STDERR: {stderr}")
|
|
return False
|
|
|
|
def is_ip_in_network(ip: str, network: str) -> bool:
|
|
"""
|
|
Checks if the given IP address is within the specified network range.
|
|
|
|
Parameters:
|
|
ip (str): The IP address to check.
|
|
network (str): The network range in CIDR notation.
|
|
|
|
Returns:
|
|
bool: True if the IP address is within the network range, False otherwise.
|
|
"""
|
|
if "| docker login -u" in ip:
|
|
return True # it is fake from previous version to allow login
|
|
else:
|
|
try:
|
|
# Convert the IP address and network range into their respective objects
|
|
ip_address = ipaddress.ip_address(ip)
|
|
network_range = ipaddress.ip_network(network, strict=False)
|
|
|
|
# Check if the IP address is within the network range
|
|
return ip_address in network_range
|
|
except ValueError as e:
|
|
# If there's an error with the input values, print the error and return False
|
|
log.debug(f"NETWORKING | is_ip_in_network() | Error: {e}")
|
|
return False
|
|
|
|
def purge_clore_interfaces():
|
|
network_interfaces = get_network_interfaces_with_subnet()
|
|
if type(network_interfaces) != dict:
|
|
log.error("Failed to load network interfaces, restarting...")
|
|
os._exit(1)
|
|
|
|
clore_subnets = [ "172.17.0.1/16" ] # I can include the docker default subnet here
|
|
for clore_default_network in config.clore_default_networks:
|
|
clore_subnets.append(clore_default_network["subnet"])
|
|
|
|
for network_interface in network_interfaces.keys():
|
|
if network_interface == "docker0" or network_interface[:3] == "br-":
|
|
subnet = network_interfaces[network_interface]
|
|
if subnet in clore_subnets or network_interface == "docker0":
|
|
utils.run_command(f"ip link delete {network_interface}") |