from lib import logging as logging_lib log = logging_lib.log from lib import config as config_module config = config_module.config from lib import networking from lib import utils from pydantic import BaseModel, Field, ValidationError, IPvAnyNetwork from typing import List, Optional import docker import json import os try: os.makedirs(config.startup_scripts_folder, exist_ok=True) except Exception as e: pass try: os.makedirs(config.wireguard_config_folder, exist_ok=True) except Exception as e: pass try: os.makedirs(config.entrypoints_folder, exist_ok=True) except Exception as e: pass class NetworkConfig(BaseModel): name: str = Field(..., description="The name of the network") subnet: IPvAnyNetwork = Field(..., description="The subnet of the network in CIDR notation") gateway: str = Field(..., description="The gateway IP address of the network") class IPAMConfig(BaseModel): Subnet: str Gateway: str class DockerNetwork(BaseModel): Name: str = Field(..., alias='Name') Id: str = Field(..., alias='ID') Created: Optional[str] = None # Assuming you might still want this field, it's optional since it's not in your example dict Scope: str Driver: str EnableIPv6: Optional[bool] = None # Making it optional since it's not in your example dict IPAM: List[IPAMConfig] = Field(..., alias='IPAM') class Config: populate_by_name = True client = docker.from_env() low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock') def check_docker_connection(): try: client.ping() return True except docker.errors.DockerException as e: print(f"Error: {e}") return False def get_docker_networks(): net_list = [] try: networks = client.networks.list() for network in networks: net_list.append(network.attrs) return net_list except docker.errors.DockerException as e: return (f"Error: {e}") def get_local_images(no_latest_tag=False): try: images = client.images.list() # Extract and print the repository and tag for each image image_list = [] for image in images: # Some images may not have tags, so we handle that case tags = image.tags if image.tags else [':'] for tag in tags: if tag!=":": if no_latest_tag: image_list.append(tag.replace(':latest','')) else: image_list.append(tag) return image_list except Exception as e: log.error(f"DOCKER | Can't get local images | {e}") os._exit(1) def get_containers(all=False): try: containers = client.containers.list(all=all) return containers except Exception as e: log.error("DOCKER INTERFACE | Can't get_containers()") os._exit(1) def remove_docker_image(image): try: # Remove the Docker image client.images.remove(image=image) log.debug(f"remove_docker_image() | Image '{image}' successfully removed.") return True except docker.errors.ImageNotFound: log.debug(f"remove_docker_image() | Image '{image}' not found.") return False except Exception as e: log.debug(f"remove_docker_image() | Error removing image '{image}': {e}") return False def remove_docker_network(network_name): try: # Retrieve the network by name network = client.networks.get(network_name) # Remove the network network.remove() log.debug(f"DOCKER | Network '{network_name}' successfully removed.") return True except docker.errors.NotFound: log.debug(f"DOCKER | Network '{network_name}' not found.") return False except Exception as e: log.debug(f"DOCKER | An error occurred: {e}") return False def get_docker_networks(): networks_list = [] try: # Create a Docker client client = docker.from_env() # Get a list of all networks networks = client.networks.list() # Iterate through each network to gather details for network in networks: network_details = { 'Name': network.name, 'ID': network.id, 'Driver': network.attrs["Driver"], 'Scope': network.attrs["Scope"], 'IPAM': [] } # IPAM Config might have multiple configurations. Gather them. ipam_configs = network.attrs.get('IPAM', {}).get('Config', []) for config in ipam_configs: subnet = config.get('Subnet', 'Not specified') gateway = config.get('Gateway', 'Not specified') network_details['IPAM'].append({'Subnet': subnet, 'Gateway': gateway}) networks_list.append(network_details) return networks_list except Exception as e: log.error(f"Failed to retrieve Docker networks: {e}") os._exit(1) # Exit the application on any failure def create_docker_network(network_name, subnet, gateway, driver="bridge"): try: network = client.networks.create( name=network_name, driver=driver, ipam=docker.types.IPAMConfig( pool_configs=[docker.types.IPAMPool( subnet=subnet, iprange=subnet, gateway=gateway )] ), check_duplicate=True ) log.debug(f"Network {network_name} created successfully.") return True except docker.errors.APIError as e: log.error(f"DOCKER | Failed to create network {network_name}: {e}") return False def validate_and_secure_networks(): try: failed_appending_iptables_rule = False valid_networks = [] network_interfaces_with_subnet = networking.get_network_interfaces_with_subnet() iptables_rules = networking.get_iptables_config() if type(network_interfaces_with_subnet)!=dict: log.error("get_network_interfaces_with_subnet() | Networking | Can't get interfaces") os._exit(1) normalized_iptables_rules=[] not_normalized_iptables_rules=[] for rule in iptables_rules: normalized_iptables_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) not_normalized_iptables_rules.append(rule) net_list = get_docker_networks() if type(net_list)!=list: log.error(f"DOCKER | Networking | {net_list}") os._exit(1) existing_clore_networks=[] for network in net_list: try: #log.success(network) docker_network = DockerNetwork(**network) if docker_network.Name[:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: try: existing_clore_networks.append(docker_network.Name) if docker_network.IPAM and len(docker_network.IPAM) > 0 and docker_network.IPAM[0].Subnet: this_if_name = None this_ipv4_range = '' for if_name in network_interfaces_with_subnet.keys(): can_be_docker = True if if_name[:3] in ["eth", "enp", "eno", "ens", "wlp"]: can_be_docker = False ipv4_range = network_interfaces_with_subnet[if_name] if ipv4_range==docker_network.IPAM[0].Subnet and can_be_docker: this_if_name=if_name this_ipv4_range=ipv4_range break if this_if_name: #print(this_if_name) #print(this_ipv4_range) #print(docker_network) for rule_template in config.clore_iptables_rules: needed_iptables_rule = rule_template.replace("",this_ipv4_range).replace("",this_if_name) for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) is_rule_active = False # Iterate in reverse to safely remove items while iterating for i in range(len(normalized_iptables_rules) - 1, -1, -1): if normalized_iptables_rules[i] == for_comparison_rule_normalized: is_rule_active = True # Remove the matched rule normalized_iptables_rules.pop(i) not_normalized_iptables_rules.pop(i) #print(for_comparison_rule, '|', is_rule_active) if not is_rule_active: succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) if not succesfully_appended: failed_appending_iptables_rule = True else: remove_docker_network(docker_network.Name) except Exception as e2: log.debug(f"DOCKER | Networking | Can't validate network: {e2}") except ValidationError as e: log.error(f"DOCKER | Networking | Validation error: {e.json()}") return False added_default_network = False for clore_default_network in config.clore_default_networks: try: valid_default_network = NetworkConfig(**clore_default_network) if not valid_default_network.name in existing_clore_networks: create_docker_network(valid_default_network.name, str(valid_default_network.subnet), valid_default_network.gateway) added_default_network=True except Exception as e: pass if added_default_network: return "run_again" # Delete unused iptables rules normalized_template_rules=[] for rule in config.clore_iptables_rules: if rule[:2]=="-I": rule = f"-A{rule[2:]}" normalized_template_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) for index, not_matched_rule in enumerate(normalized_iptables_rules): for normalized_template_rule in normalized_template_rules: if normalized_template_rule.keys() == not_matched_rule.keys(): any_unmatching_values = False for key in normalized_template_rule.keys(): if normalized_template_rule[key]=="" or normalized_template_rule[key]=="": pass elif normalized_template_rule[key]!=not_matched_rule[key]: any_unmatching_values=True break if key=="-s" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: any_unmatching_values=True break elif key=="-i" and not_matched_rule[key][:3] in ["eth", "enp", "eno", "ens", "wlp"]: any_unmatching_values=True break elif key=="-d" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: any_unmatching_values=True break if not any_unmatching_values: simple_rule = not_normalized_iptables_rules[index] # Delete rule from iptables networking.rm_iptables_rule(simple_rule) # Delete unused iptables rules if failed_appending_iptables_rule: return False else: return True except Exception as e: log.error(f"validate_and_secure_networks() | ERROR | {e}") return False def get_daemon_config(): config_path = "/etc/docker/daemon.json" try: with open(config_path, 'r') as file: config_data = json.load(file) return config_data except FileNotFoundError: print(f"Error: {config_path} not found.") return None except json.JSONDecodeError: print(f"Error: Failed to parse JSON from {config_path}.") return None