From 31740450bf34eacb95da40fb82957a55a44e7d55 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 10:51:32 +0200 Subject: [PATCH 01/36] feat: move redis client lifecycle to app server's one --- .../src/celery_library/signals.py | 10 +---- .../src/servicelib/celery/models.py | 2 +- .../core/application.py | 11 ++--- .../modules/celery/__init__.py | 43 ++++++++++++++++--- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index dd5bf047e65..bcaf1605527 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -8,7 +8,6 @@ from servicelib.logging_utils import log_context from settings_library.celery import CelerySettings -from .common import create_task_manager from .utils import get_app_server, set_app_server _logger = logging.getLogger(__name__) @@ -26,20 +25,15 @@ def _init(startup_complete_event: threading.Event) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - async def _setup_task_manager(): + async def _setup(): assert sender.app # nosec assert isinstance(sender.app, Celery) # nosec - app_server.task_manager = await create_task_manager( - sender.app, - celery_settings, - ) - set_app_server(sender.app, app_server) app_server.event_loop = loop - loop.run_until_complete(_setup_task_manager()) + loop.run_until_complete(_setup()) loop.run_until_complete(app_server.lifespan(startup_complete_event)) thread = threading.Thread( diff --git a/packages/service-library/src/servicelib/celery/models.py b/packages/service-library/src/servicelib/celery/models.py index 40756553377..3c39de96a86 100644 --- a/packages/service-library/src/servicelib/celery/models.py +++ b/packages/service-library/src/servicelib/celery/models.py @@ -58,7 +58,7 @@ async def get_task_metadata(self, task_id: TaskID) -> TaskMetadata | None: ... async def get_task_progress(self, task_id: TaskID) -> ProgressReport | None: ... - async def list_tasks(self, task_context: TaskFilter) -> list[Task]: ... + async def list_tasks(self, task_filter: TaskFilter) -> list[Task]: ... async def remove_task(self, task_id: TaskID) -> None: ... diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index ebe84c5643f..fc00a511c0f 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -36,7 +36,7 @@ from ..dsm import setup_dsm from ..dsm_cleaner import setup_dsm_cleaner from ..exceptions.handlers import set_exception_handlers -from ..modules.celery import setup_task_manager +from ..modules.celery import setup_celery from ..modules.db import setup_db from ..modules.long_running_tasks import setup_rest_api_long_running_tasks_for_uploads from ..modules.rabbitmq import setup as setup_rabbitmq @@ -71,12 +71,13 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901 setup_s3(app) setup_client_session(app, tracing_settings=settings.STORAGE_TRACING) - if settings.STORAGE_CELERY and not settings.STORAGE_WORKER_MODE: - setup_rabbitmq(app) + if settings.STORAGE_CELERY: + setup_celery(app, settings=settings.STORAGE_CELERY) - setup_task_manager(app, celery_settings=settings.STORAGE_CELERY) + if not settings.STORAGE_WORKER_MODE: + setup_rabbitmq(app) + setup_rpc_routes(app) - setup_rpc_routes(app) setup_rest_api_long_running_tasks_for_uploads(app) setup_rest_api_routes(app, API_VTAG) set_exception_handlers(app) diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py index 684262d3f9b..f33eb18b622 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -1,4 +1,7 @@ -from celery_library.common import create_app, create_task_manager +import logging + +from celery_library.backends._redis import RedisTaskInfoStore +from celery_library.common import create_app from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types, register_pydantic_types from fastapi import FastAPI @@ -6,21 +9,47 @@ FileUploadCompletionBody, FoldersBody, ) +from servicelib.logging_utils import log_context +from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings +from settings_library.redis import RedisDatabase from ...models import FileMetaData +_logger = logging.getLogger(__name__) + -def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None: +def setup_celery(app: FastAPI, settings: CelerySettings) -> None: async def on_startup() -> None: - app.state.task_manager = await create_task_manager( - create_app(celery_settings), celery_settings - ) + with log_context(_logger, logging.INFO, "Setting up Celery"): + redis_client_sdk = RedisClientSDK( + settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="celery_tasks", + ) + app.state.celery_tasks_redis_client_sdk = redis_client_sdk + await redis_client_sdk.setup() + + app.state.task_manager = CeleryTaskManager( + create_app(settings), + settings, + RedisTaskInfoStore(redis_client_sdk), + ) + + register_celery_types() + register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) - register_celery_types() - register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) + async def on_shutdown() -> None: + with log_context(_logger, logging.INFO, "Shutting down Celery"): + redis_client_sdk: RedisClientSDK | None = ( + app.state.celery_tasks_redis_client_sdk + ) + if redis_client_sdk: + await redis_client_sdk.shutdown() app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager: From ab99e13ddf1c33012c2c5d9cf71ecc70519d8835 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 11:01:23 +0200 Subject: [PATCH 02/36] fix: remove unused param --- packages/celery-library/src/celery_library/signals.py | 2 -- packages/celery-library/tests/conftest.py | 2 +- .../src/simcore_service_storage/modules/celery/worker_main.py | 4 +--- services/storage/tests/conftest.py | 4 +--- 4 files changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index bcaf1605527..4673355efd1 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -6,7 +6,6 @@ from celery.worker.worker import WorkController # type: ignore[import-untyped] from servicelib.celery.app_server import BaseAppServer from servicelib.logging_utils import log_context -from settings_library.celery import CelerySettings from .utils import get_app_server, set_app_server @@ -15,7 +14,6 @@ def on_worker_init( app_server: BaseAppServer, - celery_settings: CelerySettings, sender: WorkController, **_kwargs, ) -> None: diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index e9fc599136a..ec7923b8229 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -102,7 +102,7 @@ async def with_celery_worker( register_celery_tasks: Callable[[Celery], None], ) -> AsyncIterator[TestWorkController]: def _on_worker_init_wrapper(sender: WorkController, **_kwargs): - return partial(on_worker_init, app_server, celery_settings)(sender, **_kwargs) + return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index 396ed37accf..62074057eab 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -34,9 +34,7 @@ def worker_init_wrapper(sender, **_kwargs): assert _settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server, _settings.STORAGE_CELERY)( - sender, **_kwargs - ) + return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(worker_init_wrapper) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index 32813640197..66bf78c04ae 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -1019,9 +1019,7 @@ async def with_storage_celery_worker( def _on_worker_init_wrapper(sender: WorkController, **_kwargs): assert app_settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server, app_settings.STORAGE_CELERY)( - sender, **_kwargs - ) + return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown) From 0d26e0c40a619152e3e33dea6f9217f60dca3a62 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 13:43:14 +0200 Subject: [PATCH 03/36] fix: fake server Redis client's lifecycle --- .../src/celery_library/common.py | 23 ---------- packages/celery-library/tests/conftest.py | 44 ++++++++++++++++--- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/packages/celery-library/src/celery_library/common.py b/packages/celery-library/src/celery_library/common.py index d50e75597c6..ef45ef4c8b9 100644 --- a/packages/celery-library/src/celery_library/common.py +++ b/packages/celery-library/src/celery_library/common.py @@ -2,13 +2,9 @@ from typing import Any from celery import Celery # type: ignore[import-untyped] -from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase -from .backends._redis import RedisTaskInfoStore -from .task_manager import CeleryTaskManager - def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]: base_config = { @@ -36,22 +32,3 @@ def create_app(settings: CelerySettings) -> Celery: ), **_celery_configure(settings), ) - - -async def create_task_manager( - app: Celery, settings: CelerySettings -) -> CeleryTaskManager: - redis_client_sdk = RedisClientSDK( - settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( - RedisDatabase.CELERY_TASKS - ), - client_name="celery_tasks", - ) - await redis_client_sdk.setup() - # GCR please address https://github.com/ITISFoundation/osparc-simcore/issues/8159 - - return CeleryTaskManager( - app, - settings, - RedisTaskInfoStore(redis_client_sdk), - ) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index ec7923b8229..46189091dfa 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -12,15 +12,16 @@ from celery.contrib.testing.worker import TestWorkController, start_worker from celery.signals import worker_init, worker_shutdown from celery.worker.worker import WorkController -from celery_library.common import create_task_manager +from celery_library.backends._redis import RedisTaskInfoStore from celery_library.signals import on_worker_init, on_worker_shutdown from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.celery.app_server import BaseAppServer +from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings -from settings_library.redis import RedisSettings +from settings_library.redis import RedisDatabase, RedisSettings pytest_plugins = [ "pytest_simcore.docker_compose", @@ -34,10 +35,30 @@ class FakeAppServer(BaseAppServer): + def __init__(self, app: Celery, settings: CelerySettings): + super().__init__(app) + self._settings = settings + async def lifespan(self, startup_completed_event: threading.Event) -> None: + redis_client_sdk = RedisClientSDK( + self._settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="pytest_celery_tasks", + ) + await redis_client_sdk.setup() + + self.task_manager = CeleryTaskManager( + self._app, + self._settings, + RedisTaskInfoStore(redis_client_sdk), + ) + startup_completed_event.set() await self.shutdown_event.wait() # wait for shutdown + await redis_client_sdk.shutdown() + @pytest.fixture def register_celery_tasks() -> Callable[[Celery], None]: @@ -74,8 +95,8 @@ def celery_settings( @pytest.fixture -def app_server() -> BaseAppServer: - return FakeAppServer(app=None) +def app_server(celery_app: Celery, celery_settings: CelerySettings) -> BaseAppServer: + return FakeAppServer(app=celery_app, settings=celery_settings) @pytest.fixture(scope="session") @@ -125,10 +146,21 @@ async def celery_task_manager( celery_app: Celery, celery_settings: CelerySettings, with_celery_worker: TestWorkController, -) -> CeleryTaskManager: +) -> AsyncIterator[CeleryTaskManager]: register_celery_types() - return await create_task_manager( + redis_client_sdk = RedisClientSDK( + celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="pytest_celery_tasks", + ) + await redis_client_sdk.setup() + + yield CeleryTaskManager( celery_app, celery_settings, + RedisTaskInfoStore(redis_client_sdk), ) + + await redis_client_sdk.shutdown() From f2b84eb2527a324aa134d0b8a91464609b6b3d96 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 13:52:32 +0200 Subject: [PATCH 04/36] fix: typecheck --- packages/celery-library/tests/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 46189091dfa..63923f749e7 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -38,6 +38,7 @@ class FakeAppServer(BaseAppServer): def __init__(self, app: Celery, settings: CelerySettings): super().__init__(app) self._settings = settings + self.task_manager: CeleryTaskManager | None = None async def lifespan(self, startup_completed_event: threading.Event) -> None: redis_client_sdk = RedisClientSDK( From 064f8ac0a9cafed14f868b570f4dea4250b32950 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 14:03:13 +0200 Subject: [PATCH 05/36] fix: remove unuseful setup --- packages/celery-library/src/celery_library/signals.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index 4673355efd1..7d1e16fa427 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -23,15 +23,13 @@ def _init(startup_complete_event: threading.Event) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - async def _setup(): - assert sender.app # nosec - assert isinstance(sender.app, Celery) # nosec + assert sender.app # nosec + assert isinstance(sender.app, Celery) # nosec - set_app_server(sender.app, app_server) + set_app_server(sender.app, app_server) app_server.event_loop = loop - loop.run_until_complete(_setup()) loop.run_until_complete(app_server.lifespan(startup_complete_event)) thread = threading.Thread( From f8290e8e3484aa5fbbf59c90314052f36a9bd355 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 14:10:16 +0200 Subject: [PATCH 06/36] fix: celery task manager fixture --- packages/celery-library/tests/conftest.py | 29 ++++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 63923f749e7..188ebfb81b3 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -150,18 +150,19 @@ async def celery_task_manager( ) -> AsyncIterator[CeleryTaskManager]: register_celery_types() - redis_client_sdk = RedisClientSDK( - celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( - RedisDatabase.CELERY_TASKS - ), - client_name="pytest_celery_tasks", - ) - await redis_client_sdk.setup() - - yield CeleryTaskManager( - celery_app, - celery_settings, - RedisTaskInfoStore(redis_client_sdk), - ) + try: + redis_client_sdk = RedisClientSDK( + celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="pytest_celery_tasks", + ) + await redis_client_sdk.setup() - await redis_client_sdk.shutdown() + yield CeleryTaskManager( + celery_app, + celery_settings, + RedisTaskInfoStore(redis_client_sdk), + ) + finally: + await redis_client_sdk.shutdown() From 8999602f1b450e60f9687b8c13a28be12855de38 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 14:51:30 +0200 Subject: [PATCH 07/36] fix: task manager property --- packages/celery-library/tests/conftest.py | 12 +++++++++--- .../src/servicelib/celery/app_server.py | 7 ++----- .../src/servicelib/fastapi/celery/app_server.py | 5 +++++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 188ebfb81b3..3fa9daa1180 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -19,6 +19,7 @@ from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.celery.app_server import BaseAppServer +from servicelib.celery.task_manager import TaskManager from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase, RedisSettings @@ -38,7 +39,12 @@ class FakeAppServer(BaseAppServer): def __init__(self, app: Celery, settings: CelerySettings): super().__init__(app) self._settings = settings - self.task_manager: CeleryTaskManager | None = None + self._task_manager: CeleryTaskManager | None = None + + @property + def task_manager(self) -> TaskManager: + assert self._task_manager, "Task manager is not initialized" + return self._task_manager async def lifespan(self, startup_completed_event: threading.Event) -> None: redis_client_sdk = RedisClientSDK( @@ -49,7 +55,7 @@ async def lifespan(self, startup_completed_event: threading.Event) -> None: ) await redis_client_sdk.setup() - self.task_manager = CeleryTaskManager( + self._task_manager = CeleryTaskManager( self._app, self._settings, RedisTaskInfoStore(redis_client_sdk), @@ -120,7 +126,6 @@ def celery_config() -> dict[str, Any]: async def with_celery_worker( celery_app: Celery, app_server: BaseAppServer, - celery_settings: CelerySettings, register_celery_tasks: Callable[[Celery], None], ) -> AsyncIterator[TestWorkController]: def _on_worker_init_wrapper(sender: WorkController, **_kwargs): @@ -146,6 +151,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): async def celery_task_manager( celery_app: Celery, celery_settings: CelerySettings, + mock_redis_socket_timeout: None, with_celery_worker: TestWorkController, ) -> AsyncIterator[CeleryTaskManager]: register_celery_types() diff --git a/packages/service-library/src/servicelib/celery/app_server.py b/packages/service-library/src/servicelib/celery/app_server.py index 9312497aa31..46b40d7a6c8 100644 --- a/packages/service-library/src/servicelib/celery/app_server.py +++ b/packages/service-library/src/servicelib/celery/app_server.py @@ -31,12 +31,9 @@ def shutdown_event(self) -> asyncio.Event: return self._shutdown_event @property + @abstractmethod def task_manager(self) -> TaskManager: - return self._task_manager - - @task_manager.setter - def task_manager(self, manager: TaskManager) -> None: - self._task_manager = manager + raise NotImplementedError @abstractmethod async def lifespan( diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index e1a1d3255ac..bfdbf67ad75 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -5,6 +5,7 @@ from asgi_lifespan import LifespanManager from fastapi import FastAPI +from servicelib.celery.task_manager import TaskManager from ...celery.app_server import BaseAppServer @@ -18,6 +19,10 @@ def __init__(self, app: FastAPI): super().__init__(app) self._lifespan_manager: LifespanManager | None = None + @property + def task_manager(self) -> TaskManager: + return self.app.state.task_manager + async def lifespan(self, startup_completed_event: threading.Event) -> None: async with LifespanManager( self.app, From dd0a684662e45ce188329ece363c0f616c4b2407 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 14:58:31 +0200 Subject: [PATCH 08/36] fix: typecheck --- .../src/servicelib/fastapi/celery/app_server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index bfdbf67ad75..5f80ca60895 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -21,7 +21,9 @@ def __init__(self, app: FastAPI): @property def task_manager(self) -> TaskManager: - return self.app.state.task_manager + assert self.app.state.task_manager, "Task manager is not initialized" # nosec + task_manager: TaskManager = self.app.state.task_manager + return task_manager async def lifespan(self, startup_completed_event: threading.Event) -> None: async with LifespanManager( From a81aafc79865a008ea63107e4412e3b3a1fb2400 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 15:08:08 +0200 Subject: [PATCH 09/36] fix: absolute import --- .../service-library/src/servicelib/fastapi/celery/app_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index 5f80ca60895..b7824d52a33 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -5,9 +5,9 @@ from asgi_lifespan import LifespanManager from fastapi import FastAPI -from servicelib.celery.task_manager import TaskManager from ...celery.app_server import BaseAppServer +from ...celery.task_manager import TaskManager _SHUTDOWN_TIMEOUT: Final[float] = datetime.timedelta(seconds=10).total_seconds() From d50dbb46db0b1c0252eb1e1d62894552ff660f59 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 16:34:22 +0200 Subject: [PATCH 10/36] tests: use in-memory Redis --- packages/celery-library/tests/conftest.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 3fa9daa1180..c7bd82a8f93 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -79,17 +79,13 @@ def _(celery_app: Celery) -> None: ... @pytest.fixture def app_environment( monkeypatch: pytest.MonkeyPatch, - redis_service: RedisSettings, + use_in_memory_redis: RedisSettings, env_devel_dict: EnvVarsDict, ) -> EnvVarsDict: return setenvs_from_dict( monkeypatch, { **env_devel_dict, - "REDIS_SECURE": redis_service.REDIS_SECURE, - "REDIS_HOST": redis_service.REDIS_HOST, - "REDIS_PORT": f"{redis_service.REDIS_PORT}", - "REDIS_PASSWORD": redis_service.REDIS_PASSWORD.get_secret_value(), }, ) @@ -151,16 +147,14 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): async def celery_task_manager( celery_app: Celery, celery_settings: CelerySettings, - mock_redis_socket_timeout: None, + use_in_memory_redis: RedisSettings, with_celery_worker: TestWorkController, ) -> AsyncIterator[CeleryTaskManager]: register_celery_types() try: redis_client_sdk = RedisClientSDK( - celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( - RedisDatabase.CELERY_TASKS - ), + use_in_memory_redis.build_redis_dsn(RedisDatabase.CELERY_TASKS), client_name="pytest_celery_tasks", ) await redis_client_sdk.setup() From 86bbe39f43b1a1e356618e176b8e060c0982757e Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Wed, 20 Aug 2025 16:37:10 +0200 Subject: [PATCH 11/36] fix: rename --- packages/celery-library/src/celery_library/signals.py | 2 +- packages/celery-library/tests/conftest.py | 2 +- packages/service-library/src/servicelib/celery/app_server.py | 2 +- .../service-library/src/servicelib/fastapi/celery/app_server.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index 7d1e16fa427..9c3de8225ff 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -30,7 +30,7 @@ def _init(startup_complete_event: threading.Event) -> None: app_server.event_loop = loop - loop.run_until_complete(app_server.lifespan(startup_complete_event)) + loop.run_until_complete(app_server.start_and_hold(startup_complete_event)) thread = threading.Thread( group=None, diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index c7bd82a8f93..78f46a63a61 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -46,7 +46,7 @@ def task_manager(self) -> TaskManager: assert self._task_manager, "Task manager is not initialized" return self._task_manager - async def lifespan(self, startup_completed_event: threading.Event) -> None: + async def start_and_hold(self, startup_completed_event: threading.Event) -> None: redis_client_sdk = RedisClientSDK( self._settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( RedisDatabase.CELERY_TASKS diff --git a/packages/service-library/src/servicelib/celery/app_server.py b/packages/service-library/src/servicelib/celery/app_server.py index 46b40d7a6c8..497a1657cac 100644 --- a/packages/service-library/src/servicelib/celery/app_server.py +++ b/packages/service-library/src/servicelib/celery/app_server.py @@ -36,7 +36,7 @@ def task_manager(self) -> TaskManager: raise NotImplementedError @abstractmethod - async def lifespan( + async def start_and_hold( self, startup_completed_event: threading.Event, ) -> None: diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index b7824d52a33..5981df3ade1 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -25,7 +25,7 @@ def task_manager(self) -> TaskManager: task_manager: TaskManager = self.app.state.task_manager return task_manager - async def lifespan(self, startup_completed_event: threading.Event) -> None: + async def start_and_hold(self, startup_completed_event: threading.Event) -> None: async with LifespanManager( self.app, startup_timeout=None, # waits for full app initialization (DB migrations, etc.) From 70bf868fe1d588805ba20dc9d5b77064a389f4f6 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 10:31:08 +0200 Subject: [PATCH 12/36] fix: shutdown --- packages/celery-library/tests/conftest.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 78f46a63a61..c28ea1c04e1 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -10,7 +10,7 @@ import pytest from celery import Celery # type: ignore[import-untyped] from celery.contrib.testing.worker import TestWorkController, start_worker -from celery.signals import worker_init, worker_shutdown +from celery.signals import worker_init from celery.worker.worker import WorkController from celery_library.backends._redis import RedisTaskInfoStore from celery_library.signals import on_worker_init, on_worker_shutdown @@ -128,7 +128,6 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) - worker_shutdown.connect(on_worker_shutdown) register_celery_tasks(celery_app) @@ -140,7 +139,14 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): perform_ping_check=False, queues="default", ) as worker: - yield worker + # Ensure worker is fully up before test continues + worker.ensure_started() + + try: + yield worker + finally: + worker_init.disconnect(_on_worker_init_wrapper) + on_worker_shutdown(worker) @pytest.fixture From 1f626a2150c4cf2f68ba176ffb18ab970d3bc6a6 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 11:11:36 +0200 Subject: [PATCH 13/36] fix: worker shutdown --- packages/celery-library/tests/conftest.py | 27 ++++++++++++++--------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index c28ea1c04e1..0e48ca99579 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -2,7 +2,9 @@ # pylint: disable=unused-argument import datetime +import logging import threading +from asyncio import wait_for from collections.abc import AsyncIterator, Callable from functools import partial from typing import Any @@ -20,6 +22,7 @@ from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.celery.app_server import BaseAppServer from servicelib.celery.task_manager import TaskManager +from servicelib.logging_utils import log_catch from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase, RedisSettings @@ -35,6 +38,9 @@ ] +_logger = logging.getLogger(__name__) + + class FakeAppServer(BaseAppServer): def __init__(self, app: Celery, settings: CelerySettings): super().__init__(app) @@ -64,7 +70,8 @@ async def start_and_hold(self, startup_completed_event: threading.Event) -> None startup_completed_event.set() await self.shutdown_event.wait() # wait for shutdown - await redis_client_sdk.shutdown() + with log_catch(_logger, reraise=False): + await wait_for(redis_client_sdk.shutdown(), timeout=5.0) @pytest.fixture @@ -79,7 +86,6 @@ def _(celery_app: Celery) -> None: ... @pytest.fixture def app_environment( monkeypatch: pytest.MonkeyPatch, - use_in_memory_redis: RedisSettings, env_devel_dict: EnvVarsDict, ) -> EnvVarsDict: return setenvs_from_dict( @@ -98,8 +104,10 @@ def celery_settings( @pytest.fixture -def app_server(celery_app: Celery, celery_settings: CelerySettings) -> BaseAppServer: - return FakeAppServer(app=celery_app, settings=celery_settings) +def app_server( + celery_session_app: Celery, celery_settings: CelerySettings +) -> BaseAppServer: + return FakeAppServer(app=celery_session_app, settings=celery_settings) @pytest.fixture(scope="session") @@ -133,20 +141,16 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): with start_worker( celery_app, - pool="threads", - concurrency=1, loglevel="info", perform_ping_check=False, queues="default", + shutdown_timeout=10.0, ) as worker: - # Ensure worker is fully up before test continues - worker.ensure_started() - try: yield worker finally: - worker_init.disconnect(_on_worker_init_wrapper) on_worker_shutdown(worker) + worker_init.disconnect(_on_worker_init_wrapper) @pytest.fixture @@ -171,4 +175,5 @@ async def celery_task_manager( RedisTaskInfoStore(redis_client_sdk), ) finally: - await redis_client_sdk.shutdown() + with log_catch(_logger, reraise=False): + await wait_for(redis_client_sdk.shutdown(), timeout=5.0) From 246d695ac3b22982cc1b7f5f0ce84954a3044d33 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 11:23:00 +0200 Subject: [PATCH 14/36] fix: use threads --- packages/celery-library/tests/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 0e48ca99579..46422b0c0c4 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -141,10 +141,11 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): with start_worker( celery_app, + concurrency=1, + pool="threads", loglevel="info", perform_ping_check=False, queues="default", - shutdown_timeout=10.0, ) as worker: try: yield worker From 41446dcac4c2555932270d716e4cde74459cf533 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 15:23:26 +0200 Subject: [PATCH 15/36] fix: explicity stop worker --- packages/celery-library/tests/conftest.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 46422b0c0c4..5e8fbfcf97e 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -104,10 +104,8 @@ def celery_settings( @pytest.fixture -def app_server( - celery_session_app: Celery, celery_settings: CelerySettings -) -> BaseAppServer: - return FakeAppServer(app=celery_session_app, settings=celery_settings) +def app_server(celery_app: Celery, celery_settings: CelerySettings) -> BaseAppServer: + return FakeAppServer(app=celery_app, settings=celery_settings) @pytest.fixture(scope="session") @@ -150,8 +148,8 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): try: yield worker finally: + worker.stop() # explicitly stop the worker on_worker_shutdown(worker) - worker_init.disconnect(_on_worker_init_wrapper) @pytest.fixture From ee523172330c3972417be578b36302c01f55804e Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 15:57:30 +0200 Subject: [PATCH 16/36] fix: shutdown --- packages/celery-library/tests/conftest.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 5e8fbfcf97e..412ae791316 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -4,7 +4,6 @@ import datetime import logging import threading -from asyncio import wait_for from collections.abc import AsyncIterator, Callable from functools import partial from typing import Any @@ -15,14 +14,13 @@ from celery.signals import worker_init from celery.worker.worker import WorkController from celery_library.backends._redis import RedisTaskInfoStore -from celery_library.signals import on_worker_init, on_worker_shutdown +from celery_library.signals import on_worker_init from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.celery.app_server import BaseAppServer from servicelib.celery.task_manager import TaskManager -from servicelib.logging_utils import log_catch from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings from settings_library.redis import RedisDatabase, RedisSettings @@ -70,8 +68,7 @@ async def start_and_hold(self, startup_completed_event: threading.Event) -> None startup_completed_event.set() await self.shutdown_event.wait() # wait for shutdown - with log_catch(_logger, reraise=False): - await wait_for(redis_client_sdk.shutdown(), timeout=5.0) + await redis_client_sdk.shutdown() @pytest.fixture @@ -145,11 +142,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): perform_ping_check=False, queues="default", ) as worker: - try: - yield worker - finally: - worker.stop() # explicitly stop the worker - on_worker_shutdown(worker) + yield worker @pytest.fixture @@ -174,5 +167,4 @@ async def celery_task_manager( RedisTaskInfoStore(redis_client_sdk), ) finally: - with log_catch(_logger, reraise=False): - await wait_for(redis_client_sdk.shutdown(), timeout=5.0) + await redis_client_sdk.shutdown() From 97ded5f9e64e90c7c3810d7c0f419332e78eb377 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 20:43:24 +0200 Subject: [PATCH 17/36] fix: raise timeout --- packages/celery-library/tests/conftest.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 412ae791316..078b39cbebf 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -10,11 +10,15 @@ import pytest from celery import Celery # type: ignore[import-untyped] -from celery.contrib.testing.worker import TestWorkController, start_worker +from celery.contrib.testing.worker import ( + TestWorkController, + start_worker, + test_worker_stopped, +) from celery.signals import worker_init from celery.worker.worker import WorkController from celery_library.backends._redis import RedisTaskInfoStore -from celery_library.signals import on_worker_init +from celery_library.signals import on_worker_init, on_worker_shutdown from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict @@ -132,15 +136,21 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): worker_init.connect(_on_worker_init_wrapper) + def _on_worker_stopped_wrapper(_: Celery, worker: WorkController, **_kwargs): + return on_worker_shutdown(sender=worker, **_kwargs) + + test_worker_stopped.connect(_on_worker_stopped_wrapper) + register_celery_tasks(celery_app) with start_worker( celery_app, concurrency=1, pool="threads", - loglevel="info", + loglevel="debug", perform_ping_check=False, queues="default", + shutdown_timeout=15.0, ) as worker: yield worker From abeab74037099cc2c952873c2d695d4ba37dac65 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 21:21:37 +0200 Subject: [PATCH 18/36] fix: force worker stop --- packages/celery-library/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 078b39cbebf..f2197eb56eb 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -150,9 +150,9 @@ def _on_worker_stopped_wrapper(_: Celery, worker: WorkController, **_kwargs): loglevel="debug", perform_ping_check=False, queues="default", - shutdown_timeout=15.0, ) as worker: yield worker + worker.stop() @pytest.fixture From 8d2c42314bfdffbd3be6541f54ff789bc2159b0a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 21:32:39 +0200 Subject: [PATCH 19/36] fix: remove rabbit --- packages/celery-library/tests/conftest.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index f2197eb56eb..83561f2ea02 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -34,7 +34,6 @@ "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", "pytest_simcore.logging", - "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", "pytest_simcore.repository_paths", ] @@ -152,7 +151,6 @@ def _on_worker_stopped_wrapper(_: Celery, worker: WorkController, **_kwargs): queues="default", ) as worker: yield worker - worker.stop() @pytest.fixture From 1bb6dad7a610bb9dbad823dcc5024555a5165f4a Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Thu, 21 Aug 2025 21:38:42 +0200 Subject: [PATCH 20/36] fix: rabbit --- packages/celery-library/tests/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 83561f2ea02..bbddced84fb 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -34,6 +34,7 @@ "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", "pytest_simcore.logging", + "pytest_simcore.rabbit_service", "pytest_simcore.redis_service", "pytest_simcore.repository_paths", ] From 9327579450634ea1c2c484f3dd05704d33dd0c2e Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 09:21:32 +0200 Subject: [PATCH 21/36] fix: use separate Celery app --- packages/celery-library/tests/conftest.py | 19 +++++++++---------- .../celery-library/tests/unit/test_tasks.py | 5 +++++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index bbddced84fb..ab77d7074f5 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -13,9 +13,8 @@ from celery.contrib.testing.worker import ( TestWorkController, start_worker, - test_worker_stopped, ) -from celery.signals import worker_init +from celery.signals import worker_init, worker_shutdown from celery.worker.worker import WorkController from celery_library.backends._redis import RedisTaskInfoStore from celery_library.signals import on_worker_init, on_worker_shutdown @@ -135,11 +134,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): return partial(on_worker_init, app_server)(sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) - - def _on_worker_stopped_wrapper(_: Celery, worker: WorkController, **_kwargs): - return on_worker_shutdown(sender=worker, **_kwargs) - - test_worker_stopped.connect(_on_worker_stopped_wrapper) + worker_shutdown.connect(on_worker_shutdown) register_celery_tasks(celery_app) @@ -154,12 +149,16 @@ def _on_worker_stopped_wrapper(_: Celery, worker: WorkController, **_kwargs): yield worker +@pytest.fixture +async def mock_celery_app(celery_config: dict[str, Any]) -> Celery: + return Celery(**celery_config) + + @pytest.fixture async def celery_task_manager( - celery_app: Celery, + mock_celery_app: Celery, celery_settings: CelerySettings, use_in_memory_redis: RedisSettings, - with_celery_worker: TestWorkController, ) -> AsyncIterator[CeleryTaskManager]: register_celery_types() @@ -171,7 +170,7 @@ async def celery_task_manager( await redis_client_sdk.setup() yield CeleryTaskManager( - celery_app, + mock_celery_app, celery_settings, RedisTaskInfoStore(redis_client_sdk), ) diff --git a/packages/celery-library/tests/unit/test_tasks.py b/packages/celery-library/tests/unit/test_tasks.py index a4edfb7540a..44c150354db 100644 --- a/packages/celery-library/tests/unit/test_tasks.py +++ b/packages/celery-library/tests/unit/test_tasks.py @@ -13,6 +13,7 @@ import pytest from celery import Celery, Task from celery.contrib.abortable import AbortableTask +from celery.worker.worker import WorkController from celery_library.errors import TransferrableCeleryError from celery_library.task import register_task from celery_library.task_manager import CeleryTaskManager @@ -92,6 +93,7 @@ def _(celery_app: Celery) -> None: async def test_submitting_task_calling_async_function_results_with_success_state( celery_task_manager: CeleryTaskManager, + with_celery_worker: WorkController, ): task_filter = TaskFilter(user_id=42) @@ -122,6 +124,7 @@ async def test_submitting_task_calling_async_function_results_with_success_state async def test_submitting_task_with_failure_results_with_error( celery_task_manager: CeleryTaskManager, + with_celery_worker: WorkController, ): task_filter = TaskFilter(user_id=42) @@ -150,6 +153,7 @@ async def test_submitting_task_with_failure_results_with_error( async def test_cancelling_a_running_task_aborts_and_deletes( celery_task_manager: CeleryTaskManager, + with_celery_worker: WorkController, ): task_filter = TaskFilter(user_id=42) @@ -182,6 +186,7 @@ async def test_cancelling_a_running_task_aborts_and_deletes( async def test_listing_task_uuids_contains_submitted_task( celery_task_manager: CeleryTaskManager, + with_celery_worker: WorkController, ): task_filter = TaskFilter(user_id=42) From 5198a3754f203ea8dffb4a36d47444a8b09d5d58 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 12:30:50 +0200 Subject: [PATCH 22/36] fix: add wait after submit --- packages/celery-library/tests/unit/test_async_jobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/celery-library/tests/unit/test_async_jobs.py b/packages/celery-library/tests/unit/test_async_jobs.py index 4a646a1fdb4..47b4d37cfa1 100644 --- a/packages/celery-library/tests/unit/test_async_jobs.py +++ b/packages/celery-library/tests/unit/test_async_jobs.py @@ -301,6 +301,8 @@ async def test_async_jobs_cancel( payload=60 * 10, # test hangs if not cancelled properly ) + await asyncio.sleep(3) + await async_jobs.cancel( async_jobs_rabbitmq_rpc_client, rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, From f223f79f86eb1c63e5930e10ae0a8cb1f24ecaba Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 12:54:03 +0200 Subject: [PATCH 23/36] fix: test indent --- packages/celery-library/tests/unit/test_tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/celery-library/tests/unit/test_tasks.py b/packages/celery-library/tests/unit/test_tasks.py index 44c150354db..168626745b7 100644 --- a/packages/celery-library/tests/unit/test_tasks.py +++ b/packages/celery-library/tests/unit/test_tasks.py @@ -206,5 +206,5 @@ async def test_listing_task_uuids_contains_submitted_task( tasks = await celery_task_manager.list_tasks(task_filter) assert any(task.uuid == task_uuid for task in tasks) - tasks = await celery_task_manager.list_tasks(task_filter) - assert any(task.uuid == task_uuid for task in tasks) + tasks = await celery_task_manager.list_tasks(task_filter) + assert any(task.uuid == task_uuid for task in tasks) From f3476988c0af08ca714e56a9a2e6a3b52a0b04cc Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 13:02:49 +0200 Subject: [PATCH 24/36] fix: loglevel --- packages/celery-library/tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index ab77d7074f5..8e50ed8c4de 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -142,7 +142,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs): celery_app, concurrency=1, pool="threads", - loglevel="debug", + loglevel="info", perform_ping_check=False, queues="default", ) as worker: From 585dc9f95c9869090a5af4c19f563c1b02e54205 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 13:07:20 +0200 Subject: [PATCH 25/36] fix: wait --- packages/celery-library/tests/unit/test_async_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/celery-library/tests/unit/test_async_jobs.py b/packages/celery-library/tests/unit/test_async_jobs.py index 47b4d37cfa1..7ddf3df79ec 100644 --- a/packages/celery-library/tests/unit/test_async_jobs.py +++ b/packages/celery-library/tests/unit/test_async_jobs.py @@ -301,7 +301,7 @@ async def test_async_jobs_cancel( payload=60 * 10, # test hangs if not cancelled properly ) - await asyncio.sleep(3) + await asyncio.sleep(3.0) # wait a bit before cancelling await async_jobs.cancel( async_jobs_rabbitmq_rpc_client, From ceae3c31d9fefc9b33134d09d90598af4216006d Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 13:37:08 +0200 Subject: [PATCH 26/36] fix: remove partials --- .../simcore_service_storage/modules/celery/worker_main.py | 5 +---- services/storage/tests/conftest.py | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index 62074057eab..da00bbdf2c2 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -1,7 +1,4 @@ """Main application to be deployed in for example uvicorn.""" - -from functools import partial - from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped] from celery_library.common import create_app as create_celery_app from celery_library.signals import ( @@ -34,7 +31,7 @@ def worker_init_wrapper(sender, **_kwargs): assert _settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server)(sender, **_kwargs) + return on_worker_init(sender, app_server, **_kwargs) worker_init.connect(worker_init_wrapper) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index 66bf78c04ae..f90299efcd5 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -12,7 +12,6 @@ import random import sys from collections.abc import AsyncIterator, Awaitable, Callable -from functools import partial from pathlib import Path from typing import Any, Final, cast @@ -1019,7 +1018,7 @@ async def with_storage_celery_worker( def _on_worker_init_wrapper(sender: WorkController, **_kwargs): assert app_settings.STORAGE_CELERY # nosec - return partial(on_worker_init, app_server)(sender, **_kwargs) + return on_worker_init(sender, app_server, **_kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown) From 81161e906c247eed6680af819dff43651042e4ed Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 13:44:02 +0200 Subject: [PATCH 27/36] fix: remove unused asserts --- .../src/simcore_service_storage/modules/celery/worker_main.py | 1 - services/storage/tests/conftest.py | 1 - 2 files changed, 2 deletions(-) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index da00bbdf2c2..0cfde382889 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -30,7 +30,6 @@ def worker_init_wrapper(sender, **_kwargs): - assert _settings.STORAGE_CELERY # nosec return on_worker_init(sender, app_server, **_kwargs) diff --git a/services/storage/tests/conftest.py b/services/storage/tests/conftest.py index f90299efcd5..802c3fab387 100644 --- a/services/storage/tests/conftest.py +++ b/services/storage/tests/conftest.py @@ -1017,7 +1017,6 @@ async def with_storage_celery_worker( app_server = FastAPIAppServer(app=create_app(app_settings)) def _on_worker_init_wrapper(sender: WorkController, **_kwargs): - assert app_settings.STORAGE_CELERY # nosec return on_worker_init(sender, app_server, **_kwargs) worker_init.connect(_on_worker_init_wrapper) From 1e02fe7c3e46adeaa7fa91aff8d1fb88a77f6463 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 14:02:48 +0200 Subject: [PATCH 28/36] fix: assert --- .../service-library/src/servicelib/celery/task_manager.py | 3 ++- .../src/servicelib/fastapi/celery/app_server.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/service-library/src/servicelib/celery/task_manager.py b/packages/service-library/src/servicelib/celery/task_manager.py index 93612e6845f..de69a77af10 100644 --- a/packages/service-library/src/servicelib/celery/task_manager.py +++ b/packages/service-library/src/servicelib/celery/task_manager.py @@ -1,4 +1,4 @@ -from typing import Any, Protocol +from typing import Any, Protocol, runtime_checkable from models_library.progress_bar import ProgressReport @@ -12,6 +12,7 @@ ) +@runtime_checkable class TaskManager(Protocol): async def submit_task( self, task_metadata: TaskMetadata, *, task_filter: TaskFilter, **task_param diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index 5981df3ade1..2ef3c4da986 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -21,8 +21,9 @@ def __init__(self, app: FastAPI): @property def task_manager(self) -> TaskManager: - assert self.app.state.task_manager, "Task manager is not initialized" # nosec - task_manager: TaskManager = self.app.state.task_manager + task_manager = self.app.state.task_manager + assert task_manager, "Task manager is not initialized" # nosec + assert isinstance(task_manager, TaskManager) return task_manager async def start_and_hold(self, startup_completed_event: threading.Event) -> None: From ad4cc8c2c838013bc2f342828d79eed59ad0f07d Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Fri, 22 Aug 2025 14:06:42 +0200 Subject: [PATCH 29/36] fix: remove partial --- packages/celery-library/tests/conftest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 8e50ed8c4de..5bbcfce7b0d 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -5,7 +5,6 @@ import logging import threading from collections.abc import AsyncIterator, Callable -from functools import partial from typing import Any import pytest @@ -131,7 +130,7 @@ async def with_celery_worker( register_celery_tasks: Callable[[Celery], None], ) -> AsyncIterator[TestWorkController]: def _on_worker_init_wrapper(sender: WorkController, **_kwargs): - return partial(on_worker_init, app_server)(sender, **_kwargs) + return on_worker_init(app_server, sender, **_kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown) From 46f50346ecfb990cc9a5dca201e1f7d1d2adb864 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 26 Aug 2025 09:10:27 +0200 Subject: [PATCH 30/36] remove unused --- .../src/servicelib/fastapi/celery/app_server.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index 2ef3c4da986..eeb8a148150 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -15,10 +15,6 @@ class FastAPIAppServer(BaseAppServer[FastAPI]): - def __init__(self, app: FastAPI): - super().__init__(app) - self._lifespan_manager: LifespanManager | None = None - @property def task_manager(self) -> TaskManager: task_manager = self.app.state.task_manager From fec6e98053916f46a710f87e9d6a72d710765121 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 26 Aug 2025 09:28:18 +0200 Subject: [PATCH 31/36] rename --- packages/celery-library/src/celery_library/signals.py | 2 +- packages/celery-library/tests/conftest.py | 4 +++- packages/service-library/src/servicelib/celery/app_server.py | 2 +- .../src/servicelib/fastapi/celery/app_server.py | 4 +++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index 9c3de8225ff..071e66c84fb 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -30,7 +30,7 @@ def _init(startup_complete_event: threading.Event) -> None: app_server.event_loop = loop - loop.run_until_complete(app_server.start_and_hold(startup_complete_event)) + loop.run_until_complete(app_server.run_until_shutdown(startup_complete_event)) thread = threading.Thread( group=None, diff --git a/packages/celery-library/tests/conftest.py b/packages/celery-library/tests/conftest.py index 5bbcfce7b0d..a3ce410f90f 100644 --- a/packages/celery-library/tests/conftest.py +++ b/packages/celery-library/tests/conftest.py @@ -52,7 +52,9 @@ def task_manager(self) -> TaskManager: assert self._task_manager, "Task manager is not initialized" return self._task_manager - async def start_and_hold(self, startup_completed_event: threading.Event) -> None: + async def run_until_shutdown( + self, startup_completed_event: threading.Event + ) -> None: redis_client_sdk = RedisClientSDK( self._settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( RedisDatabase.CELERY_TASKS diff --git a/packages/service-library/src/servicelib/celery/app_server.py b/packages/service-library/src/servicelib/celery/app_server.py index 497a1657cac..5789d60c4c6 100644 --- a/packages/service-library/src/servicelib/celery/app_server.py +++ b/packages/service-library/src/servicelib/celery/app_server.py @@ -36,7 +36,7 @@ def task_manager(self) -> TaskManager: raise NotImplementedError @abstractmethod - async def start_and_hold( + async def run_until_shutdown( self, startup_completed_event: threading.Event, ) -> None: diff --git a/packages/service-library/src/servicelib/fastapi/celery/app_server.py b/packages/service-library/src/servicelib/fastapi/celery/app_server.py index eeb8a148150..3c42aa9144d 100644 --- a/packages/service-library/src/servicelib/fastapi/celery/app_server.py +++ b/packages/service-library/src/servicelib/fastapi/celery/app_server.py @@ -22,7 +22,9 @@ def task_manager(self) -> TaskManager: assert isinstance(task_manager, TaskManager) return task_manager - async def start_and_hold(self, startup_completed_event: threading.Event) -> None: + async def run_until_shutdown( + self, startup_completed_event: threading.Event + ) -> None: async with LifespanManager( self.app, startup_timeout=None, # waits for full app initialization (DB migrations, etc.) From 8125b5d9662ab048c25c2b264afc6d70c981827b Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 26 Aug 2025 13:02:54 +0200 Subject: [PATCH 32/36] fix: remove unused --- services/storage/src/simcore_service_storage/core/application.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index b0bbb8f2bcf..e5e213fe775 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -77,7 +77,6 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901 setup_rabbitmq(app) setup_rpc_routes(app) - setup_rest_api_long_running_tasks_for_uploads(app) setup_rest_api_routes(app, API_VTAG) set_exception_handlers(app) From 7618c5e3ebc5816647db1a1109d708732713f4ac Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Mon, 8 Sep 2025 14:42:08 +0200 Subject: [PATCH 33/36] fix: remove sleep --- packages/celery-library/tests/unit/test_async_jobs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/celery-library/tests/unit/test_async_jobs.py b/packages/celery-library/tests/unit/test_async_jobs.py index 7ddf3df79ec..4a646a1fdb4 100644 --- a/packages/celery-library/tests/unit/test_async_jobs.py +++ b/packages/celery-library/tests/unit/test_async_jobs.py @@ -301,8 +301,6 @@ async def test_async_jobs_cancel( payload=60 * 10, # test hangs if not cancelled properly ) - await asyncio.sleep(3.0) # wait a bit before cancelling - await async_jobs.cancel( async_jobs_rabbitmq_rpc_client, rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, From 6d68a6a3c877b2a42be729fbb5c1e7924ae2c446 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 9 Sep 2025 13:46:20 +0200 Subject: [PATCH 34/36] fix: api-server worker init --- packages/celery-library/src/celery_library/signals.py | 2 +- .../simcore_service_api_server/celery_worker/worker_main.py | 4 +--- .../simcore_service_storage/modules/celery/worker_main.py | 5 +++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/celery-library/src/celery_library/signals.py b/packages/celery-library/src/celery_library/signals.py index 071e66c84fb..02f1a56f0ec 100644 --- a/packages/celery-library/src/celery_library/signals.py +++ b/packages/celery-library/src/celery_library/signals.py @@ -13,8 +13,8 @@ def on_worker_init( - app_server: BaseAppServer, sender: WorkController, + app_server: BaseAppServer, **_kwargs, ) -> None: startup_complete_event = threading.Event() diff --git a/services/api-server/src/simcore_service_api_server/celery_worker/worker_main.py b/services/api-server/src/simcore_service_api_server/celery_worker/worker_main.py index e70b7f79112..82881b6af69 100644 --- a/services/api-server/src/simcore_service_api_server/celery_worker/worker_main.py +++ b/services/api-server/src/simcore_service_api_server/celery_worker/worker_main.py @@ -37,6 +37,4 @@ def worker_init_wrapper(sender, **_kwargs): assert _settings.API_SERVER_CELERY # nosec app_server = FastAPIAppServer(app=create_app(_settings)) - return partial(on_worker_init, app_server, _settings.API_SERVER_CELERY)( - sender, **_kwargs - ) + return partial(on_worker_init, app_server=app_server)(sender, **_kwargs) diff --git a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py index 0cfde382889..f2e90e90024 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/worker_main.py +++ b/services/storage/src/simcore_service_storage/modules/celery/worker_main.py @@ -1,4 +1,5 @@ """Main application to be deployed in for example uvicorn.""" + from celery.signals import worker_init, worker_shutdown # type: ignore[import-untyped] from celery_library.common import create_app as create_celery_app from celery_library.signals import ( @@ -29,8 +30,8 @@ app_server = FastAPIAppServer(app=create_app(_settings)) -def worker_init_wrapper(sender, **_kwargs): - return on_worker_init(sender, app_server, **_kwargs) +def worker_init_wrapper(sender, **kwargs): + return on_worker_init(sender, app_server, **kwargs) worker_init.connect(worker_init_wrapper) From 870d96cf75ab0e48d71da2fa9a6f8d0ea772d185 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 9 Sep 2025 14:02:52 +0200 Subject: [PATCH 35/36] fix: celery task manager setup --- .../clients/celery_task_manager.py | 44 ++++++++++++++++--- .../core/application.py | 4 +- .../modules/celery/__init__.py | 4 +- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py b/services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py index 0b4ac4c2f4e..cda19b46963 100644 --- a/services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py +++ b/services/api-server/src/simcore_service_api_server/clients/celery_task_manager.py @@ -1,18 +1,48 @@ -from celery_library.common import create_app, create_task_manager +import logging + +from celery_library.backends._redis import RedisTaskInfoStore +from celery_library.common import create_app +from celery_library.task_manager import CeleryTaskManager from celery_library.types import register_celery_types, register_pydantic_types from fastapi import FastAPI +from servicelib.logging_utils import log_context +from servicelib.redis import RedisClientSDK from settings_library.celery import CelerySettings +from settings_library.redis import RedisDatabase from ..celery_worker.worker_tasks.tasks import pydantic_types_to_register +_logger = logging.getLogger(__name__) + -def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None: +def setup_task_manager(app: FastAPI, settings: CelerySettings) -> None: async def on_startup() -> None: - app.state.task_manager = await create_task_manager( - create_app(celery_settings), celery_settings - ) + with log_context(_logger, logging.INFO, "Setting up Celery"): + redis_client_sdk = RedisClientSDK( + settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( + RedisDatabase.CELERY_TASKS + ), + client_name="api_server_celery_tasks", + ) + app.state.celery_tasks_redis_client_sdk = redis_client_sdk + await redis_client_sdk.setup() + + app.state.task_manager = CeleryTaskManager( + create_app(settings), + settings, + RedisTaskInfoStore(redis_client_sdk), + ) + + register_celery_types() + register_pydantic_types(*pydantic_types_to_register) - register_celery_types() - register_pydantic_types(*pydantic_types_to_register) + async def on_shutdown() -> None: + with log_context(_logger, logging.INFO, "Shutting down Celery"): + redis_client_sdk: RedisClientSDK | None = ( + app.state.celery_tasks_redis_client_sdk + ) + if redis_client_sdk: + await redis_client_sdk.shutdown() app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) diff --git a/services/storage/src/simcore_service_storage/core/application.py b/services/storage/src/simcore_service_storage/core/application.py index e5e213fe775..430ebd3018d 100644 --- a/services/storage/src/simcore_service_storage/core/application.py +++ b/services/storage/src/simcore_service_storage/core/application.py @@ -36,7 +36,7 @@ from ..dsm import setup_dsm from ..dsm_cleaner import setup_dsm_cleaner from ..exceptions.handlers import set_exception_handlers -from ..modules.celery import setup_celery +from ..modules.celery import setup_task_manager from ..modules.db import setup_db from ..modules.rabbitmq import setup as setup_rabbitmq from ..modules.redis import setup as setup_redis @@ -71,7 +71,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901 setup_client_session(app, tracing_settings=settings.STORAGE_TRACING) if settings.STORAGE_CELERY: - setup_celery(app, settings=settings.STORAGE_CELERY) + setup_task_manager(app, settings=settings.STORAGE_CELERY) if not settings.STORAGE_WORKER_MODE: setup_rabbitmq(app) diff --git a/services/storage/src/simcore_service_storage/modules/celery/__init__.py b/services/storage/src/simcore_service_storage/modules/celery/__init__.py index f33eb18b622..e56b698e6c6 100644 --- a/services/storage/src/simcore_service_storage/modules/celery/__init__.py +++ b/services/storage/src/simcore_service_storage/modules/celery/__init__.py @@ -19,14 +19,14 @@ _logger = logging.getLogger(__name__) -def setup_celery(app: FastAPI, settings: CelerySettings) -> None: +def setup_task_manager(app: FastAPI, settings: CelerySettings) -> None: async def on_startup() -> None: with log_context(_logger, logging.INFO, "Setting up Celery"): redis_client_sdk = RedisClientSDK( settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( RedisDatabase.CELERY_TASKS ), - client_name="celery_tasks", + client_name="storage_celery_tasks", ) app.state.celery_tasks_redis_client_sdk = redis_client_sdk await redis_client_sdk.setup() From ad768bef2cad1523268f978dc3cdca935e290d33 Mon Sep 17 00:00:00 2001 From: Giancarlo Romeo Date: Tue, 9 Sep 2025 14:32:52 +0200 Subject: [PATCH 36/36] fix: tests --- .../tests/unit/api_functions/celery/conftest.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/services/api-server/tests/unit/api_functions/celery/conftest.py b/services/api-server/tests/unit/api_functions/celery/conftest.py index 993ba4b73ab..0a5c933a728 100644 --- a/services/api-server/tests/unit/api_functions/celery/conftest.py +++ b/services/api-server/tests/unit/api_functions/celery/conftest.py @@ -6,7 +6,6 @@ import datetime from collections.abc import AsyncIterator, Callable -from functools import partial from typing import Any import pytest @@ -126,11 +125,8 @@ async def with_api_server_celery_worker( app_server = FastAPIAppServer(app=create_app(app_settings)) - def _on_worker_init_wrapper(sender: WorkController, **_kwargs): - assert app_settings.API_SERVER_CELERY # nosec - return partial(on_worker_init, app_server, app_settings.API_SERVER_CELERY)( - sender, **_kwargs - ) + def _on_worker_init_wrapper(sender: WorkController, **kwargs): + return on_worker_init(sender, app_server=app_server, **kwargs) worker_init.connect(_on_worker_init_wrapper) worker_shutdown.connect(on_worker_shutdown)