Skip to content

Commit 059a777

Browse files
authored
✨🎨Computational backend: Automatically stop a running job if no logs are detected for 1h (#8549)
1 parent de246e2 commit 059a777

File tree

6 files changed

+413
-206
lines changed

6 files changed

+413
-206
lines changed

packages/dask-task-models-library/src/dask_task_models_library/container_tasks/errors.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,42 @@
33
from common_library.errors_classes import OsparcErrorMixin
44

55

6-
class TaskValueError(OsparcErrorMixin, ValueError): ...
7-
8-
9-
class TaskCancelledError(OsparcErrorMixin, RuntimeError):
10-
msg_template = "The task was cancelled"
6+
class ContainerTaskError(OsparcErrorMixin, RuntimeError):
7+
msg_template = (
8+
"The service {service_key}:{service_version}"
9+
" running in container {container_id} encountered an unexpected error: {error_message}."
10+
)
1111

1212

13-
class ServiceRuntimeError(OsparcErrorMixin, RuntimeError):
13+
class ServiceRuntimeError(ContainerTaskError):
1414
msg_template = (
1515
"The service {service_key}:{service_version}"
1616
" running in container {container_id} failed with code"
1717
" {exit_code}. Last logs:\n{service_logs}"
1818
)
1919

2020

21-
class ServiceInputsUseFileToKeyMapButReceivesZipDataError(
22-
OsparcErrorMixin, RuntimeError
23-
):
21+
class ServiceOutOfMemoryError(ServiceRuntimeError):
22+
msg_template = (
23+
"The service {service_key}:{service_version}"
24+
" running in container {container_id} ran out of memory and was terminated. Current limits are {service_resources}."
25+
" Last logs:\n{service_logs}"
26+
)
27+
28+
29+
class ServiceTimeoutLoggingError(ServiceRuntimeError):
30+
msg_template = (
31+
"The service {service_key}:{service_version}"
32+
" running in container {container_id} was silent/hanging for longer than {timeout_timedelta} and was terminated. "
33+
"TIP: The service might have an internal issue or was wrongly setup."
34+
)
35+
36+
37+
class TaskCancelledError(ContainerTaskError):
38+
msg_template = "The task was cancelled"
39+
40+
41+
class ServiceInputsUseFileToKeyMapButReceivesZipDataError(ContainerTaskError):
2442
msg_template = (
2543
"The service {service_key}:{service_version} {input} uses a file-to-key {file_to_key_map} map but receives zip data instead. "
2644
"TIP: either pass a single file or zip file and remove the file-to-key map parameter."

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@
1515
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
1616
from dask_task_models_library.container_tasks.errors import (
1717
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
18+
ServiceOutOfMemoryError,
1819
ServiceRuntimeError,
20+
ServiceTimeoutLoggingError,
1921
)
2022
from dask_task_models_library.container_tasks.io import FileUrl, TaskOutputData
2123
from dask_task_models_library.container_tasks.protocol import ContainerTaskParameters
2224
from models_library.progress_bar import ProgressReport
2325
from packaging import version
24-
from pydantic import ValidationError
26+
from pydantic import ByteSize, TypeAdapter, ValidationError
2527
from pydantic.networks import AnyUrl
26-
from servicelib.logging_utils import LogLevelInt, LogMessageStr
28+
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch
2729
from servicelib.progress_bar import ProgressBarData
2830
from settings_library.s3 import S3Settings
2931
from yarl import URL
@@ -263,31 +265,55 @@ async def run(self, command: list[str]) -> TaskOutputData:
263265
):
264266
await container.start()
265267
await self._publish_sidecar_log(
266-
f"Container started as '{container.id}' on {socket.gethostname()}..."
268+
f"Service {self.task_parameters.image}:{self.task_parameters.tag} started as '{container.id}' on {socket.gethostname()}..."
267269
)
268270
# wait until the container finished, either success or fail or timeout
269271
while (container_data := await container.show())["State"]["Running"]:
270272
await asyncio.sleep(CONTAINER_WAIT_TIME_SECS)
273+
274+
async def _safe_get_last_logs() -> list[str]:
275+
with log_catch(_logger, reraise=False):
276+
last_logs = await cast(
277+
Coroutine,
278+
container.log(
279+
stdout=True, stderr=True, tail=20, follow=False
280+
),
281+
)
282+
assert isinstance(last_logs, list) # nosec
283+
return last_logs
284+
return ["Unexpected error: Could not retrieve logs."]
285+
286+
# Check for OOMKilled
287+
if container_data["State"].get("OOMKilled", False):
288+
raise ServiceOutOfMemoryError(
289+
service_key=self.task_parameters.image,
290+
service_version=self.task_parameters.tag,
291+
service_resources=TypeAdapter(ByteSize)
292+
.validate_python(self.task_max_resources.get("RAM", 0))
293+
.human_readable(),
294+
container_id=container.id,
295+
service_logs=await _safe_get_last_logs(),
296+
)
297+
271298
if container_data["State"]["ExitCode"] > os.EX_OK:
272299
raise ServiceRuntimeError(
273300
service_key=self.task_parameters.image,
274301
service_version=self.task_parameters.tag,
275302
container_id=container.id,
276303
exit_code=container_data["State"]["ExitCode"],
277-
service_logs=await cast(
278-
Coroutine,
279-
container.log(
280-
stdout=True, stderr=True, tail=20, follow=False
281-
),
282-
),
304+
service_logs=await _safe_get_last_logs(),
283305
)
284-
await self._publish_sidecar_log("Container ran successfully.")
306+
await self._publish_sidecar_log(
307+
f"Service {self.task_parameters.image}:{self.task_parameters.tag} completed successfully."
308+
)
285309

286310
# POST-PROCESSING (1 step weighted 5%)
287311
results = await self._retrieve_output_data(
288312
task_volumes, image_labels.get_integration_version()
289313
)
290-
await self._publish_sidecar_log("Task completed successfully.")
314+
await self._publish_sidecar_log(
315+
f"Uploaded output data of {self.task_parameters.image}:{self.task_parameters.tag} successfully."
316+
)
291317
return results
292318

293319
async def __aenter__(self) -> "ComputationalSidecar":
@@ -302,11 +328,21 @@ async def __aexit__(
302328
tb: TracebackType | None,
303329
) -> None:
304330
if exc:
305-
await self._publish_sidecar_log(
306-
f"Task error:\n{exc}", log_level=logging.ERROR
307-
)
308-
await self._publish_sidecar_log(
309-
"TIP: There might be more information in the service log file in the service outputs",
310-
)
331+
if isinstance(exc, asyncio.CancelledError):
332+
# cancelled errors are not logged as errors
333+
await self._publish_sidecar_log("Service was cancelled.")
334+
elif isinstance(exc, ServiceTimeoutLoggingError | ServiceOutOfMemoryError):
335+
await self._publish_sidecar_log(f"{exc}", log_level=logging.ERROR)
336+
await self._publish_sidecar_log(
337+
"TIP: There might be more information in the service log to help debug the service issue.",
338+
)
339+
340+
else:
341+
await self._publish_sidecar_log(
342+
f"Service error: {exc}", log_level=logging.ERROR
343+
)
344+
await self._publish_sidecar_log(
345+
"TIP: There might be more information in the service log to help debug the service issue.",
346+
)
311347
# ensure we pass the final progress
312348
self.task_publishers.publish_progress(ProgressReport(actual_value=1))

0 commit comments

Comments
 (0)