Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,110 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper {
bucketToFilesGrouping.forall(p => p._2.length <= 1)
}
}

protected def filePartitions: Seq[FilePartition] = {
if (bucketedScan) {
getBucketedReadFilePartitions(relation.bucketSpec.get, dynamicallySelectedPartitions)
} else {
getReadFilePartitions(dynamicallySelectedPartitions)
}
}

/**
* Generate file partitions for bucketed reads.
* The non-bucketed variant of this function is [[getReadFilePartitions]].
*
* The algorithm is pretty simple: file partitions being returned should include all the files
* with the same bucket id from all the given Hive partitions.
*
* @param bucketSpec the bucketing spec.
* @param selectedPartitions Hive-style partition that are part of the read.
*/
private def getBucketedReadFilePartitions(
bucketSpec: BucketSpec,
selectedPartitions: ScanFileListing): Seq[FilePartition] = {
logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
val partitionArray = selectedPartitions.toPartitionArray
val filesGroupedToBuckets = partitionArray.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
.getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
}

val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
f => bucketSet.get(f._1)
}
} else {
filesGroupedToBuckets
}

optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets")
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
Seq.tabulate(numCoalescedBuckets) { bucketId =>
val partitionedFiles = coalescedBuckets.get(bucketId).map {
_.values.flatten.toArray
}.getOrElse(Array.empty)
FilePartition(bucketId, partitionedFiles)
}
}.getOrElse {
Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
}
}

/**
* Generate file partitions for non-bucketed reads.
* The bucketed variant of this function is [[getBucketedReadFilePartitions]].
*
* @param selectedPartitions Hive-style partition that are part of the read.
*/
private def getReadFilePartitions(
selectedPartitions: ScanFileListing): Seq[FilePartition] = {
val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes
val maxSplitBytes =
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " +
log"bytes.")

// Filter files with bucket pruning if possible
val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled
val shouldProcess: Path => Boolean = optionalBucketSet match {
case Some(bucketSet) if bucketingEnabled =>
// Do not prune the file if bucket file name is invalid
filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
case _ =>
_ => true
}

val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition =>
val ListingPartition(partitionVals, _, fileStatusIterator) = partition
fileStatusIterator.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionVals
)
} else {
Seq.empty
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

FilePartition
.getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes)
}
}

/**
Expand Down Expand Up @@ -727,11 +831,10 @@ case class FileSourceScanExec(
options = options,
hadoopConf = getHadoopConf(relation.sparkSession, relation.options))

val readRDD = if (bucketedScan) {
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions)
} else {
createReadRDD(readFile, dynamicallySelectedPartitions)
}
val readRDD = new FileScanRDD(relation.sparkSession, readFile, filePartitions,
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
sendDriverMetrics()
readRDD
}
Expand Down Expand Up @@ -786,115 +889,7 @@ case class FileSourceScanExec(

override val nodeNamePrefix: String = "File"

/**
* Create an RDD for bucketed reads.
* The non-bucketed variant of this function is [[createReadRDD]].
*
* The algorithm is pretty simple: each RDD partition being returned should include all the files
* with the same bucket id from all the given Hive partitions.
*
* @param bucketSpec the bucketing spec.
* @param readFile a function to read each (part of a) file.
* @param selectedPartitions Hive-style partition that are part of the read.
*/
private def createBucketedReadRDD(
bucketSpec: BucketSpec,
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: ScanFileListing): RDD[InternalRow] = {
logInfo(log"Planning with ${MDC(COUNT, bucketSpec.numBuckets)} buckets")
val partitionArray = selectedPartitions.toPartitionArray
val filesGroupedToBuckets = partitionArray.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
.getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
}

val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
f => bucketSet.get(f._1)
}
} else {
filesGroupedToBuckets
}

val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets =>
logInfo(log"Coalescing to ${MDC(COUNT, numCoalescedBuckets)} buckets")
val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % numCoalescedBuckets)
Seq.tabulate(numCoalescedBuckets) { bucketId =>
val partitionedFiles = coalescedBuckets.get(bucketId).map {
_.values.flatten.toArray
}.getOrElse(Array.empty)
FilePartition(bucketId, partitionedFiles)
}
}.getOrElse {
Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
}

new FileScanRDD(relation.sparkSession, readFile, filePartitions,
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
}

/**
* Create an RDD for non-bucketed reads.
* The bucketed variant of this function is [[createBucketedReadRDD]].
*
* @param readFile a function to read each (part of a) file.
* @param selectedPartitions Hive-style partition that are part of the read.
*/
private def createReadRDD(
readFile: PartitionedFile => Iterator[InternalRow],
selectedPartitions: ScanFileListing): RDD[InternalRow] = {
val openCostInBytes = getSqlConf(relation.sparkSession).filesOpenCostInBytes
val maxSplitBytes =
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
logInfo(log"Planning scan with bin packing, max size: ${MDC(MAX_SPLIT_BYTES, maxSplitBytes)} " +
log"bytes, open cost is considered as scanning ${MDC(OPEN_COST_IN_BYTES, openCostInBytes)} " +
log"bytes.")

// Filter files with bucket pruning if possible
val bucketingEnabled = getSqlConf(relation.sparkSession).bucketingEnabled
val shouldProcess: Path => Boolean = optionalBucketSet match {
case Some(bucketSet) if bucketingEnabled =>
// Do not prune the file if bucket file name is invalid
filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
case _ =>
_ => true
}

val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition =>
val ListingPartition(partitionVals, _, fileStatusIterator) = partition
fileStatusIterator.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionVals
)
} else {
Seq.empty
}
}
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions = FilePartition
.getFilePartitions(relation.sparkSession, splitFiles.toImmutableArraySeq, maxSplitBytes)

new FileScanRDD(relation.sparkSession, readFile, partitions,
new StructType(requiredSchema.fields ++ relation.partitionSchema.fields),
fileConstantMetadataColumns, relation.fileFormat.fileConstantMetadataExtractors,
new FileSourceOptions(CaseInsensitiveMap(relation.options)))
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
// with DynamicPruningExpression(Literal.TrueLiteral) during Physical Planning
Expand Down