diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index e07add14..4f5d8a21 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -5,7 +5,7 @@ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -80,6 +80,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.CLIENT_SHUTDOWN_TIMEOUT; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION; @@ -279,7 +280,10 @@ private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy( KinesisAsyncClient.builder(), KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); - return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient); + return new KinesisAsyncStreamProxy( + kinesisAsyncClient, + asyncHttpClient, + consumerConfig.get(CLIENT_SHUTDOWN_TIMEOUT).toMillis()); } private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) { @@ -319,7 +323,10 @@ private KinesisStreamProxy createKinesisStreamProxy( overrideBuilder, KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); - return new KinesisStreamProxy(kinesisClient, httpClient); + return new KinesisStreamProxy( + kinesisClient, + httpClient, + consumerConfig.get(CLIENT_SHUTDOWN_TIMEOUT).toMillis()); } private void setUpDeserializationSchema(SourceReaderContext sourceReaderContext) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java index c310d7ce..148aef4f 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java @@ -138,4 +138,11 @@ public enum ConsumerLifecycle { .defaultValue(Duration.ofMillis(10000)) .withDescription( "Timeout for consumer deregistration. When timeout is reached, code will continue as per normal."); + + public static final ConfigOption CLIENT_SHUTDOWN_TIMEOUT = + ConfigOptions.key("source.client.shutdown.timeout") + .durationType() + .defaultValue(Duration.ofMillis(5000)) + .withDescription( + "Timeout for graceful shutdown of the Kinesis clients."); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java index 56d097e9..d9de432d 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java @@ -5,7 +5,7 @@ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -21,6 +21,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; @@ -28,19 +30,31 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition; /** Implementation of async stream proxy for the Kinesis client. */ @Internal public class KinesisAsyncStreamProxy implements AsyncStreamProxy { + private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncStreamProxy.class); + private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000; + private final long shutdownTimeoutMillis; + private final KinesisAsyncClient kinesisAsyncClient; private final SdkAsyncHttpClient asyncHttpClient; public KinesisAsyncStreamProxy( KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient) { + this(kinesisAsyncClient, asyncHttpClient, DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + } + + public KinesisAsyncStreamProxy( + KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient, long shutdownTimeoutMillis) { this.kinesisAsyncClient = kinesisAsyncClient; this.asyncHttpClient = asyncHttpClient; + this.shutdownTimeoutMillis = shutdownTimeoutMillis; } @Override @@ -58,9 +72,49 @@ public CompletableFuture subscribeToShard( return kinesisAsyncClient.subscribeToShard(request, responseHandler); } + /** + * Gracefully closes the Kinesis clients with a timeout. + * + * @param timeoutMillis maximum time to wait for clients to close + */ + public void gracefulClose(long timeoutMillis) { + try { + LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis); + + // Close the Kinesis client first with half the timeout + long kinesisClientTimeout = timeoutMillis / 2; + LOG.debug("Closing KinesisAsyncClient with timeout of {} ms", kinesisClientTimeout); + try { + CompletableFuture kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisAsyncClient.close()); + kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS); + LOG.debug("Successfully closed KinesisAsyncClient"); + } catch (TimeoutException e) { + LOG.warn("Closing KinesisAsyncClient timed out after {} ms", kinesisClientTimeout); + } catch (Exception e) { + LOG.warn("Error while closing KinesisAsyncClient", e); + } + + // Then close the HTTP client with the remaining timeout + long httpClientTimeout = timeoutMillis - kinesisClientTimeout; + LOG.debug("Closing SdkAsyncHttpClient with timeout of {} ms", httpClientTimeout); + try { + CompletableFuture httpClientFuture = CompletableFuture.runAsync(() -> asyncHttpClient.close()); + httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS); + LOG.debug("Successfully closed SdkAsyncHttpClient"); + } catch (TimeoutException e) { + LOG.warn("Closing SdkAsyncHttpClient timed out after {} ms", httpClientTimeout); + } catch (Exception e) { + LOG.warn("Error while closing SdkAsyncHttpClient", e); + } + + LOG.debug("Completed graceful shutdown of Kinesis clients"); + } catch (Exception e) { + LOG.warn("Error during graceful shutdown of Kinesis clients", e); + } + } + @Override public void close() throws IOException { - kinesisAsyncClient.close(); - asyncHttpClient.close(); + gracefulClose(shutdownTimeoutMillis); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java index 1a64d133..54d93390 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java @@ -5,7 +5,7 @@ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -47,21 +47,31 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** Implementation of the {@link StreamProxy} for Kinesis data streams. */ @Internal public class KinesisStreamProxy implements StreamProxy { private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamProxy.class); + private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000; + private final long shutdownTimeoutMillis; private final KinesisClient kinesisClient; private final SdkHttpClient httpClient; private final Map shardIdToIteratorStore; public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient) { + this(kinesisClient, httpClient, DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + } + + public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient, long shutdownTimeoutMillis) { this.kinesisClient = kinesisClient; this.httpClient = httpClient; this.shardIdToIteratorStore = new ConcurrentHashMap<>(); + this.shutdownTimeoutMillis = shutdownTimeoutMillis; } @Override @@ -201,9 +211,49 @@ public DescribeStreamConsumerResponse describeStreamConsumer( .build()); } + /** + * Gracefully closes the Kinesis clients with a timeout. + * + * @param timeoutMillis maximum time to wait for clients to close + */ + public void gracefulClose(long timeoutMillis) { + try { + LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis); + + // Close the Kinesis client first with half the timeout + long kinesisClientTimeout = timeoutMillis / 2; + LOG.debug("Closing KinesisClient with timeout of {} ms", kinesisClientTimeout); + try { + CompletableFuture kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisClient.close()); + kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS); + LOG.debug("Successfully closed KinesisClient"); + } catch (TimeoutException e) { + LOG.warn("Closing KinesisClient timed out after {} ms", kinesisClientTimeout); + } catch (Exception e) { + LOG.warn("Error while closing KinesisClient", e); + } + + // Then close the HTTP client with the remaining timeout + long httpClientTimeout = timeoutMillis - kinesisClientTimeout; + LOG.debug("Closing SdkHttpClient with timeout of {} ms", httpClientTimeout); + try { + CompletableFuture httpClientFuture = CompletableFuture.runAsync(() -> httpClient.close()); + httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS); + LOG.debug("Successfully closed SdkHttpClient"); + } catch (TimeoutException e) { + LOG.warn("Closing SdkHttpClient timed out after {} ms", httpClientTimeout); + } catch (Exception e) { + LOG.warn("Error while closing SdkHttpClient", e); + } + + LOG.debug("Completed graceful shutdown of Kinesis clients"); + } catch (Exception e) { + LOG.warn("Error during graceful shutdown of Kinesis clients", e); + } + } + @Override public void close() throws IOException { - kinesisClient.close(); - httpClient.close(); + gracefulClose(shutdownTimeoutMillis); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java index c0aefee5..d9726d2e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java @@ -5,7 +5,7 @@ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -27,6 +27,8 @@ import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import java.time.Duration; @@ -41,6 +43,8 @@ */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); + private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; @@ -93,6 +97,18 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { @Override public void close() throws Exception { + LOG.debug("Closing FanOutKinesisShardSplitReader"); + + // Clear all subscriptions - they will be automatically cleaned up + // when the AsyncStreamProxy is closed + if (!splitSubscriptions.isEmpty()) { + LOG.debug("Clearing {} active shard subscriptions", splitSubscriptions.size()); + splitSubscriptions.clear(); + } + + // Close the proxy with a graceful timeout + LOG.debug("Closing AsyncStreamProxy"); asyncStreamProxy.close(); + LOG.debug("FanOutKinesisShardSplitReader closed"); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java index c3dca833..0e0f2bc8 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java @@ -5,7 +5,7 @@ * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * @@ -26,6 +26,8 @@ import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import java.util.Map; @@ -36,6 +38,8 @@ */ @Internal public class PollingKinesisShardSplitReader extends KinesisShardSplitReaderBase { + private static final Logger LOG = LoggerFactory.getLogger(PollingKinesisShardSplitReader.class); + private final StreamProxy kinesis; private final Configuration configuration; private final int maxRecordsToGet; @@ -65,6 +69,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { @Override public void close() throws Exception { + LOG.debug("Closing PollingKinesisShardSplitReader"); + + // Close the proxy with a graceful timeout + LOG.debug("Closing StreamProxy"); kinesis.close(); + LOG.debug("PollingKinesisShardSplitReader closed"); } } diff --git a/pom.xml b/pom.xml index ccb262e3..92902a41 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ under the License. 2.26.19 - 4.1.86.Final + 4.1.122.Final 2.0.0 2.14.3 1.1.18