Skip to content

Commit a2c18cd

Browse files
committed
Add KinesisMessageDrivenChannelAdapter and all the respective infrastructure
* Add mock tests for `KinesisMessageDrivenChannelAdapter` * Add integration tests based on the Localstack container between `KinesisMessageHandler` and `KinesisMessageDrivenChannelAdapter` * Document `KinesisMessageDrivenChannelAdapter` * Mention via link the `DynamoDbLockRegistry` and `DynamoDbMetadataStore`. Therefore, add a section id for the Spring Integration in the `dynamodb.adoc`
1 parent ccd758a commit a2c18cd

File tree

12 files changed

+3009
-1
lines changed

12 files changed

+3009
-1
lines changed

docs/src/main/asciidoc/dynamodb.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ Note that `DynamoDbClientCustomizer` beans are applied **after** `AwsSyncClientC
187187
Since it depends on how you will use DynamoDb integration providing a list of IAM policies would be pointless since least privilege model should be used.
188188
To check what IAM policies DynamoDb uses and see which ones you should use please check https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html[IAM policies]
189189

190+
[#spring-integration-support]
190191
=== Spring Integration Support
191192

192193
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] components for Amazon DynamoDB.

docs/src/main/asciidoc/kinesis.adoc

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,79 @@ public MessageHandler kinesisMessageHandler(KinesisAsyncClient amazonKinesis,
4343
}
4444
----
4545

46-
The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` can be configured with the `OutboundMessageMapper` to embed message headers into the record data alongside the payload.
46+
The `KinesisMessageDrivenChannelAdapter` is a `MessageProducerSupport` implementation to perform record consumption from the Kinesis stream(s).
47+
Each shard from the provided streams is managed by an internal state machine to ensure shard records ordering and exclusive shard access from the same consumer group.
48+
An offset of the records for the consuming shard is stored in the `ConcurrentMetadataStore` via `Checkpointer` abstraction.
49+
If the `CheckpointMode` of the `KinesisMessageDrivenChannelAdapter` is set to `manual`, then `KinesisHeaders.CHECKPOINTER` is populated to the message this channel adapter produces downstream.
50+
The check-pointed offset is used to initiate an iterator on the shard after the application restart.
51+
52+
The `KinesisMessageDrivenChannelAdapter` can be configured with a distributed `LockRegistry<?>` to managed exclusive access to the shard in the cluster of the application consuming from the Kinesis.
53+
For example, the xref:dynamodb.adoc#spring-integration-support[`DynamoDbLockRegistry`] can be used to manage distributed locks via DynamoDB.
54+
In the case of many instances of the same consuming application in cluster, a distributed implementation of the `ConcurrentMetadataStore` is recommended, too, e.g. xref:dynamodb.adoc#spring-integration-support[`DynamoDbMetadataStore`].
55+
This way, when one instance leaves the cluster, it releases its locks for shards, and another instance can obtain those locks and continue consuming from the shard according to the stored offset in the mentioned shared meta-data store.
56+
57+
See also `CheckpointMode` Javadocs and related options on the `KinesisMessageDrivenChannelAdapter` for different checkpoint handling algorithms.
58+
59+
The `KinesisShardOffset` abstraction is used to represent an initial shard iterator request for all shards in the provided streams, if there is no specific checkpoint record in the `ConcurrentMetadataStore`.
60+
Or it can be used as an argument of the overloaded `KinesisMessageDrivenChannelAdapter` to consume from the specific shards.
61+
The concurrency and checkpoint management remain the same even for an explicit shard configuration.
62+
63+
The `KinesisMessageDrivenChannelAdapter` also supports a `batch` mode to produce a message with a payload as a list of just returned by the shard iterator records.
64+
Each record data can be converted from `byte[]` via `Converter` setting.
65+
By default, a `DeserializingConverter` is used based on Java serialization specification.
66+
Which is aligned with the settings of the mentioned above `KinesisMessageHandler`.
67+
68+
The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` and `KinesisMessageDrivenChannelAdapter` can be configured with the `OutboundMessageMapper` and `InboundMessageMapper` to embed (and extract) message headers into/from the record data alongside the payload.
4769
See `EmbeddedHeadersJsonMessageMapper` implementation for more information.
4870

71+
When the shard is closed on the Kinesis service, `KinesisMessageDrivenChannelAdapter` emits a `KinesisShardEndedEvent` into the application context with the key based on the pattern `consumerGroup + ":" + stream + ":" + shardId`.
72+
Such an event can be useful in any arbitrary application logic where the end of the shard is a crucial indicator, e.g. to perform resharding on the stream.
73+
74+
The following Java Configuration demonstrates some `KinesisMessageDrivenChannelAdapter`:
75+
76+
[source,java]
77+
----
78+
@Bean
79+
public ConcurrentMetadataStore checkpointStore() {
80+
return new SimpleMetadataStore();
81+
}
82+
83+
@Bean
84+
public LockRegistry lockRegistry() {
85+
return new DefaultLockRegistry();
86+
}
87+
88+
@Bean
89+
KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
90+
var adapter = new KinesisMessageDrivenChannelAdapter(AMAZON_KINESIS_ASYNC, TEST_STREAM);
91+
adapter.setOutputChannel(kinesisReceiveChannel());
92+
adapter.setErrorChannel(errorChannel());
93+
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
94+
adapter.setCheckpointStore(checkpointStore());
95+
adapter.setLockRegistry(lockRegistry());
96+
adapter.setEmbeddedHeadersMapper(new EmbeddedHeadersJsonMessageMapper("embedded_header"));
97+
adapter.setBindSourceRecord(true);
98+
adapter.setDescribeStreamBackoff(10);
99+
adapter.setConsumerBackoff(10);
100+
adapter.setIdleBetweenPolls(1);
101+
return adapter;
102+
}
103+
104+
@Bean
105+
public PollableChannel kinesisReceiveChannel() {
106+
QueueChannel queueChannel = new QueueChannel();
107+
queueChannel.setDatatypes(Date.class);
108+
return queueChannel;
109+
}
110+
111+
@Bean
112+
public PollableChannel errorChannel() {
113+
return new QueueChannel();
114+
}
115+
----
116+
117+
118+
=== Spring Integration Starters
119+
49120
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
50121
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.integration;
17+
18+
/**
19+
* The listener mode, record or batch.
20+
*
21+
* @author Artem Bilan
22+
* @author Hervé Fortin
23+
*
24+
* @since 4.0
25+
*/
26+
public enum CheckpointMode {
27+
28+
/**
29+
* Checkpoint after each processed record. Makes sense only if {@link ListenerMode#record} is used.
30+
*/
31+
record,
32+
33+
/**
34+
* Checkpoint after each processed batch of records.
35+
*/
36+
batch,
37+
38+
/**
39+
* Checkpoint on demand via provided to the message {@link Checkpointer} callback.
40+
*/
41+
manual,
42+
43+
/**
44+
* Checkpoint at fixed time intervals.
45+
*/
46+
periodic
47+
48+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.integration;
17+
18+
/**
19+
* A callback for target record process to perform checkpoint on the related shard.
20+
*
21+
* @author Artem Bilan
22+
*
23+
* @since 4.0
24+
*/
25+
public interface Checkpointer {
26+
27+
/**
28+
* Checkpoint the currently held sequence number if it is bigger than already stored.
29+
* @return true if checkpoint performed; false otherwise.
30+
*/
31+
boolean checkpoint();
32+
33+
/**
34+
* Checkpoint the provided sequence number, if it is bigger than already stored.
35+
* @param sequenceNumber the sequence number to checkpoint.
36+
* @return true if checkpoint performed; false otherwise.
37+
*/
38+
boolean checkpoint(String sequenceNumber);
39+
40+
}

0 commit comments

Comments
 (0)