Skip to content

Commit f23f25a

Browse files
jvh-awsmattcreaserthisisabhash
authored
chore: Fix kinesis retry logic for request-level errors (#3223)
Co-authored-by: Matt Creaser <mattwcc@amazon.com> Co-authored-by: Abhash Kumar Singh <thisisabhash@gmail.com>
1 parent 87d2b46 commit f23f25a

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

aws-kinesis/src/main/java/com/amplifyframework/recordcache/RecordClient.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ internal class RecordClient(
8080

8181
private suspend fun handleFailedRequest(records: List<Record>) {
8282
try {
83-
val (recordsToRetry, recordsToDelete) = records.partition { it.retryCount + 1 < maxRetries }
83+
val (recordsToRetry, recordsToDelete) = records.partition { it.retryCount < maxRetries }
8484
val recordIdsToIncrement = recordsToRetry.map { it.id }
8585
val recordIdsToDelete = recordsToDelete.map { it.id }
8686

@@ -91,7 +91,7 @@ internal class RecordClient(
9191
val streamName = records.first().streamName
9292
logger.warn {
9393
"Deleted ${recordIdsToDelete.size} records from stream $streamName " +
94-
"that exceeded retry limit of $maxRetries after failed request"
94+
"that exceeded retry limit of $maxRetries after failed retries"
9595
}
9696
}
9797
} catch (storageError: Throwable) {

aws-kinesis/src/test/java/com/amplifyframework/recordcache/RecordClientFlushTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ class RecordClientFlushTest {
126126
storage.addRecord(RecordInput(streamName, "key2", byteArrayOf(2))).getOrThrow()
127127
storage.addRecord(RecordInput(streamName, "key3", byteArrayOf(3))).getOrThrow()
128128

129-
// Set record 2 and 3 to retry count 2 (will be deleted on next failure since retryCount + 1 >= maxRetries)
129+
// Set record 2 and 3 to retry count 3 (will be deleted on next failure since retryCount >= maxRetries)
130130
val allRecords = storage.getRecordsByStream().getOrThrow().flatten()
131131
val record2Id = allRecords[1].id
132132
val record3Id = allRecords[2].id
133133

134-
repeat(2) { storage.incrementRetryCount(listOf(record2Id, record3Id)).getOrThrow() }
134+
repeat(3) { storage.incrementRetryCount(listOf(record2Id, record3Id)).getOrThrow() }
135135

136136
// Configure mock sender to fail with a non-SDK error
137137
coEvery { mockSender.putRecords(streamName, any()) } returns

0 commit comments

Comments
 (0)