Skip to content

Commit f3e41fb

Browse files
committed
fixup! feat(asset): add an Audit Log if the partition_key can not be mapped by target Dag partition_mapper
1 parent 64199a1 commit f3e41fb

File tree

3 files changed

+119
-12
lines changed

3 files changed

+119
-12
lines changed

airflow-core/src/airflow/assets/manager.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,7 @@ def _queue_dagruns(
342342
task_instance: TaskInstance | None,
343343
session: Session,
344344
) -> None:
345-
log.debug("dags to queue", dags_to_queue=dags_to_queue)
346-
345+
log.debug("Dags to queue", dags_to_queue=dags_to_queue)
347346
if not dags_to_queue:
348347
return None
349348

@@ -360,7 +359,6 @@ def _queue_dagruns(
360359
)
361360

362361
non_partitioned_dags = dags_to_queue.difference(partition_dags) # don't double process
363-
364362
if not non_partitioned_dags:
365363
return None
366364

@@ -422,20 +420,23 @@ def _queue_partitioned_dags(
422420
raise RuntimeError(f"Could not find asset for asset_id={asset_id}")
423421

424422
try:
423+
# We'll need to catch every possible exception happen when mapping partition_key.
425424
target_key = timetable.get_partition_mapper(
426425
name=asset_model.name, uri=asset_model.uri
427426
).to_downstream(partition_key)
428-
except Exception:
427+
except Exception as err:
428+
msg = (
429+
f"Could not map partition_key '{partition_key}' for asset "
430+
f"(name='{asset_model.name}', uri='{asset_model.uri}') in target Dag "
431+
f"'{target_dag.dag_id}'. This likely indicates that the partition "
432+
f"mapper in the target Dag is misconfigured or does not support this "
433+
f"partition key."
434+
)
435+
log.exception(msg)
429436
session.add(
430437
Log(
431438
event="failed to map partition_key",
432-
extra=(
433-
f"Could not map partition_key '{partition_key}' for asset "
434-
f"(name='{asset_model.name}', uri='{asset_model.uri}') in target Dag "
435-
f"'{target_dag.dag_id}'. This likely indicates that the partition "
436-
f"mapper in the target Dag is misconfigured or does not support this "
437-
f"partition key."
438-
),
439+
extra=(f"{msg}\n{type(err).__name__}: {err}"),
439440
task_instance=task_instance,
440441
)
441442
)

airflow-core/tests/unit/assets/test_manager.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444

4545
from unit.listeners import asset_listener
4646

47+
if TYPE_CHECKING:
48+
from tests_common.pytest_plugin import DagMaker
49+
50+
4751
pytestmark = pytest.mark.db_test
4852

4953

@@ -91,6 +95,38 @@ def test_register_asset_change_asset_doesnt_exist(self, mock_task_instance):
9195
mock_session.add.assert_not_called()
9296
mock_session.merge.assert_not_called()
9397

98+
@pytest.mark.need_serialized_dag
99+
def test_register_asset_change_target_partition_mapper_raise_exception(
100+
self,
101+
session: Session,
102+
dag_maker: DagMaker,
103+
):
104+
bundle_name = "testing"
105+
dag1 = DagModel(dag_id="dag1", is_stale=False, bundle_name=bundle_name)
106+
dag2 = DagModel(
107+
dag_id="dag2", is_stale=False, bundle_name=bundle_name, timetable_summary="Partitioned Asset"
108+
)
109+
session.add_all([dag1, dag2])
110+
111+
asm = AssetModel(uri="test://asset1/", name="test_asset_uri", group="asset")
112+
session.add(asm)
113+
asm.scheduled_dags = [DagScheduleAssetReference(dag_id=dag.dag_id) for dag in (dag1, dag2)]
114+
session.execute(delete(AssetDagRunQueue))
115+
session.flush()
116+
117+
asset_manager = AssetManager()
118+
asset_manager.register_asset_change(
119+
task_instance=mock_task_instance, asset=asm, session=session, partition_key="invalid"
120+
)
121+
session.flush()
122+
123+
# Ensure we've created an asset
124+
assert (
125+
session.scalar(select(func.count()).select_from(AssetEvent).where(AssetEvent.asset_id == asm.id))
126+
== 1
127+
)
128+
assert session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 2
129+
94130
@pytest.mark.usefixtures("dag_maker", "testing_dag_bundle")
95131
def test_register_asset_change(self, session, mock_task_instance):
96132
asset_manager = AssetManager()

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
from airflow.providers.standard.operators.bash import BashOperator
8585
from airflow.providers.standard.operators.empty import EmptyOperator
8686
from airflow.providers.standard.triggers.file import FileDeleteTrigger
87-
from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher, IdentityMapper, task
87+
from airflow.sdk import DAG, Asset, AssetAlias, AssetWatcher, HourlyMapper, IdentityMapper, task
8888
from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
8989
from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
9090
from airflow.serialization.definitions.dag import SerializedDAG
@@ -8784,6 +8784,76 @@ def _produce_and_register_asset_event(
87848784
return apdr
87858785

87868786

8787+
@pytest.mark.need_serialized_dag
8788+
@pytest.mark.usefixtures("clear_asset_partition_rows")
8789+
def test_partitioned_dag_run_with_invalid_mapping(
8790+
dag_maker: DagMaker,
8791+
session: Session,
8792+
):
8793+
asset_1 = Asset(name="asset-1")
8794+
with dag_maker(
8795+
dag_id="asset-event-consumer",
8796+
schedule=PartitionedAssetTimetable(
8797+
assets=asset_1,
8798+
default_partition_mapper=HourlyMapper(),
8799+
),
8800+
session=session,
8801+
):
8802+
EmptyOperator(task_id="hi")
8803+
session.commit()
8804+
8805+
runner = SchedulerJobRunner(
8806+
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
8807+
)
8808+
with dag_maker(dag_id="asset-event-producer", schedule=None, session=session) as dag:
8809+
EmptyOperator(task_id="hi", outlets=[asset_1])
8810+
8811+
partition_key = "an invalid key for HourlyMapper"
8812+
dr = dag_maker.create_dagrun(partition_key=partition_key, session=session)
8813+
[ti] = dr.get_task_instances(session=session)
8814+
session.commit()
8815+
8816+
serialized_outlets = dag.get_task("hi").outlets
8817+
TaskInstance.register_asset_changes_in_db(
8818+
ti=ti,
8819+
task_outlets=[o.asprofile() for o in serialized_outlets],
8820+
outlet_events=[],
8821+
session=session,
8822+
)
8823+
session.commit()
8824+
event = session.scalar(
8825+
select(AssetEvent).where(
8826+
AssetEvent.source_dag_id == dag.dag_id,
8827+
AssetEvent.source_run_id == dr.run_id,
8828+
)
8829+
)
8830+
assert event is not None
8831+
assert event.partition_key == partition_key
8832+
apdr = session.scalar(
8833+
select(AssetPartitionDagRun)
8834+
.join(
8835+
PartitionedAssetKeyLog,
8836+
PartitionedAssetKeyLog.asset_partition_dag_run_id == AssetPartitionDagRun.id,
8837+
)
8838+
.where(PartitionedAssetKeyLog.asset_event_id == event.id)
8839+
)
8840+
assert apdr is None
8841+
8842+
partition_dags = runner._create_dagruns_for_partitioned_asset_dags(session=session)
8843+
assert len(partition_dags) == 0
8844+
assert partition_dags == set()
8845+
8846+
audit_log = session.scalar(select(Log))
8847+
assert audit_log is not None
8848+
assert audit_log.extra == (
8849+
"Could not map partition_key 'an invalid key for HourlyMapper' "
8850+
"for asset (name='asset-1', uri='asset-1') in target Dag 'asset-event-consumer'. "
8851+
"This likely indicates that the partition mapper in the target Dag is misconfigured or "
8852+
"does not support this partition key.\n"
8853+
"ValueError: time data 'an invalid key for HourlyMapper' does not match format '%Y-%m-%dT%H:%M:%S'"
8854+
)
8855+
8856+
87878857
@pytest.mark.need_serialized_dag
87888858
@pytest.mark.usefixtures("clear_asset_partition_rows")
87898859
def test_partitioned_dag_run_with_customized_mapper(

0 commit comments

Comments
 (0)