hosting/lib/log_streaming_task.py

81 lines
3.6 KiB
Python

from lib import docker_interface
from lib import config as config_module
from lib import logging as logging_lib
from lib import clore_partner
config = config_module.config
log = logging_lib.log
import asyncio
import time
from lib import container_logs
from concurrent.futures import ThreadPoolExecutor
import queue # Import the synchronous queue module
async def log_streaming_task(message_broker, monitoring, do_not_stream_containers):
client = docker_interface.client
executor = ThreadPoolExecutor(max_workers=4)
tasks = {}
queues = {}
while True:
try:
await monitoring.put("log_streaming_task")
current_containers = await asyncio.get_event_loop().run_in_executor(
executor,
lambda: {container.name: container for container in client.containers.list() if container.status == 'running'}
)
existing_tasks = set(tasks.keys())
log_container_names = []
# Start tasks for new containers
for container_name, container in current_containers.items():
if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name):
log_container_names.append(container_name)
if container_name not in tasks:
log.debug(f"log_streaming_task() | Starting task for {container_name}")
sync_queue = queue.Queue()
task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor(
executor, container_logs.stream_logs, container_name, sync_queue))
tasks[container_name] = task
queues[container_name] = sync_queue
await message_broker.put(log_container_names)
# Check sync_queues for data and log it
for container_name, sync_queue in list(queues.items()):
if not sync_queue.empty():
log.debug(f"log_streaming_task() | Streamed data from {container_name}: ")
init_msg = False
full_msg = ''
while not sync_queue.empty():
data = sync_queue.get()
if data is None:
# Task is done, remove it
del tasks[container_name]
del queues[container_name]
log.debug(f"log_streaming_task() | Completed processing for {container_name}")
else:
if data=='I':
await message_broker.put(f"{container_name}|I")
full_msg=''
init_msg=True
else:
full_msg+=data[1:]
if full_msg!='':
#print(full_msg)
await message_broker.put(f"{container_name}|D{full_msg}")
# Remove tasks for containers that no longer exist
for task_name in existing_tasks:
if task_name not in current_containers and task_name in tasks:
log.debug(f"log_streaming_task() | Killing task for {task_name}")
tasks[task_name].cancel()
del tasks[task_name]
if task_name in queues:
del queues[task_name]
#print(log_container_names)
except Exception as e:
log.error(f"log_streaming_task() | {e}")
await asyncio.sleep(config.container_log_streaming_interval)