Skip to content

Conversation

MrGranday
Copy link

Which issue does this PR close?

Rationale for this change

Currently, DataFrame.cache() always performs eager caching using an in-memory MemTable.
This works fine in local mode but causes problems in distributed environments (e.g., Ballista),
where caching should be deferred until distributed execution.

This PR introduces a configuration flag (local_cache) to support both eager and lazy caching.

What changes are included in this PR?

  • Added LogicalPlan::Cache { id, lineage } for lazy caching.
  • Updated DataFrame.cache():
    • If local_cache = true → uses current eager caching with MemTable.
    • If local_cache = false → returns a LogicalPlan::Cache node for lazy evaluation.
  • Extended physical planner to handle LogicalPlan::Cache.

Are these changes tested?

  • Builds on existing logical/physical plan tests.
  • No new dedicated distributed caching tests are included in this PR.

Are there any user-facing changes?

  • Yes, a new configuration option:
    • datafusion.execution.local_cache = true (default) → eager caching.
    • datafusion.execution.local_cache = false → lazy caching with LogicalPlan::Cache.

This is a non-breaking change because the default behavior remains unchanged.

@github-actions github-actions bot added the core Core DataFusion crate label Aug 25, 2025
@milenkovicm
Copy link
Contributor

is this PR work in progress @MrGranday ?

let mem_table = MemTable::try_new(schema, partitions)?;
context.read_table(Arc::new(mem_table))
} else {
// Lazy caching: return LogicalPlan::Cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.into_parts() can split the df into state and plan

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I’ll use self.into_parts() to split the state and plan instead of manual cloning.

@@ -946,6 +947,18 @@ impl DefaultPhysicalPlanner {
))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that default planner should support LogicalPlan::Cache as it is implementation implementation specific

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense I’ll remove the cache handling from DefaultPhysicalPlanner and keep it implementation-specific.

projection_requires_validation: true,
}
pub async fn cache(self) -> Result<DataFrame> {
if self.session_state.config.local_cache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should come from configuration option

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it I’ll make sure this comes directly from a configuration option instead of hardcoding here.

let lineage = self.to_logical_plan(); // get the logical plan so far
Ok(DataFrame::new(
(*self.session_state).clone(),
LogicalPlan::Cache {
Copy link
Contributor

@milenkovicm milenkovicm Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogicalPlan::Cache does not exist, so it should be created, also proto should be created as well.
I believe that apart from id and lineage we may need session_id as parameter as well, as caches are tied to session

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood I’ll introduce a proper LogicalPlan::Cache variant with id, lineage, and also include session_id as you suggested. I’ll add the corresponding proto support too.

@MrGranday
Copy link
Author

Updates in this PR:

  • Added DataFrame.cache() method (eager materialization into MemTable)
  • Reverted physical_planner.rs back to its default (no unnecessary changes)
  • Fixed type annotation issue in tests/parquet/mod.rs (make_uint_batches range casts)

This should fully resolve issue #17297

@MrGranday MrGranday requested a review from milenkovicm August 27, 2025 08:15
@milenkovicm
Copy link
Contributor

Sorry @MrGranday i see no changes in cache logic, just comment update.

@MrGranday
Copy link
Author

@milenkovicm can you review it again have edited the cache fn and added the CacheNode

@milenkovicm
Copy link
Contributor

@MrGranday one question regarding your code, do you use any of AI tools to generate this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DataFrame.cache() does not work in distributed environments
2 participants