Skip to content

Commit 85b6df0

Browse files
[SPARK-54566][SQL] Enhance SparkSql FileSourceScanExec for easy integration
1 parent 7251e95 commit 85b6df0

File tree

1 file changed

+108
-113
lines changed

1 file changed

+108
-113
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 108 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,110 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper {
666666
bucketToFilesGrouping.forall(p => p._2.length <= 1)
667667
}
668668
}
669+
670+
protected def filePartitions: Seq[FilePartition] = {
671+
if (bucketedScan) {
672+
getBucketedReadFilePartitions(relation.bucketSpec.get, dynamicallySelectedPartitions)
673+
} else {
674+
getReadFilePartitions(dynamicallySelectedPartitions)
675+
}
676+
}
677+
678+
/**
679+
* Generate file partitions for bucketed reads.
680+
* The non-bucketed variant of this function is [[getReadFilePartitions]].
681+
*
682+
* The algorithm is pretty simple: file partitions being returned should include all the files
683+
* with the same bucket id from all the given Hive partitions.
684+
*
685+
* @param bucketSpec the bucketing spec.
686+
* @param selectedPartitions Hive-style partition that are part of the read.
687+
*/
688+
private def getBucketedReadFilePartitions(
689+
bucketSpec: BucketSpec,
690+
selectedPartitions: ScanFileListing): Seq[FilePartition] = {
691+
logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
692+
val partitionArray = selectedPartitions.toPartitionArray
693+
val filesGroupedToBuckets = partitionArray.groupBy { f =>
694+
BucketingUtils
695+
.getBucketId(f.toPath.getName)
696+
.getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
697+
}
698+
699+
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
700+
val bucketSet = optionalBucketSet.get
701+
filesGroupedToBuckets.filter {
702+
f => bucketSet.get(f._1)
703+
}
704+
} else {
705+
filesGroupedToBuckets
706+
}
707+
708+
optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
709+
logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets")
710+
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
711+
Seq.tabulate(numCoalescedBuckets) { bucketId =>
712+
val partitionedFiles = coalescedBuckets.get(bucketId).map {
713+
_.values.flatten.toArray
714+
}.getOrElse(Array.empty)
715+
FilePartition(bucketId, partitionedFiles)
716+
}
717+
}.getOrElse {
718+
Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
719+
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
720+
}
721+
}
722+
}
723+
724+
/**
725+
* Generate file partitions for non-bucketed reads.
726+
* The bucketed variant of this function is [[getBucketedReadFilePartitions]].
727+
*
728+
* @param selectedPartitions Hive-style partition that are part of the read.
729+
*/
730+
private def getReadFilePartitions(
731+
selectedPartitions: ScanFileListing): Seq[FilePartition] = {
732+
val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes
733+
val maxSplitBytes =
734+
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
735+
logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
736+
log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " +
737+
log"bytes.")
738+
739+
// Filter files with bucket pruning if possible
740+
val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled
741+
val shouldProcess: Path => Boolean = optionalBucketSet match {
742+
case Some(bucketSet) if bucketingEnabled =>
743+
// Do not prune the file if bucket file name is invalid
744+
filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
745+
case _ =>
746+
_ => true
747+
}
748+
749+
val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition =>
750+
val ListingPartition(partitionVals, _, fileStatusIterator) = partition
751+
fileStatusIterator.flatMap { file =>
752+
// getPath() is very expensive so we only want to call it once in this block:
753+
val filePath = file.getPath
754+
if (shouldProcess(filePath)) {
755+
val isSplitable = relation.fileFormat.isSplitable(
756+
relation.sparkSession, relation.options, filePath)
757+
PartitionedFileUtil.splitFiles(
758+
file = file,
759+
filePath = filePath,
760+
isSplitable = isSplitable,
761+
maxSplitBytes = maxSplitBytes,
762+
partitionValues = partitionVals
763+
)
764+
} else {
765+
Seq.empty
766+
}
767+
}
768+
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
769+
770+
FilePartition
771+
.getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes)
772+
}
669773
}
670774

671775
/**
@@ -727,11 +831,10 @@ case class FileSourceScanExec(
727831
options = options,
728832
hadoopConf = getHadoopConf(relation.sparkSession, relation.options))
729833

730-
val readRDD = if (bucketedScan) {
731-
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions)
732-
} else {
733-
createReadRDD(readFile, dynamicallySelectedPartitions)
734-
}
834+
val readRDD = new FileScanRDD(relation.sparkSession, readFile, filePartitions,
835+
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
836+
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
837+
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
735838
sendDriverMetrics()
736839
readRDD
737840
}
@@ -786,115 +889,7 @@ case class FileSourceScanExec(
786889

787890
override val nodeNamePrefix: String = "File"
788891

789-
/**
790-
* Create an RDD for bucketed reads.
791-
* The non-bucketed variant of this function is [[createReadRDD]].
792-
*
793-
* The algorithm is pretty simple: each RDD partition being returned should include all the files
794-
* with the same bucket id from all the given Hive partitions.
795-
*
796-
* @param bucketSpec the bucketing spec.
797-
* @param readFile a function to read each (part of a) file.
798-
* @param selectedPartitions Hive-style partition that are part of the read.
799-
*/
800-
private def createBucketedReadRDD(
801-
bucketSpec: BucketSpec,
802-
readFile: (PartitionedFile) => Iterator[InternalRow],
803-
selectedPartitions: ScanFileListing): RDD[InternalRow] = {
804-
logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
805-
val partitionArray = selectedPartitions.toPartitionArray
806-
val filesGroupedToBuckets = partitionArray.groupBy { f =>
807-
BucketingUtils
808-
.getBucketId(f.toPath.getName)
809-
.getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
810-
}
811-
812-
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
813-
val bucketSet = optionalBucketSet.get
814-
filesGroupedToBuckets.filter {
815-
f => bucketSet.get(f._1)
816-
}
817-
} else {
818-
filesGroupedToBuckets
819-
}
820-
821-
val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
822-
logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets")
823-
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
824-
Seq.tabulate(numCoalescedBuckets) { bucketId =>
825-
val partitionedFiles = coalescedBuckets.get(bucketId).map {
826-
_.values.flatten.toArray
827-
}.getOrElse(Array.empty)
828-
FilePartition(bucketId, partitionedFiles)
829-
}
830-
}.getOrElse {
831-
Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
832-
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
833-
}
834-
}
835892

836-
new FileScanRDD(relation.sparkSession, readFile, filePartitions,
837-
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
838-
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
839-
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
840-
}
841-
842-
/**
843-
* Create an RDD for non-bucketed reads.
844-
* The bucketed variant of this function is [[createBucketedReadRDD]].
845-
*
846-
* @param readFile a function to read each (part of a) file.
847-
* @param selectedPartitions Hive-style partition that are part of the read.
848-
*/
849-
private def createReadRDD(
850-
readFile: PartitionedFile => Iterator[InternalRow],
851-
selectedPartitions: ScanFileListing): RDD[InternalRow] = {
852-
val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes
853-
val maxSplitBytes =
854-
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
855-
logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
856-
log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " +
857-
log"bytes.")
858-
859-
// Filter files with bucket pruning if possible
860-
val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled
861-
val shouldProcess: Path => Boolean = optionalBucketSet match {
862-
case Some(bucketSet) if bucketingEnabled =>
863-
// Do not prune the file if bucket file name is invalid
864-
filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
865-
case _ =>
866-
_ => true
867-
}
868-
869-
val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition =>
870-
val ListingPartition(partitionVals, _, fileStatusIterator) = partition
871-
fileStatusIterator.flatMap { file =>
872-
// getPath() is very expensive so we only want to call it once in this block:
873-
val filePath = file.getPath
874-
if (shouldProcess(filePath)) {
875-
val isSplitable = relation.fileFormat.isSplitable(
876-
relation.sparkSession, relation.options, filePath)
877-
PartitionedFileUtil.splitFiles(
878-
file = file,
879-
filePath = filePath,
880-
isSplitable = isSplitable,
881-
maxSplitBytes = maxSplitBytes,
882-
partitionValues = partitionVals
883-
)
884-
} else {
885-
Seq.empty
886-
}
887-
}
888-
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
889-
890-
val partitions = FilePartition
891-
.getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes)
892-
893-
new FileScanRDD(relation.sparkSession, readFile, partitions,
894-
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
895-
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
896-
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
897-
}
898893

899894
// Filters unused DynamicPruningExpression expressions - one which has been replaced
900895
// with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning

0 commit comments

Comments
 (0)