Skip to content

Commit 70c84cb

Browse files
darenwktdannycranmer
authored andcommitted
[FLINK-30418] [Connectors/Kinesis] Implement synchronous KinesisClient in EFO
1 parent 2a24e7c commit 70c84cb

File tree

34 files changed

+2458
-720
lines changed

34 files changed

+2458
-720
lines changed

flink-connector-aws-base/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ under the License.
5252
<artifactId>netty-nio-client</artifactId>
5353
</dependency>
5454

55+
<dependency>
56+
<groupId>software.amazon.awssdk</groupId>
57+
<artifactId>apache-client</artifactId>
58+
</dependency>
59+
5560
<dependency>
5661
<groupId>software.amazon.awssdk</groupId>
5762
<artifactId>sts</artifactId>

flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java renamed to flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSClientUtil.java

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424

2525
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
2626
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
27+
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
2728
import software.amazon.awssdk.core.SdkClient;
2829
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
2930
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
3031
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
3132
import software.amazon.awssdk.core.client.config.SdkClientOption;
33+
import software.amazon.awssdk.http.SdkHttpClient;
3234
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
3335

3436
import java.net.URI;
@@ -37,7 +39,7 @@
3739

3840
/** Some utilities specific to Amazon Web Service. */
3941
@Internal
40-
public class AWSAsyncSinkUtil extends AWSGeneralUtil {
42+
public class AWSClientUtil extends AWSGeneralUtil {
4143

4244
/** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */
4345
static final String V2_USER_AGENT_SUFFIX = " V2";
@@ -57,9 +59,11 @@ public static String formatFlinkUserAgentPrefix(String userAgentFormat) {
5759
}
5860

5961
/**
62+
* Creates an AWS Async Client.
63+
*
6064
* @param configProps configuration properties
6165
* @param httpClient the underlying HTTP client used to talk to AWS
62-
* @return a new AWS Client
66+
* @return a new AWS Async Client
6367
*/
6468
public static <
6569
S extends SdkClient,
@@ -83,10 +87,15 @@ S createAwsAsyncClient(
8387
}
8488

8589
/**
90+
* Creates an AWS Async Client.
91+
*
8692
* @param configProps configuration properties
8793
* @param clientConfiguration the AWS SDK v2 config to instantiate the client
8894
* @param httpClient the underlying HTTP client used to talk to AWS
89-
* @return a new AWS Client
95+
* @param clientBuilder httpClientBuilder to build the underlying HTTP client
96+
* @param awsUserAgentPrefixFormat user agent prefix for Flink
97+
* @param awsClientUserAgentPrefix user agent prefix for kinesis client
98+
* @return a new AWS Async Client
9099
*/
91100
public static <
92101
S extends SdkClient,
@@ -101,10 +110,8 @@ S createAwsAsyncClient(
101110
final String awsUserAgentPrefixFormat,
102111
final String awsClientUserAgentPrefix) {
103112
String flinkUserAgentPrefix =
104-
Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
105-
.orElse(
106-
formatFlinkUserAgentPrefix(
107-
awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));
113+
getFlinkUserAgentPrefix(
114+
configProps, awsUserAgentPrefixFormat, awsClientUserAgentPrefix);
108115

109116
final ClientOverrideConfiguration overrideConfiguration =
110117
createClientOverrideConfiguration(
@@ -148,11 +155,49 @@ S createAwsAsyncClient(
148155
final SdkAsyncHttpClient httpClient,
149156
final ClientOverrideConfiguration overrideConfiguration) {
150157

151-
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
152-
final URI endpointOverride =
153-
URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
154-
clientBuilder.endpointOverride(endpointOverride);
155-
}
158+
updateEndpointOverride(configProps, clientBuilder);
159+
160+
return clientBuilder
161+
.httpClient(httpClient)
162+
.overrideConfiguration(overrideConfiguration)
163+
.credentialsProvider(getCredentialsProvider(configProps))
164+
.region(getRegion(configProps))
165+
.build();
166+
}
167+
168+
/**
169+
* Creates an AWS Sync Client.
170+
*
171+
* @param configProps configuration properties
172+
* @param httpClient the underlying HTTP client used to talk to AWS
173+
* @param clientBuilder httpClientBuilder to build the underlying HTTP client
174+
* @param awsUserAgentPrefixFormat user agent prefix for Flink
175+
* @param awsClientUserAgentPrefix user agent prefix for kinesis client
176+
* @return a new AWS Sync Client
177+
*/
178+
public static <
179+
S extends SdkClient,
180+
T extends
181+
AwsSyncClientBuilder<? extends T, S> & AwsClientBuilder<? extends T, S>>
182+
S createAwsSyncClient(
183+
final Properties configProps,
184+
final SdkHttpClient httpClient,
185+
final T clientBuilder,
186+
final String awsUserAgentPrefixFormat,
187+
final String awsClientUserAgentPrefix) {
188+
SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
189+
190+
String flinkUserAgentPrefix =
191+
getFlinkUserAgentPrefix(
192+
configProps, awsUserAgentPrefixFormat, awsClientUserAgentPrefix);
193+
194+
final ClientOverrideConfiguration overrideConfiguration =
195+
createClientOverrideConfiguration(
196+
clientConfiguration,
197+
ClientOverrideConfiguration.builder(),
198+
flinkUserAgentPrefix);
199+
200+
updateEndpointOverride(configProps, clientBuilder);
156201

157202
return clientBuilder
158203
.httpClient(httpClient)
@@ -161,4 +206,24 @@ S createAwsAsyncClient(
161206
.region(getRegion(configProps))
162207
.build();
163208
}
209+
210+
private static String getFlinkUserAgentPrefix(
211+
final Properties configProps,
212+
final String awsUserAgentPrefixFormat,
213+
final String awsClientUserAgentPrefix) {
214+
215+
return Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
216+
.orElse(
217+
formatFlinkUserAgentPrefix(
218+
awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));
219+
}
220+
221+
private static <S extends SdkClient, T extends AwsClientBuilder<? extends T, S>>
222+
void updateEndpointOverride(final Properties configProps, final T clientBuilder) {
223+
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
224+
final URI endpointOverride =
225+
URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
226+
clientBuilder.endpointOverride(endpointOverride);
227+
}
228+
}
164229
}

flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
3232
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
3333
import software.amazon.awssdk.http.Protocol;
34+
import software.amazon.awssdk.http.SdkHttpClient;
3435
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
36+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
3537
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
3638
import software.amazon.awssdk.http.nio.netty.Http2Configuration;
3739
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -301,6 +303,12 @@ public static SdkAsyncHttpClient createAsyncHttpClient(
301303
return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
302304
}
303305

306+
public static SdkHttpClient createSyncHttpClient(
307+
final AttributeMap config, final ApacheHttpClient.Builder httpClientBuilder) {
308+
httpClientBuilder.connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT);
309+
return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
310+
}
311+
304312
/**
305313
* Creates a {@link Region} object from the given Properties.
306314
*
Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,16 @@
3838

3939
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
4040
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
41-
import static org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix;
41+
import static org.apache.flink.connector.aws.util.AWSClientUtil.formatFlinkUserAgentPrefix;
4242
import static org.mockito.ArgumentMatchers.any;
4343
import static org.mockito.ArgumentMatchers.argThat;
4444
import static org.mockito.Mockito.mock;
4545
import static org.mockito.Mockito.never;
4646
import static org.mockito.Mockito.verify;
4747
import static org.mockito.Mockito.when;
4848

49-
/** Tests for {@link AWSAsyncSinkUtil}. */
50-
class AWSAsyncSinkUtilTest {
49+
/** Tests for {@link AWSClientUtil}. */
50+
class AWSClientUtilTest {
5151

5252
private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT =
5353
"Apache Flink %s (%s) *Destination* Connector";
@@ -62,7 +62,7 @@ void testCreateKinesisAsyncClient() {
6262
ClientOverrideConfiguration.builder().build();
6363
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
6464

65-
AWSAsyncSinkUtil.createAwsAsyncClient(
65+
AWSClientUtil.createAwsAsyncClient(
6666
properties, builder, httpClient, clientOverrideConfiguration);
6767

6868
verify(builder).overrideConfiguration(clientOverrideConfiguration);
@@ -83,7 +83,7 @@ void testCreateKinesisAsyncClientWithEndpointOverride() {
8383
ClientOverrideConfiguration.builder().build();
8484
SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
8585

86-
AWSAsyncSinkUtil.createAwsAsyncClient(
86+
AWSClientUtil.createAwsAsyncClient(
8787
properties, builder, httpClient, clientOverrideConfiguration);
8888

8989
verify(builder).endpointOverride(URI.create("https://localhost"));
@@ -95,11 +95,11 @@ void testClientOverrideConfigurationWithDefaults() {
9595

9696
ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
9797

98-
AWSAsyncSinkUtil.createClientOverrideConfiguration(
98+
AWSClientUtil.createClientOverrideConfiguration(
9999
clientConfiguration,
100100
builder,
101101
formatFlinkUserAgentPrefix(
102-
DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
102+
DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSClientUtil.V2_USER_AGENT_SUFFIX));
103103

104104
verify(builder).build();
105105
verify(builder)
@@ -120,11 +120,11 @@ void testClientOverrideConfigurationUserAgentSuffix() {
120120

121121
ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
122122

123-
AWSAsyncSinkUtil.createClientOverrideConfiguration(
123+
AWSClientUtil.createClientOverrideConfiguration(
124124
clientConfiguration,
125125
builder,
126126
formatFlinkUserAgentPrefix(
127-
DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
127+
DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSClientUtil.V2_USER_AGENT_SUFFIX));
128128

129129
verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
130130
}
@@ -138,12 +138,11 @@ void testClientOverrideConfigurationApiCallAttemptTimeout() {
138138

139139
ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
140140

141-
AWSAsyncSinkUtil.createClientOverrideConfiguration(
141+
AWSClientUtil.createClientOverrideConfiguration(
142142
clientConfiguration,
143143
builder,
144144
formatFlinkUserAgentPrefix(
145-
DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
146-
+ AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
145+
DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 + AWSClientUtil.V2_USER_AGENT_SUFFIX));
147146

148147
verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
149148
}
@@ -157,12 +156,11 @@ void testClientOverrideConfigurationApiCallTimeout() {
157156

158157
ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
159158

160-
AWSAsyncSinkUtil.createClientOverrideConfiguration(
159+
AWSClientUtil.createClientOverrideConfiguration(
161160
clientConfiguration,
162161
builder,
163162
formatFlinkUserAgentPrefix(
164-
DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
165-
+ AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
163+
DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 + AWSClientUtil.V2_USER_AGENT_SUFFIX));
166164

167165
verify(builder).apiCallTimeout(Duration.ofMillis(600));
168166
}

flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.api.connector.sink2.Sink;
22-
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
22+
import org.apache.flink.connector.aws.util.AWSClientUtil;
2323
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
2424
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
2525
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -73,7 +73,7 @@ private static SdkAsyncHttpClient createHttpClient(Properties firehoseClientProp
7373
private static FirehoseAsyncClient createFirehoseClient(
7474
Properties firehoseClientProperties, SdkAsyncHttpClient httpClient) {
7575
AWSGeneralUtil.validateAwsCredentials(firehoseClientProperties);
76-
return AWSAsyncSinkUtil.createAwsAsyncClient(
76+
return AWSClientUtil.createAwsAsyncClient(
7777
firehoseClientProperties,
7878
httpClient,
7979
FirehoseAsyncClient.builder(),

flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.connector.kinesis.sink;
1919

2020
import org.apache.flink.api.connector.sink2.Sink;
21-
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
21+
import org.apache.flink.connector.aws.util.AWSClientUtil;
2222
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
2323
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
2424
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -158,7 +158,7 @@ private KinesisAsyncClient buildClient(
158158
Properties kinesisClientProperties, SdkAsyncHttpClient httpClient) {
159159
AWSGeneralUtil.validateAwsCredentials(kinesisClientProperties);
160160

161-
return AWSAsyncSinkUtil.createAwsAsyncClient(
161+
return AWSClientUtil.createAwsAsyncClient(
162162
kinesisClientProperties,
163163
httpClient,
164164
KinesisAsyncClient.builder(),

flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@
4444
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata;
4545
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
4646
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
47+
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
4748
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
4849
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Factory;
49-
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
5050
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
5151
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
5252
import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
@@ -273,7 +273,7 @@ public interface FlinkKinesisProxyFactory {
273273

274274
/** Factory to create Kinesis proxy V@ instances used by a fetcher. */
275275
public interface FlinkKinesisProxyV2Factory {
276-
KinesisProxyV2Interface create(Properties configProps);
276+
KinesisProxyAsyncV2Interface create(Properties configProps);
277277
}
278278

279279
/**
@@ -377,7 +377,7 @@ public KinesisDataFetcher(
377377
new ArrayList<>(),
378378
createInitialSubscribedStreamsToLastDiscoveredShardsState(streams),
379379
KinesisProxy::create,
380-
KinesisProxyV2Factory::createKinesisProxyV2);
380+
KinesisProxyV2Factory::createKinesisProxyAsyncV2);
381381
}
382382

383383
@VisibleForTesting

flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
2828
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
2929
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
30-
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
30+
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2Interface;
3131
import org.apache.flink.util.Preconditions;
3232

3333
import org.slf4j.Logger;
@@ -64,7 +64,7 @@ public class FanOutRecordPublisher implements RecordPublisher {
6464

6565
private final String consumerArn;
6666

67-
private final KinesisProxyV2Interface kinesisProxy;
67+
private final KinesisProxyAsyncV2Interface kinesisProxy;
6868

6969
private final StreamShardHandle subscribedShard;
7070

@@ -89,7 +89,7 @@ public FanOutRecordPublisher(
8989
final StartingPosition startingPosition,
9090
final String consumerArn,
9191
final StreamShardHandle subscribedShard,
92-
final KinesisProxyV2Interface kinesisProxy,
92+
final KinesisProxyAsyncV2Interface kinesisProxy,
9393
final FanOutRecordPublisherConfiguration configuration,
9494
final FullJitterBackoff backoff) {
9595
this.nextStartingPosition = Preconditions.checkNotNull(startingPosition);

0 commit comments

Comments
 (0)