Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docs/src/main/asciidoc/sns.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,77 @@ class NotificationService {
}
----

==== SNS Batch Template

The starter automatically configures and registers a `SnsBatchTemplate` bean providing higher level abstractions for sending multiple SNS notifications in a single batch request.
Batch operations are more efficient than sending individual messages, as they reduce the number of API calls to SNS.

SNS has a limit of 10 messages per batch request. `SnsBatchTemplate` automatically splits larger collections into multiple batches and combines the results.

NOTE: Batch messaging support requires Jackson 3 (`tools.jackson.databind.json.JsonMapper`) on the classpath.

It supports the same payload types as `SnsTemplate`:

* `String` - using `org.springframework.messaging.converter.StringMessageConverter`
* `Object` - serialized to JSON using `org.springframework.messaging.converter.JacksonJsonMessageConverter` and Jackson 3's `tools.jackson.databind.json.JsonMapper`

[source,java]
----
import io.awspring.cloud.sns.core.batch.SnsBatchTemplate;
import io.awspring.cloud.sns.core.batch.BatchResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;

class NotificationService {
private final SnsBatchTemplate snsBatchTemplate;

NotificationService(SnsBatchTemplate snsBatchTemplate) {
this.snsBatchTemplate = snsBatchTemplate;
}

void sendBatchWithObjects(List<Person> people) {
List<Message<Person>> messages = people.stream()
.map(person -> MessageBuilder.withPayload(person).build())
.toList();

BatchResult result = snsBatchTemplate.sendBatch("topic-name", messages);
}
}
----

==== Batch Operations

Similar to `SnsOperations`, `SnsBatchOperations` is an interface implemented by `SnsBatchTemplate` that provides a convenient method for sending batch SNS notifications.

[source,java]
----
import io.awspring.cloud.sns.core.batch.SnsBatchOperations;
import io.awspring.cloud.sns.core.batch.BatchResult;

class NotificationService {
private final SnsBatchOperations snsBatchOperations;

NotificationService(SnsBatchOperations snsBatchOperations) {
this.snsBatchOperations = snsBatchOperations;
}

void sendBatchNotifications(List<Message<String>> messages) {
BatchResult result = snsBatchOperations.sendBatch("topic-name", messages);
}
}
----

==== Customizing Batch Behavior

The default batch messaging implementation provided by Spring Cloud AWS handles most common use cases efficiently. The `DefaultBatchExecutionStrategy` automatically splits large message collections into batches of 10 (the SNS API limit) and processes them sequentially, while the `DefaultSnsMessageConverter` handles payload serialization using Jackson 3 and maps Spring message headers to SNS message attributes. These default implementations are designed to work seamlessly with minimal configuration.

The `BatchExecutionStrategy` interface controls how batches are executed and results are aggregated. You might want to provide a custom implementation when you need parallel processing of multiple batches to improve throughput for example.

The `SnsMessageConverter` interface controls how Spring messages are converted to SNS batch request entries. Custom implementations are useful when you need custom serialization formats.



=== Sending SMS Messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@
import io.awspring.cloud.autoconfigure.core.CredentialsProviderAutoConfiguration;
import io.awspring.cloud.autoconfigure.core.RegionProviderAutoConfiguration;
import io.awspring.cloud.core.support.JacksonPresent;
import io.awspring.cloud.sns.core.CachingTopicArnResolver;
import io.awspring.cloud.sns.core.DefaultTopicArnResolver;
import io.awspring.cloud.sns.core.SnsOperations;
import io.awspring.cloud.sns.core.SnsTemplate;
import io.awspring.cloud.sns.core.TopicArnResolver;
import io.awspring.cloud.sns.core.batch.SnsBatchOperations;
import io.awspring.cloud.sns.core.batch.SnsBatchTemplate;
import io.awspring.cloud.sns.core.batch.converter.DefaultSnsMessageConverter;
import io.awspring.cloud.sns.core.batch.converter.SnsMessageConverter;
import io.awspring.cloud.sns.core.batch.executor.BatchExecutionStrategy;
import io.awspring.cloud.sns.core.batch.executor.SequentialBatchExecutionStrategy;
import io.awspring.cloud.sns.sms.SnsSmsOperations;
import io.awspring.cloud.sns.sms.SnsSmsTemplate;
import java.util.List;
Expand Down Expand Up @@ -124,6 +132,34 @@ public SnsTemplate snsTemplate(SnsClient snsClient, Optional<ObjectMapper> objec
}
}

@ConditionalOnClass(name = "tools.jackson.databind.json.JsonMapper")
@Configuration
static class SnsBatchConfiguration {
@ConditionalOnMissingBean(SnsMessageConverter.class)
@Bean
public DefaultSnsMessageConverter defaultSnsMessageConverter(Optional<JsonMapper> jsonMapper) {
JacksonJsonMessageConverter converter = new JacksonJsonMessageConverter(
jsonMapper.orElseGet(JsonMapper::new));
converter.setSerializedPayloadClass(String.class);
return new DefaultSnsMessageConverter(converter);
}

@ConditionalOnMissingBean(BatchExecutionStrategy.class)
@Bean
public SequentialBatchExecutionStrategy defaultBatchExecutionStrategy(SnsClient snsClient) {
return new SequentialBatchExecutionStrategy(snsClient);
}

@ConditionalOnMissingBean(SnsBatchOperations.class)
@Bean
public SnsBatchTemplate snsBatchTemplate(SnsMessageConverter snsMessageConverter,
BatchExecutionStrategy batchExecutionStrategy, SnsClient snsClient,
Optional<TopicArnResolver> topicArnResolver) {
return new SnsBatchTemplate(snsMessageConverter, batchExecutionStrategy, topicArnResolver
.orElseGet(() -> new CachingTopicArnResolver(new DefaultTopicArnResolver(snsClient))));
}
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(WebMvcConfigurer.class)
static class SnsWebConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@
import io.awspring.cloud.sns.core.SnsOperations;
import io.awspring.cloud.sns.core.SnsTemplate;
import io.awspring.cloud.sns.core.TopicArnResolver;
import io.awspring.cloud.sns.core.batch.SnsBatchTemplate;
import io.awspring.cloud.sns.core.batch.converter.SnsMessageConverter;
import io.awspring.cloud.sns.core.batch.executor.BatchExecutionStrategy;
import io.awspring.cloud.sns.sms.SnsSmsOperations;
import io.awspring.cloud.sns.sms.SnsSmsTemplate;

import java.net.URI;

import org.junit.jupiter.api.Test;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.AutoConfigurations;
Expand All @@ -49,15 +54,15 @@
class SnsAutoConfigurationTest {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("spring.cloud.aws.region.static:eu-west-1")
.withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SnsAutoConfiguration.class,
AwsAutoConfiguration.class));
.withPropertyValues("spring.cloud.aws.region.static:eu-west-1")
.withConfiguration(AutoConfigurations.of(RegionProviderAutoConfiguration.class,
CredentialsProviderAutoConfiguration.class, SnsAutoConfiguration.class,
AwsAutoConfiguration.class));

@Test
void snsAutoConfigurationIsDisabled() {
this.contextRunner.withPropertyValues("spring.cloud.aws.sns.enabled:false")
.run(context -> assertThat(context).doesNotHaveBean(SnsClient.class));
.run(context -> assertThat(context).doesNotHaveBean(SnsClient.class));
}

@Test
Expand Down Expand Up @@ -89,14 +94,17 @@ void withCustomEndpoint() {
@Test
void customTopicArnResolverCanBeConfigured() {
this.contextRunner.withUserConfiguration(CustomTopicArnResolverConfiguration.class)
.run(context -> assertThat(context).hasSingleBean(CustomTopicArnResolver.class));
.run(context -> assertThat(context).hasSingleBean(CustomTopicArnResolver.class));
}

@Test
void doesNotConfigureArgumentResolversWhenSpringWebNotOnTheClasspath() {
this.contextRunner.withClassLoader(new FilteredClassLoader(WebMvcConfigurer.class)).run(context -> {
assertThat(context).hasSingleBean(SnsClient.class);
assertThat(context).hasSingleBean(SnsTemplate.class);
assertThat(context).hasSingleBean(SnsBatchTemplate.class);
assertThat(context).hasSingleBean(SnsMessageConverter.class);
assertThat(context).hasSingleBean(BatchExecutionStrategy.class);
assertThat(context).hasSingleBean(SnsSmsTemplate.class);
assertThat(context).doesNotHaveBean("snsWebMvcConfigurer");
});
Expand All @@ -120,9 +128,10 @@ void bothTemplatesAndOperationsAreInjectable() {
@Test
void customChannelInterceptorCanBeConfigured() {
this.contextRunner.withUserConfiguration(CustomChannelInterceptorConfiguration.class)
.run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class));
.run(context -> assertThat(context).hasSingleBean(CustomChannelInterceptor.class));
}


@Configuration(proxyBeanMethods = false)
static class CustomTopicArnResolverConfiguration {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2013-2026 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sns.core;

import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_DEDUPLICATION_ID_HEADER;
import static io.awspring.cloud.sns.core.SnsHeaders.MESSAGE_GROUP_ID_HEADER;
import static io.awspring.cloud.sns.core.SnsHeaders.NOTIFICATION_SUBJECT_HEADER;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.NumberUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;

/**
* Util class used to convert {@link Message} headers to SNS messageAttributes.
*
* @author Agim Emruli
* @author Alain Sahli
* @author Gyozo Papp
* @author Matej Nedic
* @since 4.1.0
*/
public class SnsHeaderConverterUtil {

private static final JsonStringEncoder jsonStringEncoder = JsonStringEncoder.create();
private static final Log logger = LogFactory.getLog(SnsHeaderConverterUtil.class);

public static Map<String, MessageAttributeValue> toSnsMessageAttributes(Message<?> message) {
HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<>();
for (Map.Entry<String, Object> messageHeader : message.getHeaders().entrySet()) {
String messageHeaderName = messageHeader.getKey();
Object messageHeaderValue = messageHeader.getValue();

if (isSkipHeader(messageHeaderName)) {
continue;
}

if (MessageHeaders.CONTENT_TYPE.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName, getContentTypeMessageAttribute(messageHeaderValue));
}
else if (MessageHeaders.ID.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName, getStringMessageAttribute(messageHeaderValue.toString()));
}
else if (MessageHeaders.TIMESTAMP.equals(messageHeaderName) && messageHeaderValue != null) {
messageAttributes.put(messageHeaderName, getDetailedNumberMessageAttribute(messageHeaderValue));
}
else if (messageHeaderValue instanceof String) {
messageAttributes.put(messageHeaderName, getStringMessageAttribute((String) messageHeaderValue));
}
else if (messageHeaderValue instanceof Number) {
messageAttributes.put(messageHeaderName, getDetailedNumberMessageAttribute(messageHeaderValue));
}
else if (messageHeaderValue instanceof ByteBuffer) {
messageAttributes.put(messageHeaderName, getBinaryMessageAttribute((ByteBuffer) messageHeaderValue));
}
else if (messageHeaderValue instanceof List) {
messageAttributes.put(messageHeaderName, getStringArrayMessageAttribute((List<?>) messageHeaderValue));
}
else {
logger.warn(String.format(
"Message header with name '%s' and type '%s' cannot be sent as"
+ " message attribute because it is not supported by SNS.",
messageHeaderName, messageHeaderValue != null ? messageHeaderValue.getClass().getName() : ""));
}
}

return messageAttributes;
}

private static boolean isSkipHeader(String headerName) {
return NOTIFICATION_SUBJECT_HEADER.equals(headerName) || MESSAGE_GROUP_ID_HEADER.equals(headerName)
|| MESSAGE_DEDUPLICATION_ID_HEADER.equals(headerName);
}

private static <T> MessageAttributeValue getStringArrayMessageAttribute(List<T> messageHeaderValue) {
String stringValue = messageHeaderValue.stream().map(item -> {
StringBuilder sb = new StringBuilder();
jsonStringEncoder.quoteAsString(item.toString(), sb);
return "\"" + sb + "\"";
}).collect(Collectors.joining(", ", "[", "]"));

return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING_ARRAY).stringValue(stringValue)
.build();
}

private static MessageAttributeValue getBinaryMessageAttribute(ByteBuffer messageHeaderValue) {
return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.BINARY)
.binaryValue(SdkBytes.fromByteBuffer(messageHeaderValue)).build();
}

@Nullable
private static MessageAttributeValue getContentTypeMessageAttribute(Object messageHeaderValue) {
if (messageHeaderValue instanceof MimeType) {
return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING)
.stringValue(messageHeaderValue.toString()).build();
}
else if (messageHeaderValue instanceof String) {
return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING)
.stringValue((String) messageHeaderValue).build();
}
return null;
}

private static MessageAttributeValue getStringMessageAttribute(String messageHeaderValue) {
return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING)
.stringValue(messageHeaderValue).build();
}

private static MessageAttributeValue getDetailedNumberMessageAttribute(Object messageHeaderValue) {
Assert.isTrue(NumberUtils.STANDARD_NUMBER_TYPES.contains(messageHeaderValue.getClass()),
"Only standard number types are accepted as message header.");

return MessageAttributeValue.builder()
.dataType(MessageAttributeDataTypes.NUMBER + "." + messageHeaderValue.getClass().getName())
.stringValue(messageHeaderValue.toString()).build();
}

private static MessageAttributeValue getNumberMessageAttribute(Object messageHeaderValue) {
return MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.NUMBER)
.stringValue(messageHeaderValue.toString()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.awspring.cloud.sns.integration.LegacyJackson2SnsInboundChannelAdapter;
import io.awspring.cloud.sns.integration.SnsInboundChannelAdapter;
import org.springframework.messaging.Message;
import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;

Expand Down
Loading