from lib import logging as logging_lib from lib import config as config_module import subprocess import re, os import signal import docker config = config_module.config log = logging_lib.log def create_container(container_options, ip=None, docker_gpus=False, shm_size=64, timeout=30): # Sanitize and validate input container_options = sanitize_input(container_options) command = ["docker", "run", "--detach", "--tty"] if "name" in container_options: command.extend(["--name", container_options["name"]]) if "network_mode" in container_options: command.extend(["--network", container_options["network_mode"]]) if "hostname" in container_options: command.extend(["--hostname", container_options["hostname"]]) if "cap_add" in container_options: for cap in container_options["cap_add"]: command.extend(["--cap-add", cap]) if "devices" in container_options: for device in container_options["devices"]: command.extend(["--device", device]) if "security_opt" in container_options: for security_opt in container_options["security_opt"]: command.extend(["--security-opt", security_opt]) if "volumes" in container_options: for volume_host, volume_container in container_options["volumes"].items(): bind = f"{volume_host}:{volume_container['bind']}" if "mode" in volume_container: bind += f":{volume_container['mode']}" command.extend(["--volume", bind]) if "ports" in container_options: for port_container, port_host in container_options["ports"].items(): command.extend(["-p", f"{port_host}:{port_container}"]) if "environment" in container_options: for env_var, env_value in container_options["environment"].items(): command.extend(["-e", f"{env_var}={env_value}"]) if "log_config" in container_options: log_config = container_options["log_config"] if isinstance(log_config, dict): for key, value in log_config["Config"].items(): log_option = (f"{key}={value}") command.extend(["--log-opt", log_option]) command.extend(["--log-driver", log_config["Type"]]) else: log.debug("Invalid log_config format. Skipping log configuration.") if "runtime" in container_options: command.extend(["--runtime", container_options["runtime"]]) if shm_size != 64: command.extend(["--shm-size", f"{shm_size}m"]) if docker_gpus: if type(docker_gpus)==list: command.extend(['--gpus', '"device=' + ','.join(str(gpu_id) for gpu_id in docker_gpus) + '"']) else: command.extend(["--gpus", "all"]) if "storage_opt" in container_options: for storage_opt, value in container_options["storage_opt"].items(): command.extend(["--storage-opt", f"{storage_opt}={value}"]) if "entrypoint" in container_options: command.extend(["--entrypoint", container_options["entrypoint"]]) if ip: command.extend(["--ip", ip]) command.append('--stop-timeout') command.append('0') command.append(container_options["image"]) try: process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) try: output, error = process.communicate(timeout=timeout) if process.returncode == 0: container_id = output.strip() return container_id else: print(f"Error creating container: {error}") return None except subprocess.TimeoutExpired: process.send_signal(signal.SIGTERM) process.kill() print("Timeout exceeded while creating container.") return None except subprocess.CalledProcessError as e: print(f"Error creating container: {e.output}") return None def sanitize_input(container_options): sanitized_options = {} for key, value in container_options.items(): if isinstance(value, str): # Remove any potential shell injection or escapes sanitized_value = re.sub(r'[`$\\\'\"]', '', value) sanitized_options[key] = sanitized_value elif isinstance(value, dict): sanitized_options[key] = sanitize_input(value) elif isinstance(value, list): sanitized_list = [] for item in value: if isinstance(item, str): sanitized_item = re.sub(r'[`$\\\'\"]', '', item) sanitized_list.append(sanitized_item) elif isinstance(item, dict): sanitized_list.append(sanitize_input(item)) sanitized_options[key] = sanitized_list else: sanitized_options[key] = value return sanitized_options