From 85b6df0316ff5d594ab90cae807a949bb006b14a Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 2 Dec 2025 12:04:48 +0000 Subject: [PATCH] [SPARK-54566][SQL] Enhance SparkSql FileSourceScanExec for easy integration --- .../sql/execution/DataSourceScanExec.scala | 221 +++++++++--------- 1 file changed, 108 insertions(+), 113 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2488b6aa5115..a37ba4d1ecb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -666,6 +666,110 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper { bucketToFilesGrouping.forall(p => p._2.length <= 1) } } + + protected def filePartitions: Seq[FilePartition] = { + if (bucketedScan) { + getBucketedReadFilePartitions(relation.bucketSpec.get, dynamicallySelectedPartitions) + } else { + getReadFilePartitions(dynamicallySelectedPartitions) + } + } + + /** + * Generate file partitions for bucketed reads. + * The non-bucketed variant of this function is [[getReadFilePartitions]]. + * + * The algorithm is pretty simple: file partitions being returned should include all the files + * with the same bucket id from all the given Hive partitions. + * + * @param bucketSpec the bucketing spec. + * @param selectedPartitions Hive-style partition that are part of the read. + */ + private def getBucketedReadFilePartitions( + bucketSpec: BucketSpec, + selectedPartitions: ScanFileListing): Seq[FilePartition] = { + logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets") + val partitionArray = selectedPartitions.toPartitionArray + val filesGroupedToBuckets = partitionArray.groupBy { f => + BucketingUtils + .getBucketId(f.toPath.getName) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath)) + } + + val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { + val bucketSet = optionalBucketSet.get + filesGroupedToBuckets.filter { + f => bucketSet.get(f._1) + } + } else { + filesGroupedToBuckets + } + + optionalNumCoalescedBuckets.map { numCoalescedBuckets => + logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets") + val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) + Seq.tabulate(numCoalescedBuckets) { bucketId => + val partitionedFiles = coalescedBuckets.get(bucketId).map { + _.values.flatten.toArray + }.getOrElse(Array.empty) + FilePartition(bucketId, partitionedFiles) + } + }.getOrElse { + Seq.tabulate(bucketSpec.numBuckets) { bucketId => + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + } + } + } + + /** + * Generate file partitions for non-bucketed reads. + * The bucketed variant of this function is [[getBucketedReadFilePartitions]]. + * + * @param selectedPartitions Hive-style partition that are part of the read. + */ + private def getReadFilePartitions( + selectedPartitions: ScanFileListing): Seq[FilePartition] = { + val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes + val maxSplitBytes = + FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) + logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " + + log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " + + log"bytes.") + + // Filter files with bucket pruning if possible + val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled + val shouldProcess: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) if bucketingEnabled => + // Do not prune the file if bucket file name is invalid + filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) + case _ => + _ => true + } + + val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition => + val ListingPartition(partitionVals, _, fileStatusIterator) = partition + fileStatusIterator.flatMap { file => + // getPath() is very expensive so we only want to call it once in this block: + val filePath = file.getPath + if (shouldProcess(filePath)) { + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partitionVals + ) + } else { + Seq.empty + } + } + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + + FilePartition + .getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes) + } } /** @@ -727,11 +831,10 @@ case class FileSourceScanExec( options = options, hadoopConf = getHadoopConf(relation.sparkSession, relation.options)) - val readRDD = if (bucketedScan) { - createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions) - } else { - createReadRDD(readFile, dynamicallySelectedPartitions) - } + val readRDD = new FileScanRDD(relation.sparkSession, readFile, filePartitions, + new StructType(requiredSchema.fields ++ relation.partitionSchema.fields), + fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors, + new FileSourceOptions(CaseInsensitiveMap(relation.options))) sendDriverMetrics() readRDD } @@ -786,115 +889,7 @@ case class FileSourceScanExec( override val nodeNamePrefix: String = "File" - /** - * Create an RDD for bucketed reads. - * The non-bucketed variant of this function is [[createReadRDD]]. - * - * The algorithm is pretty simple: each RDD partition being returned should include all the files - * with the same bucket id from all the given Hive partitions. - * - * @param bucketSpec the bucketing spec. - * @param readFile a function to read each (part of a) file. - * @param selectedPartitions Hive-style partition that are part of the read. - */ - private def createBucketedReadRDD( - bucketSpec: BucketSpec, - readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: ScanFileListing): RDD[InternalRow] = { - logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets") - val partitionArray = selectedPartitions.toPartitionArray - val filesGroupedToBuckets = partitionArray.groupBy { f => - BucketingUtils - .getBucketId(f.toPath.getName) - .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath)) - } - - val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { - val bucketSet = optionalBucketSet.get - filesGroupedToBuckets.filter { - f => bucketSet.get(f._1) - } - } else { - filesGroupedToBuckets - } - - val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets => - logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets") - val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets) - Seq.tabulate(numCoalescedBuckets) { bucketId => - val partitionedFiles = coalescedBuckets.get(bucketId).map { - _.values.flatten.toArray - }.getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) - } - }.getOrElse { - Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) - } - } - new FileScanRDD(relation.sparkSession, readFile, filePartitions, - new StructType(requiredSchema.fields ++ relation.partitionSchema.fields), - fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors, - new FileSourceOptions(CaseInsensitiveMap(relation.options))) - } - - /** - * Create an RDD for non-bucketed reads. - * The bucketed variant of this function is [[createBucketedReadRDD]]. - * - * @param readFile a function to read each (part of a) file. - * @param selectedPartitions Hive-style partition that are part of the read. - */ - private def createReadRDD( - readFile: PartitionedFile => Iterator[InternalRow], - selectedPartitions: ScanFileListing): RDD[InternalRow] = { - val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes - val maxSplitBytes = - FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions) - logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " + - log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " + - log"bytes.") - - // Filter files with bucket pruning if possible - val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled - val shouldProcess: Path => Boolean = optionalBucketSet match { - case Some(bucketSet) if bucketingEnabled => - // Do not prune the file if bucket file name is invalid - filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) - case _ => - _ => true - } - - val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition => - val ListingPartition(partitionVals, _, fileStatusIterator) = partition - fileStatusIterator.flatMap { file => - // getPath() is very expensive so we only want to call it once in this block: - val filePath = file.getPath - if (shouldProcess(filePath)) { - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partitionVals - ) - } else { - Seq.empty - } - } - }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - - val partitions = FilePartition - .getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes) - - new FileScanRDD(relation.sparkSession, readFile, partitions, - new StructType(requiredSchema.fields ++ relation.partitionSchema.fields), - fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors, - new FileSourceOptions(CaseInsensitiveMap(relation.options))) - } // Filters unused DynamicPruningExpression expressions - one which has been replaced // with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning