-
Notifications
You must be signed in to change notification settings - Fork 31
🐛 Ensure proper Redis client shutdown in Celery #8237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
🐛 Ensure proper Redis client shutdown in Celery #8237
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #8237 +/- ##
==========================================
- Coverage 88.03% 86.70% -1.34%
==========================================
Files 1919 1449 -470
Lines 74341 59854 -14487
Branches 1305 682 -623
==========================================
- Hits 65449 51894 -13555
+ Misses 8499 7733 -766
+ Partials 393 227 -166
Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a critical issue with Celery's Redis client lifecycle management where Redis connections were not properly cleaned up, leading to resource leaks. The fix ensures proper initialization and shutdown of Redis clients throughout the application.
- Centralizes Redis client lifecycle management for Celery operations
- Refactors worker initialization to remove redundant celery_settings parameter
- Updates method names for consistency (lifespan → start_and_hold)
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
services/storage/src/simcore_service_storage/modules/celery/__init__.py |
Complete refactor to manage Redis client lifecycle with proper setup/shutdown |
services/storage/src/simcore_service_storage/core/application.py |
Updated to use new setup_celery function name and reorganized conditional logic |
packages/service-library/src/servicelib/fastapi/celery/app_server.py |
Added task_manager property and renamed lifespan method to start_and_hold |
packages/service-library/src/servicelib/celery/app_server.py |
Made task_manager abstract property and renamed lifespan method |
packages/celery-library/src/celery_library/signals.py |
Simplified worker initialization by removing redundant celery_settings parameter |
packages/celery-library/src/celery_library/common.py |
Removed create_task_manager function that was causing lifecycle issues |
services/storage/tests/conftest.py |
Updated test fixture to match simplified worker initialization |
packages/celery-library/tests/conftest.py |
Updated test fixtures with proper Redis client lifecycle management |
packages/service-library/src/servicelib/celery/models.py |
Fixed parameter name from task_context to task_filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor things
services/storage/src/simcore_service_storage/modules/celery/worker_main.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I completely get where the issue was? in the celery package tests?
Why are we not just using the real thing in the celery package tests instead of that complicated fake?
packages/service-library/src/servicelib/fastapi/celery/app_server.py
Outdated
Show resolved
Hide resolved
task_manager: TaskManager = self.app.state.task_manager | ||
return task_manager | ||
|
||
async def start_and_hold(self, startup_completed_event: threading.Event) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a lifespan and the one problem I see here is the returned type that is wrong. It should be AsyncIterator[None]
which would remove the confusion I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't yield
anything here. This is the place in which the initialized FastAPI instance stays parked waiting for the shutdown
event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the primary entrypoint for this service, i would call it run_until_shutdown
that emphasizes the lifecycle clearly (and reminds the
naming from asyncio library).
Regarding @sanderegg comment.
In other parts of the code our approach is to provide a context-manager like function that includes setup&tear-down parts in one place (see https://github.com/ITISFoundation/osparc-simcore/blob/master/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py#L31C11-L31C37).
This approach here is difference since this member function encapsulates the setup&tear-down parts AND runs it. That reduces the flexibility but I guess you do not need it here.
I understand this function also can only be called once. Therefore I would add a protection for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIP: use log_context(INFO,...)
instead of _logger.info
task_manager: TaskManager = self.app.state.task_manager | ||
return task_manager | ||
|
||
async def start_and_hold(self, startup_completed_event: threading.Event) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the primary entrypoint for this service, i would call it run_until_shutdown
that emphasizes the lifecycle clearly (and reminds the
naming from asyncio library).
Regarding @sanderegg comment.
In other parts of the code our approach is to provide a context-manager like function that includes setup&tear-down parts in one place (see https://github.com/ITISFoundation/osparc-simcore/blob/master/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py#L31C11-L31C37).
This approach here is difference since this member function encapsulates the setup&tear-down parts AND runs it. That reduces the flexibility but I guess you do not need it here.
I understand this function also can only be called once. Therefore I would add a protection for it
|
||
@abstractmethod | ||
async def lifespan( | ||
async def start_and_hold( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check my other comment about renaming this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for interfaces, plaease add some doc about what is expected, specially
when the name does not reveals all details
task_manager: TaskManager = self.app.state.task_manager | ||
return task_manager | ||
|
||
async def start_and_hold(self, startup_completed_event: threading.Event) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIP: use log_context(INFO,...)
instead of _logger.info
async def lifespan(self, startup_completed_event: threading.Event) -> None: | ||
@property | ||
def task_manager(self) -> TaskManager: | ||
task_manager = self.app.state.task_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order in which the app state is setup is very important and here I do not see how this is guaranteed. Can you please show me offline how the workflow works?
|
What do these changes do?
This PR fixes a critical issue with Celery's Redis client lifecycle management where Redis connections were not properly cleaned up, leading to resource leaks. The fix ensures proper initialization and shutdown of Redis clients throughout the application.
BONUS:
This PR fixes an issue that caused hanging tests. Now they complete in ~3m (btw. they always complete)
Related issue/s
How to test
Dev-ops