hosting/lib/docker_cli_wrapper.py

112 lines
4.3 KiB
Python

from lib import logging as logging_lib
from lib import config as config_module
import subprocess
import re, os
import signal
import docker
config = config_module.config
log = logging_lib.log
def create_container(container_options, ip=None, docker_gpus=False, timeout=30):
# Sanitize and validate input
container_options = sanitize_input(container_options)
command = ["docker", "run", "--detach", "--tty"]
if "name" in container_options:
command.extend(["--name", container_options["name"]])
if "network_mode" in container_options:
command.extend(["--network", container_options["network_mode"]])
if "cap_add" in container_options:
for cap in container_options["cap_add"]:
command.extend(["--cap-add", cap])
if "volumes" in container_options:
for volume_host, volume_container in container_options["volumes"].items():
bind = f"{volume_host}:{volume_container['bind']}"
if "mode" in volume_container:
bind += f":{volume_container['mode']}"
command.extend(["--volume", bind])
if "ports" in container_options:
for port_container, port_host in container_options["ports"].items():
command.extend(["-p", f"{port_host}:{port_container}"])
if "environment" in container_options:
for env_var, env_value in container_options["environment"].items():
command.extend(["-e", f"{env_var}={env_value}"])
if "log_config" in container_options:
log_config = container_options["log_config"]
if isinstance(log_config, dict):
for key, value in log_config["Config"].items():
log_option = (f"{key}={value}")
command.extend(["--log-opt", log_option])
command.extend(["--log-driver", log_config["Type"]])
else:
log.debug("Invalid log_config format. Skipping log configuration.")
if "runtime" in container_options:
command.extend(["--runtime", container_options["runtime"]])
if docker_gpus:
if type(docker_gpus)==list:
command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"'])
else:
command.extend(["--gpus", "all"])
if "storage_opt" in container_options:
for storage_opt, value in container_options["storage_opt"].items():
command.extend(["--storage-opt", f"{storage_opt}={value}"])
if "entrypoint" in container_options:
command.extend(["--entrypoint", container_options["entrypoint"]])
if ip:
command.extend(["--ip", ip])
command.append(container_options["image"])
try:
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
try:
output, error = process.communicate(timeout=timeout)
if process.returncode == 0:
container_id = output.strip()
return container_id
else:
print(f"Error creating container: {error}")
return None
except subprocess.TimeoutExpired:
process.send_signal(signal.SIGTERM)
process.kill()
print("Timeout exceeded while creating container.")
return None
except subprocess.CalledProcessError as e:
print(f"Error creating container: {e.output}")
return None
def sanitize_input(container_options):
sanitized_options = {}
for key, value in container_options.items():
if isinstance(value, str):
# Remove any potential shell injection or escapes
sanitized_value = re.sub(r'[`$\\\'\"]', '', value)
sanitized_options[key] = sanitized_value
elif isinstance(value, dict):
sanitized_options[key] = sanitize_input(value)
elif isinstance(value, list):
sanitized_list = []
for item in value:
if isinstance(item, str):
sanitized_item = re.sub(r'[`$\\\'\"]', '', item)
sanitized_list.append(sanitized_item)
elif isinstance(item, dict):
sanitized_list.append(sanitize_input(item))
sanitized_options[key] = sanitized_list
else:
sanitized_options[key] = value
return sanitized_options