from lib import logging as logging_lib log = logging_lib.log from lib import config as config_module config = config_module.config from packaging import version from lib import networking from lib import utils from pydantic import BaseModel, Field, ValidationError, IPvAnyNetwork from typing import List, Optional import docker import json import os import re partner_bridge_subnet = '' for clore_network in config.clore_default_networks: if clore_network["name"] == "clore-partner-br0": partner_bridge_subnet = clore_network["subnet"] try: os.makedirs(config.startup_scripts_folder, exist_ok=True) except Exception as e: pass try: os.makedirs(config.wireguard_config_folder, exist_ok=True) except Exception as e: pass try: os.makedirs(config.entrypoints_folder, exist_ok=True) except Exception as e: pass class NetworkConfig(BaseModel): name: str = Field(..., description="The name of the network") subnet: IPvAnyNetwork = Field(..., description="The subnet of the network in CIDR notation") gateway: str = Field(..., description="The gateway IP address of the network") class IPAMConfig(BaseModel): Subnet: str Gateway: str class DockerNetwork(BaseModel): Name: str = Field(..., alias='Name') Id: str = Field(..., alias='ID') Created: Optional[str] = None # Assuming you might still want this field, it's optional since it's not in your example dict Scope: str Driver: str EnableIPv6: Optional[bool] = None # Making it optional since it's not in your example dict IPAM: List[IPAMConfig] = Field(..., alias='IPAM') class Config: populate_by_name = True client = docker.from_env() low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock') daemon_config_path = "/etc/docker/daemon.json" def get_info(): try: client_info = client.info() return client_info except Exception as e: return {} def stop_all_containers(): try: # List all containers containers = client.containers.list(all=True) # Use all=True to include stopped containers for container in containers: log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})") container.stop() # Stop the container log.success("stop_all_containers() | All containers have been stopped.") except Exception as e: log.error(f"stop_all_containers() |An error occurred: {e}") return True def check_docker_connection(): try: client.ping() return True except docker.errors.DockerException as e: print(f"Error: {e}") return False def get_docker_networks(): net_list = [] try: networks = client.networks.list() for network in networks: net_list.append(network.attrs) return net_list except docker.errors.DockerException as e: return (f"Error: {e}") def get_local_images(no_latest_tag=False): try: images = client.images.list() # Extract and print the repository and tag for each image image_list = [] for image in images: # Some images may not have tags, so we handle that case tags = image.tags if image.tags else [':'] for tag in tags: if tag!=":": if no_latest_tag: image_list.append(tag.replace(':latest','')) else: image_list.append(tag) return image_list except Exception as e: log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}") os._exit(1) def get_containers(all=False): try: containers = client.containers.list(all=all) return containers except Exception as e: log.error("DOCKER INTERFACE | Can't get_containers()") os._exit(1) def remove_docker_image(image): try: # Remove the Docker image client.images.remove(image=image) log.debug(f"remove_docker_image() | Image '{image}' successfully removed.") return True except docker.errors.ImageNotFound: log.debug(f"remove_docker_image() | Image '{image}' not found.") return False except Exception as e: log.debug(f"remove_docker_image() | Error removing image '{image}': {e}") return False def remove_docker_network(network_name): try: # Retrieve the network by name network = client.networks.get(network_name) # Remove the network network.remove() log.debug(f"DOCKER | Network '{network_name}' successfully removed.") return True except docker.errors.NotFound: log.debug(f"DOCKER | Network '{network_name}' not found.") return False except Exception as e: log.debug(f"DOCKER | An error occurred: {e}") return False def get_docker_networks(): networks_list = [] try: # Create a Docker client client = docker.from_env() # Get a list of all networks networks = client.networks.list() # Iterate through each network to gather details for network in networks: try: network_details = { 'Name': network.name, 'ID': network.id, 'Driver': network.attrs["Driver"], 'Scope': network.attrs["Scope"], 'IPAM': [] } # IPAM Config might have multiple configurations. Gather them. ipam_configs = network.attrs.get('IPAM', {}).get('Config', []) if type(ipam_configs) == list: for config in ipam_configs: subnet = config.get('Subnet', 'Not specified') gateway = config.get('Gateway', 'Not specified') network_details['IPAM'].append({'Subnet': subnet, 'Gateway': gateway}) networks_list.append(network_details) except Exception as e: pass return networks_list except Exception as e: log.error(f"Failed to retrieve Docker networks: {e}") os._exit(1) # Exit the application on any failure def create_docker_network(network_name, subnet, gateway, driver="bridge"): try: network = client.networks.create( name=network_name, driver=driver, ipam=docker.types.IPAMConfig( pool_configs=[docker.types.IPAMPool( subnet=subnet, iprange=subnet, gateway=gateway )] ), check_duplicate=True, #options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {} ) log.debug(f"Network {network_name} created successfully.") return True except docker.errors.APIError as e: log.error(f"DOCKER | Failed to create network {network_name}: {e}") return False def validate_and_secure_networks(partner_forwarding_ips): try: failed_appending_iptables_rule = False valid_networks = [] network_interfaces_with_subnet = networking.get_network_interfaces_with_subnet() iptables_rules = networking.get_iptables_config() if type(network_interfaces_with_subnet)!=dict: log.error("get_network_interfaces_with_subnet() | Networking | Can't get interfaces") os._exit(1) normalized_iptables_rules=[] not_normalized_iptables_rules=[] for rule in iptables_rules: normalized_iptables_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) not_normalized_iptables_rules.append(rule) net_list = get_docker_networks() if type(net_list)!=list: log.error(f"DOCKER | Networking | {net_list}") os._exit(1) existing_clore_networks=[] for network in net_list: try: #log.success(network) docker_network = DockerNetwork(**network) if docker_network.Name[:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix: try: existing_clore_networks.append(docker_network.Name) if docker_network.IPAM and len(docker_network.IPAM) > 0 and docker_network.IPAM[0].Subnet: this_if_name = None this_ipv4_range = '' for if_name in network_interfaces_with_subnet.keys(): can_be_docker = True if if_name[:3] in ["eth", "enp", "eno", "ens", "wlp", "vet"]: can_be_docker = False ipv4_range = network_interfaces_with_subnet[if_name] if ipv4_range==docker_network.IPAM[0].Subnet and can_be_docker: this_if_name=if_name this_ipv4_range=ipv4_range break if this_if_name: #print(this_if_name) #print(this_ipv4_range) outside_ranges_ip_network = networking.exclude_network(this_ipv4_range) if this_ipv4_range == partner_bridge_subnet: for partner_forwarding_ip in partner_forwarding_ips: outside_ranges = [] for ip_range in outside_ranges_ip_network: outside_ranges.append(str(ip_range)) outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges) outside_ranges = [] for outside_range_ip_network in outside_ranges_ip_network: outside_ranges.append(str(outside_range_ip_network)) #print(docker_network) for rule_template in config.clore_iptables_rules: if rule_template[:2]=='-I': for outside_range in outside_ranges: needed_iptables_rule = rule_template.replace("",outside_range).replace("",this_if_name) for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) is_rule_active = False # Iterate in reverse to safely remove items while iterating for i in range(len(normalized_iptables_rules) - 1, -1, -1): if normalized_iptables_rules[i] == for_comparison_rule_normalized: is_rule_active = True # Remove the matched rule normalized_iptables_rules.pop(i) not_normalized_iptables_rules.pop(i) #print(for_comparison_rule, '|', is_rule_active) if not is_rule_active: succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) if not succesfully_appended: failed_appending_iptables_rule = True elif this_ipv4_range != partner_bridge_subnet: needed_iptables_rule = rule_template.replace("",this_ipv4_range).replace("",this_if_name) for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule)) is_rule_active = False # Iterate in reverse to safely remove items while iterating for i in range(len(normalized_iptables_rules) - 1, -1, -1): if normalized_iptables_rules[i] == for_comparison_rule_normalized: is_rule_active = True # Remove the matched rule normalized_iptables_rules.pop(i) not_normalized_iptables_rules.pop(i) #print(for_comparison_rule, '|', is_rule_active) if not is_rule_active: succesfully_appended = networking.add_iptables_rule(needed_iptables_rule) if not succesfully_appended: failed_appending_iptables_rule = True else: remove_docker_network(docker_network.Name) except Exception as e2: log.debug(f"DOCKER | Networking | Can't validate network: {e2}") except ValidationError as e: log.error(f"DOCKER | Networking | Validation error: {e.json()}") return False added_default_network = False for clore_default_network in config.clore_default_networks: try: valid_default_network = NetworkConfig(**clore_default_network) if not valid_default_network.name in existing_clore_networks: create_docker_network(valid_default_network.name, str(valid_default_network.subnet), valid_default_network.gateway) added_default_network=True except Exception as e: pass if added_default_network: return "run_again" # Delete unused iptables rules normalized_template_rules=[] for rule in config.clore_iptables_rules: if rule[:2]=="-I": rule = f"-A{rule[2:]}" normalized_template_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule))) for index, not_matched_rule in enumerate(normalized_iptables_rules): for normalized_template_rule in normalized_template_rules: if normalized_template_rule.keys() == not_matched_rule.keys(): any_unmatching_values = False for key in normalized_template_rule.keys(): if normalized_template_rule[key]=="" or normalized_template_rule[key]=="": pass elif normalized_template_rule[key]!=not_matched_rule[key]: if key=='-A' and not_matched_rule[key]=="FORWARD": pass else: any_unmatching_values=True break if key=="-s" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: any_unmatching_values=True break elif key=="-i" and not_matched_rule[key][:3] in ["eth", "enp", "eno", "ens", "wlp", "vet"]: any_unmatching_values=True break #elif key=="-d" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet: # any_unmatching_values=True # break if not any_unmatching_values: simple_rule = not_normalized_iptables_rules[index] # Delete rule from iptables networking.rm_iptables_rule(simple_rule) # Delete unused iptables rules if failed_appending_iptables_rule: return False else: return True except Exception as e: log.error(f"validate_and_secure_networks() | ERROR | {e}") return False def get_daemon_config(): try: with open(daemon_config_path, 'r') as file: config_data = json.load(file) return config_data except FileNotFoundError: print(f"Error: {daemon_config_path} not found.") return None except json.JSONDecodeError: print(f"Error: Failed to parse JSON from {daemon_config_path}.") return None def verify_docker_version(min_version="17.06"): try: docker_version = client.version()['Version'] is_newer = version.parse(docker_version) > version.parse(min_version) if not is_newer: log.error(f"Current docker version ({docker_version}) is too old to be used with clore.ai software\nExiting...") os._exit(1) except Exception as e: log.error(f"Failed to verify docker version | {e}") os._exit(1) def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"): deamon_config = get_daemon_config() if deamon_config: try: if (not "exec-opts" in deamon_config or type(deamon_config["exec-opts"])!=list) and value!=None: deamon_config["exec-opts"]=[f"{key}={value}"] elif "exec-opts" in deamon_config: new_exec_opts=[] matched_key=False for exec_opt in deamon_config["exec-opts"]: if '=' in exec_opt: exec_opt_key, exec_opt_value = exec_opt.split('=',1) if exec_opt_key==key: matched_key=True if value!=None: new_exec_opts.append(f"{key}={value}") else: new_exec_opts.append(exec_opt) else: new_exec_opts.append(exec_opt) if not matched_key: new_exec_opts.append(f"{key}={value}") if len(new_exec_opts)==0: del deamon_config["exec-opts"] else: if deamon_config["exec-opts"] == new_exec_opts: return "Same" deamon_config["exec-opts"]=new_exec_opts json_string = json.dumps(deamon_config, indent=4) with open(daemon_config_path, 'w') as file: file.write(json_string) return True except Exception as e: log.error(f"Failed 'configure_exec_opts' | {e}") return False else: return False def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job, pattern = r'^[a-z]+_[a-z]+$' return re.match(pattern, container_name) is not None