hosting/lib/docker_interface.py

446 lines
20 KiB
Python
Raw Normal View History

2024-03-21 01:28:02 +00:00
from lib import logging as logging_lib
log = logging_lib.log
from lib import config as config_module
config = config_module.config
from packaging import version
2024-03-21 01:28:02 +00:00
from lib import networking
from lib import utils
from pydantic import BaseModel, Field, ValidationError, IPvAnyNetwork
from typing import List, Optional
import docker
import json
import os
2024-12-02 00:06:53 +00:00
import re
partner_bridge_subnet = ''
for clore_network in config.clore_default_networks:
if clore_network["name"] == "clore-partner-br0":
partner_bridge_subnet = clore_network["subnet"]
2024-03-21 01:28:02 +00:00
try:
os.makedirs(config.startup_scripts_folder, exist_ok=True)
except Exception as e:
pass
try:
os.makedirs(config.wireguard_config_folder, exist_ok=True)
except Exception as e:
pass
try:
os.makedirs(config.entrypoints_folder, exist_ok=True)
except Exception as e:
pass
class NetworkConfig(BaseModel):
name: str = Field(..., description="The name of the network")
subnet: IPvAnyNetwork = Field(..., description="The subnet of the network in CIDR notation")
gateway: str = Field(..., description="The gateway IP address of the network")
class IPAMConfig(BaseModel):
Subnet: str
Gateway: str
class DockerNetwork(BaseModel):
Name: str = Field(..., alias='Name')
Id: str = Field(..., alias='ID')
Created: Optional[str] = None # Assuming you might still want this field, it's optional since it's not in your example dict
Scope: str
Driver: str
EnableIPv6: Optional[bool] = None # Making it optional since it's not in your example dict
IPAM: List[IPAMConfig] = Field(..., alias='IPAM')
class Config:
populate_by_name = True
client = docker.from_env()
low_level_client = docker.APIClient(base_url='unix://var/run/docker.sock')
daemon_config_path = "/etc/docker/daemon.json"
def get_info():
try:
client_info = client.info()
return client_info
except Exception as e:
return {}
2024-03-21 01:28:02 +00:00
2024-12-02 00:06:53 +00:00
def stop_all_containers():
try:
# List all containers
containers = client.containers.list(all=True) # Use all=True to include stopped containers
for container in containers:
log.info(f"stop_all_containers() | Stopping container: {container.name} (ID: {container.id})")
container.stop() # Stop the container
log.success("stop_all_containers() | All containers have been stopped.")
except Exception as e:
log.error(f"stop_all_containers() |An error occurred: {e}")
return True
2024-03-21 01:28:02 +00:00
def check_docker_connection():
try:
client.ping()
return True
except docker.errors.DockerException as e:
print(f"Error: {e}")
return False
def get_docker_networks():
net_list = []
try:
networks = client.networks.list()
for network in networks:
net_list.append(network.attrs)
return net_list
except docker.errors.DockerException as e:
return (f"Error: {e}")
2024-03-23 00:50:42 +00:00
def get_local_images(no_latest_tag=False):
2024-03-21 01:28:02 +00:00
try:
images = client.images.list()
# Extract and print the repository and tag for each image
image_list = []
for image in images:
# Some images may not have tags, so we handle that case
tags = image.tags if image.tags else ['<none>:<none>']
for tag in tags:
if tag!="<none>:<none>":
2024-03-23 00:50:42 +00:00
if no_latest_tag:
image_list.append(tag.replace(':latest',''))
else:
image_list.append(tag)
2024-03-21 01:28:02 +00:00
return image_list
except Exception as e:
2024-12-02 00:06:53 +00:00
log.error(f"DOCKER | Can't get local images | {e} | {'y' if no_latest_tag else 'n'}")
2024-03-21 01:28:02 +00:00
os._exit(1)
def get_containers(all=False):
try:
containers = client.containers.list(all=all)
return containers
except Exception as e:
log.error("DOCKER INTERFACE | Can't get_containers()")
os._exit(1)
def remove_docker_image(image):
try:
# Remove the Docker image
client.images.remove(image=image)
log.debug(f"remove_docker_image() | Image '{image}' successfully removed.")
return True
except docker.errors.ImageNotFound:
log.debug(f"remove_docker_image() | Image '{image}' not found.")
return False
except Exception as e:
log.debug(f"remove_docker_image() | Error removing image '{image}': {e}")
return False
def remove_docker_network(network_name):
try:
# Retrieve the network by name
network = client.networks.get(network_name)
# Remove the network
network.remove()
log.debug(f"DOCKER | Network '{network_name}' successfully removed.")
return True
except docker.errors.NotFound:
log.debug(f"DOCKER | Network '{network_name}' not found.")
return False
except Exception as e:
log.debug(f"DOCKER | An error occurred: {e}")
return False
def get_docker_networks():
networks_list = []
try:
# Create a Docker client
client = docker.from_env()
# Get a list of all networks
networks = client.networks.list()
# Iterate through each network to gather details
for network in networks:
try:
network_details = {
'Name': network.name,
'ID': network.id,
'Driver': network.attrs["Driver"],
'Scope': network.attrs["Scope"],
'IPAM': []
}
# IPAM Config might have multiple configurations. Gather them.
ipam_configs = network.attrs.get('IPAM', {}).get('Config', [])
if type(ipam_configs) == list:
for config in ipam_configs:
subnet = config.get('Subnet', 'Not specified')
gateway = config.get('Gateway', 'Not specified')
network_details['IPAM'].append({'Subnet': subnet, 'Gateway': gateway})
networks_list.append(network_details)
except Exception as e:
pass
2024-03-21 01:28:02 +00:00
return networks_list
except Exception as e:
log.error(f"Failed to retrieve Docker networks: {e}")
os._exit(1) # Exit the application on any failure
def create_docker_network(network_name, subnet, gateway, driver="bridge"):
try:
network = client.networks.create(
name=network_name,
driver=driver,
ipam=docker.types.IPAMConfig(
pool_configs=[docker.types.IPAMPool(
subnet=subnet,
iprange=subnet,
gateway=gateway
)]
),
2024-12-02 00:06:53 +00:00
check_duplicate=True,
#options={'com.docker.network.bridge.enable_ip_masq': 'false'} if 'clore-partner-' in network_name else {}
2024-03-21 01:28:02 +00:00
)
log.debug(f"Network {network_name} created successfully.")
return True
except docker.errors.APIError as e:
log.error(f"DOCKER | Failed to create network {network_name}: {e}")
return False
2024-12-02 00:06:53 +00:00
def validate_and_secure_networks(partner_forwarding_ips):
2024-03-21 01:28:02 +00:00
try:
failed_appending_iptables_rule = False
valid_networks = []
network_interfaces_with_subnet = networking.get_network_interfaces_with_subnet()
iptables_rules = networking.get_iptables_config()
if type(network_interfaces_with_subnet)!=dict:
log.error("get_network_interfaces_with_subnet() | Networking | Can't get interfaces")
os._exit(1)
normalized_iptables_rules=[]
not_normalized_iptables_rules=[]
for rule in iptables_rules:
normalized_iptables_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule)))
not_normalized_iptables_rules.append(rule)
net_list = get_docker_networks()
if type(net_list)!=list:
log.error(f"DOCKER | Networking | {net_list}")
os._exit(1)
existing_clore_networks=[]
for network in net_list:
try:
#log.success(network)
docker_network = DockerNetwork(**network)
if docker_network.Name[:len(config.clore_network_name_prefix)]==config.clore_network_name_prefix:
try:
existing_clore_networks.append(docker_network.Name)
if docker_network.IPAM and len(docker_network.IPAM) > 0 and docker_network.IPAM[0].Subnet:
this_if_name = None
this_ipv4_range = ''
for if_name in network_interfaces_with_subnet.keys():
can_be_docker = True
2024-03-26 00:39:51 +00:00
if if_name[:3] in ["eth", "enp", "eno", "ens", "wlp", "vet"]:
2024-03-21 01:28:02 +00:00
can_be_docker = False
ipv4_range = network_interfaces_with_subnet[if_name]
if ipv4_range==docker_network.IPAM[0].Subnet and can_be_docker:
this_if_name=if_name
this_ipv4_range=ipv4_range
break
if this_if_name:
#print(this_if_name)
#print(this_ipv4_range)
2024-03-26 00:39:51 +00:00
outside_ranges_ip_network = networking.exclude_network(this_ipv4_range)
2024-12-02 00:06:53 +00:00
if this_ipv4_range == partner_bridge_subnet:
for partner_forwarding_ip in partner_forwarding_ips:
outside_ranges = []
for ip_range in outside_ranges_ip_network:
outside_ranges.append(str(ip_range))
outside_ranges_ip_network = networking.exclude_network(f"{partner_forwarding_ip}/32", input_ranges=outside_ranges)
2024-03-26 00:39:51 +00:00
outside_ranges = []
for outside_range_ip_network in outside_ranges_ip_network:
outside_ranges.append(str(outside_range_ip_network))
2024-03-21 01:28:02 +00:00
#print(docker_network)
for rule_template in config.clore_iptables_rules:
2024-03-26 00:39:51 +00:00
if rule_template[:2]=='-I':
for outside_range in outside_ranges:
needed_iptables_rule = rule_template.replace("<subnet>",outside_range).replace("<interface>",this_if_name)
for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule
for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule))
is_rule_active = False
# Iterate in reverse to safely remove items while iterating
for i in range(len(normalized_iptables_rules) - 1, -1, -1):
if normalized_iptables_rules[i] == for_comparison_rule_normalized:
is_rule_active = True
# Remove the matched rule
normalized_iptables_rules.pop(i)
not_normalized_iptables_rules.pop(i)
#print(for_comparison_rule, '|', is_rule_active)
if not is_rule_active:
succesfully_appended = networking.add_iptables_rule(needed_iptables_rule)
if not succesfully_appended:
failed_appending_iptables_rule = True
2024-12-02 00:06:53 +00:00
elif this_ipv4_range != partner_bridge_subnet:
2024-03-26 00:39:51 +00:00
needed_iptables_rule = rule_template.replace("<subnet>",this_ipv4_range).replace("<interface>",this_if_name)
for_comparison_rule = "-A"+needed_iptables_rule[2:] if needed_iptables_rule[:2]=="-I" else needed_iptables_rule
for_comparison_rule_normalized = utils.normalize_rule(utils.parse_rule_to_dict(for_comparison_rule))
is_rule_active = False
# Iterate in reverse to safely remove items while iterating
for i in range(len(normalized_iptables_rules) - 1, -1, -1):
if normalized_iptables_rules[i] == for_comparison_rule_normalized:
is_rule_active = True
# Remove the matched rule
normalized_iptables_rules.pop(i)
not_normalized_iptables_rules.pop(i)
#print(for_comparison_rule, '|', is_rule_active)
if not is_rule_active:
succesfully_appended = networking.add_iptables_rule(needed_iptables_rule)
if not succesfully_appended:
failed_appending_iptables_rule = True
2024-03-21 01:28:02 +00:00
else:
remove_docker_network(docker_network.Name)
except Exception as e2:
log.debug(f"DOCKER | Networking | Can't validate network: {e2}")
except ValidationError as e:
log.error(f"DOCKER | Networking | Validation error: {e.json()}")
return False
added_default_network = False
for clore_default_network in config.clore_default_networks:
try:
valid_default_network = NetworkConfig(**clore_default_network)
if not valid_default_network.name in existing_clore_networks:
create_docker_network(valid_default_network.name, str(valid_default_network.subnet), valid_default_network.gateway)
added_default_network=True
except Exception as e:
pass
if added_default_network:
return "run_again"
# Delete unused iptables rules
normalized_template_rules=[]
for rule in config.clore_iptables_rules:
if rule[:2]=="-I":
rule = f"-A{rule[2:]}"
normalized_template_rules.append(utils.normalize_rule(utils.parse_rule_to_dict(rule)))
for index, not_matched_rule in enumerate(normalized_iptables_rules):
for normalized_template_rule in normalized_template_rules:
if normalized_template_rule.keys() == not_matched_rule.keys():
any_unmatching_values = False
for key in normalized_template_rule.keys():
if normalized_template_rule[key]=="<subnet>" or normalized_template_rule[key]=="<interface>":
pass
elif normalized_template_rule[key]!=not_matched_rule[key]:
if key=='-A' and not_matched_rule[key]=="FORWARD":
pass
else:
any_unmatching_values=True
break
2024-03-21 01:28:02 +00:00
if key=="-s" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet:
any_unmatching_values=True
break
2024-03-26 00:39:51 +00:00
elif key=="-i" and not_matched_rule[key][:3] in ["eth", "enp", "eno", "ens", "wlp", "vet"]:
2024-03-21 01:28:02 +00:00
any_unmatching_values=True
break
#elif key=="-d" and not_matched_rule[key][:len(config.clore_br_first_allowed_octet)] != config.clore_br_first_allowed_octet:
# any_unmatching_values=True
# break
2024-03-21 01:28:02 +00:00
if not any_unmatching_values:
simple_rule = not_normalized_iptables_rules[index]
# Delete rule from iptables
networking.rm_iptables_rule(simple_rule)
# Delete unused iptables rules
if failed_appending_iptables_rule:
return False
else:
return True
except Exception as e:
log.error(f"validate_and_secure_networks() | ERROR | {e}")
return False
def get_daemon_config():
try:
with open(daemon_config_path, 'r') as file:
2024-03-21 01:28:02 +00:00
config_data = json.load(file)
return config_data
except FileNotFoundError:
print(f"Error: {daemon_config_path} not found.")
2024-03-21 01:28:02 +00:00
return None
except json.JSONDecodeError:
print(f"Error: Failed to parse JSON from {daemon_config_path}.")
return None
def verify_docker_version(min_version="17.06"):
try:
docker_version = client.version()['Version']
is_newer = version.parse(docker_version) > version.parse(min_version)
if not is_newer:
log.error(f"Current docker version ({docker_version}) is too old to be used with clore.ai software\nExiting...")
os._exit(1)
except Exception as e:
log.error(f"Failed to verify docker version | {e}")
os._exit(1)
def configure_exec_opts(key="native.cgroupdriver", value="cgroupfs"):
deamon_config = get_daemon_config()
if deamon_config:
try:
if (not "exec-opts" in deamon_config or type(deamon_config["exec-opts"])!=list) and value!=None:
deamon_config["exec-opts"]=[f"{key}={value}"]
elif "exec-opts" in deamon_config:
new_exec_opts=[]
matched_key=False
for exec_opt in deamon_config["exec-opts"]:
if '=' in exec_opt:
exec_opt_key, exec_opt_value = exec_opt.split('=',1)
if exec_opt_key==key:
matched_key=True
if value!=None:
new_exec_opts.append(f"{key}={value}")
else:
new_exec_opts.append(exec_opt)
else:
new_exec_opts.append(exec_opt)
if not matched_key:
new_exec_opts.append(f"{key}={value}")
if len(new_exec_opts)==0:
del deamon_config["exec-opts"]
else:
if deamon_config["exec-opts"] == new_exec_opts:
return "Same"
deamon_config["exec-opts"]=new_exec_opts
json_string = json.dumps(deamon_config, indent=4)
with open(daemon_config_path, 'w') as file:
file.write(json_string)
return True
except Exception as e:
log.error(f"Failed 'configure_exec_opts' | {e}")
return False
else:
2024-12-02 00:06:53 +00:00
return False
def is_docker_default_name_lenient(container_name): # Not a perfect solution, but it will do the job,
pattern = r'^[a-z]+_[a-z]+$'
return re.match(pattern, container_name) is not None