Skip to content

Commit e08b738

Browse files
committed
Merge branch 'main' into 1457-kinesis-autoconfiguration
# Conflicts: # pom.xml
2 parents 3ad0c0e + aaedf69 commit e08b738

File tree

68 files changed

+8122
-75
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+8122
-75
lines changed

docs/src/main/asciidoc/dynamodb.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ Note that `DynamoDbClientCustomizer` beans are applied **after** `AwsSyncClientC
187187
Since it depends on how you will use DynamoDb integration providing a list of IAM policies would be pointless since least privilege model should be used.
188188
To check what IAM policies DynamoDb uses and see which ones you should use please check https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html[IAM policies]
189189

190+
[#spring-integration-support]
190191
=== Spring Integration Support
191192

192193
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] components for Amazon DynamoDB.

docs/src/main/asciidoc/index.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ include::core.adoc[]
140140

141141
include::dynamodb.adoc[]
142142

143+
include::kinesis.adoc[]
144+
143145
include::s3.adoc[]
144146

145147
include::ses.adoc[]
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
[#spring-cloud-aws-kinesis]
2+
== Kinesis Integration
3+
4+
The https://aws.amazon.com/kinesis/[Kinesis] is a platform for streaming data on AWS, making it easy to load and analyze streaming data and also providing the ability for you to build custom streaming data applications for specialized needs.
5+
6+
// TODO: auto-configuration
7+
8+
=== Spring Integration Support
9+
10+
Also, starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon Kinesis.
11+
12+
The `KinesisMessageHandler` is an `AbstractMessageHandler` to perform put record(s) to the Kinesis stream.
13+
The stream, partition key (or explicit hash key) and sequence number can be determined against a request message via evaluation provided expressions or can be specified statically.
14+
They also can be specified as `KinesisHeaders.STREAM`, `KinesisHeaders.PARTITION_KEY` and `KinesisHeaders.SEQUENCE_NUMBER` respectively.
15+
16+
The `KinesisMessageHandler` can be configured with the `outputChannel` for sending a `Message` on successful put operation.
17+
The payload is the original request and additional `KinesisHeaders.SHARD` and `KinesisHeaders.SEQUENCE_NUMBER` headers are populated from the `PutRecordResposne`.
18+
If the request payload is a `PutRecordsRequest`, the full `PutRecordsResponse` is populated in the `KinesisHeaders.SERVICE_RESULT` header instead.
19+
20+
When an async failure is happened on the put operation, the `ErrorMessage` is sent to the `errorChannel` header or global one.
21+
The payload is an `MessageHandlingException`.
22+
23+
The `payload` of request message can be:
24+
25+
- `PutRecordsRequest` to perform `KinesisAsyncClient.putRecords`
26+
- `PutRecordRequest` to perform `KinesisAsyncClient.putRecord`
27+
- `ByteBuffer` to represent data of the `PutRecordRequest`
28+
- `byte[]` which is wrapped to the `ByteBuffer`
29+
- any other type that is converted to the `byte[]` by the provided `Converter`; the `SerializingConverter` is used by default.
30+
31+
The Java Configuration for the message handler is:
32+
33+
[source,java]
34+
----
35+
@Bean
36+
@ServiceActivator(inputChannel = "kinesisSendChannel")
37+
public MessageHandler kinesisMessageHandler(KinesisAsyncClient amazonKinesis,
38+
MessageChannel channel) {
39+
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
40+
kinesisMessageHandler.setPartitionKey("1");
41+
kinesisMessageHandler.setOutputChannel(channel);
42+
return kinesisMessageHandler;
43+
}
44+
----
45+
46+
The `KinesisMessageDrivenChannelAdapter` is a `MessageProducerSupport` implementation to perform record consumption from the Kinesis stream(s).
47+
Each shard from the provided streams is managed by an internal state machine to ensure shard records ordering and exclusive shard access from the same consumer group.
48+
An offset of the records for the consuming shard is stored in the `ConcurrentMetadataStore` via `Checkpointer` abstraction.
49+
If the `CheckpointMode` of the `KinesisMessageDrivenChannelAdapter` is set to `manual`, then `KinesisHeaders.CHECKPOINTER` is populated to the message this channel adapter produces downstream.
50+
The check-pointed offset is used to initiate an iterator on the shard after the application restart.
51+
52+
The `KinesisMessageDrivenChannelAdapter` can be configured with a distributed `LockRegistry<?>` to managed exclusive access to the shard in the cluster of the application consuming from the Kinesis.
53+
For example, the xref:dynamodb.adoc#spring-integration-support[`DynamoDbLockRegistry`] can be used to manage distributed locks via DynamoDB.
54+
In the case of many instances of the same consuming application in cluster, a distributed implementation of the `ConcurrentMetadataStore` is recommended, too, e.g. xref:dynamodb.adoc#spring-integration-support[`DynamoDbMetadataStore`].
55+
This way, when one instance leaves the cluster, it releases its locks for shards, and another instance can obtain those locks and continue consuming from the shard according to the stored offset in the mentioned shared meta-data store.
56+
57+
See also `CheckpointMode` Javadocs and related options on the `KinesisMessageDrivenChannelAdapter` for different checkpoint handling algorithms.
58+
59+
The `KinesisShardOffset` abstraction is used to represent an initial shard iterator request for all shards in the provided streams, if there is no specific checkpoint record in the `ConcurrentMetadataStore`.
60+
Or it can be used as an argument of the overloaded `KinesisMessageDrivenChannelAdapter` to consume from the specific shards.
61+
The concurrency and checkpoint management remain the same even for an explicit shard configuration.
62+
63+
The `KinesisMessageDrivenChannelAdapter` also supports a `batch` mode to produce a message with a payload as a list of just returned by the shard iterator records.
64+
Each record data can be converted from `byte[]` via `Converter` setting.
65+
By default, a `DeserializingConverter` is used based on Java serialization specification.
66+
Which is aligned with the settings of the mentioned above `KinesisMessageHandler`.
67+
68+
The Kinesis service does not provide a "headers"(attributes) abstraction, so the `KinesisMessageHandler` and `KinesisMessageDrivenChannelAdapter` can be configured with the `OutboundMessageMapper` and `InboundMessageMapper` to embed (and extract) message headers into/from the record data alongside the payload.
69+
See `EmbeddedHeadersJsonMessageMapper` implementation for more information.
70+
71+
When the shard is closed on the Kinesis service, `KinesisMessageDrivenChannelAdapter` emits a `KinesisShardEndedEvent` into the application context with the key based on the pattern `consumerGroup + ":" + stream + ":" + shardId`.
72+
Such an event can be useful in any arbitrary application logic where the end of the shard is a crucial indicator, e.g. to perform resharding on the stream.
73+
74+
The following Java Configuration demonstrates some `KinesisMessageDrivenChannelAdapter`:
75+
76+
[source,java]
77+
----
78+
@Bean
79+
public ConcurrentMetadataStore checkpointStore() {
80+
return new SimpleMetadataStore();
81+
}
82+
83+
@Bean
84+
public LockRegistry lockRegistry() {
85+
return new DefaultLockRegistry();
86+
}
87+
88+
@Bean
89+
KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
90+
var adapter = new KinesisMessageDrivenChannelAdapter(AMAZON_KINESIS_ASYNC, TEST_STREAM);
91+
adapter.setOutputChannel(kinesisReceiveChannel());
92+
adapter.setErrorChannel(errorChannel());
93+
adapter.setErrorMessageStrategy(new KinesisMessageHeaderErrorMessageStrategy());
94+
adapter.setCheckpointStore(checkpointStore());
95+
adapter.setLockRegistry(lockRegistry());
96+
adapter.setEmbeddedHeadersMapper(new EmbeddedHeadersJsonMessageMapper("embedded_header"));
97+
adapter.setBindSourceRecord(true);
98+
adapter.setDescribeStreamBackoff(10);
99+
adapter.setConsumerBackoff(10);
100+
adapter.setIdleBetweenPolls(1);
101+
return adapter;
102+
}
103+
104+
@Bean
105+
public PollableChannel kinesisReceiveChannel() {
106+
QueueChannel queueChannel = new QueueChannel();
107+
queueChannel.setDatatypes(Date.class);
108+
return queueChannel;
109+
}
110+
111+
@Bean
112+
public PollableChannel errorChannel() {
113+
return new QueueChannel();
114+
}
115+
----
116+
117+
118+
=== Spring Integration Starters
119+
120+
The Spring Integration dependency in the `spring-cloud-aws-kinesis` module is `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
121+
For convenience, a dedicated `spring-cloud-aws-starter-integration-kinesis` is provided managing all the required dependencies for Spring Integration support with a classical Amazon Kinesis client.

docs/src/main/asciidoc/s3.adoc

Lines changed: 185 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -604,17 +604,51 @@ The Spring Boot Starter for S3 provides the following configuration options:
604604
| `spring.cloud.aws.s3.config.reload.max-wait-time-for-restart` | `Duration`| `2s` | The maximum time between the detection of changes in property source and the application context restart when `restart_context` strategy is used.
605605
|===
606606

607+
=== S3 Vector Client support
608+
609+
https://aws.amazon.com/blogs/aws/introducing-amazon-s3-vectors-first-cloud-storage-with-native-vector-support-at-scale/[S3 Vector Store] is a vector storage which supports uploading, storing and querying vectors.
610+
To allow users simpler use of `S3VectorsClient` with https://github.com/spring-projects/spring-ai[Spring AI] project or use of a plain client, we support autoconfiguration of the `S3VectorsClient`.
611+
612+
To enable autoconfiguration you should add the following dependencies
613+
[source,xml]
614+
----
615+
<dependency>
616+
<groupId>io.awspring.cloud</groupId>
617+
<artifactId>spring-cloud-aws-starter</artifactId>
618+
</dependency>
619+
620+
<dependency>
621+
<groupId>software.amazon.awssdk</groupId>
622+
<artifactId>s3vectors</artifactId>
623+
</dependency>
624+
----
625+
626+
After dependencies are introduced Spring Cloud AWS will automatically create a `S3VectorsClient` bean which can than be autowired and used.
627+
628+
[cols="2,3,1,1"]
629+
|===
630+
| Name | Description | Required | Default value
631+
| `spring.cloud.aws.s3.vector.config.enabled` | Enables the S3 config import integration. | No | `true`
632+
| `spring.cloud.aws.s3.config.reload.strategy` | `Enum` | `refresh` | The strategy to use when firing a reload (`refresh`, `restart_context`)
633+
| `spring.cloud.aws.s3.config.reload.period` | `Duration`| `15s` | The period for verifying changes
634+
| `spring.cloud.aws.s3.config.reload.max-wait-time-for-restart` | `Duration`| `2s` | The maximum time between the detection of changes in property source and the application context restart when `restart_context` strategy is used.
635+
|===
607636

608637
=== IAM Permissions
609638

610639
Following IAM permissions are required by Spring Cloud AWS:
611640

612-
[cols="2,1"]
613-
|===
614-
| Downloading files | `s3:GetObject`
615-
| Searching files | `s3:ListObjects`
616-
| Uploading files | `s3:PutObject`
641+
[cols="2,3,1,1"]
617642
|===
643+
| Name | Description | Required | Default value
644+
| `spring.cloud.aws.s3.vector.enabled` | Enables the S3VectorsClient autoconfiguration. | No | `true`
645+
| `spring.cloud.aws.s3.vector.endpoint` | Configures endpoint used by `S3VectorsClient`. | No | `http://localhost:4566`
646+
| `spring.cloud.aws.s3.vector.region` | Configures region used by `S3VectorsClient`. | No | `eu-west-1`
647+
648+
649+
650+
651+
=== Example of IAM policy for Spring Cloud AWS demo bucket
618652

619653
Sample IAM policy granting access to `spring-cloud-aws-demo` bucket:
620654

@@ -641,3 +675,149 @@ Sample IAM policy granting access to `spring-cloud-aws-demo` bucket:
641675
]
642676
}
643677
----
678+
679+
=== Spring Integration Support
680+
681+
Starting with version 4.0, Spring Cloud AWS provides https://spring.io/projects/spring-integration[Spring Integration] channel adapters for Amazon SQS.
682+
683+
The S3 Channel Adapters are based on the `S3Client` template and `S3TransferManager`.
684+
See their specification and Javadocs for more information.
685+
686+
The S3 Inbound Channel Adapter is represented by the `S3InboundFileSynchronizingMessageSource` and allows pulling S3 objects as files from the S3 bucket to the local directory for synchronization.
687+
This adapter is fully similar to the Inbound Channel Adapters in the FTP and SFTP Spring Integration modules.
688+
See more information in the https://docs.spring.io/spring-integration/reference/ftp.html[FTP/FTPS Adapters Chapter] for common options or `SessionFactory`, `RemoteFileTemplate` and `FileListFilter` abstractions.
689+
690+
The Java Configuration is:
691+
692+
[source,java]
693+
----
694+
@SpringBootApplication
695+
public static class MyConfiguration {
696+
697+
@Autowired
698+
private S3Client amazonS3;
699+
700+
@Bean
701+
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
702+
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.amazonS3);
703+
synchronizer.setDeleteRemoteFiles(true);
704+
synchronizer.setPreserveTimestamp(true);
705+
synchronizer.setRemoteDirectory(S3_BUCKET);
706+
synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.test$"));
707+
Expression expression = PARSER.parseExpression("#this.toUpperCase() + '.a'");
708+
synchronizer.setLocalFilenameGeneratorExpression(expression);
709+
return synchronizer;
710+
}
711+
712+
@Bean
713+
@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "100"))
714+
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
715+
S3InboundFileSynchronizingMessageSource messageSource =
716+
new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
717+
messageSource.setAutoCreateLocalDirectory(true);
718+
messageSource.setLocalDirectory(LOCAL_FOLDER);
719+
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
720+
return messageSource;
721+
}
722+
723+
@Bean
724+
public PollableChannel s3FilesChannel() {
725+
return new QueueChannel();
726+
}
727+
}
728+
----
729+
730+
With this config you receive messages with `java.io.File` `payload` from the `s3FilesChannel` after periodic synchronization of content from the Amazon S3 bucket into the local directory.
731+
732+
The `S3StreamingMessageSource` adapter produces messages with payloads of type `InputStream`, allowing S3 objects to be fetched without writing to the local file system.
733+
Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed.
734+
The session is provided in the closeableResource header (`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`).
735+
Standard framework components, such as the `FileSplitter` and `StreamTransformer` will automatically close the session.
736+
737+
The following Spring Boot application provides an example of configuring the S3 inbound streaming adapter using Java configuration:
738+
739+
[source,java]
740+
----
741+
@SpringBootApplication
742+
public class S3JavaApplication {
743+
744+
public static void main(String[] args) {
745+
new SpringApplicationBuilder(S3JavaApplication.class)
746+
.web(false)
747+
.run(args);
748+
}
749+
750+
@Autowired
751+
private S3Client amazonS3;
752+
753+
@Bean
754+
@InboundChannelAdapter(value = "s3Channel", poller = @Poller(fixedDelay = "100"))
755+
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
756+
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
757+
messageSource.setRemoteDirectory(S3_BUCKET);
758+
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
759+
"streaming"));
760+
return messageSource;
761+
}
762+
763+
@Bean
764+
@Transformer(inputChannel = "s3Channel", outputChannel = "data")
765+
public org.springframework.integration.transformer.Transformer transformer() {
766+
return new StreamTransformer();
767+
}
768+
769+
@Bean
770+
public S3RemoteFileTemplate template() {
771+
return new S3RemoteFileTemplate(new S3SessionFactory(this.amazonS3));
772+
}
773+
774+
@Bean
775+
public PollableChannel s3Channel() {
776+
return new QueueChannel();
777+
}
778+
}
779+
----
780+
781+
> NOTE: Unlike the non-streaming inbound channel adapter, this adapter does not prevent duplicates by default.
782+
> If you do not delete the remote file and wish to prevent the file being processed again, you can configure an `S3PersistentFileListFilter` in the `filter` attribute.
783+
> If you don’t want to persist the state, an in-memory `SimpleMetadataStore` can be used with the filter.
784+
> If you wish to use a filename pattern (or regex) as well, use a `CompositeFileListFilter`.
785+
786+
The `S3MessageHandler` is an Outbound Channel Adapter and allows performing `upload`, `download` and `copy` (see `S3MessageHandler.Command` enum) operations in the provided S3 bucket.
787+
788+
The Java Configuration is:
789+
790+
[source,java]
791+
----
792+
@SpringBootApplication
793+
public static class MyConfiguration {
794+
795+
@Autowired
796+
private S3AsyncClient amazonS3;
797+
798+
@Bean
799+
@ServiceActivator(inputChannel = "s3UploadChannel")
800+
public MessageHandler s3MessageHandler() {
801+
return new S3MessageHandler(this.amazonS3, "my-bucket");
802+
}
803+
804+
}
805+
----
806+
807+
With this config you can send a message with the `java.io.File` as `payload` and the `transferManager.upload()` operation will be performed, where the file name is used as a S3 Object key.
808+
809+
See more information in the `S3MessageHandler` JavaDocs.
810+
811+
NOTE: The AWS SDK recommends to use `S3CrtAsyncClient` for `S3TransferManager`, therefore an `S3AsyncClient.crtBuilder()` has to be used to achieve respective upload and download requirements, what is done internally in the `S3MessageHandler` when `S3CrtAsyncClient`-based constructor is used.
812+
813+
The `S3MessageHandler` can be used as an Outbound Gateway with the `produceReply = true` constructor argument for Java Configuration.
814+
815+
The "request-reply" nature of this gateway is async and the `Transfer` result from the `TransferManager` operation is sent to the `outputChannel`, assuming the transfer progress observation in the downstream flow.
816+
817+
The `TransferListener` can be supplied to the `S3MessageHandler` to track the transfer progress per requests.
818+
819+
See more information in the `S3MessageHandler` Javadocs.
820+
821+
The Spring Integration dependency (as well as `s3-transfer-manager` and `aws-crt-client`) in the `spring-cloud-aws-s3` module are `optional` to avoid unnecessary artifacts on classpath when Spring Integration is not used.
822+
For convenience, a dedicated `spring-cloud-aws-starter-integration-s3` is provided managing all the required dependencies for Spring Integration support with Amazon S3.
823+

0 commit comments

Comments
 (0)