81 lines
3.6 KiB
Python
81 lines
3.6 KiB
Python
from lib import docker_interface
|
|
from lib import config as config_module
|
|
from lib import logging as logging_lib
|
|
from lib import clore_partner
|
|
config = config_module.config
|
|
log = logging_lib.log
|
|
|
|
import asyncio
|
|
import time
|
|
from lib import container_logs
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import queue # Import the synchronous queue module
|
|
|
|
async def log_streaming_task(message_broker, monitoring, do_not_stream_containers):
|
|
client = docker_interface.client
|
|
executor = ThreadPoolExecutor(max_workers=4)
|
|
tasks = {}
|
|
queues = {}
|
|
|
|
while True:
|
|
try:
|
|
await monitoring.put("log_streaming_task")
|
|
current_containers = await asyncio.get_event_loop().run_in_executor(
|
|
executor,
|
|
lambda: {container.name: container for container in client.containers.list() if container.status == 'running'}
|
|
)
|
|
existing_tasks = set(tasks.keys())
|
|
|
|
log_container_names = []
|
|
|
|
# Start tasks for new containers
|
|
for container_name, container in current_containers.items():
|
|
if not container_name in do_not_stream_containers and not clore_partner.validate_partner_container_name(container_name):
|
|
log_container_names.append(container_name)
|
|
if container_name not in tasks:
|
|
log.debug(f"log_streaming_task() | Starting task for {container_name}")
|
|
sync_queue = queue.Queue()
|
|
task = asyncio.ensure_future(asyncio.get_event_loop().run_in_executor(
|
|
executor, container_logs.stream_logs, container_name, sync_queue))
|
|
tasks[container_name] = task
|
|
queues[container_name] = sync_queue
|
|
|
|
await message_broker.put(log_container_names)
|
|
|
|
# Check sync_queues for data and log it
|
|
for container_name, sync_queue in list(queues.items()):
|
|
if not sync_queue.empty():
|
|
log.debug(f"log_streaming_task() | Streamed data from {container_name}: ")
|
|
init_msg = False
|
|
full_msg = ''
|
|
while not sync_queue.empty():
|
|
data = sync_queue.get()
|
|
if data is None:
|
|
# Task is done, remove it
|
|
del tasks[container_name]
|
|
del queues[container_name]
|
|
log.debug(f"log_streaming_task() | Completed processing for {container_name}")
|
|
else:
|
|
if data=='I':
|
|
await message_broker.put(f"{container_name}|I")
|
|
full_msg=''
|
|
init_msg=True
|
|
else:
|
|
full_msg+=data[1:]
|
|
if full_msg!='':
|
|
#print(full_msg)
|
|
await message_broker.put(f"{container_name}|D{full_msg}")
|
|
|
|
# Remove tasks for containers that no longer exist
|
|
for task_name in existing_tasks:
|
|
if task_name not in current_containers and task_name in tasks:
|
|
log.debug(f"log_streaming_task() | Killing task for {task_name}")
|
|
tasks[task_name].cancel()
|
|
del tasks[task_name]
|
|
if task_name in queues:
|
|
del queues[task_name]
|
|
|
|
#print(log_container_names)
|
|
except Exception as e:
|
|
log.error(f"log_streaming_task() | {e}")
|
|
await asyncio.sleep(config.container_log_streaming_interval) |