Skip to content

Commit c352b07

Browse files
committed
tests passing
1 parent 2b7bf6d commit c352b07

File tree

1 file changed

+72
-21
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+72
-21
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,33 +1152,62 @@ impl ListingTable {
11521152
})
11531153
}
11541154

1155-
/// Resolves the desired file ordering based on query requirements and natural ordering.
1155+
/// Resolves the desired file ordering for file arrangement purposes.
11561156
///
1157-
/// This method prioritizes query-requested ordering if it can be satisfied using statistics,
1158-
/// otherwise falls back to any natural file ordering defined in the table configuration.
1159-
fn resolve_desired_ordering(
1157+
/// This method returns the ordering to use for arranging files optimally,
1158+
/// prioritizing query-requested ordering if it can be satisfied using statistics,
1159+
/// otherwise using any configured file_sort_order.
1160+
///
1161+
/// Note: This is for file arrangement only - output ordering should be determined
1162+
/// separately based on whether file_sort_order is configured.
1163+
fn resolve_desired_file_arrangement_ordering(
11601164
&self,
11611165
requested_ordering: Option<&[SortExpr]>,
11621166
) -> Result<Option<LexOrdering>> {
1163-
// Check if query requested specific ordering that we can use
1167+
// Check if query requested specific ordering that we can use for file arrangement
11641168
if let Some(ordering) = requested_ordering {
11651169
if !ordering.is_empty() && self.can_use_ordering_from_statistics(ordering) {
11661170
return create_ordering(&self.table_schema, &[ordering.to_vec()])
11671171
.map(|orderings| orderings.first().cloned());
11681172
}
11691173
}
11701174

1171-
// Fall back to natural file ordering if any
1175+
// Fall back to file_sort_order for arrangement if configured
11721176
self.try_create_output_ordering()
11731177
.map(|orderings| orderings.first().cloned())
11741178
}
11751179

1180+
/// Gets the output orderings that can be guaranteed by the scan.
1181+
///
1182+
/// Only returns orderings if file_sort_order is explicitly configured,
1183+
/// as this represents a promise about the physical file layout.
1184+
/// Returns all configured equivalent orderings.
1185+
///
1186+
/// TODO: For formats like Parquet that store ordering metadata in the file,
1187+
/// we could read and use that information instead of requiring explicit configuration.
1188+
fn get_guaranteed_output_orderings(&self) -> Result<Option<Vec<LexOrdering>>> {
1189+
if self.options.file_sort_order.is_empty() {
1190+
// No explicit file sort order configured
1191+
Ok(None)
1192+
} else {
1193+
// Return all configured file sort orderings
1194+
self.try_create_output_ordering().map(|orderings| {
1195+
if orderings.is_empty() {
1196+
None
1197+
} else {
1198+
Some(orderings)
1199+
}
1200+
})
1201+
}
1202+
}
1203+
11761204
/// Determines the optimal file grouping and ordering strategy.
11771205
///
11781206
/// This method orchestrates the file grouping process by:
1179-
/// 1. Resolving the desired ordering (query-requested vs natural)
1180-
/// 2. Applying statistics-based splitting if enabled and available
1181-
/// 3. Returning both the file groups and any output ordering that can be guaranteed
1207+
/// 1. Resolving the desired ordering for file arrangement (query-requested vs file_sort_order)
1208+
/// 2. Applying statistics-based splitting if enabled and available
1209+
/// 3. Determining output ordering separately based on file_sort_order configuration
1210+
/// 4. Returning both the optimized file groups and any guaranteed output ordering
11821211
///
11831212
/// # Arguments
11841213
/// * `state` - The session state containing configuration options
@@ -1188,29 +1217,33 @@ impl ListingTable {
11881217
/// # Returns
11891218
/// A tuple of (file_groups, optional_output_ordering) where:
11901219
/// - file_groups: The optimized file group arrangement
1191-
/// - optional_output_ordering: Output ordering that can be guaranteed (if any)
1220+
/// - optional_output_ordering: Output ordering guaranteed by scan (only if file_sort_order configured)
11921221
fn determine_file_groups_and_ordering(
11931222
&self,
11941223
state: &dyn Session,
11951224
partitioned_file_lists: Vec<FileGroup>,
11961225
requested_ordering: Option<&[SortExpr]>,
11971226
) -> Result<(Vec<FileGroup>, Option<Vec<LexOrdering>>)> {
1198-
// 1. Determine desired ordering (query-requested vs natural)
1199-
let desired_ordering = self.resolve_desired_ordering(requested_ordering)?;
1227+
// 1. Determine desired ordering for file arrangement (query-requested vs configured)
1228+
let arrangement_ordering =
1229+
self.resolve_desired_file_arrangement_ordering(requested_ordering)?;
12001230

12011231
// 2. Check if statistics-based splitting is enabled
12021232
if !state
12031233
.config_options()
12041234
.execution
12051235
.split_file_groups_by_statistics
12061236
{
1207-
return Ok((partitioned_file_lists, desired_ordering.map(|o| vec![o])));
1237+
// No statistics-based splitting - return original groups with guaranteed ordering if any
1238+
let guaranteed_orderings = self.get_guaranteed_output_orderings()?;
1239+
return Ok((partitioned_file_lists, guaranteed_orderings));
12081240
}
12091241

1210-
// 3. Apply statistics-based splitting if we have an ordering requirement
1211-
let Some(ordering) = desired_ordering else {
1212-
// No ordering requirement, keep original groups
1213-
return Ok((partitioned_file_lists, None));
1242+
// 3. Apply statistics-based splitting if we have an arrangement ordering requirement
1243+
let Some(ordering) = arrangement_ordering else {
1244+
// No ordering requirement for arrangement, keep original groups
1245+
let guaranteed_orderings = self.get_guaranteed_output_orderings()?;
1246+
return Ok((partitioned_file_lists, guaranteed_orderings));
12141247
};
12151248

12161249
match FileScanConfig::split_groups_by_statistics_with_overlap_handling(
@@ -1220,15 +1253,31 @@ impl ListingTable {
12201253
self.options.target_partitions,
12211254
) {
12221255
Ok(FileGroupPartitioning::TotalOrder(groups)) => {
1223-
// Files don't overlap - can guarantee output ordering
1256+
// Files don't overlap and are arranged in total order
12241257
log::debug!(
12251258
"Files arranged in total order across {} partitions",
12261259
groups.len()
12271260
);
1228-
Ok((groups, Some(vec![ordering])))
1261+
1262+
// Only guarantee output ordering if file_sort_order is configured
1263+
// and the arrangement ordering matches one of the configured orderings
1264+
let guaranteed_orderings = self.get_guaranteed_output_orderings()?;
1265+
let output_orderings =
1266+
if let Some(configured_orderings) = &guaranteed_orderings {
1267+
// Check if the arrangement ordering matches any of the configured file_sort_orders
1268+
if configured_orderings.contains(&ordering) {
1269+
Some(configured_orderings.clone())
1270+
} else {
1271+
None // Arrangement was for query optimization, not scan output
1272+
}
1273+
} else {
1274+
None // No file_sort_order configured
1275+
};
1276+
1277+
Ok((groups, output_orderings))
12291278
}
12301279
Ok(FileGroupPartitioning::PartialOrder(groups)) => {
1231-
// Files overlap but are ordered within partitions
1280+
// Files overlap but are ordered within partitions - cannot guarantee total ordering
12321281
log::debug!(
12331282
"Files arranged in partial order across {} partitions",
12341283
groups.len()
@@ -1245,7 +1294,9 @@ impl ListingTable {
12451294
}
12461295
Err(e) => {
12471296
log::debug!("Failed to split file groups by statistics: {e}");
1248-
Ok((partitioned_file_lists, None))
1297+
// Fallback to original groups, but still check for guaranteed ordering
1298+
let guaranteed_orderings = self.get_guaranteed_output_orderings()?;
1299+
Ok((partitioned_file_lists, guaranteed_orderings))
12491300
}
12501301
}
12511302
}

0 commit comments

Comments
 (0)