fix: add null checks for dag_version access in scheduler#62225
fix: add null checks for dag_version access in scheduler#62225SakshamSinghal20 wants to merge 5 commits intoapache:mainfrom
Conversation
SameerMesiah97
left a comment
There was a problem hiding this comment.
I think we may be introducing implicit semantics here by skipping TaskCallbackRequest and EmailCallbackRequest when ti.dag_version is not present. This change is not only fixing the crash, but also effectively establishing that these two behaviors will not trigger for legacy DAGs.
Is that the intended long-term behavior? If possible, would it make sense to pass some kind of “legacy” dag_version instead, rather than silently changing existing callback/email behavior for migrated tasks?
If we stick with the current implementation, I think we should include unit tests that verify the executor does not send callback requests when ti.dag_version is None, and does send them when it is present. If this can be covered in a single parameterized test, that would be ideal.
Replace silent skip-on-None guards with a proper fallback strategy
for all four TaskCallbackRequest / EmailRequest creation sites in
scheduler_job_runner.py.
Problem:
When ti.dag_version is None (tasks migrated from Airflow 2 to 3),
the scheduler crashed with AttributeError when accessing
ti.dag_version.bundle_name / bundle_version, and the previous fix
silently suppressed callbacks entirely for legacy tasks.
Fix:
Use an inline ternary fallback, mirroring the existing pattern in
dagrun.py (DagCallbackRequest):
bundle_name <- dag_version.bundle_name OR dag_model.bundle_name
bundle_version <- dag_version.bundle_version OR dag_run.bundle_version
DagModel.bundle_name is NOT NULL so the fallback is always safe.
DagRun.bundle_version is nullable (str | None), matching the type
expected by BaseCallbackRequest.
Affected sites:
1. process_executor_events - TaskCallbackRequest (externally killed)
2. process_executor_events - EmailRequest (email on failure/retry)
3. _maybe_requeue_stuck_ti - TaskCallbackRequest (stuck-in-queued)
4. _purge_task_instances_without_heartbeats - TaskCallbackRequest
Tests:
Added TestSchedulerCallbackBundleInfoDagVersionNullable to
test_scheduler_job.py with parameterized cases covering:
- dag_version present -> bundle info from dag_version
- dag_version None -> bundle info from dag_model / dag_run
- no AttributeError crash in either case
- correct precedence of dag_version over fallback values
|
Great catch, @SameerMesiah97! You were spot on about the implicit semantics dropping those alerts for legacy DAGs would have been a bad side effect. I've just pushed a new commit that fixes this. Instead of skipping the requests, the logic now safely falls back so that TaskCallbackRequest and EmailCallbackRequest are still triggered even when ti.dag_version is None. Let me know if the updated approach looks good to you! |
…ationError
The Pydantic TaskInstance datamodel requires dag_version_id to be a
strict uuid.UUID (not None). After removing the old 'skip when
dag_version is None' guard, legacy tasks migrated from Airflow 2
triggered a ValidationError when constructing TaskCallbackRequest.
Fix:
Add _ensure_ti_has_dag_version_id() helper that:
1. Returns True immediately if dag_version_id is already set
2. Backfills from DagVersion.get_latest_version() when missing
3. Returns False (skip with warning) only when no DagVersion
exists at all for the dag_id
This helper is called at all 4 callback creation sites, right before
constructing TaskCallbackRequest / EmailRequest. It ensures:
- Callbacks are sent whenever possible (backfill succeeds)
- Clean skip with warning only in edge case (no DagVersion at all)
- Pydantic validation never sees None for dag_version_id
Updated test_purge_without_heartbeat_skips_when_missing_dag_version
to verify the new backfill behavior: since dag_maker creates a
DagVersion, the backfill succeeds and the callback IS sent.
…ck_ti The 'continue' statement was inside _maybe_requeue_stuck_ti() method, which is not a loop - causing a SyntaxError at import time. This broke all CI jobs that import scheduler_job_runner. Restructured the logic to use an if-guard: when dag_version_id cannot be backfilled, we skip the callback but still let the finally block mark the task as FAILED.
|
@SameerMesiah97 could you please review it again whenever you get time? |
|
Can I know what's the status of my PR. or do i have to change anything or not.? |
uranusjr
left a comment
There was a problem hiding this comment.
The idea looks good to me. Need to fix static checks.
Fix Scheduler Crash on Nullable dag_version Access
Fixes #62198
The
TaskInstance.dag_versionfield is nullable in the data model (specifically for tasks migrated from Airflow 2 to Airflow 3). However, several locations in the SchedulerJobRunner accessedti.dag_version.bundle_nameandti.dag_version.bundle_versionwithout null checks, leading toAttributeError: 'NoneType' object has no attribute 'bundle_name'and crashing the scheduler.This PR adds null checks to three critical locations in scheduler_job_runner.py:
Approach:
When dag_version is None, the scheduler now logs a warning and skips the callback/email request, rather than crashing. This mirrors the existing defensive pattern already implemented in the heartbeat timeout handler.
Was generative AI tooling used to co-author this PR?