from lib import docker_interface from lib import config as config_module from lib import logging as logging_lib config = config_module.config log = logging_lib.log import asyncio import time from lib import container_logs from concurrent.futures import ThreadPoolExecutor import queue # Import the synchronous queue module async def log_streaming_task(message_broker, monitoring): client = docker_interface.client executor = ThreadPoolExecutor(max_workers=4) tasks = {} queues = {} while True: try: await monitoring.put("log_streaming_task") current_containers = await asyncio.get_event_loop().run_in_executor( executor, lambda: {container.name: container for container in client.containers.list() if container.status == 'running'} ) existing_tasks = set(tasks.keys()) log_container_names = [] # Start tasks for new containers for container_name, container in current_containers.items(): log_container_names.append(container_name) if container_name not in tasks: log.debug(f"log_streaming_task() | Starting task for {container_name}") sync_queue = queue.Queue() task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor( executor, container_logs.stream_logs, container_name, sync_queue)) tasks[container_name] = task queues[container_name] = sync_queue await message_broker.put(log_container_names) # Check sync_queues for data and log it for container_name, sync_queue in list(queues.items()): if not sync_queue.empty(): log.debug(f"log_streaming_task() | Streamed data from {container_name}: ") init_msg = False full_msg = '' while not sync_queue.empty(): data = sync_queue.get() if data is None: # Task is done, remove it del tasks[container_name] del queues[container_name] log.debug(f"log_streaming_task() | Completed processing for {container_name}") else: if data=='I': await message_broker.put(f"{container_name}|I") full_msg='' init_msg=True else: full_msg+=data[1:] if full_msg!='': #print(full_msg) await message_broker.put(f"{container_name}|D{full_msg}") # Remove tasks for containers that no longer exist for task_name in existing_tasks: if task_name not in current_containers and task_name in tasks: log.debug(f"log_streaming_task() | Killing task for {task_name}") tasks[task_name].cancel() del tasks[task_name] if task_name in queues: del queues[task_name] #print(log_container_names) except Exception as e: log.error(f"log_streaming_task() | {e}") await asyncio.sleep(config.container_log_streaming_interval)