Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@
package org.apache.flink.connector.kinesis.source.reader.fanout;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;

Expand All @@ -45,17 +52,128 @@ public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase {
private final String consumerArn;
private final Duration subscriptionTimeout;

/**
* Shared executor service for all shard subscriptions.
*
* <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) to ensure no tasks are ever rejected.
* Although the queue is technically unbounded, the system has natural flow control mechanisms that effectively
* bound the queue size:
*
* <ol>
* <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li>
* <li>New records are only requested after processing an event (via {@code requestRecords()})</li>
* <li>The maximum number of queued tasks is effectively bounded by {@code 2 * number_of_shards}</li>
* </ol>
*
* <p>This design provides natural backpressure while ensuring no records are dropped, making it safe
* to use an unbounded executor queue.
*/
private final ExecutorService sharedShardSubscriptionExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the name of the event executor service consistent, there are currently 2 names.. sharedShardSubscriptionExecutor and eventProcessingExecutor;


private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>();

/**
* Factory for creating subscriptions. This is primarily used for testing.
*/
@VisibleForTesting
public interface SubscriptionFactory {
FanOutKinesisShardSubscription createSubscription(
AsyncStreamProxy proxy,
String consumerArn,
String shardId,
StartingPosition startingPosition,
Duration timeout,
ExecutorService executor);
}

/**
* Default implementation of the subscription factory.
*/
private static class DefaultSubscriptionFactory implements SubscriptionFactory {
@Override
public FanOutKinesisShardSubscription createSubscription(
AsyncStreamProxy proxy,
String consumerArn,
String shardId,
StartingPosition startingPosition,
Duration timeout,
ExecutorService executor) {
return new FanOutKinesisShardSubscription(
proxy,
consumerArn,
shardId,
startingPosition,
timeout,
executor);
}
}

private SubscriptionFactory subscriptionFactory;

public FanOutKinesisShardSplitReader(
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration) {
this(asyncStreamProxy, consumerArn, shardMetricGroupMap, configuration, new DefaultSubscriptionFactory());
}

@VisibleForTesting
FanOutKinesisShardSplitReader(
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration,
SubscriptionFactory subscriptionFactory) {
this(
asyncStreamProxy,
consumerArn,
shardMetricGroupMap,
configuration,
subscriptionFactory,
createDefaultExecutor());
}

/**
* Constructor with injected executor service for testing.
*
* @param asyncStreamProxy The proxy for Kinesis API calls
* @param consumerArn The ARN of the consumer
* @param shardMetricGroupMap The metrics map
* @param configuration The configuration
* @param subscriptionFactory The factory for creating subscriptions
* @param executorService The executor service to use for subscription tasks
*/
@VisibleForTesting
FanOutKinesisShardSplitReader(
AsyncStreamProxy asyncStreamProxy,
String consumerArn,
Map<String, KinesisShardMetrics> shardMetricGroupMap,
Configuration configuration,
SubscriptionFactory subscriptionFactory,
ExecutorService executorService) {
super(shardMetricGroupMap, configuration);
this.asyncStreamProxy = asyncStreamProxy;
this.consumerArn = consumerArn;
this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
this.subscriptionFactory = subscriptionFactory;
this.sharedShardSubscriptionExecutor = executorService;
}

/**
* Creates the default executor service for subscription tasks.
*
* @return A new executor service
*/
private static ExecutorService createDefaultExecutor() {
int minThreads = Runtime.getRuntime().availableProcessors();
int maxThreads = minThreads * 2;
return new ThreadPoolExecutor(
minThreads,
maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what will happen in the case where downstream is severely backpressured?

I believe there is a risk of OutOfMemoryError. When there is backpressure downstream, all threads in thread pool will be blocked during eventQueue.put(event) in processSubscriptionEvent() as the eventQueue is filled up. The ThreadPoolExecutor's LinkedBlockingQueue will then keep growing eventually causing an OOM.

I can see a scenario where i) Flink job reprocesses from trim horizon with hours worth of data ii) The Flink job initial checkpoint is very slow or large causing backpressure for long time e.g. 30 minutes. Under this scenario, the connector may OOM repeatedly and the job will be unrecoverable.

Do we need to propagate the backpressure further upstream by delaying new subscription when LinkedBlockingQueue grows too large?

Copy link
Author

@aws-nageshvh aws-nageshvh May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is added in the documentation too which I further refined it in the next iteration by adding some more details,

Despite using an unbounded queue in the executor, we will highly likely not run into OOM problems because there is backpressure implemented in this design:

  1. The number of threads is limited (2x the number of cores)
  2. Each shard has a bounded queue with capacity 2
  3. We only request more records after successfully processing and storing an event
  4. If a shard's queue is full, the processing blocks, creating back-pressure
  5. This effectively bounds the total number of events in memory to 2 * number_of_shards

In the worst-case scenario during backpressure, the maximum number of events in memory is Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads).

I have been running a few tests to test these scenarios,
a) 10 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM
b) 1 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM

Both the apps are writing to a 1 shard destination stream which makes it such that apps are constantly in backpressure till they catch up to tip

new ExecutorThreadFactory("kinesis-efo-subscription"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the threads' names consistent with the executor name e.g. event-processing-executor-service-thread

}

@Override
Expand All @@ -80,19 +198,33 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {
super.handleSplitsChanges(splitsChanges);
for (KinesisShardSplit split : splitsChanges.splits()) {
FanOutKinesisShardSubscription subscription =
new FanOutKinesisShardSubscription(
subscriptionFactory.createSubscription(
asyncStreamProxy,
consumerArn,
split.getShardId(),
split.getStartingPosition(),
subscriptionTimeout);
subscriptionTimeout,
sharedShardSubscriptionExecutor);
subscription.activateSubscription();
splitSubscriptions.put(split.splitId(), subscription);
}
}

@Override
public void close() throws Exception {
// Shutdown the executor service
if (sharedShardSubscriptionExecutor != null) {
sharedShardSubscriptionExecutor.shutdown();
try {
if (!sharedShardSubscriptionExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
sharedShardSubscriptionExecutor.shutdownNow();
}
} catch (InterruptedException e) {
sharedShardSubscriptionExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

asyncStreamProxy.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -77,6 +78,21 @@ public class FanOutKinesisShardSubscription {

private final Duration subscriptionTimeout;

/** Executor service to run subscription event processing tasks. */
private final ExecutorService subscriptionEventProcessingExecutor;

/**
* Lock to ensure sequential processing of subscription events for this shard.
* This lock guarantees that for each shard:
* 1. Only one event is processed at a time
* 2. Events are processed in the order they are received
* 3. The critical operations (queue.put, startingPosition update, requestRecords) are executed atomically
*
* <p>This is essential to prevent race conditions that could lead to data loss or incorrect
* continuation sequence numbers being used after failover.
*/
private final Object subscriptionEventProcessingLock = new Object();

// Queue is meant for eager retrieval of records from the Kinesis stream. We will always have 2
// record batches available on next read.
private final BlockingQueue<SubscribeToShardEvent> eventQueue = new LinkedBlockingQueue<>(2);
Expand All @@ -86,19 +102,50 @@ public class FanOutKinesisShardSubscription {
// Store the current starting position for this subscription. Will be updated each time new
// batch of records is consumed
private StartingPosition startingPosition;

/**
* Gets the current starting position for this subscription.
*
* @return The current starting position
*/
public StartingPosition getStartingPosition() {
return startingPosition;
}

/**
* Checks if the subscription is active.
*
* @return true if the subscription is active, false otherwise
*/
public boolean isActive() {
return subscriptionActive.get();
}

private FanOutShardSubscriber shardSubscriber;

/**
* Creates a new FanOutKinesisShardSubscription with the specified parameters.
*
* @param kinesis The AsyncStreamProxy to use for Kinesis operations
* @param consumerArn The ARN of the consumer
* @param shardId The ID of the shard to subscribe to
* @param startingPosition The starting position for the subscription
* @param subscriptionTimeout The timeout for the subscription
* @param subscriptionEventProcessingExecutor The executor service to use for processing subscription events
*/
public FanOutKinesisShardSubscription(
AsyncStreamProxy kinesis,
String consumerArn,
String shardId,
StartingPosition startingPosition,
Duration subscriptionTimeout) {
Duration subscriptionTimeout,
ExecutorService subscriptionEventProcessingExecutor) {
this.kinesis = kinesis;
this.consumerArn = consumerArn;
this.shardId = shardId;
this.startingPosition = startingPosition;
this.subscriptionTimeout = subscriptionTimeout;
this.subscriptionEventProcessingExecutor = subscriptionEventProcessingExecutor;
}

/** Method to allow eager activation of the subscription. */
Expand Down Expand Up @@ -293,27 +340,14 @@ public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
try {
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
eventQueue.put(event);

// Update the starting position in case we have to recreate the
// subscription
startingPosition =
StartingPosition.continueFromSequenceNumber(
event.continuationSequenceNumber());

// Replace the record just consumed in the Queue
requestRecords();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new KinesisStreamsSourceException(
"Interrupted while adding Kinesis record to internal buffer.",
e);
}
// For critical path operations like processing subscription events, we need to ensure:
// 1. Events are processed in order (sequential processing)
// 2. No events are dropped (reliable processing)
// 3. The Netty event loop thread is not blocked (async processing)
// 4. The starting position is correctly updated for checkpointing

// Submit the event processing to the executor service
submitEventProcessingTask(event);
}
});
}
Expand All @@ -334,4 +368,87 @@ public void onComplete() {
activateSubscription();
}
}

/**
* Submits an event processing task to the executor service.
* This method encapsulates the task submission logic and error handling.
*
* @param event The subscription event to process
*/
private void submitEventProcessingTask(SubscribeToShardEvent event) {
try {
subscriptionEventProcessingExecutor.execute(() -> {
synchronized (subscriptionEventProcessingLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of lock object can be avoided if the method processSubscriptionEvent be defined with synchronized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the initial iteration when I was testing things out, I had made the lock granular but decided against it. We don't need the lock object anymore. Made the method synchronized

try {
processSubscriptionEvent(event);
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processSubscriptionEvent() can be long running here. Do we need to catch and handle InterruptedException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated code and pushed some more improvements. please check

// For critical path operations, propagate exceptions to cause a Flink job restart
LOG.error("Error processing subscription event", e);
// Propagate the exception to the subscription exception handler
terminateSubscription(new KinesisStreamsSourceException(
"Error processing subscription event", e));
}
}
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more precise with the exception that we're catching, for example RejectedExecutionException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have updated code and pushed some more improvements. please check

// This should never happen with an unbounded queue, but if it does,
// we need to propagate the exception to cause a Flink job restart
LOG.error("Error submitting subscription event task", e);
throw new KinesisStreamsSourceException(
"Error submitting subscription event task", e);
}
}

/**
* Processes a subscription event in a separate thread from the shared executor pool.
* This method encapsulates the critical path operations:
* 1. Putting the event in the blocking queue (which has a capacity of 2)
* 2. Updating the starting position for recovery after failover
* 3. Requesting more records
*
* <p>These operations are executed sequentially for each shard to ensure thread safety
* and prevent race conditions. The bounded nature of the event queue (capacity 2) combined
* with only requesting more records after processing an event provides natural flow control,
* effectively limiting the number of tasks in the executor's queue.
*
* <p>This method is made public for testing purposes.
*
* @param event The subscription event to process
*/
public void processSubscriptionEvent(SubscribeToShardEvent event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this method to be public?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, removed it

try {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Processing event for shard {}: {}, {}",
shardId,
event.getClass().getSimpleName(),
event);
}

// Put event in queue - this is a blocking operation
eventQueue.put(event);

// Update the starting position to ensure we can recover after failover
// Note: We don't need additional synchronization here because this method is already
// called within a synchronized block on subscriptionEventProcessingLock
startingPosition = StartingPosition.continueFromSequenceNumber(
event.continuationSequenceNumber());

// Request more records
shardSubscriber.requestRecords();

if (LOG.isDebugEnabled()) {
LOG.debug(
"Successfully processed event for shard {}, updated position to {}",
shardId,
startingPosition);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Consistent with current implementation - throw KinesisStreamsSourceException
throw new KinesisStreamsSourceException(
"Interrupted while adding Kinesis record to internal buffer.", e);
}
// No catch for other exceptions - let them propagate to be handled by the AWS SDK
}
}
Loading