44package com .example .sns ;
55
66// snippet-start:[sns.java2.PriceUpdateExample.import]
7+
78import software .amazon .awssdk .policybuilder .iam .IamConditionOperator ;
89import software .amazon .awssdk .policybuilder .iam .IamEffect ;
910import software .amazon .awssdk .policybuilder .iam .IamPolicy ;
@@ -39,14 +40,14 @@ public class PriceUpdateExample {
3940 public static void main (String [] args ) {
4041
4142 final String usage = "\n " +
42- "Usage: " +
43- " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n \n " +
44- "Where:\n " +
45- " fifoTopicName - The name of the FIFO topic that you want to create. \n \n " +
46- " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n \n "
47- +
48- " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n \n " +
49- " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n \n " ;
43+ "Usage: " +
44+ " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n \n " +
45+ "Where:\n " +
46+ " fifoTopicName - The name of the FIFO topic that you want to create. \n \n " +
47+ " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n \n "
48+ +
49+ " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n \n " +
50+ " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n \n " ;
5051 if (args .length != 4 ) {
5152 System .out .println (usage );
5253 System .exit (1 );
@@ -60,9 +61,9 @@ public static void main(String[] args) {
6061 // For convenience, the QueueData class holds metadata about a queue: ARN, URL,
6162 // name and type.
6263 List <QueueData > queues = List .of (
63- new QueueData (wholeSaleQueueName , QueueType .FIFO ),
64- new QueueData (retailQueueName , QueueType .FIFO ),
65- new QueueData (analyticsQueueName , QueueType .Standard ));
64+ new QueueData (wholeSaleQueueName , QueueType .FIFO ),
65+ new QueueData (retailQueueName , QueueType .FIFO ),
66+ new QueueData (analyticsQueueName , QueueType .Standard ));
6667
6768 // Create queues.
6869 createQueues (queues );
@@ -90,13 +91,14 @@ public static String createFIFOTopic(String topicName) {
9091 try {
9192 // Create a FIFO topic by using the SNS service client.
9293 Map <String , String > topicAttributes = Map .of (
93- "FifoTopic" , "true" ,
94- "ContentBasedDeduplication" , "false" );
94+ "FifoTopic" , "true" ,
95+ "ContentBasedDeduplication" , "false" ,
96+ "FifoThroughputScope" , "MessageGroup" );
9597
9698 CreateTopicRequest topicRequest = CreateTopicRequest .builder ()
97- .name (topicName )
98- .attributes (topicAttributes )
99- .build ();
99+ .name (topicName )
100+ .attributes (topicAttributes )
101+ .build ();
100102
101103 CreateTopicResponse response = snsClient .createTopic (topicRequest );
102104 String topicArn = response .topicArn ();
@@ -114,10 +116,10 @@ public static String createFIFOTopic(String topicName) {
114116 public static void subscribeQueues (List <QueueData > queues , String topicARN ) {
115117 queues .forEach (queue -> {
116118 SubscribeRequest subscribeRequest = SubscribeRequest .builder ()
117- .topicArn (topicARN )
118- .endpoint (queue .queueARN )
119- .protocol ("sqs" )
120- .build ();
119+ .topicArn (topicARN )
120+ .endpoint (queue .queueARN )
121+ .protocol ("sqs" )
122+ .build ();
121123
122124 // Subscribe to the endpoint by using the SNS service client.
123125 // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO
@@ -138,20 +140,20 @@ public static void publishPriceUpdate(String topicArn, String payload, String gr
138140 String attributeValue = "wholesale" ;
139141
140142 MessageAttributeValue msgAttValue = MessageAttributeValue .builder ()
141- .dataType ("String" )
142- .stringValue (attributeValue )
143- .build ();
143+ .dataType ("String" )
144+ .stringValue (attributeValue )
145+ .build ();
144146
145147 Map <String , MessageAttributeValue > attributes = new HashMap <>();
146148 attributes .put (attributeName , msgAttValue );
147149 PublishRequest pubRequest = PublishRequest .builder ()
148- .topicArn (topicArn )
149- .subject (subject )
150- .message (payload )
151- .messageGroupId (groupId )
152- .messageDeduplicationId (dedupId )
153- .messageAttributes (attributes )
154- .build ();
150+ .topicArn (topicArn )
151+ .subject (subject )
152+ .message (payload )
153+ .messageGroupId (groupId )
154+ .messageDeduplicationId (dedupId )
155+ .messageAttributes (attributes )
156+ .build ();
155157
156158 final PublishResponse response = snsClient .publish (pubRequest );
157159 System .out .println (response .messageId ());
@@ -173,17 +175,17 @@ public static void createQueues(List<QueueData> queueData) {
173175 CreateQueueResponse response ;
174176 if (isFifoQueue ) {
175177 response = sqsClient .createQueue (r -> r
176- .queueName (queue .queueName )
177- .attributes (Map .of (
178- QueueAttributeName .FIFO_QUEUE , "true" )));
178+ .queueName (queue .queueName )
179+ .attributes (Map .of (
180+ QueueAttributeName .FIFO_QUEUE , "true" )));
179181 } else {
180182 response = sqsClient .createQueue (r -> r
181- .queueName (queue .queueName ));
183+ .queueName (queue .queueName ));
182184 }
183185 queue .queueURL = response .queueUrl ();
184186 queue .queueARN = sqsClient .getQueueAttributes (b -> b
185- .queueUrl (queue .queueURL )
186- .attributeNames (QueueAttributeName .QUEUE_ARN )).attributes ().get (QueueAttributeName .QUEUE_ARN );
187+ .queueUrl (queue .queueURL )
188+ .attributeNames (QueueAttributeName .QUEUE_ARN )).attributes ().get (QueueAttributeName .QUEUE_ARN );
187189 });
188190 }
189191
@@ -194,25 +196,25 @@ public static void addAccessPolicyToQueuesFINAL(List<QueueData> queues, String t
194196 }
195197 queues .forEach (queue -> {
196198 IamPolicy policy = IamPolicy .builder ()
197- .addStatement (b -> b // Allow account user to send messages to the queue.
198- .effect (IamEffect .ALLOW )
199- .addPrincipal (IamPrincipalType .AWS , account )
200- .addAction ("SQS:*" )
201- .addResource (queue .queueARN ))
202- .addStatement (b -> b // Allow the SNS FIFO topic to send messages to the queue.
203- .effect (IamEffect .ALLOW )
204- .addPrincipal (IamPrincipalType .AWS , "*" )
205- .addAction ("SQS:SendMessage" )
206- .addResource (queue .queueARN )
207- .addCondition (b1 -> b1
208- .operator (IamConditionOperator .ARN_LIKE )
209- .key ("aws:SourceArn" ).value (topicARN )))
210- .build ();
199+ .addStatement (b -> b // Allow account user to send messages to the queue.
200+ .effect (IamEffect .ALLOW )
201+ .addPrincipal (IamPrincipalType .AWS , account )
202+ .addAction ("SQS:*" )
203+ .addResource (queue .queueARN ))
204+ .addStatement (b -> b // Allow the SNS FIFO topic to send messages to the queue.
205+ .effect (IamEffect .ALLOW )
206+ .addPrincipal (IamPrincipalType .AWS , "*" )
207+ .addAction ("SQS:SendMessage" )
208+ .addResource (queue .queueARN )
209+ .addCondition (b1 -> b1
210+ .operator (IamConditionOperator .ARN_LIKE )
211+ .key ("aws:SourceArn" ).value (topicARN )))
212+ .build ();
211213 sqsClient .setQueueAttributes (b -> b
212- .queueUrl (queue .queueURL )
213- .attributes (Map .of (
214- QueueAttributeName .POLICY ,
215- policy .toJson ())));
214+ .queueUrl (queue .queueURL )
215+ .attributes (Map .of (
216+ QueueAttributeName .POLICY ,
217+ policy .toJson ())));
216218 });
217219 }
218220
0 commit comments