@@ -934,9 +934,6 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
934
934
stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
935
935
936
936
// Now check what our filter looks like
937
- // `probe_keys=2` indicates the dynamic filter was derived from two probe-side
938
- // join key columns (here `a` and `b`). It verifies the optimizer generated a
939
- // probe-side predicate that constrains both join keys from the small build side.
940
937
insta:: assert_snapshot!(
941
938
format!( "{}" , format_plan_for_test( & plan) ) ,
942
939
@r"
@@ -1245,8 +1242,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_high_cardinality() {
1245
1242
. expect ( "optimizer should finish in time" )
1246
1243
. unwrap ( ) ;
1247
1244
let formatted = format_plan_for_test ( & plan) ;
1248
- // Programmatic check: ensure some child received a dynamic filter for Inner join
1249
- assert_dynamic_filter_location_plan ( & plan, & JoinType :: Inner ) ;
1245
+ assert_contains ! ( & formatted, "DynamicFilterPhysicalExpr" ) ;
1250
1246
1251
1247
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1252
1248
@@ -1348,63 +1344,6 @@ fn assert_dynamic_filter_location(formatted: &str, join_type: &JoinType) {
1348
1344
}
1349
1345
}
1350
1346
1351
- /// Programmatic check: inspect the `HashJoinExec` children and verify which
1352
- /// side received a pushed predicate in the underlying `TestSource`.
1353
- fn assert_dynamic_filter_location_plan (
1354
- plan : & Arc < dyn ExecutionPlan > ,
1355
- join_type : & JoinType ,
1356
- ) {
1357
- // Find top-level HashJoinExec
1358
- let join = plan
1359
- . as_any ( )
1360
- . downcast_ref :: < HashJoinExec > ( )
1361
- . expect ( "expected HashJoinExec" ) ;
1362
-
1363
- // Helper to check whether a child has a pushed predicate
1364
- let child_has_predicate = |child : & Arc < dyn ExecutionPlan > | -> bool {
1365
- if let Some ( data_src) = child
1366
- . as_any ( )
1367
- . downcast_ref :: < datafusion:: datasource:: source:: DataSourceExec > (
1368
- ) {
1369
- if let Some ( ( _, test_source) ) =
1370
- data_src. downcast_to_file_source :: < util:: TestSource > ( )
1371
- {
1372
- return test_source. predicate ( ) . is_some ( ) ;
1373
- }
1374
- }
1375
- false
1376
- } ;
1377
-
1378
- match join_type {
1379
- JoinType :: Left | JoinType :: LeftSemi | JoinType :: LeftAnti | JoinType :: LeftMark => {
1380
- // For left-type joins the right side (probe) should receive the dynamic filter
1381
- assert ! (
1382
- child_has_predicate( & join. right) ,
1383
- "expected predicate on right child"
1384
- ) ;
1385
- assert ! (
1386
- !child_has_predicate( & join. left) ,
1387
- "expected no predicate on left child"
1388
- ) ;
1389
- }
1390
- JoinType :: Right
1391
- | JoinType :: RightSemi
1392
- | JoinType :: RightAnti
1393
- | JoinType :: RightMark => {
1394
- // For right-type joins the left side should receive the dynamic filter
1395
- assert ! (
1396
- child_has_predicate( & join. left) ,
1397
- "expected predicate on left child"
1398
- ) ;
1399
- assert ! (
1400
- !child_has_predicate( & join. right) ,
1401
- "expected no predicate on right child"
1402
- ) ;
1403
- }
1404
- _ => unreachable ! ( ) ,
1405
- }
1406
- }
1407
-
1408
1347
#[ rstest(
1409
1348
join_type,
1410
1349
case:: inner( JoinType :: Inner ) ,
@@ -1469,7 +1408,7 @@ async fn test_hashjoin_outer_dynamic_filter_pushdown(join_type: JoinType) {
1469
1408
. optimize ( plan, & config)
1470
1409
. unwrap ( ) ;
1471
1410
let formatted = format_plan_for_test ( & plan) ;
1472
- assert_dynamic_filter_location_plan ( & plan , & join_type) ;
1411
+ assert_dynamic_filter_location ( & formatted , & join_type) ;
1473
1412
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1474
1413
}
1475
1414
@@ -1503,8 +1442,14 @@ async fn test_hashjoin_left_dynamic_filter_pushdown_collect_left() {
1503
1442
. optimize ( plan, & config)
1504
1443
. unwrap ( ) ;
1505
1444
let formatted = format_plan_for_test ( & plan) ;
1506
- // Programmatic check: verify the dynamic filter was pushed to the expected side
1507
- assert_dynamic_filter_location_plan ( & plan, & JoinType :: Left ) ;
1445
+ assert_contains ! (
1446
+ & formatted,
1447
+ "DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr"
1448
+ ) ;
1449
+ assert_contains ! (
1450
+ & formatted,
1451
+ "DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=<none>"
1452
+ ) ;
1508
1453
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1509
1454
}
1510
1455
@@ -1524,7 +1469,7 @@ async fn test_hashjoin_semi_dynamic_filter_pushdown(join_type: JoinType) {
1524
1469
. optimize ( plan, & config)
1525
1470
. unwrap ( ) ;
1526
1471
let formatted = format_plan_for_test ( & plan) ;
1527
- assert_dynamic_filter_location_plan ( & plan , & join_type) ;
1472
+ assert_dynamic_filter_location ( & formatted , & join_type) ;
1528
1473
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1529
1474
}
1530
1475
@@ -1544,7 +1489,7 @@ async fn test_hashjoin_anti_dynamic_filter_pushdown(join_type: JoinType) {
1544
1489
. optimize ( plan, & config)
1545
1490
. unwrap ( ) ;
1546
1491
let formatted = format_plan_for_test ( & plan) ;
1547
- assert_dynamic_filter_location_plan ( & plan , & join_type) ;
1492
+ assert_dynamic_filter_location ( & formatted , & join_type) ;
1548
1493
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1549
1494
}
1550
1495
@@ -1564,7 +1509,7 @@ async fn test_hashjoin_mark_dynamic_filter_pushdown(join_type: JoinType) {
1564
1509
. optimize ( plan, & config)
1565
1510
. unwrap ( ) ;
1566
1511
let formatted = format_plan_for_test ( & plan) ;
1567
- assert_dynamic_filter_location_plan ( & plan , & join_type) ;
1512
+ assert_dynamic_filter_location ( & formatted , & join_type) ;
1568
1513
assert_contains ! ( & formatted, "probe_keys=0" ) ;
1569
1514
}
1570
1515
0 commit comments