-
Notifications
You must be signed in to change notification settings - Fork 407
Description
What
When Spark GC works on committed data, it calls lakeFS prepare GC commits.1 That gives a CSV file, which the driver reads. It then produces a set of all the ranges which appear in any of the commits in the CSV file.
Unfortunately this appears to happen in driver memory, using Java arrays rather Scala streaming, and no Spark in sight. That (probably) causes timeouts on a large customer repository, and will also waste memory etc.
Instead, parallelize this part in Spark as well!
Details
val ranges = commitIDs
.flatMap(commitID => {
val metaRangeURL = apiClient.getMetaRangeURL(repoName, commitID)
if (metaRangeURL == "") {
// a commit with no meta range is an empty commit.
// this only happens for the first commit in the repository.
None
} else {
val rangesReader = metarangeReaderGetter(job.getConfiguration, metaRangeURL, true)
read(rangesReader).map(rd => new Range(new String(rd.id), rd.message.estimatedSize))
}
})
.toSetis strange. commitIDs is a Java array which we get from the Hadoop configuration with the getStrings method. So flatMap is also a Java array, computed in the driver, and flattened to a set only after it is all generated.
Footnotes
-
That itself can be slow, see Synchronous prepare GC commits API causes timeouts for large repositories #9648, but that is a different issue! ↩