Skip to content

Commit 63a09e2

Browse files
committed
Some DynamoDbStreamToKinesisIntegrationTests code flow rearrangement
in attempt to mitigate race conditions * Add WARN for received CDC events in the `DynamoDbStreamToKinesisIntegrationTests`
1 parent 71ec3f8 commit 63a09e2

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

spring-cloud-aws-kinesis/src/test/java/io/awspring/cloud/kinesis/integration/DynamoDbStreamToKinesisIntegrationTests.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.awspring.cloud.kinesis.LocalstackContainerTest;
2323
import java.time.Duration;
2424
import java.util.Map;
25+
26+
import org.apache.commons.logging.Log;
27+
import org.apache.commons.logging.LogFactory;
2528
import org.junit.jupiter.api.BeforeAll;
2629
import org.junit.jupiter.api.Test;
2730
import org.springframework.beans.DirectFieldAccessor;
@@ -58,6 +61,8 @@
5861
@DirtiesContext
5962
class DynamoDbStreamToKinesisIntegrationTests implements LocalstackContainerTest {
6063

64+
static final Log LOGGER = LogFactory.getLog(DynamoDbStreamToKinesisIntegrationTests.class);
65+
6166
static final String TEST_TABLE = "StreamsDemoTable";
6267

6368
static final String TABLE_KEY = "id";
@@ -79,6 +84,17 @@ static void setup() {
7984
DYNAMODB = LocalstackContainerTest.dynamoDbClient();
8085
DYNAMODB_STREAMS = LocalstackContainerTest.dynamoDbStreamsClient();
8186
DYNAMODB_STREAM_ARN = createDemoTable();
87+
88+
AmazonDynamoDBStreamsAdapterClient streamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(
89+
DYNAMODB_STREAMS, null);
90+
91+
await().atMost(Duration.ofMinutes(2))
92+
.untilAsserted(() -> assertThat(
93+
streamsAdapterClient.describeStream(builder -> builder.streamName(DYNAMODB_STREAM_ARN)))
94+
.succeedsWithin(Duration.ofSeconds(60))
95+
.extracting(describeStreamResponse -> describeStreamResponse.streamDescription()
96+
.streamStatusAsString())
97+
.isEqualTo("ENABLED"));
8298
}
8399

84100
private static String createDemoTable() {
@@ -102,18 +118,6 @@ private static String createDemoTable() {
102118

103119
@Test
104120
void fromDynamoDbStreamToKinesis() {
105-
AmazonDynamoDBStreamsAdapterClient streamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(
106-
DYNAMODB_STREAMS, null);
107-
108-
await().atMost(Duration.ofMinutes(2))
109-
.untilAsserted(() -> assertThat(
110-
streamsAdapterClient.describeStream(builder -> builder.streamName(DYNAMODB_STREAM_ARN)))
111-
.succeedsWithin(Duration.ofSeconds(60))
112-
.extracting(describeStreamResponse -> describeStreamResponse.streamDescription()
113-
.streamStatusAsString())
114-
.isEqualTo("ENABLED"));
115-
116-
this.kinesisMessageDrivenChannelAdapter.start();
117121
Map<?, ?> shardConsumers = TestUtils.getPropertyValue(this.kinesisMessageDrivenChannelAdapter, "shardConsumers",
118122
Map.class);
119123

@@ -129,8 +133,11 @@ void fromDynamoDbStreamToKinesis() {
129133
.join();
130134

131135
Message<?> receive = this.kinesisReceiveChannel.receive(120_000);
132-
assertThat(receive).extracting(Message::getPayload).asString().contains("some_id", "some value",
133-
"\"eventName\":\"INSERT\"", "\"eventSource\":\"aws:dynamodb\"");
136+
assertThat(receive)
137+
.satisfies(LOGGER::warn)
138+
.extracting(Message::getPayload)
139+
.asString()
140+
.contains("some_id", "some value", "\"eventName\":\"INSERT\"", "\"eventSource\":\"aws:dynamodb\"");
134141

135142
DYNAMODB.updateItem(
136143
builder -> builder.tableName(TEST_TABLE).key(Map.of(TABLE_KEY, AttributeValue.fromS("some_id")))
@@ -139,15 +146,22 @@ void fromDynamoDbStreamToKinesis() {
139146
.join();
140147

141148
receive = this.kinesisReceiveChannel.receive(30_000);
142-
assertThat(receive).extracting(Message::getPayload).asString().contains("some_id", "some value",
143-
"updated value", "\"eventName\":\"MODIFY\"");
149+
assertThat(receive)
150+
.satisfies(LOGGER::warn)
151+
.extracting(Message::getPayload)
152+
.asString()
153+
.contains("some_id", "some value", "updated value", "\"eventName\":\"MODIFY\"");
144154

145155
DYNAMODB.deleteItem(
146156
builder -> builder.tableName(TEST_TABLE).key(Map.of(TABLE_KEY, AttributeValue.fromS("some_id"))))
147157
.join();
148158

149159
receive = this.kinesisReceiveChannel.receive(30_000);
150-
assertThat(receive).extracting(Message::getPayload).asString().contains("some_id", "\"eventName\":\"REMOVE\"");
160+
assertThat(receive)
161+
.satisfies(LOGGER::warn)
162+
.extracting(Message::getPayload)
163+
.asString()
164+
.contains("some_id", "\"eventName\":\"REMOVE\"");
151165
}
152166

153167
@Configuration(proxyBeanMethods = false)
@@ -163,7 +177,6 @@ PollableChannel kinesisReceiveChannel() {
163177
KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter(PollableChannel kinesisReceiveChannel) {
164178
KinesisMessageDrivenChannelAdapter adapter = new KinesisMessageDrivenChannelAdapter(
165179
new SpringDynamoDBStreamsAdapterClient(DYNAMODB_STREAMS), DYNAMODB_STREAM_ARN);
166-
adapter.setAutoStartup(false);
167180
adapter.setStreamInitialSequence(KinesisShardOffset.trimHorizon());
168181
adapter.setOutputChannel(kinesisReceiveChannel);
169182
adapter.setConverter(String::new);

0 commit comments

Comments
 (0)