Skip to content

Commit f9075d9

Browse files
committed
Introduce Spring Integration for Kinesis support
* Create a `spring-cloud-aws-kinesis` module as a generic place for anything Kinesis-related. Similar to any other modules in the project * Add `KinesisMessageHandler` and respective tests for it * Add the `spring-cloud-aws-starter-integration-kinesis` for a generic Kinesis client with Spring Integration * Managed KCL/KPL dependencies as the next evolution of the `spring-cloud-aws-kinesis` as respective Spring Integration channel adapter will be contributed later * Initial `kinesis.adoc` with a `KinesisMessageHandler` explanation
1 parent da4b734 commit f9075d9

File tree

13 files changed

+1182
-1
lines changed

13 files changed

+1182
-1
lines changed

docs/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ include::core.adoc[]
140140

141141
include::dynamodb.adoc[]
142142

143+
include::kinesis.adoc[]
144+
143145
include::s3.adoc[]
144146

145147
include::ses.adoc[]
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
[#spring-cloud-aws-kinesis]
2+
== Kinesis Integration
3+
4+
The https://aws.amazon.com/kinesis/[Kinesis] is a platform for streaming data on AWS, making it easy to load and analyze streaming data and also providing the ability for you to build custom streaming data applications for specialized needs.
5+
6+
// TODO: auto-configuration
7+
8+
=== Spring Integration Support
9+
10+
Also, starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon Kinesis.
11+
12+
The `KinesisMessageHandler` is an `AbstractMessageHandler` to perform put record(s) to the Kinesis stream.
13+
The stream, partition key (or explicit hash key) and sequence number can be determined against a request message via evaluation provided expressions or can be specified statically.
14+
They also can be specified as `KinesisHeaders.STREAM`, `KinesisHeaders.PARTITION_KEY` and `KinesisHeaders.SEQUENCE_NUMBER` respectively.
15+
16+
The `KinesisMessageHandler` can be configured with the `outputChannel` for sending a `Message` on successful put operation.
17+
The payload is the original request and additional `KinesisHeaders.SHARD` and `KinesisHeaders.SEQUENCE_NUMBER` headers are populated from the `PutRecordResposne`.
18+
If the request payload is a `PutRecordsRequest`, the full `PutRecordsResponse` is populated in the `KinesisHeaders.SERVICE_RESULT` header instead.
19+
20+
When an async failure is happened on the put operation, the `ErrorMessage` is sent to the `errorChannel` header or global one.
21+
The payload is an `MessageHandlingException`.
22+
23+
The `payload` of request message can be:
24+
25+
- `PutRecordsRequest` to perform `KinesisAsyncClient.putRecords`
26+
- `PutRecordRequest` to perform `KinesisAsyncClient.putRecord`
27+
- `ByteBuffer` to represent data of the `PutRecordRequest`
28+
- `byte[]` which is wrapped to the `ByteBuffer`
29+
- any other type that is converted to the `byte[]` by the provided `Converter`; the `SerializingConverter` is used by default.
30+
31+
The Java Configuration for the message handler is:
32+
33+
[source,java]
34+
----
35+
@Bean
36+
@ServiceActivator(inputChannel = "kinesisSendChannel")
37+
public MessageHandler kinesisMessageHandler(KinesisAsyncClient amazonKinesis,
38+
MessageChannel channel) {
39+
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
40+
kinesisMessageHandler.setPartitionKey("1");
41+
kinesisMessageHandler.setOutputChannel(channel);
42+
return kinesisMessageHandler;
43+
}
44+
----
45+
46+
The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` can be configured with the `OutboundMessageMapper` to embed message headers into the record data alongside the payload.
47+
See `EmbeddedHeadersJsonMessageMapper` implementation for more information.
48+
49+
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
50+
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.

pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
<module>spring-cloud-aws-sns</module>
4545
<module>spring-cloud-aws-sqs</module>
4646
<module>spring-cloud-aws-dynamodb</module>
47+
<module>spring-cloud-aws-kinesis</module>
48+
<module>spring-cloud-aws-starters/spring-cloud-aws-starter-integration-kinesis</module>
4749
<module>spring-cloud-aws-s3</module>
4850
<module>spring-cloud-aws-testcontainers</module>
4951
<module>spring-cloud-aws-starters/spring-cloud-aws-starter</module>
@@ -60,7 +62,7 @@
6062
<module>spring-cloud-aws-test</module>
6163
<module>spring-cloud-aws-modulith</module>
6264
<module>docs</module>
63-
</modules>
65+
</modules>
6466

6567
<dependencyManagement>
6668
<dependencies>

spring-cloud-aws-dependencies/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
<properties>
2626
<spotless.version>2.31.0</spotless.version>
2727
<awssdk-v2.version>2.32.28</awssdk-v2.version>
28+
<kcl.version>3.1.2</kcl.version>
29+
<kpl.version>1.0.4</kpl.version>
2830
<amazon.dax.version>2.0.5</amazon.dax.version>
2931
<amazon.encryption.s3.version>3.3.5</amazon.encryption.s3.version>
3032
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
@@ -111,6 +113,16 @@
111113
<artifactId>spring-cloud-aws-dynamodb</artifactId>
112114
<version>${project.version}</version>
113115
</dependency>
116+
<dependency>
117+
<groupId>io.awspring.cloud</groupId>
118+
<artifactId>spring-cloud-aws-kinesis</artifactId>
119+
<version>${project.version}</version>
120+
</dependency>
121+
<dependency>
122+
<groupId>io.awspring.cloud</groupId>
123+
<artifactId>spring-cloud-aws-starter-integration-kinesis</artifactId>
124+
<version>${project.version}</version>
125+
</dependency>
114126

115127
<dependency>
116128
<groupId>io.awspring.cloud</groupId>
@@ -237,6 +249,17 @@
237249
<artifactId>jakarta.mail</artifactId>
238250
<version>${eclipse.jakarta.mail.version}</version>
239251
</dependency>
252+
<dependency>
253+
<groupId>software.amazon.kinesis</groupId>
254+
<artifactId>amazon-kinesis-client</artifactId>
255+
<version>${kcl.version}</version>
256+
</dependency>
257+
<dependency>
258+
<groupId>software.amazon.kinesis</groupId>
259+
<artifactId>amazon-kinesis-producer</artifactId>
260+
<version>${kpl.version}</version>
261+
</dependency>
262+
240263
</dependencies>
241264
</dependencyManagement>
242265

spring-cloud-aws-kinesis/pom.xml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>io.awspring.cloud</groupId>
8+
<artifactId>spring-cloud-aws</artifactId>
9+
<version>4.0.0-SNAPSHOT</version>
10+
</parent>
11+
12+
<artifactId>spring-cloud-aws-kinesis</artifactId>
13+
<name>Spring Cloud AWS Kinesis Integration</name>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>software.amazon.awssdk</groupId>
18+
<artifactId>kinesis</artifactId>
19+
<optional>true</optional>
20+
</dependency>
21+
<dependency>
22+
<groupId>software.amazon.kinesis</groupId>
23+
<artifactId>amazon-kinesis-client</artifactId>
24+
<optional>true</optional>
25+
</dependency>
26+
<dependency>
27+
<groupId>software.amazon.kinesis</groupId>
28+
<artifactId>amazon-kinesis-producer</artifactId>
29+
<optional>true</optional>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.springframework.integration</groupId>
33+
<artifactId>spring-integration-core</artifactId>
34+
<optional>true</optional>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.testcontainers</groupId>
38+
<artifactId>localstack</artifactId>
39+
<scope>test</scope>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.testcontainers</groupId>
43+
<artifactId>junit-jupiter</artifactId>
44+
<scope>test</scope>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.springframework.integration</groupId>
48+
<artifactId>spring-integration-test</artifactId>
49+
<scope>test</scope>
50+
</dependency>
51+
</dependencies>
52+
53+
54+
55+
</project>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.integration;
17+
18+
import org.springframework.core.convert.converter.Converter;
19+
import org.springframework.messaging.Message;
20+
import org.springframework.messaging.MessageHeaders;
21+
import org.springframework.messaging.converter.MessageConverter;
22+
23+
/**
24+
* A simple {@link MessageConverter} that delegates to a {@link Converter}.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 4.0
29+
*/
30+
record ConvertingFromMessageConverter(Converter<Object, ?> delegate) implements MessageConverter {
31+
32+
@Override
33+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
34+
return this.delegate.convert(message.getPayload());
35+
}
36+
37+
@Override
38+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
39+
throw new UnsupportedOperationException();
40+
}
41+
42+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.kinesis.integration;
17+
18+
/**
19+
* Constants for Kinesis message headers.
20+
*
21+
* @author Artem Bilan
22+
*
23+
* @since 4.0
24+
*/
25+
public final class KinesisHeaders {
26+
27+
/**
28+
* Kinesis headers prefix to be used by all headers added by the framework.
29+
*/
30+
public static final String PREFIX = "Kinesis_";
31+
32+
/**
33+
* The {@value STREAM} header for sending data to Kinesis.
34+
*/
35+
public static final String STREAM = PREFIX + "stream";
36+
37+
/**
38+
* The {@value RECEIVED_STREAM} header for receiving data from Kinesis.
39+
*/
40+
public static final String RECEIVED_STREAM = PREFIX + "receivedStream";
41+
42+
/**
43+
* The {@value PARTITION_KEY} header for sending data to Kinesis.
44+
*/
45+
public static final String PARTITION_KEY = PREFIX + "partitionKey";
46+
47+
/**
48+
* The {@value SEQUENCE_NUMBER} header for sending data to Kinesis.
49+
*/
50+
public static final String SEQUENCE_NUMBER = PREFIX + "sequenceNumber";
51+
52+
/**
53+
* The {@value SHARD} header to represent Kinesis shardId.
54+
*/
55+
public static final String SHARD = PREFIX + "shard";
56+
57+
/**
58+
* The {@value SERVICE_RESULT} header represents a
59+
* {@link software.amazon.awssdk.services.kinesis.model.KinesisResponse}.
60+
*/
61+
public static final String SERVICE_RESULT = PREFIX + "serviceResult";
62+
63+
/**
64+
* The {@value RECEIVED_PARTITION_KEY} header for receiving data from Kinesis.
65+
*/
66+
public static final String RECEIVED_PARTITION_KEY = PREFIX + "receivedPartitionKey";
67+
68+
/**
69+
* The {@value RECEIVED_SEQUENCE_NUMBER} header for receiving data from Kinesis.
70+
*/
71+
public static final String RECEIVED_SEQUENCE_NUMBER = PREFIX + "receivedSequenceNumber";
72+
73+
/**
74+
* The {@value CHECKPOINTER} header for checkpoint the shard sequenceNumber.
75+
*/
76+
public static final String CHECKPOINTER = PREFIX + "checkpointer";
77+
78+
/**
79+
* The {@value RAW_RECORD} header represents received Kinesis record(s).
80+
*/
81+
public static final String RAW_RECORD = PREFIX + "rawRecord";
82+
83+
private KinesisHeaders() {
84+
}
85+
86+
}

0 commit comments

Comments
 (0)