Skip to content

Commit 9b27588

Browse files
committed
Not yet done
1 parent 83994be commit 9b27588

File tree

13 files changed

+152
-62
lines changed

13 files changed

+152
-62
lines changed

pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-sqs</module>
6363
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-sqs</module>
6464
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis</module>
65+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-producer</module>
66+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-kinesis-client-library</module>
6567
<module>spring-cloud-aws-samples</module>
6668
<module>spring-cloud-aws-test</module>
6769
<module>spring-cloud-aws-modulith</module>

spring-cloud-aws-autoconfigure/pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,18 +182,18 @@
182182
<optional>true</optional>
183183
</dependency>
184184
<dependency>
185-
<groupId>software.amazon.kinesis</groupId>
186-
<artifactId>amazon-kinesis-client</artifactId>
185+
<groupId>software.amazon.awssdk</groupId>
186+
<artifactId>kinesis</artifactId>
187187
<optional>true</optional>
188188
</dependency>
189189
<dependency>
190190
<groupId>software.amazon.kinesis</groupId>
191-
<artifactId>amazon-kinesis-producer</artifactId>
191+
<artifactId>amazon-kinesis-client</artifactId>
192192
<optional>true</optional>
193193
</dependency>
194194
<dependency>
195-
<groupId>software.amazon.awssdk</groupId>
196-
<artifactId>kinesis</artifactId>
195+
<groupId>software.amazon.kinesis</groupId>
196+
<artifactId>amazon-kinesis-producer</artifactId>
197197
<optional>true</optional>
198198
</dependency>
199199
<dependency>

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisAutoConfiguration.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package io.awspring.cloud.autoconfigure.kinesis;
1717

18-
import com.amazonaws.services.kinesis.producer.KinesisProducer;
19-
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
2018
import io.awspring.cloud.autoconfigure.AwsAsyncClientCustomizer;
2119
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
2220
import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
@@ -29,10 +27,7 @@
2927
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
3028
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3129
import org.springframework.boot.context.properties.EnableConfigurationProperties;
32-
import org.springframework.boot.context.properties.PropertyMapper;
3330
import org.springframework.context.annotation.Bean;
34-
import org.springframework.context.annotation.Configuration;
35-
import software.amazon.awssdk.regions.providers.AwsRegionProvider;
3631
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
3732

3833
@AutoConfiguration
@@ -54,21 +49,4 @@ public KinesisAsyncClient kinesisAsyncClient(KinesisProperties properties,
5449
kinesisAsyncClientCustomizer.orderedStream(), awsSyncClientCustomizers.orderedStream())
5550
.build();
5651
}
57-
58-
@ConditionalOnClass(name = { "com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration",
59-
"com.amazonaws.services.kinesis.producer.KinesisProducer" })
60-
public static class KinesisProducerAutoConfiguration {
61-
}
62-
63-
// In your configs
64-
ConfigsBuilder configsBuilder = new ConfigsBuilder(
65-
streamName,
66-
applicationName,
67-
KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClient), // <-- Use your bean
68-
dynamoDbClient,
69-
cloudWatchClient,
70-
workerId,
71-
recordProcessorFactory
72-
);
73-
7452
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.awspring.cloud.autoconfigure.kinesis;
2+
3+
4+
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
5+
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
6+
import org.springframework.beans.factory.ObjectProvider;
7+
import org.springframework.boot.autoconfigure.AutoConfiguration;
8+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
9+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
10+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
11+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
12+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
13+
import org.springframework.context.annotation.Bean;
14+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
15+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
16+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
17+
import software.amazon.kinesis.coordinator.Scheduler;
18+
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
19+
20+
import java.util.UUID;
21+
22+
@AutoConfiguration
23+
@ConditionalOnClass({ KinesisAsyncClient.class, Scheduler.class })
24+
@EnableConfigurationProperties({ KinesisClientLibraryProperties.class })
25+
@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class, KinesisAutoConfiguration.class })
26+
@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.client.library.enabled", havingValue = "true", matchIfMissing = true)
27+
public class KinesisClientLibraryAutoConfiguration {
28+
29+
30+
31+
@ConditionalOnMissingBean
32+
@Bean
33+
public Scheduler scheduler(ObjectProvider<DynamoDbAsyncClient> dynamoDbClient, ObjectProvider<CloudWatchAsyncClient> cloudWatchClient,
34+
KinesisAsyncClient kinesisAsyncClient, KinesisClientLibraryProperties properties,
35+
ShardRecordProcessorFactory processorFactory) {
36+
return null;
37+
}
38+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.awspring.cloud.autoconfigure.kinesis;
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties;
4+
5+
import static io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryProperties.PREFIX;
6+
7+
8+
@ConfigurationProperties(prefix = PREFIX)
9+
public class KinesisClientLibraryProperties {
10+
11+
public static final String PREFIX = "spring.cloud.aws.kinesis.client.library";
12+
13+
private String streamName;
14+
private String applicationName;
15+
16+
public String getStreamName() {
17+
return streamName;
18+
}
19+
20+
public void setStreamName(String streamName) {
21+
this.streamName = streamName;
22+
}
23+
24+
public String getApplicationName() {
25+
return applicationName;
26+
}
27+
28+
public void setApplicationName(String applicationName) {
29+
this.applicationName = applicationName;
30+
}
31+
}

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerAutoConfiguration.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.awspring.cloud.autoconfigure.kinesis;
22

33

4-
import com.amazonaws.services.kinesis.producer.KinesisProducer;
5-
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
4+
import software.amazon.kinesis.producer.KinesisProducer;
5+
import software.amazon.kinesis.producer.KinesisProducerConfiguration;
66
import io.awspring.cloud.autoconfigure.core.AwsClientBuilderConfigurer;
77
import io.awspring.cloud.autoconfigure.core.AwsConnectionDetails;
88
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
@@ -20,20 +20,19 @@
2020
import software.amazon.awssdk.regions.providers.AwsRegionProvider;
2121

2222
@AutoConfiguration
23-
@ConditionalOnClass({ KinesisProducer.class })
23+
@ConditionalOnClass({ KinesisProducer.class, KinesisProducerConfiguration.class })
2424
@EnableConfigurationProperties({ KinesisProducerProperties.class })
2525
@AutoConfigureAfter({ CredentialsProviderAutoConfiguration.class, RegionProviderAutoConfiguration.class })
2626
@ConditionalOnProperty(value = "spring.cloud.aws.kinesis.producer.enabled", havingValue = "true", matchIfMissing = true)
2727
public class KinesisProducerAutoConfiguration {
2828

2929
@ConditionalOnMissingBean
3030
@Bean
31-
public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperties kinesisProperties,
31+
public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProducerProperties prop,
3232
AwsCredentialsProvider credentialsProvider,
3333
AwsRegionProvider awsRegionProvider, ObjectProvider<AwsConnectionDetails> connectionDetails) {
3434
PropertyMapper propertyMapper = PropertyMapper.get();
3535
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
36-
KinesisProducerProperties prop = kinesisProperties.getProducer();
3736
propertyMapper.from(prop::getAggregationEnabled).whenNonNull().to(config::setAggregationEnabled);
3837
propertyMapper.from(prop::getAggregationMaxCount).whenNonNull().to(config::setAggregationMaxCount);
3938
propertyMapper.from(prop::getAggregationMaxSize).whenNonNull().to(config::setAggregationMaxSize);
@@ -70,9 +69,9 @@ public KinesisProducerConfiguration kinesisProducerConfiguration(KinesisProperti
7069
propertyMapper.from(prop.getUserRecordTimeoutInMillis()).whenNonNull()
7170
.to(config::setUserRecordTimeoutInMillis);
7271

73-
config.setCredentialsProvider()
72+
config.setCredentialsProvider(credentialsProvider);
7473
config.setRegion(AwsClientBuilderConfigurer
75-
.resolveRegion(kinesisProperties, connectionDetails.getIfAvailable(), awsRegionProvider)
74+
.resolveRegion(prop, connectionDetails.getIfAvailable(), awsRegionProvider)
7675
.toString());
7776
connectionDetails.ifAvailable(cd -> {
7877
config.setKinesisPort(cd.getEndpoint().getPort());

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProducerProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
*/
1616
package io.awspring.cloud.autoconfigure.kinesis;
1717

18-
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
18+
1919
import io.awspring.cloud.autoconfigure.AwsClientProperties;
2020
import org.springframework.boot.context.properties.ConfigurationProperties;
21+
import software.amazon.kinesis.producer.KinesisProducerConfiguration;
2122

22-
import static io.awspring.cloud.autoconfigure.kinesis.KinesisProperties.PREFIX;
23-
23+
import static io.awspring.cloud.autoconfigure.kinesis.KinesisProducerProperties.PREFIX;
2424

2525
/**
2626
* Properties related to KinesisProducer

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/kinesis/KinesisProperties.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,4 @@ public class KinesisProperties extends AwsClientProperties {
3333
* The prefix used for AWS Kinesis configuration.
3434
*/
3535
public static final String PREFIX = "spring.cloud.aws.kinesis";
36-
37-
@Nullable
38-
private KinesisProducerProperties producer = new KinesisProducerProperties();
39-
40-
public KinesisProducerProperties getProducer() {
41-
return producer;
42-
}
43-
44-
public void setProducer(KinesisProducerProperties producer) {
45-
this.producer = producer;
46-
}
4736
}

spring-cloud-aws-autoconfigure/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreReloadAutoCo
1717
io.awspring.cloud.autoconfigure.config.parameterstore.ParameterStoreAutoConfiguration
1818
io.awspring.cloud.autoconfigure.config.s3.S3ReloadAutoConfiguration
1919
io.awspring.cloud.autoconfigure.kinesis.KinesisAutoConfiguration
20+
io.awspring.cloud.autoconfigure.kinesis.KinesisProducerAutoConfiguration
21+
io.awspring.cloud.autoconfigure.kinesis.KinesisClientLibraryAutoConfiguration

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/listener/source/AbstractPollingMessageSourceTests.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424

2525
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
2626
import io.awspring.cloud.sqs.listener.BackPressureMode;
27-
import io.awspring.cloud.sqs.listener.ContainerOptions;
28-
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
2927
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
3028
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementCallback;
3129
import io.awspring.cloud.sqs.listener.acknowledgement.AcknowledgementProcessor;
@@ -442,10 +440,9 @@ else if (currentPoll.compareAndSet(2, 3)) {
442440
void shouldRemovePollingFutureOnException() throws InterruptedException {
443441
String testName = "shouldClearPollingFuturesOnException";
444442

445-
BackPressureHandler backPressureHandler = BackPressureHandlerFactories
446-
.adaptiveThroughputBackPressureHandler()
447-
.createBackPressureHandler(SqsContainerOptions.builder()
448-
.maxDelayBetweenPolls(Duration.ofMillis(100)).backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build());
443+
BackPressureHandler backPressureHandler = BackPressureHandlerFactories.adaptiveThroughputBackPressureHandler()
444+
.createBackPressureHandler(SqsContainerOptions.builder().maxDelayBetweenPolls(Duration.ofMillis(100))
445+
.backPressureMode(BackPressureMode.ALWAYS_POLL_MAX_MESSAGES).build());
449446

450447
AbstractPollingMessageSource<Object, Message> source = new AbstractPollingMessageSource<>() {
451448
@Override

0 commit comments

Comments
 (0)