Skip to content

Commit 94b7a8e

Browse files
committed
[FLINK-37949][Connectors/Kinesis] Fix client shutdown order to prevent Netty deadlocks
1 parent 373be6f commit 94b7a8e

File tree

5 files changed

+122
-5
lines changed

5 files changed

+122
-5
lines changed

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,27 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
2323

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
2527
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
2628
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
2729
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
2830

2931
import java.io.IOException;
32+
import java.time.Duration;
3033
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3136

3237
import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;
3338

3439
/** Implementation of async stream proxy for the Kinesis client. */
3540
@Internal
3641
public class KinesisAsyncStreamProxy implements AsyncStreamProxy {
42+
private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncStreamProxy.class);
43+
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000;
44+
3745
private final KinesisAsyncClient kinesisAsyncClient;
3846
private final SdkAsyncHttpClient asyncHttpClient;
3947

@@ -58,9 +66,49 @@ public CompletableFuture<Void> subscribeToShard(
5866
return kinesisAsyncClient.subscribeToShard(request, responseHandler);
5967
}
6068

69+
/**
70+
* Gracefully closes the Kinesis clients with a timeout.
71+
*
72+
* @param timeoutMillis maximum time to wait for clients to close
73+
*/
74+
public void gracefulClose(long timeoutMillis) {
75+
try {
76+
LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis);
77+
78+
// Close the Kinesis client first with half the timeout
79+
long kinesisClientTimeout = timeoutMillis / 2;
80+
LOG.debug("Closing KinesisAsyncClient with timeout of {} ms", kinesisClientTimeout);
81+
try {
82+
CompletableFuture<Void> kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisAsyncClient.close());
83+
kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS);
84+
LOG.debug("Successfully closed KinesisAsyncClient");
85+
} catch (TimeoutException e) {
86+
LOG.warn("Closing KinesisAsyncClient timed out after {} ms", kinesisClientTimeout);
87+
} catch (Exception e) {
88+
LOG.warn("Error while closing KinesisAsyncClient", e);
89+
}
90+
91+
// Then close the HTTP client with the remaining timeout
92+
long httpClientTimeout = timeoutMillis - kinesisClientTimeout;
93+
LOG.debug("Closing SdkAsyncHttpClient with timeout of {} ms", httpClientTimeout);
94+
try {
95+
CompletableFuture<Void> httpClientFuture = CompletableFuture.runAsync(() -> asyncHttpClient.close());
96+
httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS);
97+
LOG.debug("Successfully closed SdkAsyncHttpClient");
98+
} catch (TimeoutException e) {
99+
LOG.warn("Closing SdkAsyncHttpClient timed out after {} ms", httpClientTimeout);
100+
} catch (Exception e) {
101+
LOG.warn("Error while closing SdkAsyncHttpClient", e);
102+
}
103+
104+
LOG.debug("Completed graceful shutdown of Kinesis clients");
105+
} catch (Exception e) {
106+
LOG.warn("Error during graceful shutdown of Kinesis clients", e);
107+
}
108+
}
109+
61110
@Override
62111
public void close() throws IOException {
63-
kinesisAsyncClient.close();
64-
asyncHttpClient.close();
112+
gracefulClose(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
65113
}
66114
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,16 @@
4747
import java.util.ArrayList;
4848
import java.util.List;
4949
import java.util.Map;
50+
import java.util.concurrent.CompletableFuture;
5051
import java.util.concurrent.ConcurrentHashMap;
52+
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.TimeoutException;
5154

5255
/** Implementation of the {@link StreamProxy} for Kinesis data streams. */
5356
@Internal
5457
public class KinesisStreamProxy implements StreamProxy {
5558
private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamProxy.class);
59+
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 5000;
5660

5761
private final KinesisClient kinesisClient;
5862
private final SdkHttpClient httpClient;
@@ -201,9 +205,49 @@ public DescribeStreamConsumerResponse describeStreamConsumer(
201205
.build());
202206
}
203207

208+
/**
209+
* Gracefully closes the Kinesis clients with a timeout.
210+
*
211+
* @param timeoutMillis maximum time to wait for clients to close
212+
*/
213+
public void gracefulClose(long timeoutMillis) {
214+
try {
215+
LOG.debug("Closing Kinesis clients with timeout of {} ms", timeoutMillis);
216+
217+
// Close the Kinesis client first with half the timeout
218+
long kinesisClientTimeout = timeoutMillis / 2;
219+
LOG.debug("Closing KinesisClient with timeout of {} ms", kinesisClientTimeout);
220+
try {
221+
CompletableFuture<Void> kinesisClientFuture = CompletableFuture.runAsync(() -> kinesisClient.close());
222+
kinesisClientFuture.get(kinesisClientTimeout, TimeUnit.MILLISECONDS);
223+
LOG.debug("Successfully closed KinesisClient");
224+
} catch (TimeoutException e) {
225+
LOG.warn("Closing KinesisClient timed out after {} ms", kinesisClientTimeout);
226+
} catch (Exception e) {
227+
LOG.warn("Error while closing KinesisClient", e);
228+
}
229+
230+
// Then close the HTTP client with the remaining timeout
231+
long httpClientTimeout = timeoutMillis - kinesisClientTimeout;
232+
LOG.debug("Closing SdkHttpClient with timeout of {} ms", httpClientTimeout);
233+
try {
234+
CompletableFuture<Void> httpClientFuture = CompletableFuture.runAsync(() -> httpClient.close());
235+
httpClientFuture.get(httpClientTimeout, TimeUnit.MILLISECONDS);
236+
LOG.debug("Successfully closed SdkHttpClient");
237+
} catch (TimeoutException e) {
238+
LOG.warn("Closing SdkHttpClient timed out after {} ms", httpClientTimeout);
239+
} catch (Exception e) {
240+
LOG.warn("Error while closing SdkHttpClient", e);
241+
}
242+
243+
LOG.debug("Completed graceful shutdown of Kinesis clients");
244+
} catch (Exception e) {
245+
LOG.warn("Error during graceful shutdown of Kinesis clients", e);
246+
}
247+
}
248+
204249
@Override
205250
public void close() throws IOException {
206-
kinesisClient.close();
207-
httpClient.close();
251+
gracefulClose(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
208252
}
209253
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
2828
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
2929

30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
3032
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
3133

3234
import java.time.Duration;
@@ -41,6 +43,8 @@
4143
*/
4244
@Internal
4345
public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase {
46+
private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class);
47+
4448
private final AsyncStreamProxy asyncStreamProxy;
4549
private final String consumerArn;
4650
private final Duration subscriptionTimeout;
@@ -93,6 +97,18 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {
9397

9498
@Override
9599
public void close() throws Exception {
100+
LOG.debug("Closing FanOutKinesisShardSplitReader");
101+
102+
// Clear all subscriptions - they will be automatically cleaned up
103+
// when the AsyncStreamProxy is closed
104+
if (!splitSubscriptions.isEmpty()) {
105+
LOG.debug("Clearing {} active shard subscriptions", splitSubscriptions.size());
106+
splitSubscriptions.clear();
107+
}
108+
109+
// Close the proxy with a graceful timeout
110+
LOG.debug("Closing AsyncStreamProxy");
96111
asyncStreamProxy.close();
112+
LOG.debug("FanOutKinesisShardSplitReader closed");
97113
}
98114
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
2727
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
2828

29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
3032

3133
import java.util.Map;
@@ -36,6 +38,8 @@
3638
*/
3739
@Internal
3840
public class PollingKinesisShardSplitReader extends KinesisShardSplitReaderBase {
41+
private static final Logger LOG = LoggerFactory.getLogger(PollingKinesisShardSplitReader.class);
42+
3943
private final StreamProxy kinesis;
4044
private final Configuration configuration;
4145
private final int maxRecordsToGet;
@@ -65,6 +69,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
6569

6670
@Override
6771
public void close() throws Exception {
72+
LOG.debug("Closing PollingKinesisShardSplitReader");
73+
74+
// Close the proxy with a graceful timeout
75+
LOG.debug("Closing StreamProxy");
6876
kinesis.close();
77+
LOG.debug("PollingKinesisShardSplitReader closed");
6978
}
7079
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ under the License.
5454

5555
<properties>
5656
<aws.sdkv2.version>2.26.19</aws.sdkv2.version>
57-
<netty.version>4.1.86.Final</netty.version>
57+
<netty.version>4.1.122.Final</netty.version>
5858
<flink.version>2.0.0</flink.version>
5959
<jackson-bom.version>2.14.3</jackson-bom.version>
6060
<glue.schema.registry.version>1.1.18</glue.schema.registry.version>

0 commit comments

Comments
 (0)