11import asyncio
22import contextlib
3+ import datetime
34import logging
45import re
56import socket
2223from aiodocker .containers import DockerContainer
2324from aiodocker .volumes import DockerVolume
2425from dask_task_models_library .container_tasks .docker import DockerBasicAuth
26+ from dask_task_models_library .container_tasks .errors import ServiceTimeoutLoggingError
2527from dask_task_models_library .container_tasks .protocol import (
2628 ContainerCommands ,
2729 ContainerEnvsDict ,
5658)
5759from .task_shared_volume import TaskSharedVolumes
5860
59- logger = logging .getLogger (__name__ )
61+ _logger = logging .getLogger (__name__ )
6062LogPublishingCB = Callable [[LogMessageStr , LogLevelInt ], Coroutine [Any , Any , None ]]
6163
6264
@@ -102,7 +104,7 @@ async def create_container_config(
102104 NanoCPUs = nano_cpus_limit ,
103105 ),
104106 )
105- logger .debug ("Container configuration: \n %s" , pformat (config .model_dump ()))
107+ _logger .debug ("Container configuration: \n %s" , pformat (config .model_dump ()))
106108 return config
107109
108110
@@ -113,30 +115,30 @@ async def managed_container(
113115 container = None
114116 try :
115117 with log_context (
116- logger , logging .DEBUG , msg = f"managing container { name } for { config .image } "
118+ _logger , logging .DEBUG , msg = f"managing container { name } for { config .image } "
117119 ):
118120 container = await docker_client .containers .create (
119121 config .model_dump (by_alias = True ), name = name
120122 )
121123 yield container
122124 except asyncio .CancelledError :
123125 if container :
124- logger .warning (
126+ _logger .warning (
125127 "Cancelling run of container %s, for %s" , container .id , config .image
126128 )
127129 raise
128130 finally :
129131 try :
130132 if container :
131133 with log_context (
132- logger ,
134+ _logger ,
133135 logging .DEBUG ,
134136 msg = f"Removing container { name } :{ container .id } for { config .image } " ,
135137 ):
136138 await container .delete (remove = True , v = True , force = True )
137- logger .info ("Completed run of %s" , config .image )
139+ _logger .info ("Completed run of %s" , config .image )
138140 except DockerError :
139- logger .exception (
141+ _logger .exception (
140142 "Unknown error with docker client when removing container '%s'" ,
141143 container or name ,
142144 )
@@ -166,7 +168,7 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float:
166168async def _try_parse_progress (
167169 line : str , * , progress_regexp : re .Pattern [str ]
168170) -> float | None :
169- with log_catch (logger , reraise = False ):
171+ with log_catch (_logger , reraise = False ):
170172 # pattern might be like "timestamp log"
171173 log = line .strip ("\n " )
172174 splitted_log = log .split (" " , maxsplit = 1 )
@@ -215,14 +217,14 @@ async def _parse_container_log_file( # noqa: PLR0913 # pylint: disable=too-many
215217) -> None :
216218 log_file = task_volumes .logs_folder / LEGACY_SERVICE_LOG_FILE_NAME
217219 with log_context (
218- logger ,
220+ _logger ,
219221 logging .DEBUG ,
220222 "started monitoring of pre-1.0 service - using log file in /logs folder" ,
221223 ):
222- async with aiofiles .open (log_file , mode = "rt" ) as file_pointer :
224+ async with aiofiles .open (log_file ) as file_pointer :
223225 while (await container .show ())["State" ]["Running" ]:
224226 if line := await file_pointer .readline ():
225- logger .info (
227+ _logger .info (
226228 "[%s]: %s" ,
227229 f"{ service_key } :{ service_version } - { container .id } { container_name } " ,
228230 line ,
@@ -236,7 +238,7 @@ async def _parse_container_log_file( # noqa: PLR0913 # pylint: disable=too-many
236238
237239 # finish reading the logs if possible
238240 async for line in file_pointer :
239- logger .info (
241+ _logger .info (
240242 "[%s]: %s" ,
241243 f"{ service_key } :{ service_version } - { container .id } { container_name } " ,
242244 line ,
@@ -275,62 +277,76 @@ async def _parse_container_docker_logs(
275277 """
276278
277279 Raises:
278- TimeoutError : raised when no logs are received for longer than _AIODOCKER_LOGS_TIMEOUT_S
280+ ServiceTimeoutLoggingError : raised when no logs are received for longer than _AIODOCKER_LOGS_TIMEOUT_S
279281 """
280- with log_context (
281- logger , logging .DEBUG , "started monitoring of >=1.0 service - using docker logs"
282- ):
283- assert isinstance (container .docker .connector , aiohttp .UnixConnector ) # nosec
284- async with Docker (
285- session = aiohttp .ClientSession (
286- connector = aiohttp .UnixConnector (container .docker .connector .path ),
287- timeout = aiohttp .ClientTimeout (total = _AIODOCKER_LOGS_TIMEOUT_S ),
288- )
289- ) as docker_client_for_logs :
290- # NOTE: this is a workaround for aiodocker not being able to get the container
291- # logs when the container is not running
292- container_for_long_running_logs = (
293- await docker_client_for_logs .containers .get (container .id )
294- )
295- # NOTE: this is a workaround for aiodocker not being able to get the container
296- # logs when the container is not running
297- await container .show ()
298- await container_for_long_running_logs .show ()
299- async with aiofiles .tempfile .TemporaryDirectory () as tmp_dir :
300- log_file_path = (
301- Path (tmp_dir )
302- / f"{ service_key .split (sep = '/' )[- 1 ]} _{ service_version } .logs"
282+ try :
283+ with log_context (
284+ _logger ,
285+ logging .DEBUG ,
286+ "started monitoring of >=1.0 service - using docker logs" ,
287+ ):
288+ assert isinstance (
289+ container .docker .connector , aiohttp .UnixConnector
290+ ) # nosec
291+ async with Docker (
292+ session = aiohttp .ClientSession (
293+ connector = aiohttp .UnixConnector (container .docker .connector .path ),
294+ timeout = aiohttp .ClientTimeout (total = _AIODOCKER_LOGS_TIMEOUT_S ),
303295 )
304- log_file_path .parent .mkdir (parents = True , exist_ok = True )
305- async with aiofiles .open (log_file_path , mode = "wb+" ) as log_fp :
306- async for log_line in cast (
307- AsyncGenerator [str , None ],
308- container_for_long_running_logs .log (
309- stdout = True ,
310- stderr = True ,
311- follow = True ,
312- timestamps = True ,
313- ),
314- ):
315- log_msg_without_timestamp = log_line .split (" " , maxsplit = 1 )[1 ]
316- logger .info (
317- "[%s]: %s" ,
318- f"{ service_key } :{ service_version } - { container_for_long_running_logs .id } { container_name } " ,
319- log_msg_without_timestamp ,
320- )
321- await log_fp .write (log_line .encode ("utf-8" ))
322- # NOTE: here we remove the timestamp, only needed for the file
323- await _parse_and_publish_logs (
324- log_msg_without_timestamp ,
325- task_publishers = task_publishers ,
326- progress_regexp = progress_regexp ,
327- progress_bar = progress_bar ,
328- )
329-
330- # copy the log file to the log_file_url
331- await push_file_to_remote (
332- log_file_path , log_file_url , log_publishing_cb , s3_settings
296+ ) as docker_client_for_logs :
297+ # NOTE: this is a workaround for aiodocker not being able to get the container
298+ # logs when the container is not running
299+ container_for_long_running_logs = (
300+ await docker_client_for_logs .containers .get (container .id )
333301 )
302+ # NOTE: this is a workaround for aiodocker not being able to get the container
303+ # logs when the container is not running
304+ await container .show ()
305+ await container_for_long_running_logs .show ()
306+ async with aiofiles .tempfile .TemporaryDirectory () as tmp_dir :
307+ log_file_path = (
308+ Path (tmp_dir )
309+ / f"{ service_key .split (sep = '/' )[- 1 ]} _{ service_version } .logs"
310+ )
311+ log_file_path .parent .mkdir (parents = True , exist_ok = True )
312+ async with aiofiles .open (log_file_path , mode = "wb+" ) as log_fp :
313+ async for log_line in cast (
314+ AsyncGenerator [str , None ],
315+ container_for_long_running_logs .log (
316+ stdout = True ,
317+ stderr = True ,
318+ follow = True ,
319+ timestamps = True ,
320+ ),
321+ ):
322+ log_msg_without_timestamp = log_line .split (" " , maxsplit = 1 )[
323+ 1
324+ ]
325+ _logger .info (
326+ "[%s]: %s" ,
327+ f"{ service_key } :{ service_version } - { container_for_long_running_logs .id } { container_name } " ,
328+ log_msg_without_timestamp ,
329+ )
330+ await log_fp .write (log_line .encode ("utf-8" ))
331+ # NOTE: here we remove the timestamp, only needed for the file
332+ await _parse_and_publish_logs (
333+ log_msg_without_timestamp ,
334+ task_publishers = task_publishers ,
335+ progress_regexp = progress_regexp ,
336+ progress_bar = progress_bar ,
337+ )
338+
339+ # copy the log file to the log_file_url
340+ await push_file_to_remote (
341+ log_file_path , log_file_url , log_publishing_cb , s3_settings
342+ )
343+ except TimeoutError as e :
344+ raise ServiceTimeoutLoggingError (
345+ service_key = service_key ,
346+ service_version = service_version ,
347+ container_id = container .id ,
348+ timeout_timedelta = datetime .timedelta (seconds = _AIODOCKER_LOGS_TIMEOUT_S ),
349+ ) from e
334350
335351
336352async def _monitor_container_logs ( # noqa: PLR0913 # pylint: disable=too-many-arguments
@@ -351,12 +367,15 @@ async def _monitor_container_logs( # noqa: PLR0913 # pylint: disable=too-many-a
351367 that must be available in task_volumes.log / log.dat
352368 Services above are not creating a file and use the usual docker logging. These logs
353369 are retrieved using the usual cli 'docker logs CONTAINERID'
370+
371+ Raises: ServiceTimeoutLoggingError if no logs are received for longer than _AIODOCKER_LOGS_TIMEOUT_S
372+ any error
354373 """
355374
356375 container_info = await container .show ()
357376 container_name = container_info .get ("Name" , "undefined" )
358377 with log_context (
359- logger ,
378+ _logger ,
360379 logging .INFO ,
361380 f"parse logs of { service_key } :{ service_version } - { container .id } -{ container_name } " ,
362381 ):
@@ -403,6 +422,10 @@ async def managed_monitor_container_log_task( # noqa: PLR0913 # pylint: disable
403422 s3_settings : S3Settings | None ,
404423 progress_bar : ProgressBarData ,
405424) -> AsyncIterator [Awaitable [None ]]:
425+ """
426+ Raises:
427+ ServiceTimeoutLoggingError -- raised when no logs are received for longer than _AIODOCKER_LOGS_TIMEOUT_S
428+ """
406429 monitoring_task = None
407430 try :
408431 if integration_version == LEGACY_INTEGRATION_VERSION :
@@ -432,7 +455,7 @@ async def managed_monitor_container_log_task( # noqa: PLR0913 # pylint: disable
432455 await monitoring_task
433456 finally :
434457 if monitoring_task :
435- with log_context (logger , logging .DEBUG , "cancel logs monitoring task" ):
458+ with log_context (_logger , logging .DEBUG , "cancel logs monitoring task" ):
436459 monitoring_task .cancel ()
437460 with contextlib .suppress (asyncio .CancelledError ):
438461 await monitoring_task
@@ -479,7 +502,7 @@ async def get_image_labels(
479502 # NOTE: old services did not have the integration-version label
480503 # image labels are set to None when empty
481504 if image_labels := image_cfg ["Config" ].get ("Labels" ):
482- logger .debug ("found following image labels:\n %s" , pformat (image_labels ))
505+ _logger .debug ("found following image labels:\n %s" , pformat (image_labels ))
483506 data = from_labels (
484507 image_labels , prefix_key = OSPARC_LABEL_PREFIXES [0 ], trim_key_head = False
485508 )
@@ -490,20 +513,20 @@ async def get_image_labels(
490513async def get_computational_shared_data_mount_point (docker_client : Docker ) -> Path :
491514 app_settings = ApplicationSettings .create_from_envs ()
492515 try :
493- logger .debug (
516+ _logger .debug (
494517 "getting computational shared data mount point for %s" ,
495518 app_settings .SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME ,
496519 )
497520 volume_attributes = await DockerVolume (
498521 docker_client , app_settings .SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME
499522 ).show ()
500- logger .debug (
523+ _logger .debug (
501524 "found following volume attributes: %s" , pformat (volume_attributes )
502525 )
503526 return Path (volume_attributes ["Mountpoint" ])
504527
505528 except DockerError :
506- logger .exception (
529+ _logger .exception (
507530 "Error while retrieving docker volume %s, returnining default %s instead" ,
508531 app_settings .SIDECAR_COMP_SERVICES_SHARED_VOLUME_NAME ,
509532 app_settings .SIDECAR_COMP_SERVICES_SHARED_FOLDER ,
0 commit comments