Skip to content

Commit 722fb25

Browse files
authored
Fix Celery worker crash on macOS due to unpicklable local function (#62655)
1 parent a7b8c7f commit 722fb25

File tree

2 files changed

+22
-11
lines changed

2 files changed

+22
-11
lines changed

providers/celery/src/airflow/providers/celery/cli/celery_command.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,16 @@ def _serve_logs(skip_serve_logs: bool = False):
122122
sub_proc.terminate()
123123

124124

125+
def _bundle_cleanup_main(check_interval):
126+
"""Entry point for the stale bundle cleanup subprocess."""
127+
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
128+
129+
mgr = BundleUsageTrackingManager()
130+
while True:
131+
time.sleep(check_interval)
132+
mgr.remove_stale_bundle_versions()
133+
134+
125135
@contextmanager
126136
def _run_stale_bundle_cleanup():
127137
"""Start stale bundle cleanup sub-process."""
@@ -136,19 +146,11 @@ def _run_stale_bundle_cleanup():
136146
with suppress(BaseException):
137147
yield
138148
return
139-
from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
140149

141150
log.info("starting stale bundle cleanup process")
142151
sub_proc = None
143-
144-
def bundle_cleanup_main():
145-
mgr = BundleUsageTrackingManager()
146-
while True:
147-
time.sleep(check_interval)
148-
mgr.remove_stale_bundle_versions()
149-
150152
try:
151-
sub_proc = Process(target=bundle_cleanup_main)
153+
sub_proc = Process(target=_bundle_cleanup_main, args=(check_interval,))
152154
sub_proc.start()
153155
yield
154156
finally:

providers/celery/tests/unit/celery/cli/test_celery_command.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from airflow.configuration import conf
3434
from airflow.executors import executor_loader
3535
from airflow.providers.celery.cli import celery_command
36-
from airflow.providers.celery.cli.celery_command import _run_stale_bundle_cleanup
36+
from airflow.providers.celery.cli.celery_command import _bundle_cleanup_main, _run_stale_bundle_cleanup
3737

3838
from tests_common.test_utils.config import conf_vars
3939
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_2_PLUS
@@ -632,7 +632,16 @@ def test_stale_bundle_cleanup(mock_process):
632632
calls = mock_process.call_args_list
633633
assert len(calls) == 1
634634
actual = [x.kwargs["target"] for x in calls]
635-
assert actual[0].__name__ == "bundle_cleanup_main"
635+
assert actual[0] is _bundle_cleanup_main
636+
637+
638+
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Doesn't apply to pre-3.0")
639+
def test_bundle_cleanup_main_is_picklable():
640+
"""Regression test: _bundle_cleanup_main must be a module-level function so it can be
641+
pickled by multiprocessing on macOS (which uses 'spawn' start method)."""
642+
import pickle
643+
644+
pickle.dumps(_bundle_cleanup_main)
636645

637646

638647
class TestLoggerSetupHandler:

0 commit comments

Comments
 (0)