Skip to content

Commit 383e5a9

Browse files
committed
* Fix typos in docs and Javadocs for SI SNS
* Simplify `try..catch` logic in the `SnsMessageHandler` and add missed `currentThread().interrupt()` * Add `.fifo` logic to the `SnsAsyncTopicArnResolver`
1 parent 8bb2db4 commit 383e5a9

File tree

6 files changed

+43
-34
lines changed

6 files changed

+43
-34
lines changed

docs/src/main/asciidoc/sns.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public static class MyConfiguration {
331331
332332
@Bean
333333
public HttpRequestHandler sqsMessageDrivenChannelAdapter(PollableChannel inputChannel) {
334-
SnsInboundChannelAdapter adapter = new SnsInboundChannelAdapter(amazonSns(), "/mySampleTopic");
334+
SnsInboundChannelAdapter adapter = new SnsInboundChannelAdapter(this.amazonSns, "/mySampleTopic");
335335
adapter.setRequestChannel(inputChannel);
336336
adapter.setHandleNotificationStatus(true);
337337
return adapter;
@@ -360,8 +360,8 @@ The Java Config looks like:
360360
[source,java]
361361
----
362362
@Bean
363-
public MessageHandler snsMessageHandler() {
364-
SnsMessageHandler handler = new SnsMessageHandler(amazonSns());
363+
public MessageHandler snsMessageHandler(SnsAsyncClient amazonSns) {
364+
SnsMessageHandler handler = new SnsMessageHandler(amazonSns);
365365
handler.setTopicArn("arn:aws:sns:eu-west:123456789012:test");
366366
String bodyExpression = "T(SnsBodyBuilder).withDefault(payload).forProtocols(payload.substring(0, 140), 'sms')";
367367
handler.setBodyExpression(spelExpressionParser.parseExpression(bodyExpression));

spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/core/SnsAsyncTopicArnResolver.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package io.awspring.cloud.sns.core;
1717

18+
import java.util.Map;
1819
import org.springframework.util.Assert;
1920
import software.amazon.awssdk.arns.Arn;
2021
import software.amazon.awssdk.services.sns.SnsAsyncClient;
22+
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
2123

2224
/**
2325
* A {@link TopicArnResolver} implementation to determine topic ARN by name against an {@link SnsAsyncClient}.
@@ -27,6 +29,7 @@
2729
* @since 4.0
2830
*/
2931
public class SnsAsyncTopicArnResolver implements TopicArnResolver {
32+
3033
private final SnsAsyncClient snsClient;
3134

3235
public SnsAsyncTopicArnResolver(SnsAsyncClient snsClient) {
@@ -36,7 +39,7 @@ public SnsAsyncTopicArnResolver(SnsAsyncClient snsClient) {
3639

3740
/**
3841
* Resolve topic ARN by topic name. If topicName is already an ARN, it returns {@link Arn}. If topicName is just a
39-
* string with a topic name, it attempts to create a topic or if the topic already exists, just returns its ARN.
42+
* string with a topic name, it attempts to create a topic, or if the topic already exists, just returns its ARN.
4043
*/
4144
@Override
4245
public Arn resolveTopicArn(String topicName) {
@@ -45,8 +48,15 @@ public Arn resolveTopicArn(String topicName) {
4548
return Arn.fromString(topicName);
4649
}
4750
else {
51+
CreateTopicRequest.Builder builder = CreateTopicRequest.builder().name(topicName);
52+
53+
// fix for https://github.com/awspring/spring-cloud-aws/issues/707
54+
if (topicName.endsWith(".fifo")) {
55+
builder.attributes(Map.of("FifoTopic", "true"));
56+
}
57+
4858
// if the topic exists, createTopic returns a successful response with the topic arn
49-
return Arn.fromString(this.snsClient.createTopic(request -> request.name(topicName)).join().topicArn());
59+
return Arn.fromString(this.snsClient.createTopic(builder.build()).join().topicArn());
5060
}
5161
}
5262

spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/integration/SnsHeaderMapper.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
*
4141
* @author Artem Bilan
4242
*
43-
* @since 2.0
43+
* @since 4.0
4444
*/
4545
public class SnsHeaderMapper implements HeaderMapper<Map<String, MessageAttributeValue>> {
4646

@@ -55,7 +55,7 @@ public class SnsHeaderMapper implements HeaderMapper<Map<String, MessageAttribut
5555
* Spring Integration Message's headers. The values can also contain simple wildcard patterns (e.g. "foo*" or
5656
* "*foo") to be matched. Also supports negated ('!') patterns. First match wins (positive or negative). To match
5757
* the names starting with {@code !} symbol, you have to escape it prepending with the {@code \} symbol in the
58-
* pattern definition. Defaults to map all ({@code *}) if the type is supported by SQS. The
58+
* pattern definition. Defaults to map all ({@code *}) if the type is supported by SNS. The
5959
* {@link MessageHeaders#ID}, {@link MessageHeaders#TIMESTAMP}, {@link NativeMessageHeaderAccessor#NATIVE_HEADERS},
6060
* {@link SnsHeaders#MESSAGE_ID_HEADER and {@link SnsHeaders#TOPIC_HEADER} are ignored by default.
6161
* @param outboundHeaderNames The inbound header names.
@@ -101,22 +101,23 @@ else if (messageHeaderValue instanceof byte[] bytes) {
101101
}
102102
}
103103

104-
private MessageAttributeValue getBinaryMessageAttribute(ByteBuffer messageHeaderValue) {
104+
private static MessageAttributeValue getBinaryMessageAttribute(ByteBuffer messageHeaderValue) {
105105
return buildMessageAttribute("Binary", messageHeaderValue);
106106
}
107107

108-
private MessageAttributeValue getStringMessageAttribute(String messageHeaderValue) {
108+
private static MessageAttributeValue getStringMessageAttribute(String messageHeaderValue) {
109109
return buildMessageAttribute("String", messageHeaderValue);
110110
}
111111

112-
private MessageAttributeValue getNumberMessageAttribute(Object messageHeaderValue) {
113-
Assert.isTrue(NumberUtils.STANDARD_NUMBER_TYPES.contains(messageHeaderValue.getClass()),
112+
private static MessageAttributeValue getNumberMessageAttribute(Object messageHeaderValue) {
113+
Class<?> messageHeaderValueClass = messageHeaderValue.getClass();
114+
Assert.isTrue(NumberUtils.STANDARD_NUMBER_TYPES.contains(messageHeaderValueClass),
114115
"Only standard number types are accepted as message header.");
115116

116-
return buildMessageAttribute("Number." + messageHeaderValue.getClass().getName(), messageHeaderValue);
117+
return buildMessageAttribute("Number." + messageHeaderValueClass.getName(), messageHeaderValue);
117118
}
118119

119-
private MessageAttributeValue buildMessageAttribute(String dataType, Object value) {
120+
private static MessageAttributeValue buildMessageAttribute(String dataType, Object value) {
120121
MessageAttributeValue.Builder messageAttributeValue = MessageAttributeValue.builder().dataType(dataType);
121122
if (value instanceof ByteBuffer byteBuffer) {
122123
messageAttributeValue.binaryValue(SdkBytes.fromByteBuffer(byteBuffer));

spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/integration/SnsInboundChannelAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public SnsInboundChannelAdapter(SnsClient amazonSns, String... path) {
9494
}
9595

9696
/**
97-
* The flag indicating if the adapter should send {@code SubscriptionConfirmation/UnsubscribeConfirmation}
98-
* message to the `output-channel` or not.
97+
* The flag indicating if the adapter should send {@code SubscriptionConfirmation/UnsubscribeConfirmation} message
98+
* to the `output-channel` or not.
9999
* @param handleNotificationStatus the flag to set. Default is {@code false}.
100100
*/
101101
public void setHandleNotificationStatus(boolean handleNotificationStatus) {

spring-cloud-aws-sns/src/main/java/io/awspring/cloud/sns/integration/SnsMessageHandler.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@
5757
* from there and the {@code messageStructure} of the {@link PublishRequest} is set to {@code json}. For the convenience
5858
* the package {@code org.springframework.integration.aws.support} is imported to the {@link #evaluationContext} to
5959
* allow bypassing it for the {@link SnsBodyBuilder} from the {@link #bodyExpression} definition. For example:
60-
*
60+
*
6161
* <pre class="code">
6262
* {@code
6363
* String bodyExpression = "SnsBodyBuilder.withDefault(payload).forProtocols(payload.substring(0, 140), 'sms')";
6464
* snsMessageHandler.setBodyExpression(spelExpressionParser.parseExpression(bodyExpression));
6565
* }
6666
* </pre>
67-
*
67+
*
6868
* </li>
6969
* <li>Otherwise the {@code payload} (or the {@link #bodyExpression} evaluation result) is converted to the
7070
* {@link String} using {@link #getConversionService()}.</li>
@@ -245,24 +245,23 @@ protected void handleMessageInternal(Message<?> message) {
245245
}
246246

247247
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
248-
if (sendTimeout == null || sendTimeout < 0) {
249-
try {
248+
try {
249+
if (sendTimeout == null || sendTimeout < 0) {
250250
resultFuture.get();
251251
}
252-
catch (InterruptedException | ExecutionException ex) {
253-
throw new IllegalStateException(ex);
254-
}
255-
}
256-
else {
257-
try {
252+
else {
258253
resultFuture.get(sendTimeout, TimeUnit.MILLISECONDS);
259254
}
260-
catch (TimeoutException te) {
261-
throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te);
262-
}
263-
catch (InterruptedException | ExecutionException ex) {
264-
throw new IllegalStateException(ex);
265-
}
255+
}
256+
catch (TimeoutException te) {
257+
throw new MessageTimeoutException(message, "Timeout waiting for response from AmazonKinesis", te);
258+
}
259+
catch (InterruptedException ex) {
260+
Thread.currentThread().interrupt();
261+
throw new IllegalStateException(ex);
262+
}
263+
catch (ExecutionException ex) {
264+
throw new IllegalStateException(ex);
266265
}
267266
}
268267

spring-cloud-aws-sns/src/test/java/io/awspring/cloud/sns/integration/SnsMessageHandlerTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.awspring.cloud.sns.core.SnsHeaders;
2525
import java.util.Map;
2626
import java.util.concurrent.CompletableFuture;
27-
import java.util.function.Consumer;
2827
import org.junit.jupiter.api.Test;
2928
import org.mockito.ArgumentCaptor;
3029
import org.springframework.beans.factory.annotation.Autowired;
@@ -43,6 +42,7 @@
4342
import org.springframework.test.annotation.DirtiesContext;
4443
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4544
import software.amazon.awssdk.services.sns.SnsAsyncClient;
45+
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
4646
import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
4747
import software.amazon.awssdk.services.sns.model.MessageAttributeValue;
4848
import software.amazon.awssdk.services.sns.model.PublishRequest;
@@ -110,13 +110,12 @@ void snsMessageHandler() {
110110
public static class ContextConfiguration {
111111

112112
@Bean
113-
@SuppressWarnings("unchecked")
114113
public SnsAsyncClient amazonSNS() {
115114
SnsAsyncClient mock = mock();
116115

117116
willAnswer(invocation -> CompletableFuture.completedFuture(
118117
CreateTopicResponse.builder().topicArn("arn:aws:sns:eu-west-1:111111111111:topic.fifo").build()))
119-
.given(mock).createTopic(any(Consumer.class));
118+
.given(mock).createTopic(any(CreateTopicRequest.class));
120119

121120
willAnswer(
122121
invocation -> CompletableFuture.completedFuture(PublishResponse.builder().messageId("111").build()))

0 commit comments

Comments
 (0)