Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,42 @@
from common_library.errors_classes import OsparcErrorMixin


class TaskValueError(OsparcErrorMixin, ValueError): ...


class TaskCancelledError(OsparcErrorMixin, RuntimeError):
msg_template = "The task was cancelled"
class ContainerTaskError(OsparcErrorMixin, RuntimeError):
msg_template = (
"The service {service_key}:{service_version}"
" running in container {container_id} encountered an unexpected error: {error_message}."
)


class ServiceRuntimeError(OsparcErrorMixin, RuntimeError):
class ServiceRuntimeError(ContainerTaskError):
msg_template = (
"The service {service_key}:{service_version}"
" running in container {container_id} failed with code"
" {exit_code}. Last logs:\n{service_logs}"
)


class ServiceInputsUseFileToKeyMapButReceivesZipDataError(
OsparcErrorMixin, RuntimeError
):
class ServiceOutOfMemoryError(ServiceRuntimeError):
msg_template = (
"The service {service_key}:{service_version}"
" running in container {container_id} ran out of memory and was terminated. Current limits are {service_resources}."
" Last logs:\n{service_logs}"
)


class ServiceTimeoutLoggingError(ServiceRuntimeError):
msg_template = (
"The service {service_key}:{service_version}"
" running in container {container_id} was silent/hanging for longer than {timeout_timedelta} and was terminated. "
"TIP: The service might have an internal issue or was wrongly setup."
)


class TaskCancelledError(ContainerTaskError):
msg_template = "The task was cancelled"


class ServiceInputsUseFileToKeyMapButReceivesZipDataError(ContainerTaskError):
msg_template = (
"The service {service_key}:{service_version} {input} uses a file-to-key {file_to_key_map} map but receives zip data instead. "
"TIP: either pass a single file or zip file and remove the file-to-key map parameter."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
from dask_task_models_library.container_tasks.docker import DockerBasicAuth
from dask_task_models_library.container_tasks.errors import (
ServiceInputsUseFileToKeyMapButReceivesZipDataError,
ServiceOutOfMemoryError,
ServiceRuntimeError,
ServiceTimeoutLoggingError,
)
from dask_task_models_library.container_tasks.io import FileUrl, TaskOutputData
from dask_task_models_library.container_tasks.protocol import ContainerTaskParameters
from models_library.progress_bar import ProgressReport
from packaging import version
from pydantic import ValidationError
from pydantic import ByteSize, TypeAdapter, ValidationError
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt, LogMessageStr
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch
from servicelib.progress_bar import ProgressBarData
from settings_library.s3 import S3Settings
from yarl import URL
Expand Down Expand Up @@ -263,31 +265,55 @@ async def run(self, command: list[str]) -> TaskOutputData:
):
await container.start()
await self._publish_sidecar_log(
f"Container started as '{container.id}' on {socket.gethostname()}..."
f"Service {self.task_parameters.image}:{self.task_parameters.tag} started as '{container.id}' on {socket.gethostname()}..."
)
# wait until the container finished, either success or fail or timeout
while (container_data := await container.show())["State"]["Running"]:
await asyncio.sleep(CONTAINER_WAIT_TIME_SECS)

async def _safe_get_last_logs() -> list[str]:
with log_catch(_logger, reraise=False):
last_logs = await cast(
Coroutine,
container.log(
stdout=True, stderr=True, tail=20, follow=False
),
)
assert isinstance(last_logs, list) # nosec
return last_logs
return ["Unexpected error: Could not retrieve logs."]

# Check for OOMKilled
if container_data["State"].get("OOMKilled", False):
raise ServiceOutOfMemoryError(
service_key=self.task_parameters.image,
service_version=self.task_parameters.tag,
service_resources=TypeAdapter(ByteSize)
.validate_python(self.task_max_resources.get("RAM", 0))
.human_readable(),
container_id=container.id,
service_logs=await _safe_get_last_logs(),
)

if container_data["State"]["ExitCode"] > os.EX_OK:
raise ServiceRuntimeError(
service_key=self.task_parameters.image,
service_version=self.task_parameters.tag,
container_id=container.id,
exit_code=container_data["State"]["ExitCode"],
service_logs=await cast(
Coroutine,
container.log(
stdout=True, stderr=True, tail=20, follow=False
),
),
service_logs=await _safe_get_last_logs(),
)
await self._publish_sidecar_log("Container ran successfully.")
await self._publish_sidecar_log(
f"Service {self.task_parameters.image}:{self.task_parameters.tag} completed successfully."
)

# POST-PROCESSING (1 step weighted 5%)
results = await self._retrieve_output_data(
task_volumes, image_labels.get_integration_version()
)
await self._publish_sidecar_log("Task completed successfully.")
await self._publish_sidecar_log(
f"Uploaded output data of {self.task_parameters.image}:{self.task_parameters.tag} successfully."
)
return results

async def __aenter__(self) -> "ComputationalSidecar":
Expand All @@ -302,11 +328,21 @@ async def __aexit__(
tb: TracebackType | None,
) -> None:
if exc:
await self._publish_sidecar_log(
f"Task error:\n{exc}", log_level=logging.ERROR
)
await self._publish_sidecar_log(
"TIP: There might be more information in the service log file in the service outputs",
)
if isinstance(exc, asyncio.CancelledError):
# cancelled errors are not logged as errors
await self._publish_sidecar_log("Service was cancelled.")
elif isinstance(exc, ServiceTimeoutLoggingError | ServiceOutOfMemoryError):
await self._publish_sidecar_log(f"{exc}", log_level=logging.ERROR)
await self._publish_sidecar_log(
"TIP: There might be more information in the service log to help debug the service issue.",
)

else:
await self._publish_sidecar_log(
f"Service error: {exc}", log_level=logging.ERROR
)
await self._publish_sidecar_log(
"TIP: There might be more information in the service log to help debug the service issue.",
)
# ensure we pass the final progress
self.task_publishers.publish_progress(ProgressReport(actual_value=1))
Loading
Loading