Skip to content

Commit 71ec3f8

Browse files
committed
Re-enable KinesisBinderFunctionalTests
Looks like the batch consumer for `Message<List<Message<?>>>` cannot be deduced yet. * Use plain `Consumer<Message<?>>` for the `Message<List<Message<?>>>` from the Kinesis binder * Fix typos in docs * Optimize some code flows
1 parent bc8e16e commit 71ec3f8

File tree

7 files changed

+23
-20
lines changed

7 files changed

+23
-20
lines changed

docs/src/main/asciidoc/kinesis.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,13 @@ NOTE: Unlike `KinesisMessageDrivenChannelAdapter`, the `KclMessageDrivenChannelA
181181

182182
==== DynamoDB Streams Support
183183

184-
The `KinesisMessageDrivenChannelAdapter` and `KclMessageDrivenChannelAdapter` provides support for consuming CDC events from the https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html[DynamoDB Streams].
184+
The `KinesisMessageDrivenChannelAdapter` and `KclMessageDrivenChannelAdapter` provide support for consuming CDC events from the https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html[DynamoDB Streams].
185185
The `com.amazonaws:dynamodb-streams-kinesis-adapter` dependency must be present on classpath.
186186
The KCL provides native support for the mentioned adapter, and only instance of `DynamoDbStreamsClient` has to be injected into the `KclMessageDrivenChannelAdapter` to switch consuming logic from the Kinesis stream to DynamoDB stream.
187187
For consuming via `KinesisMessageDrivenChannelAdapter`, the `SpringDynamoDBStreamsAdapterClient` has to be injected instead of regular `KinesisAsyncClient`.
188188
The `SpringDynamoDBStreamsAdapterClient` is an extension of the `AmazonDynamoDBStreamsAdapterClient` with overridden `listShards()` and `getRecords()` operations to mimic `KinesisAsyncClient` API called from the `KinesisMessageDrivenChannelAdapter` logic.
189189
Both channel adapters require a Stream ARN for DynamoDB Stream on the table.
190-
Using AWS SDK API, such a value is available as a result of the `DescribeTableResponse.table().latestStreamArn()` in the answer to `DynamoDbAsyncClient.describeTable()` request.
190+
Using AWS SDK API, such a Stream ARN value is available as a result of the `DescribeTableResponse.table().latestStreamArn()` in the answer to `DynamoDbAsyncClient.describeTable()` request.
191191

192192
==== Spring Integration Starters
193193

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/KinesisBinderHealthIndicator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
/**
2828
* @author Artem Bilan
2929
*
30-
* @since 2.0
30+
* @since 4.0
3131
*/
3232
public class KinesisBinderHealthIndicator implements HealthIndicator {
3333

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/KinesisMessageChannelBinder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ private MessageProducerSupport createKclConsumerEndpoint(ConsumerDestination des
406406
adapter.setPollingMaxRecords(kinesisConsumerProperties.getPollingMaxRecords());
407407
adapter.setPollingIdleTime(kinesisConsumerProperties.getPollingIdleTime());
408408
adapter.setGracefulShutdownTimeout(kinesisConsumerProperties.getGracefulShutdownTimeout());
409-
if (properties.getExtension().isEmbedHeaders() && !properties.isUseNativeDecoding()) {
409+
if (kinesisConsumerProperties.isEmbedHeaders() && !properties.isUseNativeDecoding()) {
410410
adapter.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
411411
}
412412

@@ -509,7 +509,7 @@ else if (shardId != null) {
509509
adapter.setStreamInitialSequence(streamInitialSequence);
510510

511511
adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
512-
if (properties.getExtension().isEmbedHeaders() && !properties.isUseNativeDecoding()) {
512+
if (kinesisConsumerProperties.isEmbedHeaders() && !properties.isUseNativeDecoding()) {
513513
adapter.setEmbeddedHeadersMapper(this.embeddedHeadersMapper);
514514
}
515515

@@ -559,8 +559,9 @@ private static String[] headersToMap(KinesisBinderConfigurationProperties config
559559
headers.add("X-B3*");
560560
headers.add("b3");
561561
}
562-
if (!ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
563-
Collections.addAll(headers, configurationProperties.getHeaders());
562+
String[] additionalHeaders = configurationProperties.getHeaders();
563+
if (!ObjectUtils.isEmpty(additionalHeaders)) {
564+
Collections.addAll(headers, additionalHeaders);
564565
}
565566
return headers.toArray(new String[0]);
566567
}

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/config/KinesisBinderConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ public DynamoDbLockRepository dynamoDbLockRepository(@Autowired(required = false
160160
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled", havingValue = "false", matchIfMissing = true)
161161
public LockRegistry<?> dynamoDBLockRegistry(
162162
@Autowired(required = false) DynamoDbLockRepository dynamoDbLockRepository) {
163+
163164
if (dynamoDbLockRepository != null) {
164165
KinesisBinderConfigurationProperties.Locks locks = this.configurationProperties.getLocks();
165166
DynamoDbLockRegistry dynamoDbLockRegistry = new DynamoDbLockRegistry(dynamoDbLockRepository);
@@ -185,8 +186,9 @@ public ConcurrentMetadataStore kinesisCheckpointStore(@Autowired(required = fals
185186
kinesisCheckpointStore.setWriteCapacity(checkpoint.getWriteCapacity());
186187
kinesisCheckpointStore.setCreateTableDelay(checkpoint.getCreateDelay());
187188
kinesisCheckpointStore.setCreateTableRetries(checkpoint.getCreateRetries());
188-
if (checkpoint.getTimeToLive() != null) {
189-
kinesisCheckpointStore.setTimeToLive(checkpoint.getTimeToLive());
189+
Integer timeToLive = checkpoint.getTimeToLive();
190+
if (timeToLive != null) {
191+
kinesisCheckpointStore.setTimeToLive(timeToLive);
190192
}
191193
return kinesisCheckpointStore;
192194
}
@@ -200,6 +202,7 @@ public ConcurrentMetadataStore kinesisCheckpointStore(@Autowired(required = fals
200202
@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled")
201203
public CloudWatchAsyncClient cloudWatch(CloudWatchProperties properties,
202204
ObjectProvider<AwsClientCustomizer<CloudWatchAsyncClientBuilder>> configurer) {
205+
203206
if (this.hasInputs) {
204207
return awsClientBuilderConfigurer.configureAsyncClient(CloudWatchAsyncClient.builder(), properties, null,
205208
Stream.of(configurer.getIfAvailable()), null).build();
@@ -223,6 +226,7 @@ public KinesisProducerConfiguration kinesisProducerConfiguration() {
223226
@ConditionalOnMissingBean
224227
public DynamoDbStreamsClient dynamoDBStreams(DynamoDbStreamsProperties properties,
225228
ObjectProvider<AwsClientCustomizer<DynamoDbStreamsClientBuilder>> configurer) {
229+
226230
if (this.hasInputs) {
227231
return awsClientBuilderConfigurer
228232
.configureAsyncClient(DynamoDbStreamsClient.builder(), properties, null, configurer.stream(), null)

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/properties/KinesisExtendedBindingProperties.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ public KinesisConsumerProperties getExtendedConsumerProperties(String channelNam
5757

5858
@Override
5959
public KinesisProducerProperties getExtendedProducerProperties(String channelName) {
60-
if (this.bindings.containsKey(channelName) && this.bindings.get(channelName).getProducer() != null) {
61-
return this.bindings.get(channelName).getProducer();
60+
KinesisBindingProperties kinesisBindingProperties = this.bindings.get(channelName);
61+
if (kinesisBindingProperties != null && kinesisBindingProperties.getProducer() != null) {
62+
return kinesisBindingProperties.getProducer();
6263
}
6364
else {
6465
return new KinesisProducerProperties();

spring-cloud-aws-kinesis-stream-binder/src/main/java/io/awspring/cloud/kinesis/stream/binder/provisioning/KinesisStreamProvisioner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@ public ProducerDestination provisionProducerDestination(String name,
7878

7979
logger.info(() -> "Using Kinesis stream for outbound: " + name);
8080

81-
KinesisProducerProperties kinesisProducerProperties = properties.getExtension();
82-
if (kinesisProducerProperties.isEmbedHeaders()) {
81+
if (properties.getExtension().isEmbedHeaders()) {
8382
properties.setHeaderMode(HeaderMode.none);
8483
}
8584

spring-cloud-aws-kinesis-stream-binder/src/test/java/io/awspring/cloud/kinesis/stream/binder/KinesisBinderFunctionalTests.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.concurrent.atomic.AtomicReference;
3131
import java.util.function.Consumer;
3232
import org.assertj.core.api.Condition;
33-
import org.junit.jupiter.api.Disabled;
3433
import org.junit.jupiter.api.Test;
3534
import org.springframework.beans.factory.annotation.Autowired;
3635
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -58,16 +57,15 @@
5857
* @since 4.0
5958
*/
6059
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
61-
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.multiplex=true",
62-
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.destination=some_other_stream,"
60+
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.multiplex = true",
61+
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.destination = some_other_stream,"
6362
+ KinesisBinderFunctionalTests.KINESIS_STREAM,
6463
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1",
6564
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch",
6665
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual",
6766
"spring.cloud.stream.kinesis.binder.headers = event.eventType",
6867
"spring.cloud.stream.kinesis.binder.autoAddShards = true" })
6968
@DirtiesContext
70-
@Disabled("Something is off with generics in Spring Cloud Stream for batch processing")
7169
public class KinesisBinderFunctionalTests implements LocalstackContainerTest {
7270

7371
static final String KINESIS_STREAM = "test_stream";
@@ -171,7 +169,7 @@ public ObjectMapper objectMapper() {
171169
}
172170

173171
@Bean
174-
public AtomicReference<Message<List<Message<?>>>> messageHolder() {
172+
public AtomicReference<Message<?>> messageHolder() {
175173
return new AtomicReference<>();
176174
}
177175

@@ -181,8 +179,8 @@ public CountDownLatch messageBarrier() {
181179
}
182180

183181
@Bean
184-
public Consumer<Message<List<Message<?>>>> eventConsumerBatchProcessingWithHeaders(
185-
AtomicReference<Message<List<Message<?>>>> messageHolder, CountDownLatch messageBarrier) {
182+
public Consumer<Message<?>> eventConsumerBatchProcessingWithHeaders(AtomicReference<Message<?>> messageHolder,
183+
CountDownLatch messageBarrier) {
186184

187185
return eventMessages -> {
188186
messageHolder.set(eventMessages);

0 commit comments

Comments
 (0)