Skip to content

Conversation

xuyangzhong
Copy link
Contributor

What is the purpose of the change

Introduce cache in delta join operator

Brief change log

  • Introduce related configs in ExecutionConfigOptions. These configs have been discussed in Flip.
  • Introduce cache in delta join operator and runner
  • Build and update cache when records arrive

Verifying this change

New tests have been added to verify this pr.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? doc has been updated

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

<td><h5>table.exec.delta-join.cache-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enable the cache of delta join. If enabled, the delta join would cache the records from remote dim table.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would cache -> caches

Copy link
Contributor

@davidradl davidradl Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be useful to give some guidance in the docs as to when to switch on caching and how to tune the left and right time values. I assume there is some level of staleness we introduce by using the caches, we should talk about this consideration as well. I am interested in what happens to the join results during cache an after the cache invalidates one side after the timeout.

The Jira gives no details. I would like to see details around the motivation behind this change, in which circumstances it is most and least useful so we can easily see how and when this adds value.

Copy link
Contributor Author

@xuyangzhong xuyangzhong Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache in delta joins does not affect the correctness of the final results due to data aging; it only impacts the intermediate results.
In practical usage, the cost of performing remote I/O access for each individual record without caching can be quite substantial. Therefore, I believe that enabling the cache by default, as long as it doesn’t compromise data correctness, may be a better approach. This is also why I mentioned that this parameter is set to true by default in Flip-486.
WDYT?

public static final ConfigOption<Boolean> TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED =
key("table.exec.delta-join.cache-enabled")
.booleanType()
.defaultValue(true)
Copy link
Contributor

@davidradl davidradl Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your thinking to have a the default as true, I assume this is changing the existing non caching behaviour. It would make more sense to me to have the users opt into the caching and having the default as false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained earlier.

.booleanType()
.defaultValue(true)
.withDescription(
"Whether enable the cache of delta join. If enabled, the delta join would cache the records from remote dim table.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Whether enable -> Flag to enable
suggest not using would as per previous comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about updating this comment to Whether to enable the cache of delta join. If enabled, the delta join caches the records from remote dim table. Default is true. to align with other configs like TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED, TABLE_EXEC_SPILL_COMPRESSION_ENABLED, ...?

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 10, 2025

class DeltaJoinITCase extends StreamingTestBase {
@ExtendWith(Array(classOf[ParameterizedTestExtension]))
class DeltaJoinITCase(enableCache: Boolean) extends StreamingTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants