Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand Down Expand Up @@ -80,6 +80,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.CLIENT_SHUTDOWN_TIMEOUT;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION;
Expand Down Expand Up @@ -279,7 +280,10 @@ private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient);
return new KinesisAsyncStreamProxy(
kinesisAsyncClient,
asyncHttpClient,
consumerConfig.get(CLIENT_SHUTDOWN_TIMEOUT).toMillis());
}

private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
Expand Down Expand Up @@ -319,7 +323,10 @@ private KinesisStreamProxy createKinesisStreamProxy(
overrideBuilder,
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisStreamProxy(kinesisClient, httpClient);
return new KinesisStreamProxy(
kinesisClient,
httpClient,
consumerConfig.get(CLIENT_SHUTDOWN_TIMEOUT).toMillis());
}

private void setUpDeserializationSchema(SourceReaderContext sourceReaderContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,11 @@ public enum ConsumerLifecycle {
.defaultValue(Duration.ofMillis(10000))
.withDescription(
"Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.");

public static final ConfigOption<Duration> CLIENT_SHUTDOWN_TIMEOUT =
ConfigOptions.key("source.client.shutdown.timeout")
.durationType()
.defaultValue(Duration.ofMillis(5000))
.withDescription(
"Timeout for graceful shutdown of the Kinesis clients.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -21,26 +21,40 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;

/** Implementation of async stream proxy for the Kinesis client. */
@Internal
public class KinesisAsyncStreamProxy implements AsyncStreamProxy {
private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncStreamProxy.class);
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000;
private final long shutdownTimeoutMillis;

private final KinesisAsyncClient kinesisAsyncClient;
private final SdkAsyncHttpClient asyncHttpClient;

public KinesisAsyncStreamProxy(
KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient) {
this(kinesisAsyncClient, asyncHttpClient, DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
}

public KinesisAsyncStreamProxy(
KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient, long shutdownTimeoutMillis) {
this.kinesisAsyncClient = kinesisAsyncClient;
this.asyncHttpClient = asyncHttpClient;
this.shutdownTimeoutMillis = shutdownTimeoutMillis;
}

@Override
Expand All @@ -58,9 +72,49 @@ public CompletableFuture<Void> subscribeToShard(
return kinesisAsyncClient.subscribeToShard(request, responseHandler);
}

/**
* Gracefully closes the Kinesis clients with a timeout.
*
* @param timeoutMillis maximum time to wait for clients to close
*/
public void gracefulClose(long timeoutMillis) {
try {
LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis);

// Close the Kinesis client first with half the timeout
long kinesisClientTimeout = timeoutMillis / 2;
LOG.debug("Closing KinesisAsyncClient with timeout of {} ms", kinesisClientTimeout);
try {
CompletableFuture<Void> kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisAsyncClient.close());
kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS);
LOG.debug("Successfully closed KinesisAsyncClient");
} catch (TimeoutException e) {
LOG.warn("Closing KinesisAsyncClient timed out after {} ms", kinesisClientTimeout);
} catch (Exception e) {
LOG.warn("Error while closing KinesisAsyncClient", e);
}

// Then close the HTTP client with the remaining timeout
long httpClientTimeout = timeoutMillis - kinesisClientTimeout;
LOG.debug("Closing SdkAsyncHttpClient with timeout of {} ms", httpClientTimeout);
try {
CompletableFuture<Void> httpClientFuture = CompletableFuture.runAsync(() -> asyncHttpClient.close());
httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS);
LOG.debug("Successfully closed SdkAsyncHttpClient");
} catch (TimeoutException e) {
LOG.warn("Closing SdkAsyncHttpClient timed out after {} ms", httpClientTimeout);
} catch (Exception e) {
LOG.warn("Error while closing SdkAsyncHttpClient", e);
}

LOG.debug("Completed graceful shutdown of Kinesis clients");
} catch (Exception e) {
LOG.warn("Error during graceful shutdown of Kinesis clients", e);
}
}

@Override
public void close() throws IOException {
kinesisAsyncClient.close();
asyncHttpClient.close();
gracefulClose(shutdownTimeoutMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand Down Expand Up @@ -47,21 +47,31 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/** Implementation of the {@link StreamProxy} for Kinesis data streams. */
@Internal
public class KinesisStreamProxy implements StreamProxy {
private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamProxy.class);
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000;
private final long shutdownTimeoutMillis;

private final KinesisClient kinesisClient;
private final SdkHttpClient httpClient;
private final Map<String, String> shardIdToIteratorStore;

public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient) {
this(kinesisClient, httpClient, DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
}

public KinesisStreamProxy(KinesisClient kinesisClient, SdkHttpClient httpClient, long shutdownTimeoutMillis) {
this.kinesisClient = kinesisClient;
this.httpClient = httpClient;
this.shardIdToIteratorStore = new ConcurrentHashMap<>();
this.shutdownTimeoutMillis = shutdownTimeoutMillis;
}

@Override
Expand Down Expand Up @@ -201,9 +211,49 @@ public DescribeStreamConsumerResponse describeStreamConsumer(
.build());
}

/**
* Gracefully closes the Kinesis clients with a timeout.
*
* @param timeoutMillis maximum time to wait for clients to close
*/
public void gracefulClose(long timeoutMillis) {
try {
LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis);

// Close the Kinesis client first with half the timeout
long kinesisClientTimeout = timeoutMillis / 2;
LOG.debug("Closing KinesisClient with timeout of {} ms", kinesisClientTimeout);
try {
CompletableFuture<Void> kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisClient.close());
kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS);
LOG.debug("Successfully closed KinesisClient");
} catch (TimeoutException e) {
LOG.warn("Closing KinesisClient timed out after {} ms", kinesisClientTimeout);
} catch (Exception e) {
LOG.warn("Error while closing KinesisClient", e);
}

// Then close the HTTP client with the remaining timeout
long httpClientTimeout = timeoutMillis - kinesisClientTimeout;
LOG.debug("Closing SdkHttpClient with timeout of {} ms", httpClientTimeout);
try {
CompletableFuture<Void> httpClientFuture = CompletableFuture.runAsync(() -> httpClient.close());
httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS);
LOG.debug("Successfully closed SdkHttpClient");
} catch (TimeoutException e) {
LOG.warn("Closing SdkHttpClient timed out after {} ms", httpClientTimeout);
} catch (Exception e) {
LOG.warn("Error while closing SdkHttpClient", e);
}

LOG.debug("Completed graceful shutdown of Kinesis clients");
} catch (Exception e) {
LOG.warn("Error during graceful shutdown of Kinesis clients", e);
}
}

@Override
public void close() throws IOException {
kinesisClient.close();
httpClient.close();
gracefulClose(shutdownTimeoutMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -27,6 +27,8 @@
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;

import java.time.Duration;
Expand All @@ -41,6 +43,8 @@
*/
@Internal
public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase {
private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class);

private final AsyncStreamProxy asyncStreamProxy;
private final String consumerArn;
private final Duration subscriptionTimeout;
Expand Down Expand Up @@ -93,6 +97,18 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {

@Override
public void close() throws Exception {
LOG.debug("Closing FanOutKinesisShardSplitReader");

// Clear all subscriptions - they will be automatically cleaned up
// when the AsyncStreamProxy is closed
if (!splitSubscriptions.isEmpty()) {
LOG.debug("Clearing {} active shard subscriptions", splitSubscriptions.size());
splitSubscriptions.clear();
}

// Close the proxy with a graceful timeout
LOG.debug("Closing AsyncStreamProxy");
asyncStreamProxy.close();
LOG.debug("FanOutKinesisShardSplitReader closed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -26,6 +26,8 @@
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;

import java.util.Map;
Expand All @@ -36,6 +38,8 @@
*/
@Internal
public class PollingKinesisShardSplitReader extends KinesisShardSplitReaderBase {
private static final Logger LOG = LoggerFactory.getLogger(PollingKinesisShardSplitReader.class);

private final StreamProxy kinesis;
private final Configuration configuration;
private final int maxRecordsToGet;
Expand Down Expand Up @@ -65,6 +69,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {

@Override
public void close() throws Exception {
LOG.debug("Closing PollingKinesisShardSplitReader");

// Close the proxy with a graceful timeout
LOG.debug("Closing StreamProxy");
kinesis.close();
LOG.debug("PollingKinesisShardSplitReader closed");
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ under the License.

<properties>
<aws.sdkv2.version>2.26.19</aws.sdkv2.version>
<netty.version>4.1.86.Final</netty.version>
<netty.version>4.1.122.Final</netty.version>
<flink.version>2.0.0</flink.version>
<jackson-bom.version>2.14.3</jackson-bom.version>
<glue.schema.registry.version>1.1.18</glue.schema.registry.version>
Expand Down