Skip to content

[FLINK-34071][Connectors/Kinesis] Decouple NettyEventLoop thread's onNext() by handing over blocking queue put to a separate executor. Using a shared executor across shards to execute processing of event received by NettyEventLoop. #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

aws-nageshvh
Copy link

Purpose of the change

This PR decouples the NettyEventLoop thread from the blockingQueue.put operation in the Kinesis Enhanced Fan-Out (EFO) connector. Previously, the critical path operations (queue.put, startingPosition recording, and requestRecords) were executed directly in the NettyEventLoop thread, which could lead to blocking and potential performance issues.

In this implementation, we extract the blocking operations into a separate method that is executed by a shared executor service, allowing the NettyEventLoop thread to return quickly while the potentially blocking operations are handled asynchronously. This improves the overall performance and stability of the Kinesis EFO connector, especially under high load conditions. This pattern is also in accordance with the reactive streams spec for implementing onNext() as provided here - https://github.com/reactive-streams/reactive-streams-jvm/blob/master/README.md#2-subscriber-code

It also solves the deadlock issue reported in FLINK-34071 which was happening because the NettyEventLoop thread on sink was blocked by NettyEventLoop thread on source. However, the NettyEventLoop on source isn't released because it's depending on sink to relieve the backpressure.

Verifying this change

This change added unit tests and can be verified as follows:

  • Added and improved unit tests that cover various failure scenarios
  • Manually verified by running the Kinesis connector on a production Flink cluster with a deterministic reproducible deadlock scenario under backpressure

Significant changes

  • This change is an internal implementation improvement that doesn't affect the public API or serialization. It enhances the performance and stability of the existing Kinesis EFO connector by optimizing how the NettyEventLoop thread interacts with the blocking queue operations.

…Next() by handing over blocking queue put to a separate executor. Using a shared executor across shards to execute processing of event received by NettyEventLoop. Unit tests for shard subscription thread safety, recording starting position correctly, restart processing, record ordering and happy path.
synchronized (subscriptionEventProcessingLock) {
try {
processSubscriptionEvent(event);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processSubscriptionEvent() can be long running here. Do we need to catch and handle InterruptedException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated code and pushed some more improvements. please check

*
* @param event The subscription event to process
*/
public void processSubscriptionEvent(SubscribeToShardEvent event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this method to be public?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, removed it

}
}
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more precise with the exception that we're catching, for example RejectedExecutionException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated code and pushed some more improvements. please check

private void submitEventProcessingTask(SubscribeToShardEvent event) {
try {
subscriptionEventProcessingExecutor.execute(() -> {
synchronized (subscriptionEventProcessingLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of lock object can be avoided if the method processSubscriptionEvent be defined with synchronized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the initial iteration when I was testing things out, I had made the lock granular but decided against it. We don't need the lock object anymore. Made the method synchronized

minThreads,
maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what will happen in the case where downstream is severely backpressured?

I believe there is a risk of OutOfMemoryError. When there is backpressure downstream, all threads in thread pool will be blocked during eventQueue.put(event) in processSubscriptionEvent() as the eventQueue is filled up. The ThreadPoolExecutor's LinkedBlockingQueue will then keep growing eventually causing an OOM.

I can see a scenario where i) Flink job reprocesses from trim horizon with hours worth of data ii) The Flink job initial checkpoint is very slow or large causing backpressure for long time e.g. 30 minutes. Under this scenario, the connector may OOM repeatedly and the job will be unrecoverable.

Do we need to propagate the backpressure further upstream by delaying new subscription when LinkedBlockingQueue grows too large?

Copy link
Author

@aws-nageshvh aws-nageshvh May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is added in the documentation too which I further refined it in the next iteration by adding some more details,

Despite using an unbounded queue in the executor, we will highly likely not run into OOM problems because there is backpressure implemented in this design:

  1. The number of threads is limited (2x the number of cores)
  2. Each shard has a bounded queue with capacity 2
  3. We only request more records after successfully processing and storing an event
  4. If a shard's queue is full, the processing blocks, creating back-pressure
  5. This effectively bounds the total number of events in memory to 2 * number_of_shards

In the worst-case scenario during backpressure, the maximum number of events in memory is Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads).

I have been running a few tests to test these scenarios,
a) 10 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM
b) 1 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM

Both the apps are writing to a 1 shard destination stream which makes it such that apps are constantly in backpressure till they catch up to tip

… a separate executor so it doesn't block the NettyEventLoop threads, adding null checks, improving logging and documentation.
…and implementing the right ordering, improving error handling, logging and documentation
@elrob
Copy link

elrob commented May 26, 2025

Thanks for working on this change.
I have a suspicion that this issue or something similar is the cause of this:
https://issues.apache.org/jira/browse/FLINK-37648

… separate shared executor and update to tests
@leekeiabstraction
Copy link
Contributor

Ran into the following while running mvn clean install

[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardRestartTest.java
...
[ERROR]     ... (218 more lines that didn't fit)
[ERROR] Violations also present in:
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisTestUtils.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardHappyPathTest.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionThreadSafetyTest.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderThreadPoolTest.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardRecordOrderingTest.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardStartingPositionTest.java
[ERROR]     src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardTestBase.java
[ERROR]     src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java
[ERROR]     src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
[ERROR] Run 'mvn spotless:apply' to fix these violations.
[ERROR] -> [Help 1]

Comment on lines +73 to +74
* <li>New records are only requested after processing an event (via {@code requestRecords()})</li>
* <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm missing context to interpret how the backpressure works.

Can you clarify if put() and requestRecords() is done in order and requestRecords() is done only after put()? If so, that was the information that I was missing.

Comment on lines +120 to +130
@VisibleForTesting
public interface SubscriptionFactory {
FanOutKinesisShardSubscription createSubscription(
AsyncStreamProxy proxy,
String consumerArn,
String shardId,
StartingPosition startingPosition,
Duration timeout,
ExecutorService eventProcessingExecutor,
ExecutorService subscriptionCallExecutor);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should SubscriptionFactory be an interface in FanOutKinesisShardSubscription or in its own file instead?

* <p>This ensures that memory usage scales linearly with the number of shards, not exponentially,
* making it safe to use an unbounded executor queue even with a large number of shards.
*/
private final ExecutorService sharedShardSubscriptionExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the name of the event executor service consistent, there are currently 2 names.. sharedShardSubscriptionExecutor and eventProcessingExecutor;

maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control
new ExecutorThreadFactory("kinesis-efo-subscription"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the threads' names consistent with the executor name e.g. event-processing-executor-service-thread

maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control
new ExecutorThreadFactory("kinesis-subscription-caller"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the threads' names consistent with the executor name e.g. subscription-call-executor-service-thread

1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ExecutorThreadFactory("kinesis-client-close"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use more elaborate thread names e.g. close-async-stream-proxy-executor-service

*/
private void cancelActiveSubscriptions() {
for (FanOutKinesisShardSubscription subscription : splitSubscriptions.values()) {
if (subscription.isActive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this if necessary? cancelSubscription() already check if subscription is active.

Comment on lines +229 to +230
LOG.debug("Making subscribeToShard API call for shard {} on thread {}",
shardId, Thread.currentThread().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is logging out the thread necessary? The thread information should normally be logged out already as a separate field

subscriptionCallExecutor
).thenCompose(future -> future); // Flatten the CompletableFuture<CompletableFuture<Void>> to CompletableFuture<Void>

subscriptionFuture.exceptionally(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we retain the previous comment?

        // We don't need to keep track of the future here because we monitor subscription success
        // using our own CountDownLatch

if (subscription != null) {
subscription.request(1);
} else {
LOG.warn("Cannot request records - subscription is null for shard {}", shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what condition does this happen? If not handled correctly, wouldn't this cause a subscription to stop getting further data? Should we just let it throw NPE here instead?

Comment on lines +209 to +210
this.sharedShardSubscriptionExecutor = eventProcessingExecutor;
this.sharedSubscriptionCallExecutor = subscriptionCallExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Been thinking about this approach where FanOutKinesisShardSplitReader instantiates the ExecutorServices and passes them to FanOutKinesisShardSubscription. I have the following questions:

  1. Would a backpressured shard drown out other shards? Specifically, if there is a shard that has large backlog and is severely backpressured by downstream, we would expect to see that the subscriptionEventProcessingExecutor's input queue have a large number of callables queued for the backpressured shard. This would mean that the quieter shards' callables will be backpressured as well as they will be place in the same queue. This was not the case in current design in that a shard is never backpressured by its neighbours' downstream. This may cause something akin to data loss in the case where idle watermarking strategy would drop the now late arriving records from non-backpressured, quieter shards.

  2. Are we breaking abstraction here by defining and passing around executor services?


I think instead of letting SplitReader instantiate ExecutorService, an alternative approach that we can consider is

  1. FanOutKinesisShardSubscription instantiate executor service e.g. Executors.newSingleThreadExecutor() and
  2. FanOutKinesisShardSubscription manage the lifecycle of the executor service.
  3. The executor service can be used for both handling of subscription event and also re-subscribing.

This will ensure that single backpressured shard will not impact throughput of other non-backpressured shard and also use of ExecutorService is fully encapsulated within FanOutKinesisShardSubscription. There is a change in the scaling here as we'd scale threads with 1 x number of shards instead of 1 to 2x number of processors, however I do not see big drawback here as we usually match or recommend number of cores(KPU) to number of shards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants