shardMetricGroupMap) {
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java
new file mode 100644
index 00000000..11725e6c
--- /dev/null
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records from
+ * the SplitReader implementation to the SplitReaderBase.
+ *
+ * Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x
+ * messages are converted to KCL 3.x {@link KinesisClientRecord}.
+ */
+@Internal
+public class RecordBatch {
+ private final List deaggregatedRecords;
+ private final long millisBehindLatest;
+ private final boolean completed;
+
+ public RecordBatch(
+ final List records,
+ final KinesisShardSplit subscribedShard,
+ final long millisBehindLatest,
+ final boolean completed) {
+ this.deaggregatedRecords = deaggregateRecords(records, subscribedShard);
+ this.millisBehindLatest = millisBehindLatest;
+ this.completed = completed;
+ }
+
+ public List getDeaggregatedRecords() {
+ return deaggregatedRecords;
+ }
+
+ public long getMillisBehindLatest() {
+ return millisBehindLatest;
+ }
+
+ public boolean isCompleted() {
+ return completed;
+ }
+
+ private List deaggregateRecords(
+ final List records, final KinesisShardSplit subscribedShard) {
+ final List kinesisClientRecords = new ArrayList<>();
+ for (Record record : records) {
+ kinesisClientRecords.add(KinesisClientRecord.fromRecord(record));
+ }
+
+ final String startingHashKey = subscribedShard.getStartingHashKey();
+ final String endingHashKey = subscribedShard.getEndingHashKey();
+
+ return new AggregatorUtil()
+ .deaggregate(kinesisClientRecords, startingHashKey, endingHashKey);
+ }
+}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
index c0aefee5..2cd642ea 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java
@@ -24,6 +24,7 @@
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
+import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
@@ -72,7 +73,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
if (shardCompleted) {
splitSubscriptions.remove(splitState.getShardId());
}
- return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted);
+ return new RecordBatch(
+ event.records(),
+ splitState.getKinesisShardSplit(),
+ event.millisBehindLatest(),
+ shardCompleted);
}
@Override
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
index c3dca833..b0684fb5 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java
@@ -24,6 +24,7 @@
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
+import org.apache.flink.connector.kinesis.source.reader.RecordBatch;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -59,8 +60,12 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
splitState.getNextStartingPosition(),
this.maxRecordsToGet);
boolean isCompleted = getRecordsResponse.nextShardIterator() == null;
+
return new RecordBatch(
- getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted);
+ getRecordsResponse.records(),
+ splitState.getKinesisShardSplit(),
+ getRecordsResponse.millisBehindLatest(),
+ isCompleted);
}
@Override
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
index bf40d6cb..5f0598c8 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java
@@ -11,6 +11,8 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
+import com.amazonaws.kinesis.agg.AggRecord;
+import com.amazonaws.kinesis.agg.RecordAggregator;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
@@ -24,7 +26,6 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
@@ -41,7 +42,6 @@
import java.time.Duration;
import java.util.List;
-import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -117,6 +117,17 @@ void singleShardStreamIsConsumed() throws Exception {
.runScenario();
}
+ @Test
+ void singleShardStreamWithAggregationIsConsumed() throws Exception {
+ new Scenario()
+ .localstackStreamName("single-shard-stream-aggregation")
+ .shardCount(1)
+ .aggregationFactor(10)
+ .withSourceConnectionStreamArn(
+ "arn:aws:kinesis:ap-southeast-1:000000000000:stream/single-shard-stream-aggregation")
+ .runScenario();
+ }
+
@Test
void multipleShardStreamIsConsumed() throws Exception {
new Scenario()
@@ -127,6 +138,17 @@ void multipleShardStreamIsConsumed() throws Exception {
.runScenario();
}
+ @Test
+ void multipleShardStreamWithAggregationIsConsumed() throws Exception {
+ new Scenario()
+ .localstackStreamName("multiple-shard-stream-aggregation")
+ .shardCount(4)
+ .aggregationFactor(10)
+ .withSourceConnectionStreamArn(
+ "arn:aws:kinesis:ap-southeast-1:000000000000:stream/multiple-shard-stream-aggregation")
+ .runScenario();
+ }
+
@Test
void reshardedStreamIsConsumed() throws Exception {
new Scenario()
@@ -138,6 +160,18 @@ void reshardedStreamIsConsumed() throws Exception {
.runScenario();
}
+ @Test
+ void reshardedStreamWithAggregationIsConsumed() throws Exception {
+ new Scenario()
+ .localstackStreamName("resharded-stream-aggregation")
+ .shardCount(1)
+ .aggregationFactor(10)
+ .reshardStream(2)
+ .withSourceConnectionStreamArn(
+ "arn:aws:kinesis:ap-southeast-1:000000000000:stream/resharded-stream-aggregation")
+ .runScenario();
+ }
+
private Configuration getDefaultConfiguration() {
Configuration configuration = new Configuration();
configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint());
@@ -152,6 +186,7 @@ private Configuration getDefaultConfiguration() {
private class Scenario {
private final int expectedElements = 1000;
+ private int aggregationFactor = 1;
private String localstackStreamName = null;
private int shardCount = 1;
private boolean shouldReshardStream = false;
@@ -203,6 +238,11 @@ public Scenario reshardStream(int targetShardCount) {
return this;
}
+ public Scenario aggregationFactor(int aggregationFactor) {
+ this.aggregationFactor = aggregationFactor;
+ return this;
+ }
+
private void prepareStream(String streamName) throws Exception {
final RateLimiter rateLimiter =
RateLimiterBuilder.newBuilder()
@@ -242,13 +282,8 @@ private void putRecords(String streamName, int startInclusive, int endInclusive)
for (List partition : Lists.partition(messages, 500)) {
List entries =
- partition.stream()
- .map(
- msg ->
- PutRecordsRequestEntry.builder()
- .partitionKey(UUID.randomUUID().toString())
- .data(SdkBytes.fromByteArray(msg))
- .build())
+ Lists.partition(partition, aggregationFactor).stream()
+ .map(this::createAggregatePutRecordsRequestEntry)
.collect(Collectors.toList());
PutRecordsRequest requests =
PutRecordsRequest.builder().streamName(streamName).records(entries).build();
@@ -259,6 +294,22 @@ private void putRecords(String streamName, int startInclusive, int endInclusive)
}
}
+ private PutRecordsRequestEntry createAggregatePutRecordsRequestEntry(
+ List messages) {
+ RecordAggregator recordAggregator = new RecordAggregator();
+
+ for (byte[] message : messages) {
+ try {
+ recordAggregator.addUserRecord("key", message);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to add record to aggregator", e);
+ }
+ }
+
+ AggRecord aggRecord = recordAggregator.clearAndGet();
+ return aggRecord.toPutRecordsRequestEntry();
+ }
+
private void reshard(String streamName) {
kinesisClient.updateShardCount(
UpdateShardCountRequest.builder()
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
index bc8b1b5f..b09a7aea 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java
@@ -202,7 +202,8 @@ public EmptyRecordReturningReader(
@Override
protected RecordBatch fetchRecords(KinesisShardSplitState splitState) {
super.fetchRecords(splitState);
- return new RecordBatch(Collections.emptyList(), 0L, false);
+ return new RecordBatch(
+ Collections.emptyList(), splitState.getKinesisShardSplit(), 0L, false);
}
}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java
index f9489f99..8f4c9a3d 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java
@@ -29,10 +29,11 @@
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -50,24 +51,18 @@ class KinesisStreamsRecordEmitterTest {
@Test
void testEmitRecord() throws Exception {
final Instant startTime = Instant.now();
- List inputRecords =
+ List inputRecords =
Stream.of(
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-1")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1")))
.approximateArrivalTimestamp(startTime)
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-2")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2")))
.approximateArrivalTimestamp(startTime.plusSeconds(10))
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-3")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3")))
.approximateArrivalTimestamp(startTime.plusSeconds(20))
.sequenceNumber("some-sequence-number")
.build())
@@ -79,7 +74,7 @@ void testEmitRecord() throws Exception {
KinesisStreamsRecordEmitter emitter =
new KinesisStreamsRecordEmitter<>(KinesisDeserializationSchema.of(STRING_SCHEMA));
- for (Record record : inputRecords) {
+ for (KinesisClientRecord record : inputRecords) {
emitter.emitRecord(record, output, splitState);
}
@@ -97,26 +92,20 @@ void testEmitRecord() throws Exception {
@Test
void testEmitRecordBasedOnSequenceNumber() throws Exception {
final Instant startTime = Instant.now();
- List inputRecords =
+ List inputRecords =
Stream.of(
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-1")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1")))
.sequenceNumber("emit")
.approximateArrivalTimestamp(startTime)
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-2")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2")))
.sequenceNumber("emit")
.approximateArrivalTimestamp(startTime.plusSeconds(10))
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-3")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3")))
.approximateArrivalTimestamp(startTime.plusSeconds(20))
.sequenceNumber("do-not-emit")
.build())
@@ -126,7 +115,7 @@ void testEmitRecordBasedOnSequenceNumber() throws Exception {
KinesisStreamsRecordEmitter emitter =
new KinesisStreamsRecordEmitter<>(new SequenceNumberBasedDeserializationSchema());
- for (Record record : inputRecords) {
+ for (KinesisClientRecord record : inputRecords) {
emitter.emitRecord(record, output, splitState);
}
@@ -139,24 +128,18 @@ void testEmitRecordBasedOnSequenceNumber() throws Exception {
@Test
void testEmitRecordWithMetadata() throws Exception {
final Instant startTime = Instant.now();
- List inputRecords =
+ List inputRecords =
Stream.of(
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-1")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1")))
.approximateArrivalTimestamp(startTime)
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-2")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2")))
.approximateArrivalTimestamp(startTime.plusSeconds(10))
.build(),
- Record.builder()
- .data(
- SdkBytes.fromByteArray(
- STRING_SCHEMA.serialize("data-3")))
+ KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3")))
.approximateArrivalTimestamp(startTime.plusSeconds(20))
.sequenceNumber("some-sequence-number")
.build())
@@ -168,7 +151,7 @@ void testEmitRecordWithMetadata() throws Exception {
new KinesisStreamsRecordEmitter<>(
new AssertRecordMetadataDeserializationSchema(
splitState.getStreamArn(), splitState.getShardId()));
- for (Record record : inputRecords) {
+ for (KinesisClientRecord record : inputRecords) {
emitter.emitRecord(record, output, splitState);
}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
index e19cc70f..269bb22f 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java
@@ -32,8 +32,10 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -46,6 +48,7 @@
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX;
import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN;
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.convertToKinesisClientRecord;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord;
import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
@@ -80,7 +83,7 @@ public void init() {
@Test
void testNoAssignedSplitsHandledGracefully() throws Exception {
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
assertThat(retrievedRecords.nextSplit()).isNull();
@@ -95,7 +98,7 @@ void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception {
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
// When fetching records
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
// Then retrieve no records
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -116,7 +119,7 @@ void testSplitWithExpiredShardHandledAsCompleted() throws Exception {
splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit)));
// When fetching records
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
// Then retrieve no records and mark split as complete
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -128,28 +131,24 @@ void testSplitWithExpiredShardHandledAsCompleted() throws Exception {
void testSingleAssignedSplitAllConsumed() throws Exception {
// Given assigned split with records
testStreamProxy.addShards(TEST_SHARD_ID);
- List expectedRecords =
+ List inputRecords =
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
.collect(Collectors.toList());
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(0)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0)));
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(1)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1)));
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(2)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(2)));
splitReader.handleSplitsChanges(
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+ List expectedRecords = convertToKinesisClientRecord(inputRecords);
+
// When fetching records
- List records = new ArrayList<>();
- for (int i = 0; i < expectedRecords.size(); i++) {
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ List records = new ArrayList<>();
+ for (int i = 0; i < inputRecords.size(); i++) {
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
records.addAll(readAllRecords(retrievedRecords));
}
@@ -160,28 +159,24 @@ void testSingleAssignedSplitAllConsumed() throws Exception {
void testMultipleAssignedSplitsAllConsumed() throws Exception {
// Given assigned split with records
testStreamProxy.addShards(TEST_SHARD_ID);
- List expectedRecords =
+ List inputRecords =
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
.collect(Collectors.toList());
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(0)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0)));
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(1)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1)));
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(2)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(2)));
splitReader.handleSplitsChanges(
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+ List expectedRecords = convertToKinesisClientRecord(inputRecords);
+
// When records are fetched
- List fetchedRecords = new ArrayList<>();
+ List fetchedRecords = new ArrayList<>();
for (int i = 0; i < expectedRecords.size(); i++) {
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
fetchedRecords.addAll(readAllRecords(retrievedRecords));
}
@@ -189,6 +184,41 @@ void testMultipleAssignedSplitsAllConsumed() throws Exception {
assertThat(fetchedRecords).containsExactlyInAnyOrderElementsOf(expectedRecords);
}
+ @Test
+ void testAggregatedRecordsAreDeaggregated() throws Exception {
+ // Given assigned split with aggregated records
+ testStreamProxy.addShards(TEST_SHARD_ID);
+ List inputRecords =
+ Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
+ .collect(Collectors.toList());
+
+ KinesisClientRecord aggregatedRecord = TestUtil.createKinesisAggregatedRecord(inputRecords);
+ testStreamProxy.addRecords(
+ TestUtil.STREAM_ARN,
+ TEST_SHARD_ID,
+ Collections.singletonList(TestUtil.convertToRecord(aggregatedRecord)));
+
+ splitReader.handleSplitsChanges(
+ new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
+
+ List expectedRecords =
+ convertToKinesisClientRecord(inputRecords).stream()
+ .map(KinesisClientRecord::data)
+ .collect(Collectors.toList());
+
+ // When fetching records
+ List fetchedRecords = readAllRecords(splitReader.fetch());
+
+ // Then all records are fetched
+ assertThat(fetchedRecords)
+ .allMatch(KinesisClientRecord::aggregated)
+ .allMatch(
+ record ->
+ record.explicitHashKey().equals(aggregatedRecord.explicitHashKey()))
+ .extracting("data")
+ .containsExactlyInAnyOrderElementsOf(expectedRecords);
+ }
+
@Test
void testHandleEmptyCompletedShard() throws Exception {
// Given assigned split with no records, and the shard is complete
@@ -199,7 +229,7 @@ void testHandleEmptyCompletedShard() throws Exception {
testStreamProxy.setShouldCompleteNextShard(true);
// When fetching records
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
// Returns completed split with no records
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -211,21 +241,23 @@ void testHandleEmptyCompletedShard() throws Exception {
void testFinishedSplitsReturned() throws Exception {
// Given assigned split with records from completed shard
testStreamProxy.addShards(TEST_SHARD_ID);
- List expectedRecords =
+ List inputRecords =
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
.collect(Collectors.toList());
- testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, expectedRecords);
+ testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, inputRecords);
KinesisShardSplit split = getTestSplit(TEST_SHARD_ID);
splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(split)));
// When fetching records
- List fetchedRecords = new ArrayList<>();
+ List fetchedRecords = new ArrayList<>();
testStreamProxy.setShouldCompleteNextShard(true);
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+
+ List expectedRecords = convertToKinesisClientRecord(inputRecords);
// Then records can be read successfully, with finishedSplit returned once all records are
// completed
- for (int i = 0; i < expectedRecords.size(); i++) {
+ for (int i = 0; i < inputRecords.size(); i++) {
assertThat(retrievedRecords.nextSplit()).isEqualTo(split.splitId());
assertThat(retrievedRecords.finishedSplits()).isEmpty();
fetchedRecords.add(retrievedRecords.nextRecordFromSplit());
@@ -245,21 +277,19 @@ void testPauseOrResumeSplits() throws Exception {
testStreamProxy.addShards(TEST_SHARD_ID);
KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID);
- List expectedRecords =
+ List inputRecords =
Stream.of(getTestRecord("data-1"), getTestRecord("data-2"))
.collect(Collectors.toList());
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(0)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0)));
testStreamProxy.addRecords(
- TestUtil.STREAM_ARN,
- TEST_SHARD_ID,
- Collections.singletonList(expectedRecords.get(1)));
+ TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1)));
splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit)));
+ List expectedRecords = convertToKinesisClientRecord(inputRecords);
+
// read data from split
- RecordsWithSplitIds records = splitReader.fetch();
+ RecordsWithSplitIds records = splitReader.fetch();
assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0));
// pause split
@@ -330,12 +360,15 @@ record ->
Arrays.asList(testSplit1, testSplit3), Collections.emptyList());
// read data from splits and verify that only records from split 2 were fetched by reader
- List fetchedRecords = new ArrayList<>();
+ List fetchedRecords = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- RecordsWithSplitIds records = splitReader.fetch();
+ RecordsWithSplitIds records = splitReader.fetch();
fetchedRecords.addAll(readAllRecords(records));
}
- assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new Record[0]));
+
+ List expectedRecordsFromSplit2 =
+ convertToKinesisClientRecord(recordsFromSplit2);
+ assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit2);
// resume split 3
splitReader.pauseOrResumeSplits(
@@ -344,10 +377,13 @@ record ->
// read data from splits and verify that only records from split 3 had been read
fetchedRecords.clear();
for (int i = 0; i < 10; i++) {
- RecordsWithSplitIds records = splitReader.fetch();
+ RecordsWithSplitIds records = splitReader.fetch();
fetchedRecords.addAll(readAllRecords(records));
}
- assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new Record[0]));
+
+ List expectedRecordsFromSplit3 =
+ convertToKinesisClientRecord(recordsFromSplit3);
+ assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit3);
}
@Test
@@ -389,16 +425,17 @@ void testMaxRecordsToGetParameterPassed() throws IOException {
splitReader.handleSplitsChanges(
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
- List records = new ArrayList<>(readAllRecords(retrievedRecords));
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ List records = new ArrayList<>(readAllRecords(retrievedRecords));
assertThat(sentRecords.size() > maxRecordsToGet).isTrue();
assertThat(records.size()).isEqualTo(maxRecordsToGet);
}
- private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) {
- List outputRecords = new ArrayList<>();
- Record record;
+ private List readAllRecords(
+ RecordsWithSplitIds recordsWithSplitIds) {
+ List outputRecords = new ArrayList<>();
+ KinesisClientRecord record;
do {
record = recordsWithSplitIds.nextRecordFromSplit();
if (record != null) {
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java
new file mode 100644
index 00000000..e05ccc4c
--- /dev/null
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.connector.kinesis.source.util.TestUtil;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord;
+import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class RecordBatchTest {
+
+ @Test
+ public void testDeaggregateRecordsPassThrough() {
+ List records =
+ Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
+ .collect(Collectors.toList());
+
+ RecordBatch result = new RecordBatch(records, getTestSplit(), 100L, true);
+
+ assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3);
+ }
+
+ @Test
+ public void testDeaggregateRecordsWithAggregatedRecords() {
+ List records =
+ Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3"))
+ .collect(Collectors.toList());
+
+ Record aggregatedRecord = TestUtil.createAggregatedRecord(records);
+
+ RecordBatch result =
+ new RecordBatch(
+ Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true);
+
+ assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3);
+ }
+
+ @Test
+ public void testGetMillisBehindLatest() {
+ RecordBatch result =
+ new RecordBatch(
+ Collections.singletonList(getTestRecord("data-1")),
+ getTestSplit(),
+ 100L,
+ true);
+
+ assertThat(result.getMillisBehindLatest()).isEqualTo(100L);
+ }
+
+ @Test
+ public void testIsCompleted() {
+ RecordBatch result =
+ new RecordBatch(
+ Collections.singletonList(getTestRecord("data-1")),
+ getTestSplit(),
+ 100L,
+ true);
+
+ assertThat(result.isCompleted()).isTrue();
+ }
+}
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
index fbaaf696..22144785 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java
@@ -32,7 +32,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.time.Duration;
import java.util.ArrayList;
@@ -83,7 +83,7 @@ public void testNoAssignedSplitsHandledGracefully() throws Exception {
CONSUMER_ARN,
shardMetricGroupMap,
newConfigurationForTest());
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
assertThat(retrievedRecords.nextSplit()).isNull();
@@ -104,7 +104,7 @@ public void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception {
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
// When fetching records
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
// Then retrieve no records
assertThat(retrievedRecords.nextRecordFromSplit()).isNull();
@@ -127,7 +127,7 @@ public void testSplitWithExpiredShardHandledAsCompleted() throws Exception {
new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID))));
// When fetching records
- RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
// Then shard is marked as completed
// Then retrieve no records and mark split as complete
@@ -174,17 +174,17 @@ public void testCloseClosesStreamProxy() throws Exception {
}
private void consumeAllRecordsFromKinesis(
- SplitReader splitReader, int numRecords) {
+ SplitReader splitReader, int numRecords) {
consumeRecordsFromKinesis(splitReader, numRecords, true);
}
private void consumeSomeRecordsFromKinesis(
- SplitReader splitReader, int numRecords) {
+ SplitReader splitReader, int numRecords) {
consumeRecordsFromKinesis(splitReader, numRecords, false);
}
private void consumeRecordsFromKinesis(
- SplitReader splitReader,
+ SplitReader splitReader,
int numRecords,
boolean checkForShardCompletion) {
// Set timeout to prevent infinite loop on failure
@@ -192,10 +192,10 @@ private void consumeRecordsFromKinesis(
Duration.ofSeconds(60),
() -> {
int numRetrievedRecords = 0;
- RecordsWithSplitIds retrievedRecords = null;
+ RecordsWithSplitIds retrievedRecords = null;
while (numRetrievedRecords < numRecords) {
retrievedRecords = splitReader.fetch();
- List records = readAllRecords(retrievedRecords);
+ List records = readAllRecords(retrievedRecords);
numRetrievedRecords += records.size();
}
assertThat(numRetrievedRecords).isEqualTo(numRecords);
@@ -211,9 +211,10 @@ private void consumeRecordsFromKinesis(
"did not receive expected " + numRecords + " records within 10 seconds.");
}
- private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) {
- List outputRecords = new ArrayList<>();
- Record record;
+ private List readAllRecords(
+ RecordsWithSplitIds recordsWithSplitIds) {
+ List outputRecords = new ArrayList<>();
+ KinesisClientRecord record;
do {
record = recordsWithSplitIds.nextRecordFromSplit();
if (record != null) {
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java
index 6f563229..a78465c6 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java
@@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static com.google.common.collect.Lists.partition;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition;
@@ -121,7 +122,12 @@ List getEventsToSend() {
records.add(createRecord(sequenceNumber));
}
- eventBuilder.records(records);
+ List aggregatedRecords =
+ partition(records, aggregationFactor).stream()
+ .map(TestUtil::createAggregatedRecord)
+ .collect(Collectors.toList());
+
+ eventBuilder.records(aggregatedRecords);
String continuation =
sequenceNumber.get() < totalRecords
diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
index 035d4496..768a3dc6 100644
--- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
+++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java
@@ -27,17 +27,23 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.testutils.MetricListener;
+import com.amazonaws.kinesis.agg.AggRecord;
+import com.amazonaws.kinesis.agg.RecordAggregator;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -161,6 +167,43 @@ public static Record getTestRecord(String data) {
.build();
}
+ public static Record convertToRecord(KinesisClientRecord record) {
+ return Record.builder()
+ .data(SdkBytes.fromByteBuffer(record.data()))
+ .approximateArrivalTimestamp(record.approximateArrivalTimestamp())
+ .build();
+ }
+
+ public static List convertToKinesisClientRecord(List records) {
+ return records.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList());
+ }
+
+ public static Record createAggregatedRecord(List records) {
+ KinesisClientRecord aggregatedRecord = createKinesisAggregatedRecord(records);
+ return convertToRecord(aggregatedRecord);
+ }
+
+ public static KinesisClientRecord createKinesisAggregatedRecord(List records) {
+ RecordAggregator recordAggregator = new RecordAggregator();
+
+ for (Record record : records) {
+ try {
+ recordAggregator.addUserRecord("key", record.data().asByteArray());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to add record to aggregator", e);
+ }
+ }
+
+ AggRecord aggRecord = recordAggregator.clearAndGet();
+
+ return KinesisClientRecord.builder()
+ .data(ByteBuffer.wrap(aggRecord.toRecordBytes()))
+ .partitionKey(aggRecord.getPartitionKey())
+ .explicitHashKey(aggRecord.getExplicitHashKey())
+ .approximateArrivalTimestamp(Instant.now())
+ .build();
+ }
+
public static void assertMillisBehindLatest(
KinesisShardSplit split, long expectedValue, MetricListener metricListener) {
Arn kinesisArn = Arn.fromString(split.getStreamArn());
diff --git a/pom.xml b/pom.xml
index 5f7c5564..2e5e0c68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -332,11 +332,6 @@ under the License.
javassist
3.24.0-GA