-
Notifications
You must be signed in to change notification settings - Fork 408
Create LakeFS commits RDD directly without using an input format #9657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Garbage collection (and all other uses) use LakeFSContext.newRDD to create
the "ranges RDD". Creating them explicitly with Spark operators means Spark
can parallelize reading all metaranges and ranges.
<h2>How much faster?</h2>
I have a small repo with many small commits. I enabled GC for it. Here are
summaries from two sample mark-only runs.
<h3>Direct RDD (this code)</h3>
Runtime: 2m33s
```json
{
"run_id": "g4uk6erfnfus73frbnqg",
"success": true,
"first_slice": "g5adr8f5pvec73cpia80",
"start_time": "2025-11-10T10:34:37.245361091Z",
"cutoff_time": "2025-11-10T04:34:37.243Z",
"num_deleted_objects": 147942
}
```
<h3>File format RDD (previous code)</h3>
Runtime: 3m52s
```json
{
"run_id": "g4uinaarakss73aoeel0",
"success": true,
"first_slice": "g5adr8f5pvec73cpia80",
"start_time": "2025-11-10T12:15:11.097697745Z",
"cutoff_time": "2025-11-10T06:15:11.096Z",
"num_deleted_objects": 147942
}
```
<h3>Summary</h3>
- The same number of objects were marked for deletion.
- The _same_ objects were marked for deletion on both.
- New code takes 0.65 the time of the old code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request refactors the LakeFSContext.newRDD method to directly process ranges using Spark RDD operations instead of using Hadoop's InputFormat API. The changes aim to simplify the data loading pipeline by bypassing the InputFormat layer.
Key changes:
- Replaces
sc.newAPIHadoopRDDwith direct RDD operations usingmapPartitions - Makes the
Rangeclass public and serializable for use across Spark operations - Introduces direct file handling and SSTableReader creation in RDD transformations
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| clients/spark/src/main/scala/io/treeverse/clients/LakeFSInputFormat.scala | Makes Range class public and serializable to support cross-partition serialization in Spark RDDs |
| clients/spark/src/main/scala/io/treeverse/clients/LakeFSContext.scala | Refactors newRDD to use direct RDD operations instead of InputFormat, processing ranges and entries through mapPartitions transformations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val localFile = File.createTempFile("lakefs.", ".range") | ||
| fs.copyToLocalFile(false, path, new Path(localFile.getAbsolutePath), true) | ||
| val companion = Entry.messageCompanion | ||
| val sstableReader = new SSTableReader(localFile.getAbsolutePath, companion, true) | ||
| // TODO(ariels): Do we need to validate that this reader is good? Assume _not_, this is | ||
| // not InputFormat code so it should have slightly nicer error reports. | ||
| sstableReader.newIterator().map((entry) => (entry.key, new WithIdentifier(entry.id, entry.message, range.id))) |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The temporary file and SSTableReader are not properly cleaned up, leading to resource leaks. The TODO comment acknowledges this issue. Similar to the pattern used in EntryRecordReader (lines 95-98 in LakeFSInputFormat.scala), you should register a task completion listener to delete the temporary file:
val localFile = File.createTempFile("lakefs.", ".range")
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => localFile.delete()))Additionally, the sstableReader should be closed when the task completes. Consider adding:
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => sstableReader.close()))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, nice one Copilot!
|
Plan before pulling:
|
All errors during closing are _logged_ but do not fail the task: these are readonly objects, so bad closes can do no more than leak (on "reasonable" systems). Flagged by **Copilot**, hurrah for verifiable actionable suggestions!
Read objects in parallel: - from directory listing; - from commits Default parallelism is no good for either of these, because it is based on # of CPUs - and we want a _lot_ more. New configuration option `lakefs.job.range_read_parallelism` configures this parallelism.
| .except(committedDF) | ||
| .except(uncommittedDF) | ||
| .cache() | ||
| .persist(StorageLevel.MEMORY_AND_DISK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the difference between persist and cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. (Linking to PySpark docs for a new version because it's easiest to find these online. But it's been like this for... ever.) Man says:
Persist this RDD with the default storage level (
MEMORY_ONLY).
So it only works for small RDDs. But we mostly care about large RDDs.
Personally I think that if you have "persist", and you name a shortcut to it "cache", then you have
arielshaqed
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL. @N-o-Z - you're probably it!
N-o-Z
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Thanks! Pulling. I believe the failing test (which is not required anyway) failed for spurious flakiness - it is on the UI and not on lakeFSFS. |
Avoid input format
Garbage collection (and all other uses) use LakeFSContext.newRDD to create the "ranges RDD". Creating them explicitly with Spark operators means Spark can parallelize reading all metaranges and ranges.
How much faster?
I have a small repo with many small commits. I enabled GC for it. Here are summaries from two sample mark-only runs.
Direct RDD (this code)
Runtime: 2m33s
{ "run_id": "g4uk6erfnfus73frbnqg", "success": true, "first_slice": "g5adr8f5pvec73cpia80", "start_time": "2025-11-10T10:34:37.245361091Z", "cutoff_time": "2025-11-10T04:34:37.243Z", "num_deleted_objects": 147942 }File format RDD (previous code)
Runtime: 3m52s
{ "run_id": "g4uinaarakss73aoeel0", "success": true, "first_slice": "g5adr8f5pvec73cpia80", "start_time": "2025-11-10T12:15:11.097697745Z", "cutoff_time": "2025-11-10T06:15:11.096Z", "num_deleted_objects": 147942 }Summary
Parallelize object listing
The never-ending run of #9649 manages to finish listing, but still does not end (ran for 11 hours). That's because it also lists objects - and in practice Spark did not parallelize this. Explicitly parallelize it.
Results
I can finish the mark portion of the run in 2 hours (and a few seconds change) by configuring
--conf spark.hadoop.lakefs.job.range_read_parallelism=256and running on a smaller but still fairly large EMR serverless cluster (500 vCPUs, memory and disk like they were going out of fashion).Closes #9649.