From 767f576d4295aa6c0ad04ac34ebb183cc57bd0cd Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Sat, 1 Feb 2025 14:57:46 +0000 Subject: [PATCH 1/9] [FLINK-32097][Connectors/Kinesis] Implemented record deaggregation. In order to implement record deaggregation, the RecordBatch uses the AggregatorUtil, from the KCL 3, with the subscribed shard to deaggregate. The deaggregated records are now instance of KinesisClientRecord since the AggregatorUtil requires it. Therefore, there is a bit of refactor to update the class of the record that was received. Also, the RecordBatch class was extracted out of KinesisShardSplitReaderBase --- .../pom.xml | 5 ++ .../kinesis/source/KinesisStreamsSource.java | 4 +- .../reader/KinesisShardSplitReaderBase.java | 54 +++--------- .../reader/KinesisStreamsRecordEmitter.java | 6 +- .../reader/KinesisStreamsSourceReader.java | 8 +- .../kinesis/source/reader/RecordBatch.java | 78 +++++++++++++++++ .../fanout/FanOutKinesisShardSplitReader.java | 7 +- .../PollingKinesisShardSplitReader.java | 7 +- .../KinesisDeserializationSchema.java | 4 +- .../KinesisDeserializationSchemaWrapper.java | 12 ++- .../KinesisStreamsRecordEmitterTest.java | 67 ++++++++------- .../PollingKinesisShardSplitReaderTest.java | 86 +++++++++++-------- .../FanOutKinesisShardSplitReaderTest.java | 26 +++--- .../kinesis/source/util/TestUtil.java | 9 ++ pom.xml | 5 ++ 15 files changed, 241 insertions(+), 137 deletions(-) create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml index 3ba20c639..33a99a469 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml @@ -52,6 +52,11 @@ under the License. kinesis + + software.amazon.kinesis + amazon-kinesis-client + + software.amazon.awssdk arns diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 5fba71c68..4c56aa09c 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -70,9 +70,9 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; -import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.utils.AttributeMap; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.time.Duration; import java.util.Map; @@ -209,7 +209,7 @@ public SimpleVersionedSerializer getSplitSerializer() { return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer()); } - private Supplier> getKinesisShardSplitReaderSupplier( + private Supplier> getKinesisShardSplitReaderSupplier( Configuration sourceConfig, Map shardMetricGroupMap) { KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE); switch (readerType) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index 3de94484b..3063861fe 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -29,8 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import javax.annotation.Nullable; @@ -41,7 +41,6 @@ import java.util.Deque; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; @@ -50,10 +49,10 @@ /** Base implementation of the SplitReader for reading from KinesisShardSplits. */ @Internal public abstract class KinesisShardSplitReaderBase - implements SplitReader { + implements SplitReader { private static final Logger LOG = LoggerFactory.getLogger(KinesisShardSplitReaderBase.class); - private static final RecordsWithSplitIds INCOMPLETE_SHARD_EMPTY_RECORDS = + private static final RecordsWithSplitIds INCOMPLETE_SHARD_EMPTY_RECORDS = new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false); private final Deque assignedSplits = new ArrayDeque<>(); @@ -65,7 +64,7 @@ protected KinesisShardSplitReaderBase(Map shardMetr } @Override - public RecordsWithSplitIds fetch() throws IOException { + public RecordsWithSplitIds fetch() throws IOException { KinesisShardSplitState splitState = assignedSplits.poll(); // When there are no assigned splits, return quickly @@ -103,7 +102,7 @@ public RecordsWithSplitIds fetch() throws IOException { .get(splitState.getShardId()) .setMillisBehindLatest(recordBatch.getMillisBehindLatest()); - if (recordBatch.getRecords().isEmpty()) { + if (recordBatch.getDeaggregatedRecords().isEmpty()) { if (recordBatch.isCompleted()) { return new KinesisRecordsWithSplitIds( Collections.emptyIterator(), splitState.getSplitId(), true); @@ -115,12 +114,12 @@ public RecordsWithSplitIds fetch() throws IOException { splitState.setNextStartingPosition( StartingPosition.continueFromSequenceNumber( recordBatch - .getRecords() - .get(recordBatch.getRecords().size() - 1) + .getDeaggregatedRecords() + .get(recordBatch.getDeaggregatedRecords().size() - 1) .sequenceNumber())); return new KinesisRecordsWithSplitIds( - recordBatch.getRecords().iterator(), + recordBatch.getDeaggregatedRecords().iterator(), splitState.getSplitId(), recordBatch.isCompleted()); } @@ -154,48 +153,19 @@ public void pauseOrResumeSplits( splitsToResume.forEach(split -> pausedSplitIds.remove(split.splitId())); } - /** - * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records - * from the SplitReader implementation to the SplitReaderBase. - */ - @Internal - protected static class RecordBatch { - private final List records; - private final long millisBehindLatest; - private final boolean completed; - - public RecordBatch(List records, long millisBehindLatest, boolean completed) { - this.records = records; - this.millisBehindLatest = millisBehindLatest; - this.completed = completed; - } - - public List getRecords() { - return records; - } - - public long getMillisBehindLatest() { - return millisBehindLatest; - } - - public boolean isCompleted() { - return completed; - } - } - /** * Implementation of {@link RecordsWithSplitIds} for sending Kinesis records from fetcher to the * SourceReader. */ @Internal - private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds { + private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds { - private final Iterator recordsIterator; + private final Iterator recordsIterator; private final String splitId; private final boolean isComplete; public KinesisRecordsWithSplitIds( - Iterator recordsIterator, String splitId, boolean isComplete) { + Iterator recordsIterator, String splitId, boolean isComplete) { this.recordsIterator = recordsIterator; this.splitId = splitId; this.isComplete = isComplete; @@ -209,7 +179,7 @@ public String nextSplit() { @Nullable @Override - public Record nextRecordFromSplit() { + public KinesisClientRecord nextRecordFromSplit() { return recordsIterator.hasNext() ? recordsIterator.next() : null; } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java index 1126e1045..42cba095a 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java @@ -26,7 +26,7 @@ import org.apache.flink.connector.kinesis.source.split.StartingPosition; import org.apache.flink.util.Collector; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; /** * Emits record from the source into the Flink job graph. This serves as the interface between the @@ -36,7 +36,7 @@ */ @Internal public class KinesisStreamsRecordEmitter - implements RecordEmitter { + implements RecordEmitter { private final KinesisDeserializationSchema deserializationSchema; private final SourceOutputWrapper sourceOutputWrapper = new SourceOutputWrapper<>(); @@ -47,7 +47,7 @@ public KinesisStreamsRecordEmitter(KinesisDeserializationSchema deserializati @Override public void emitRecord( - Record element, SourceOutput output, KinesisShardSplitState splitState) + KinesisClientRecord element, SourceOutput output, KinesisShardSplitState splitState) throws Exception { sourceOutputWrapper.setSourceOutput(output); sourceOutputWrapper.setTimestamp(element.approximateArrivalTimestamp().toEpochMilli()); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index 281d7fe21..b484e50d1 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -31,7 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.util.HashSet; import java.util.List; @@ -45,14 +45,14 @@ @Internal public class KinesisStreamsSourceReader extends SingleThreadMultiplexSourceReaderBase< - Record, T, KinesisShardSplit, KinesisShardSplitState> { + KinesisClientRecord, T, KinesisShardSplit, KinesisShardSplitState> { private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); private final Map shardMetricGroupMap; public KinesisStreamsSourceReader( - SingleThreadFetcherManager splitFetcherManager, - RecordEmitter recordEmitter, + SingleThreadFetcherManager splitFetcherManager, + RecordEmitter recordEmitter, Configuration config, SourceReaderContext context, Map shardMetricGroupMap) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java new file mode 100644 index 000000000..2206989fb --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; + +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records + * from the SplitReader implementation to the SplitReaderBase. + * + *

Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x messages + * are converted to KCL 3.x {@link KinesisClientRecord}. + */ +@Internal +public class RecordBatch { + private final List deaggregatedRecords; + private final long millisBehindLatest; + private final boolean completed; + + public RecordBatch( + final List records, + final KinesisShardSplit subscribedShard, + final long millisBehindLatest, + final boolean completed) { + this.deaggregatedRecords = deaggregateRecords(records, subscribedShard); + this.millisBehindLatest = millisBehindLatest; + this.completed = completed; + } + + public List getDeaggregatedRecords() { + return deaggregatedRecords; + } + + public long getMillisBehindLatest() { + return millisBehindLatest; + } + + public boolean isCompleted() { + return completed; + } + + private List deaggregateRecords( + final List records, + final KinesisShardSplit subscribedShard) { + final List kinesisClientRecords = records.stream() + .map(KinesisClientRecord::fromRecord) + .collect(Collectors.toList()); + + final String startingHashKey = subscribedShard.getStartingHashKey(); + final String endingHashKey = subscribedShard.getEndingHashKey(); + + return new AggregatorUtil().deaggregate(kinesisClientRecords, startingHashKey, endingHashKey); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java index 8ae20293d..e11e2f37c 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; +import org.apache.flink.connector.kinesis.source.reader.RecordBatch; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; @@ -69,7 +70,11 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { if (shardCompleted) { splitSubscriptions.remove(splitState.getShardId()); } - return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted); + return new RecordBatch( + event.records(), + splitState.getKinesisShardSplit(), + event.millisBehindLatest(), + shardCompleted); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java index d2d6cd541..f4d3fc111 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java @@ -24,6 +24,7 @@ import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; +import org.apache.flink.connector.kinesis.source.reader.RecordBatch; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -59,8 +60,12 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { splitState.getNextStartingPosition(), this.maxRecordsToGet); boolean isCompleted = getRecordsResponse.nextShardIterator() == null; + return new RecordBatch( - getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); + getRecordsResponse.records(), + splitState.getKinesisShardSplit(), + getRecordsResponse.millisBehindLatest(), + isCompleted); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java index e43756727..b3a9eb4fa 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java @@ -23,7 +23,7 @@ import org.apache.flink.connector.kinesis.source.KinesisStreamsSource; import org.apache.flink.util.Collector; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; import java.io.Serializable; @@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex * @param output the identifier of the shard the record was sent to * @throws IOException exception when deserializing record */ - void deserialize(Record record, String stream, String shardId, Collector output) + void deserialize(KinesisClientRecord record, String stream, String shardId, Collector output) throws IOException; static KinesisDeserializationSchema of(DeserializationSchema deserializationSchema) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java index 074d8472a..deee974b8 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java @@ -22,9 +22,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; +import java.nio.ByteBuffer; /** * A simple wrapper for using the {@link DeserializationSchema} with the {@link @@ -48,9 +49,14 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc } @Override - public void deserialize(Record record, String stream, String shardId, Collector output) + public void deserialize(KinesisClientRecord record, String stream, String shardId, Collector output) throws IOException { - deserializationSchema.deserialize(record.data().asByteArray(), output); + ByteBuffer recordData = record.data(); + + byte[] dataBytes = new byte[recordData.remaining()]; + recordData.get(dataBytes); + + deserializationSchema.deserialize(dataBytes, output); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java index f9489f99d..940d67141 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java @@ -29,10 +29,10 @@ import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -50,23 +50,23 @@ class KinesisStreamsRecordEmitterTest { @Test void testEmitRecord() throws Exception { final Instant startTime = Instant.now(); - List inputRecords = + List inputRecords = Stream.of( - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-1"))) .approximateArrivalTimestamp(startTime) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-2"))) .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("some-sequence-number") @@ -79,7 +79,7 @@ void testEmitRecord() throws Exception { KinesisStreamsRecordEmitter emitter = new KinesisStreamsRecordEmitter<>(KinesisDeserializationSchema.of(STRING_SCHEMA)); - for (Record record : inputRecords) { + for (KinesisClientRecord record : inputRecords) { emitter.emitRecord(record, output, splitState); } @@ -97,25 +97,25 @@ void testEmitRecord() throws Exception { @Test void testEmitRecordBasedOnSequenceNumber() throws Exception { final Instant startTime = Instant.now(); - List inputRecords = + List inputRecords = Stream.of( - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-1"))) .sequenceNumber("emit") .approximateArrivalTimestamp(startTime) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-2"))) .sequenceNumber("emit") .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("do-not-emit") @@ -126,7 +126,7 @@ void testEmitRecordBasedOnSequenceNumber() throws Exception { KinesisStreamsRecordEmitter emitter = new KinesisStreamsRecordEmitter<>(new SequenceNumberBasedDeserializationSchema()); - for (Record record : inputRecords) { + for (KinesisClientRecord record : inputRecords) { emitter.emitRecord(record, output, splitState); } @@ -139,23 +139,23 @@ void testEmitRecordBasedOnSequenceNumber() throws Exception { @Test void testEmitRecordWithMetadata() throws Exception { final Instant startTime = Instant.now(); - List inputRecords = + List inputRecords = Stream.of( - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-1"))) .approximateArrivalTimestamp(startTime) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-2"))) .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), - Record.builder() + KinesisClientRecord.builder() .data( - SdkBytes.fromByteArray( + ByteBuffer.wrap( STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("some-sequence-number") @@ -168,7 +168,7 @@ void testEmitRecordWithMetadata() throws Exception { new KinesisStreamsRecordEmitter<>( new AssertRecordMetadataDeserializationSchema( splitState.getStreamArn(), splitState.getShardId())); - for (Record record : inputRecords) { + for (KinesisClientRecord record : inputRecords) { emitter.emitRecord(record, output, splitState); } @@ -225,10 +225,13 @@ private static class SequenceNumberBasedDeserializationSchema @Override public void deserialize( - Record record, String stream, String shardId, Collector output) + KinesisClientRecord record, String stream, String shardId, Collector output) throws IOException { if (Objects.equals(record.sequenceNumber(), "emit")) { - STRING_SCHEMA.deserialize(record.data().asByteArray(), output); + ByteBuffer recordData = record.data(); + byte[] dataBytes = new byte[recordData.remaining()]; + recordData.get(dataBytes); + STRING_SCHEMA.deserialize(dataBytes, output); } } @@ -251,11 +254,15 @@ private AssertRecordMetadataDeserializationSchema( @Override public void deserialize( - Record record, String stream, String shardId, Collector output) + KinesisClientRecord record, String stream, String shardId, Collector output) throws IOException { assertThat(stream).isEqualTo(expectedStreamArn); assertThat(shardId).isEqualTo(expectedShardId); - STRING_SCHEMA.deserialize(record.data().asByteArray(), output); + + ByteBuffer recordData = record.data(); + byte[] dataBytes = new byte[recordData.remaining()]; + recordData.get(dataBytes); + STRING_SCHEMA.deserialize(dataBytes, output); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index 2308b6705..d2f4d41d9 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; import java.util.ArrayList; @@ -46,6 +47,7 @@ import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.convertToKinesisClientRecord; import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord; import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; @@ -80,7 +82,7 @@ public void init() { @Test void testNoAssignedSplitsHandledGracefully() throws Exception { - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); assertThat(retrievedRecords.nextSplit()).isNull(); @@ -95,7 +97,7 @@ void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception { new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When fetching records - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); // Then retrieve no records assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); @@ -116,7 +118,7 @@ void testSplitWithExpiredShardHandledAsCompleted() throws Exception { splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); // When fetching records - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); // Then retrieve no records and mark split as complete assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); @@ -128,28 +130,30 @@ void testSplitWithExpiredShardHandledAsCompleted() throws Exception { void testSingleAssignedSplitAllConsumed() throws Exception { // Given assigned split with records testStreamProxy.addShards(TEST_SHARD_ID); - List expectedRecords = + List inputRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(0))); + Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(1))); + Collections.singletonList(inputRecords.get(1))); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(2))); + Collections.singletonList(inputRecords.get(2))); splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + List expectedRecords = convertToKinesisClientRecord(inputRecords); + // When fetching records - List records = new ArrayList<>(); - for (int i = 0; i < expectedRecords.size(); i++) { - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + List records = new ArrayList<>(); + for (int i = 0; i < inputRecords.size(); i++) { + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); records.addAll(readAllRecords(retrievedRecords)); } @@ -160,28 +164,30 @@ void testSingleAssignedSplitAllConsumed() throws Exception { void testMultipleAssignedSplitsAllConsumed() throws Exception { // Given assigned split with records testStreamProxy.addShards(TEST_SHARD_ID); - List expectedRecords = + List inputRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(0))); + Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(1))); + Collections.singletonList(inputRecords.get(1))); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(2))); + Collections.singletonList(inputRecords.get(2))); splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + List expectedRecords = convertToKinesisClientRecord(inputRecords); + // When records are fetched - List fetchedRecords = new ArrayList<>(); + List fetchedRecords = new ArrayList<>(); for (int i = 0; i < expectedRecords.size(); i++) { - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(retrievedRecords)); } @@ -199,7 +205,7 @@ void testHandleEmptyCompletedShard() throws Exception { testStreamProxy.setShouldCompleteNextShard(true); // When fetching records - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); // Returns completed split with no records assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); @@ -211,21 +217,23 @@ void testHandleEmptyCompletedShard() throws Exception { void testFinishedSplitsReturned() throws Exception { // Given assigned split with records from completed shard testStreamProxy.addShards(TEST_SHARD_ID); - List expectedRecords = + List inputRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); - testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, expectedRecords); + testStreamProxy.addRecords(TestUtil.STREAM_ARN, TEST_SHARD_ID, inputRecords); KinesisShardSplit split = getTestSplit(TEST_SHARD_ID); splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(split))); // When fetching records - List fetchedRecords = new ArrayList<>(); + List fetchedRecords = new ArrayList<>(); testStreamProxy.setShouldCompleteNextShard(true); - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + + List expectedRecords = convertToKinesisClientRecord(inputRecords); // Then records can be read successfully, with finishedSplit returned once all records are // completed - for (int i = 0; i < expectedRecords.size(); i++) { + for (int i = 0; i < inputRecords.size(); i++) { assertThat(retrievedRecords.nextSplit()).isEqualTo(split.splitId()); assertThat(retrievedRecords.finishedSplits()).isEmpty(); fetchedRecords.add(retrievedRecords.nextRecordFromSplit()); @@ -245,21 +253,23 @@ void testPauseOrResumeSplits() throws Exception { testStreamProxy.addShards(TEST_SHARD_ID); KinesisShardSplit testSplit = getTestSplit(TEST_SHARD_ID); - List expectedRecords = + List inputRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2")) .collect(Collectors.toList()); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(0))); + Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( TestUtil.STREAM_ARN, TEST_SHARD_ID, - Collections.singletonList(expectedRecords.get(1))); + Collections.singletonList(inputRecords.get(1))); splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); + List expectedRecords = convertToKinesisClientRecord(inputRecords); + // read data from split - RecordsWithSplitIds records = splitReader.fetch(); + RecordsWithSplitIds records = splitReader.fetch(); assertThat(readAllRecords(records)).containsExactlyInAnyOrder(expectedRecords.get(0)); // pause split @@ -330,12 +340,14 @@ record -> Arrays.asList(testSplit1, testSplit3), Collections.emptyList()); // read data from splits and verify that only records from split 2 were fetched by reader - List fetchedRecords = new ArrayList<>(); + List fetchedRecords = new ArrayList<>(); for (int i = 0; i < 10; i++) { - RecordsWithSplitIds records = splitReader.fetch(); + RecordsWithSplitIds records = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(records)); } - assertThat(fetchedRecords).containsExactly(recordsFromSplit2.toArray(new Record[0])); + + List expectedRecordsFromSplit2 = convertToKinesisClientRecord(recordsFromSplit2); + assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit2); // resume split 3 splitReader.pauseOrResumeSplits( @@ -344,10 +356,12 @@ record -> // read data from splits and verify that only records from split 3 had been read fetchedRecords.clear(); for (int i = 0; i < 10; i++) { - RecordsWithSplitIds records = splitReader.fetch(); + RecordsWithSplitIds records = splitReader.fetch(); fetchedRecords.addAll(readAllRecords(records)); } - assertThat(fetchedRecords).containsExactly(recordsFromSplit3.toArray(new Record[0])); + + List expectedRecordsFromSplit3 = convertToKinesisClientRecord(recordsFromSplit3); + assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit3); } @Test @@ -388,16 +402,16 @@ void testMaxRecordsToGetParameterPassed() throws IOException { splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); - List records = new ArrayList<>(readAllRecords(retrievedRecords)); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + List records = new ArrayList<>(readAllRecords(retrievedRecords)); assertThat(sentRecords.size() > maxRecordsToGet).isTrue(); assertThat(records.size()).isEqualTo(maxRecordsToGet); } - private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { - List outputRecords = new ArrayList<>(); - Record record; + private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { + List outputRecords = new ArrayList<>(); + KinesisClientRecord record; do { record = recordsWithSplitIds.nextRecordFromSplit(); if (record != null) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java index e065d4d2f..6bc815274 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java @@ -31,7 +31,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.time.Duration; import java.util.ArrayList; @@ -51,7 +51,7 @@ public class FanOutKinesisShardSplitReaderTest { private static final String TEST_SHARD_ID = TestUtil.generateShardId(1); private static final Duration TEST_SUBSCRIPTION_TIMEOUT = Duration.ofMillis(1000); - SplitReader splitReader; + SplitReader splitReader; private AsyncStreamProxy testAsyncStreamProxy; private Map shardMetricGroupMap; @@ -78,7 +78,7 @@ public void testNoAssignedSplitsHandledGracefully() throws Exception { CONSUMER_ARN, shardMetricGroupMap, TEST_SUBSCRIPTION_TIMEOUT); - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); assertThat(retrievedRecords.nextSplit()).isNull(); @@ -99,7 +99,7 @@ public void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception { new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When fetching records - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); // Then retrieve no records assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); @@ -122,7 +122,7 @@ public void testSplitWithExpiredShardHandledAsCompleted() throws Exception { new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); // When fetching records - RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); // Then shard is marked as completed // Then retrieve no records and mark split as complete @@ -169,17 +169,17 @@ public void testCloseClosesStreamProxy() throws Exception { } private void consumeAllRecordsFromKinesis( - SplitReader splitReader, int numRecords) { + SplitReader splitReader, int numRecords) { consumeRecordsFromKinesis(splitReader, numRecords, true); } private void consumeSomeRecordsFromKinesis( - SplitReader splitReader, int numRecords) { + SplitReader splitReader, int numRecords) { consumeRecordsFromKinesis(splitReader, numRecords, false); } private void consumeRecordsFromKinesis( - SplitReader splitReader, + SplitReader splitReader, int numRecords, boolean checkForShardCompletion) { // Set timeout to prevent infinite loop on failure @@ -187,10 +187,10 @@ private void consumeRecordsFromKinesis( Duration.ofSeconds(60), () -> { int numRetrievedRecords = 0; - RecordsWithSplitIds retrievedRecords = null; + RecordsWithSplitIds retrievedRecords = null; while (numRetrievedRecords < numRecords) { retrievedRecords = splitReader.fetch(); - List records = readAllRecords(retrievedRecords); + List records = readAllRecords(retrievedRecords); numRetrievedRecords += records.size(); } assertThat(numRetrievedRecords).isEqualTo(numRecords); @@ -206,9 +206,9 @@ private void consumeRecordsFromKinesis( "did not receive expected " + numRecords + " records within 10 seconds."); } - private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { - List outputRecords = new ArrayList<>(); - Record record; + private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { + List outputRecords = new ArrayList<>(); + KinesisClientRecord record; do { record = recordsWithSplitIds.nextRecordFromSplit(); if (record != null) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 035d4496e..387770f79 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -32,12 +32,15 @@ import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.math.BigInteger; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -161,6 +164,12 @@ public static Record getTestRecord(String data) { .build(); } + public static List convertToKinesisClientRecord(List records) { + return records.stream() + .map(KinesisClientRecord::fromRecord) + .collect(Collectors.toList()); + } + public static void assertMillisBehindLatest( KinesisShardSplit split, long expectedValue, MetricListener metricListener) { Arn kinesisArn = Arn.fromString(split.getStreamArn()); diff --git a/pom.xml b/pom.xml index d52f695ed..3dbfa73ac 100644 --- a/pom.xml +++ b/pom.xml @@ -347,6 +347,11 @@ under the License. amazon-kinesis-client 1.14.8 + + software.amazon.kinesis + amazon-kinesis-client + 3.0.1 + com.squareup.okio okio From ddcb1a9cf291cd9eab73829925a08da76b2dc107 Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Sat, 1 Feb 2025 14:58:26 +0000 Subject: [PATCH 2/9] [FLINK-32097][Connectors/Kinesis] Added tests for RecordBatch regarding deaggregation. There was a need to create aggregated records to test if the records are being deaggregated correctly. For that there is a dependency that can create aggregated records, but unfortunately it does not have a version available in the maven repository that is compatible with the kcl 3.x. So it uses a release of github that is compatible. Also, the protobuf version set in the dependency management was not compatible with the kcl 3.x --- .../pom.xml | 17 +++ .../source/reader/RecordBatchTest.java | 101 ++++++++++++++++++ pom.xml | 5 - 3 files changed, 118 insertions(+), 5 deletions(-) create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml index 33a99a469..162a650ca 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/pom.xml @@ -33,6 +33,14 @@ under the License. Flink : Connectors : AWS : Amazon Kinesis Data Streams Connector v2 jar + + + + jitpack.io + https://jitpack.io + + + org.apache.flink @@ -107,6 +115,15 @@ under the License. test + + + + com.github.awslabs.kinesis-aggregation + amazon-kinesis-aggregator + 2.0.3 + test + + nl.jqno.equalsverifier equalsverifier diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java new file mode 100644 index 000000000..7bbca8e09 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader; + +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestRecord; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; +import static org.assertj.core.api.Assertions.assertThat; + +class RecordBatchTest { + + @Test + public void testDeaggregateRecordsPassThrough() { + List records = + Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) + .collect(Collectors.toList()); + + RecordBatch result = + new RecordBatch(records, getTestSplit(), 100L, true); + + assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3); + } + + @Test + public void testDeaggregateRecordsWithAggregatedRecords() { + List records = + Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) + .collect(Collectors.toList()); + + Record aggregatedRecord = createAggregatedRecord(records); + + RecordBatch result = + new RecordBatch(Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true); + + assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3); + } + + @Test + public void testGetMillisBehindLatest() { + RecordBatch result = + new RecordBatch( + Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true); + + assertThat(result.getMillisBehindLatest()).isEqualTo(100L); + } + + @Test + public void testIsCompleted() { + RecordBatch result = + new RecordBatch( + Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true); + + assertThat(result.isCompleted()).isTrue(); + } + + private static Record createAggregatedRecord(List records) { + RecordAggregator recordAggregator = new RecordAggregator(); + + for (Record record : records) { + try { + recordAggregator.addUserRecord("key", record.data().asByteArray()); + } catch (Exception e) { + throw new RuntimeException("Failed to add record to aggregator", e); + } + } + + AggRecord aggRecord = recordAggregator.clearAndGet(); + + return Record.builder() + .data(SdkBytes.fromByteArray(aggRecord.toRecordBytes())) + .approximateArrivalTimestamp(Instant.now()) + .build(); + } +} diff --git a/pom.xml b/pom.xml index 3dbfa73ac..6c3b83e94 100644 --- a/pom.xml +++ b/pom.xml @@ -332,11 +332,6 @@ under the License. javassist 3.24.0-GA - - com.google.protobuf - protobuf-java - 3.25.5 - com.google.guava guava From fce6d035270509f31fd4cf3d9a13b0737cb985a1 Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Sat, 1 Feb 2025 14:59:42 +0000 Subject: [PATCH 3/9] [FLINK-32097][Connectors/Kinesis] Added integration tests for deaggregations. --- .../source/KinesisStreamsSourceITCase.java | 73 +++++++++++++++++-- .../source/reader/RecordBatchTest.java | 26 +------ .../kinesis/source/util/TestUtil.java | 21 ++++++ 3 files changed, 88 insertions(+), 32 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java index bf40d6cb5..ca14d6bf1 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java @@ -11,6 +11,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; import com.google.common.collect.Lists; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; @@ -41,7 +43,6 @@ import java.time.Duration; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -117,6 +118,17 @@ void singleShardStreamIsConsumed() throws Exception { .runScenario(); } + @Test + void singleShardStreamWithAggregationIsConsumed() throws Exception { + new Scenario() + .localstackStreamName("single-shard-stream-aggregation") + .shardCount(1) + .aggregationFactor(10) + .withSourceConnectionStreamArn( + "arn:aws:kinesis:ap-southeast-1:000000000000:stream/single-shard-stream-aggregation") + .runScenario(); + } + @Test void multipleShardStreamIsConsumed() throws Exception { new Scenario() @@ -127,6 +139,17 @@ void multipleShardStreamIsConsumed() throws Exception { .runScenario(); } + @Test + void multipleShardStreamWithAggregationIsConsumed() throws Exception { + new Scenario() + .localstackStreamName("multiple-shard-stream-aggregation") + .shardCount(4) + .aggregationFactor(10) + .withSourceConnectionStreamArn( + "arn:aws:kinesis:ap-southeast-1:000000000000:stream/multiple-shard-stream-aggregation") + .runScenario(); + } + @Test void reshardedStreamIsConsumed() throws Exception { new Scenario() @@ -138,6 +161,18 @@ void reshardedStreamIsConsumed() throws Exception { .runScenario(); } + @Test + void reshardedStreamWithAggregationIsConsumed() throws Exception { + new Scenario() + .localstackStreamName("resharded-stream-aggregation") + .shardCount(1) + .aggregationFactor(10) + .reshardStream(2) + .withSourceConnectionStreamArn( + "arn:aws:kinesis:ap-southeast-1:000000000000:stream/resharded-stream-aggregation") + .runScenario(); + } + private Configuration getDefaultConfiguration() { Configuration configuration = new Configuration(); configuration.setString(AWS_ENDPOINT, MOCK_KINESIS_CONTAINER.getEndpoint()); @@ -152,6 +187,7 @@ private Configuration getDefaultConfiguration() { private class Scenario { private final int expectedElements = 1000; + private int aggregationFactor = 1; private String localstackStreamName = null; private int shardCount = 1; private boolean shouldReshardStream = false; @@ -203,6 +239,11 @@ public Scenario reshardStream(int targetShardCount) { return this; } + public Scenario aggregationFactor(int aggregationFactor) { + this.aggregationFactor = aggregationFactor; + return this; + } + private void prepareStream(String streamName) throws Exception { final RateLimiter rateLimiter = RateLimiterBuilder.newBuilder() @@ -242,13 +283,9 @@ private void putRecords(String streamName, int startInclusive, int endInclusive) for (List partition : Lists.partition(messages, 500)) { List entries = - partition.stream() - .map( - msg -> - PutRecordsRequestEntry.builder() - .partitionKey(UUID.randomUUID().toString()) - .data(SdkBytes.fromByteArray(msg)) - .build()) + Lists.partition(partition, aggregationFactor) + .stream() + .map(this::createAggregatePutRecordsRequestEntry) .collect(Collectors.toList()); PutRecordsRequest requests = PutRecordsRequest.builder().streamName(streamName).records(entries).build(); @@ -259,6 +296,26 @@ private void putRecords(String streamName, int startInclusive, int endInclusive) } } + private PutRecordsRequestEntry createAggregatePutRecordsRequestEntry(List messages) { + RecordAggregator recordAggregator = new RecordAggregator(); + + for (byte[] message : messages) { + try { + recordAggregator.addUserRecord("key", message); + } catch (Exception e) { + throw new RuntimeException("Failed to add record to aggregator", e); + } + } + + AggRecord aggRecord = recordAggregator.clearAndGet(); + + return PutRecordsRequestEntry.builder() + .data(SdkBytes.fromByteArray(aggRecord.toRecordBytes())) + .partitionKey(aggRecord.getPartitionKey()) + .explicitHashKey(aggRecord.getExplicitHashKey()) + .build(); + } + private void reshard(String streamName) { kinesisClient.updateShardCount( UpdateShardCountRequest.builder() diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java index 7bbca8e09..af9e89149 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java @@ -18,13 +18,10 @@ package org.apache.flink.connector.kinesis.source.reader; -import com.amazonaws.kinesis.agg.AggRecord; -import com.amazonaws.kinesis.agg.RecordAggregator; +import org.apache.flink.connector.kinesis.source.util.TestUtil; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.Record; -import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -54,7 +51,7 @@ public void testDeaggregateRecordsWithAggregatedRecords() { Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); - Record aggregatedRecord = createAggregatedRecord(records); + Record aggregatedRecord = TestUtil.createAggregatedRecord(records); RecordBatch result = new RecordBatch(Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true); @@ -79,23 +76,4 @@ public void testIsCompleted() { assertThat(result.isCompleted()).isTrue(); } - - private static Record createAggregatedRecord(List records) { - RecordAggregator recordAggregator = new RecordAggregator(); - - for (Record record : records) { - try { - recordAggregator.addUserRecord("key", record.data().asByteArray()); - } catch (Exception e) { - throw new RuntimeException("Failed to add record to aggregator", e); - } - } - - AggRecord aggRecord = recordAggregator.clearAndGet(); - - return Record.builder() - .data(SdkBytes.fromByteArray(aggRecord.toRecordBytes())) - .approximateArrivalTimestamp(Instant.now()) - .build(); - } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 387770f79..596efdfe7 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.kinesis.source.util; +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.connector.kinesis.source.metrics.MetricConstants; @@ -170,6 +172,25 @@ public static List convertToKinesisClientRecord(List records) { + RecordAggregator recordAggregator = new RecordAggregator(); + + for (Record record : records) { + try { + recordAggregator.addUserRecord("key", record.data().asByteArray()); + } catch (Exception e) { + throw new RuntimeException("Failed to add record to aggregator", e); + } + } + + AggRecord aggRecord = recordAggregator.clearAndGet(); + + return Record.builder() + .data(SdkBytes.fromByteArray(aggRecord.toRecordBytes())) + .approximateArrivalTimestamp(Instant.now()) + .build(); + } + public static void assertMillisBehindLatest( KinesisShardSplit split, long expectedValue, MetricListener metricListener) { Arn kinesisArn = Arn.fromString(split.getStreamArn()); From 82c6ed23c57c91d708fd30b798b9394a36ec779a Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Sat, 1 Feb 2025 15:00:16 +0000 Subject: [PATCH 4/9] [FLINK-32097][Connectors/Kinesis] Improved KinesisStreamsSourceITCase for aggregation tests. --- .../kinesis/source/KinesisStreamsSourceITCase.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java index ca14d6bf1..f4ea8a6d0 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java @@ -26,7 +26,6 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.regions.Region; @@ -308,12 +307,7 @@ private PutRecordsRequestEntry createAggregatePutRecordsRequestEntry(List Date: Sat, 1 Feb 2025 15:00:52 +0000 Subject: [PATCH 5/9] [FLINK-32097][Connectors/Kinesis] Added tests to PollingKinesisShardSplitReader for aggregated records. --- .../PollingKinesisShardSplitReaderTest.java | 32 +++++++++++++++++++ .../source/reader/RecordBatchTest.java | 1 + .../FakeKinesisFanOutBehaviorsFactory.java | 7 +++- .../kinesis/source/util/TestUtil.java | 23 ++++++++++--- 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index d2f4d41d9..ced6be5a9 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -35,6 +35,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -195,6 +196,37 @@ void testMultipleAssignedSplitsAllConsumed() throws Exception { assertThat(fetchedRecords).containsExactlyInAnyOrderElementsOf(expectedRecords); } + @Test + void testAggregatedRecordsAreDeaggregated() throws Exception { + // Given assigned split with aggregated records + testStreamProxy.addShards(TEST_SHARD_ID); + List inputRecords = + Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) + .collect(Collectors.toList()); + + KinesisClientRecord aggregatedRecord = TestUtil.createKinesisAggregatedRecord(inputRecords); + testStreamProxy.addRecords( + TestUtil.STREAM_ARN, + TEST_SHARD_ID, + Collections.singletonList(TestUtil.convertToRecord(aggregatedRecord))); + + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + List expectedRecords = convertToKinesisClientRecord(inputRecords).stream() + .map(KinesisClientRecord::data) + .collect(Collectors.toList()); + + // When fetching records + List fetchedRecords = readAllRecords(splitReader.fetch()); + + // Then all records are fetched + assertThat(fetchedRecords) + .allMatch(KinesisClientRecord::aggregated) + .allMatch(record -> record.explicitHashKey().equals(aggregatedRecord.explicitHashKey())) + .extracting("data").containsExactlyInAnyOrderElementsOf(expectedRecords); + } + @Test void testHandleEmptyCompletedShard() throws Exception { // Given assigned split with no records, and the shard is complete diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java index af9e89149..14230949b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java @@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.source.reader; import org.apache.flink.connector.kinesis.source.util.TestUtil; + import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.kinesis.model.Record; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java index 6f563229b..e49425e5b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static com.google.common.collect.Lists.partition; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition; @@ -121,7 +122,11 @@ List getEventsToSend() { records.add(createRecord(sequenceNumber)); } - eventBuilder.records(records); + List aggregatedRecords = partition(records, aggregationFactor).stream() + .map(TestUtil::createAggregatedRecord) + .collect(Collectors.toList()); + + eventBuilder.records(aggregatedRecords); String continuation = sequenceNumber.get() < totalRecords diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 596efdfe7..5c2715443 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -18,8 +18,6 @@ package org.apache.flink.connector.kinesis.source.util; -import com.amazonaws.kinesis.agg.AggRecord; -import com.amazonaws.kinesis.agg.RecordAggregator; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.connector.kinesis.source.metrics.MetricConstants; @@ -29,6 +27,8 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.testutils.MetricListener; +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; import software.amazon.awssdk.arns.Arn; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; @@ -37,6 +37,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.math.BigInteger; +import java.nio.ByteBuffer; import java.time.Instant; import java.util.Collections; import java.util.List; @@ -166,6 +167,13 @@ public static Record getTestRecord(String data) { .build(); } + public static Record convertToRecord(KinesisClientRecord record) { + return Record.builder() + .data(SdkBytes.fromByteBuffer(record.data())) + .approximateArrivalTimestamp(record.approximateArrivalTimestamp()) + .build(); + } + public static List convertToKinesisClientRecord(List records) { return records.stream() .map(KinesisClientRecord::fromRecord) @@ -173,6 +181,11 @@ public static List convertToKinesisClientRecord(List records) { + KinesisClientRecord aggregatedRecord = createKinesisAggregatedRecord(records); + return convertToRecord(aggregatedRecord); + } + + public static KinesisClientRecord createKinesisAggregatedRecord(List records) { RecordAggregator recordAggregator = new RecordAggregator(); for (Record record : records) { @@ -185,8 +198,10 @@ public static Record createAggregatedRecord(List records) { AggRecord aggRecord = recordAggregator.clearAndGet(); - return Record.builder() - .data(SdkBytes.fromByteArray(aggRecord.toRecordBytes())) + return KinesisClientRecord.builder() + .data(ByteBuffer.wrap(aggRecord.toRecordBytes())) + .partitionKey(aggRecord.getPartitionKey()) + .explicitHashKey(aggRecord.getExplicitHashKey()) .approximateArrivalTimestamp(Instant.now()) .build(); } From 156dc6ba1cc79b67b9174649ccb396e577638701 Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Sat, 1 Feb 2025 15:01:34 +0000 Subject: [PATCH 6/9] [FLINK-32097][Connectors/Kinesis] Changed RecordBatch deaggregate from stream to loop. --- .../connector/kinesis/source/reader/RecordBatch.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java index 2206989fb..e1ac7e695 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java @@ -25,8 +25,8 @@ import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records @@ -66,9 +66,10 @@ public boolean isCompleted() { private List deaggregateRecords( final List records, final KinesisShardSplit subscribedShard) { - final List kinesisClientRecords = records.stream() - .map(KinesisClientRecord::fromRecord) - .collect(Collectors.toList()); + final List kinesisClientRecords = new ArrayList<>(); + for (Record record : records) { + kinesisClientRecords.add(KinesisClientRecord.fromRecord(record)); + } final String startingHashKey = subscribedShard.getStartingHashKey(); final String endingHashKey = subscribedShard.getEndingHashKey(); From ae629b49173899d125ba966e3204331d41c9f2a7 Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Fri, 7 Feb 2025 18:56:16 +0000 Subject: [PATCH 7/9] [FLINK-32097][Connectors/Kinesis] Spotless apply --- .../kinesis/source/KinesisStreamsSource.java | 6 +- .../reader/KinesisShardSplitReaderBase.java | 3 +- .../reader/KinesisStreamsSourceReader.java | 2 +- .../kinesis/source/reader/RecordBatch.java | 14 ++--- .../KinesisDeserializationSchemaWrapper.java | 3 +- .../source/KinesisStreamsSourceITCase.java | 6 +- .../KinesisStreamsRecordEmitterTest.java | 36 +++--------- .../PollingKinesisShardSplitReaderTest.java | 55 ++++++++----------- .../source/reader/RecordBatchTest.java | 16 ++++-- .../FanOutKinesisShardSplitReaderTest.java | 3 +- .../FakeKinesisFanOutBehaviorsFactory.java | 7 ++- .../kinesis/source/util/TestUtil.java | 4 +- 12 files changed, 69 insertions(+), 86 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 4c56aa09c..1d1dc0eac 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -209,8 +209,10 @@ public SimpleVersionedSerializer getSplitSerializer() { return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer()); } - private Supplier> getKinesisShardSplitReaderSupplier( - Configuration sourceConfig, Map shardMetricGroupMap) { + private Supplier> + getKinesisShardSplitReaderSupplier( + Configuration sourceConfig, + Map shardMetricGroupMap) { KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE); switch (readerType) { // We create a new stream proxy for each split reader since they have their own diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index 3063861fe..5825f51de 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -158,7 +158,8 @@ public void pauseOrResumeSplits( * SourceReader. */ @Internal - private static class KinesisRecordsWithSplitIds implements RecordsWithSplitIds { + private static class KinesisRecordsWithSplitIds + implements RecordsWithSplitIds { private final Iterator recordsIterator; private final String splitId; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java index b484e50d1..eaf036807 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsSourceReader.java @@ -45,7 +45,7 @@ @Internal public class KinesisStreamsSourceReader extends SingleThreadMultiplexSourceReaderBase< - KinesisClientRecord, T, KinesisShardSplit, KinesisShardSplitState> { + KinesisClientRecord, T, KinesisShardSplit, KinesisShardSplitState> { private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSourceReader.class); private final Map shardMetricGroupMap; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java index e1ac7e695..11725e6cf 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/RecordBatch.java @@ -29,11 +29,11 @@ import java.util.List; /** - * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records - * from the SplitReader implementation to the SplitReaderBase. + * Dataclass to store a batch of Kinesis records with metadata. Used to pass Kinesis records from + * the SplitReader implementation to the SplitReaderBase. * - *

Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x messages - * are converted to KCL 3.x {@link KinesisClientRecord}. + *

Input records are de-aggregated using KCL 3.x library. It is expected that AWS SDK v2.x + * messages are converted to KCL 3.x {@link KinesisClientRecord}. */ @Internal public class RecordBatch { @@ -64,8 +64,7 @@ public boolean isCompleted() { } private List deaggregateRecords( - final List records, - final KinesisShardSplit subscribedShard) { + final List records, final KinesisShardSplit subscribedShard) { final List kinesisClientRecords = new ArrayList<>(); for (Record record : records) { kinesisClientRecords.add(KinesisClientRecord.fromRecord(record)); @@ -74,6 +73,7 @@ private List deaggregateRecords( final String startingHashKey = subscribedShard.getStartingHashKey(); final String endingHashKey = subscribedShard.getEndingHashKey(); - return new AggregatorUtil().deaggregate(kinesisClientRecords, startingHashKey, endingHashKey); + return new AggregatorUtil() + .deaggregate(kinesisClientRecords, startingHashKey, endingHashKey); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java index deee974b8..b594c5dc5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java @@ -49,7 +49,8 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc } @Override - public void deserialize(KinesisClientRecord record, String stream, String shardId, Collector output) + public void deserialize( + KinesisClientRecord record, String stream, String shardId, Collector output) throws IOException { ByteBuffer recordData = record.data(); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java index f4ea8a6d0..5f0598c85 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java @@ -282,8 +282,7 @@ private void putRecords(String streamName, int startInclusive, int endInclusive) for (List partition : Lists.partition(messages, 500)) { List entries = - Lists.partition(partition, aggregationFactor) - .stream() + Lists.partition(partition, aggregationFactor).stream() .map(this::createAggregatePutRecordsRequestEntry) .collect(Collectors.toList()); PutRecordsRequest requests = @@ -295,7 +294,8 @@ private void putRecords(String streamName, int startInclusive, int endInclusive) } } - private PutRecordsRequestEntry createAggregatePutRecordsRequestEntry(List messages) { + private PutRecordsRequestEntry createAggregatePutRecordsRequestEntry( + List messages) { RecordAggregator recordAggregator = new RecordAggregator(); for (byte[] message : messages) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java index 940d67141..b3a0809fe 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java @@ -53,21 +53,15 @@ void testEmitRecord() throws Exception { List inputRecords = Stream.of( KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-1"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1"))) .approximateArrivalTimestamp(startTime) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-2"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2"))) .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-3"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("some-sequence-number") .build()) @@ -100,23 +94,17 @@ void testEmitRecordBasedOnSequenceNumber() throws Exception { List inputRecords = Stream.of( KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-1"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1"))) .sequenceNumber("emit") .approximateArrivalTimestamp(startTime) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-2"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2"))) .sequenceNumber("emit") .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-3"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("do-not-emit") .build()) @@ -142,21 +130,15 @@ void testEmitRecordWithMetadata() throws Exception { List inputRecords = Stream.of( KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-1"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-1"))) .approximateArrivalTimestamp(startTime) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-2"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-2"))) .approximateArrivalTimestamp(startTime.plusSeconds(10)) .build(), KinesisClientRecord.builder() - .data( - ByteBuffer.wrap( - STRING_SCHEMA.serialize("data-3"))) + .data(ByteBuffer.wrap(STRING_SCHEMA.serialize("data-3"))) .approximateArrivalTimestamp(startTime.plusSeconds(20)) .sequenceNumber("some-sequence-number") .build()) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index ced6be5a9..850f28885 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -135,17 +135,11 @@ void testSingleAssignedSplitAllConsumed() throws Exception { Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(0))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(1))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(2))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(2))); splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); @@ -169,17 +163,11 @@ void testMultipleAssignedSplitsAllConsumed() throws Exception { Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(0))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(1))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(2))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(2))); splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); @@ -213,9 +201,10 @@ void testAggregatedRecordsAreDeaggregated() throws Exception { splitReader.handleSplitsChanges( new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); - List expectedRecords = convertToKinesisClientRecord(inputRecords).stream() - .map(KinesisClientRecord::data) - .collect(Collectors.toList()); + List expectedRecords = + convertToKinesisClientRecord(inputRecords).stream() + .map(KinesisClientRecord::data) + .collect(Collectors.toList()); // When fetching records List fetchedRecords = readAllRecords(splitReader.fetch()); @@ -223,8 +212,11 @@ void testAggregatedRecordsAreDeaggregated() throws Exception { // Then all records are fetched assertThat(fetchedRecords) .allMatch(KinesisClientRecord::aggregated) - .allMatch(record -> record.explicitHashKey().equals(aggregatedRecord.explicitHashKey())) - .extracting("data").containsExactlyInAnyOrderElementsOf(expectedRecords); + .allMatch( + record -> + record.explicitHashKey().equals(aggregatedRecord.explicitHashKey())) + .extracting("data") + .containsExactlyInAnyOrderElementsOf(expectedRecords); } @Test @@ -289,13 +281,9 @@ void testPauseOrResumeSplits() throws Exception { Stream.of(getTestRecord("data-1"), getTestRecord("data-2")) .collect(Collectors.toList()); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(0))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(0))); testStreamProxy.addRecords( - TestUtil.STREAM_ARN, - TEST_SHARD_ID, - Collections.singletonList(inputRecords.get(1))); + TestUtil.STREAM_ARN, TEST_SHARD_ID, Collections.singletonList(inputRecords.get(1))); splitReader.handleSplitsChanges(new SplitsAddition<>(Collections.singletonList(testSplit))); List expectedRecords = convertToKinesisClientRecord(inputRecords); @@ -378,7 +366,8 @@ record -> fetchedRecords.addAll(readAllRecords(records)); } - List expectedRecordsFromSplit2 = convertToKinesisClientRecord(recordsFromSplit2); + List expectedRecordsFromSplit2 = + convertToKinesisClientRecord(recordsFromSplit2); assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit2); // resume split 3 @@ -392,7 +381,8 @@ record -> fetchedRecords.addAll(readAllRecords(records)); } - List expectedRecordsFromSplit3 = convertToKinesisClientRecord(recordsFromSplit3); + List expectedRecordsFromSplit3 = + convertToKinesisClientRecord(recordsFromSplit3); assertThat(fetchedRecords).containsExactlyElementsOf(expectedRecordsFromSplit3); } @@ -441,7 +431,8 @@ void testMaxRecordsToGetParameterPassed() throws IOException { assertThat(records.size()).isEqualTo(maxRecordsToGet); } - private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { + private List readAllRecords( + RecordsWithSplitIds recordsWithSplitIds) { List outputRecords = new ArrayList<>(); KinesisClientRecord record; do { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java index 14230949b..e05ccc4c8 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/RecordBatchTest.java @@ -40,8 +40,7 @@ public void testDeaggregateRecordsPassThrough() { Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) .collect(Collectors.toList()); - RecordBatch result = - new RecordBatch(records, getTestSplit(), 100L, true); + RecordBatch result = new RecordBatch(records, getTestSplit(), 100L, true); assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3); } @@ -55,7 +54,8 @@ public void testDeaggregateRecordsWithAggregatedRecords() { Record aggregatedRecord = TestUtil.createAggregatedRecord(records); RecordBatch result = - new RecordBatch(Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true); + new RecordBatch( + Collections.singletonList(aggregatedRecord), getTestSplit(), 100L, true); assertThat(result.getDeaggregatedRecords().size()).isEqualTo(3); } @@ -64,7 +64,10 @@ public void testDeaggregateRecordsWithAggregatedRecords() { public void testGetMillisBehindLatest() { RecordBatch result = new RecordBatch( - Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true); + Collections.singletonList(getTestRecord("data-1")), + getTestSplit(), + 100L, + true); assertThat(result.getMillisBehindLatest()).isEqualTo(100L); } @@ -73,7 +76,10 @@ public void testGetMillisBehindLatest() { public void testIsCompleted() { RecordBatch result = new RecordBatch( - Collections.singletonList(getTestRecord("data-1")), getTestSplit(), 100L, true); + Collections.singletonList(getTestRecord("data-1")), + getTestSplit(), + 100L, + true); assertThat(result.isCompleted()).isTrue(); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java index 6bc815274..07ca0cc6b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java @@ -206,7 +206,8 @@ private void consumeRecordsFromKinesis( "did not receive expected " + numRecords + " records within 10 seconds."); } - private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { + private List readAllRecords( + RecordsWithSplitIds recordsWithSplitIds) { List outputRecords = new ArrayList<>(); KinesisClientRecord record; do { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java index e49425e5b..a78465c60 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java @@ -122,9 +122,10 @@ List getEventsToSend() { records.add(createRecord(sequenceNumber)); } - List aggregatedRecords = partition(records, aggregationFactor).stream() - .map(TestUtil::createAggregatedRecord) - .collect(Collectors.toList()); + List aggregatedRecords = + partition(records, aggregationFactor).stream() + .map(TestUtil::createAggregatedRecord) + .collect(Collectors.toList()); eventBuilder.records(aggregatedRecords); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 5c2715443..768a3dc63 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -175,9 +175,7 @@ public static Record convertToRecord(KinesisClientRecord record) { } public static List convertToKinesisClientRecord(List records) { - return records.stream() - .map(KinesisClientRecord::fromRecord) - .collect(Collectors.toList()); + return records.stream().map(KinesisClientRecord::fromRecord).collect(Collectors.toList()); } public static Record createAggregatedRecord(List records) { From 0d33ec322120f479fcb067a45976582d9c2c17ba Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Fri, 16 May 2025 13:57:24 +0100 Subject: [PATCH 8/9] [FLINK-32097][Connectors/Kinesis] Update fork --- .../kinesis/source/reader/KinesisShardSplitReaderBase.java | 2 +- .../kinesis/source/reader/KinesisShardSplitReaderBaseTest.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index f0d6ea6b5..f5d01679e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -190,7 +190,7 @@ private boolean skipUntilScheduledFetchTime(KinesisShardSplitState splitState) * @param recordBatch recordBatch returned by fetchRecords() */ private void scheduleNextFetchTime(KinesisShardSplitState splitState, RecordBatch recordBatch) { - if (recordBatch == null || recordBatch.getRecords().isEmpty()) { + if (recordBatch == null || recordBatch.getDeaggregatedRecords().isEmpty()) { long scheduledGetRecordTimeMillis = System.currentTimeMillis() + emptyRecordsIntervalMillis; this.scheduledFetchTimes.put(splitState, scheduledGetRecordTimeMillis); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java index bc8b1b5f2..b09a7aea1 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBaseTest.java @@ -202,7 +202,8 @@ public EmptyRecordReturningReader( @Override protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { super.fetchRecords(splitState); - return new RecordBatch(Collections.emptyList(), 0L, false); + return new RecordBatch( + Collections.emptyList(), splitState.getKinesisShardSplit(), 0L, false); } } From 9de53bb0837f8037490c3bd8212b2be9b6be2bfe Mon Sep 17 00:00:00 2001 From: Luis Marques Date: Fri, 16 May 2025 15:26:05 +0100 Subject: [PATCH 9/9] [FLINK-32097][Connectors/Kinesis] Reverted change on the KinesisDeserializationSchema to use Record instead of KinesisRecord. In order to not change any @Public or @PublicEvolving classes the record type change was reverted. But that adds the need to convert the KinesisClientRecord to Record. That conversion was added in the KinesisStreamsRecordEmitter --- .../reader/KinesisStreamsRecordEmitter.java | 17 ++++++++++++++++- .../KinesisDeserializationSchema.java | 4 ++-- .../KinesisDeserializationSchemaWrapper.java | 13 +++---------- .../reader/KinesisStreamsRecordEmitterTest.java | 16 +++++----------- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java index 42cba095a..f995d0f03 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java @@ -26,6 +26,8 @@ import org.apache.flink.connector.kinesis.source.split.StartingPosition; import org.apache.flink.util.Collector; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.retrieval.KinesisClientRecord; /** @@ -52,11 +54,24 @@ public void emitRecord( sourceOutputWrapper.setSourceOutput(output); sourceOutputWrapper.setTimestamp(element.approximateArrivalTimestamp().toEpochMilli()); deserializationSchema.deserialize( - element, splitState.getStreamArn(), splitState.getShardId(), sourceOutputWrapper); + convertKinesisClientRecordToRecord(element), + splitState.getStreamArn(), + splitState.getShardId(), + sourceOutputWrapper); splitState.setNextStartingPosition( StartingPosition.continueFromSequenceNumber(element.sequenceNumber())); } + private Record convertKinesisClientRecordToRecord(KinesisClientRecord record) { + return Record.builder() + .sequenceNumber(record.sequenceNumber()) + .approximateArrivalTimestamp(record.approximateArrivalTimestamp()) + .data(SdkBytes.fromByteBuffer(record.data())) + .partitionKey(record.partitionKey()) + .encryptionType(record.encryptionType()) + .build(); + } + private static class SourceOutputWrapper implements Collector { private SourceOutput sourceOutput; private long timestamp; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java index b3a9eb4fa..e43756727 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchema.java @@ -23,7 +23,7 @@ import org.apache.flink.connector.kinesis.source.KinesisStreamsSource; import org.apache.flink.util.Collector; -import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.awssdk.services.kinesis.model.Record; import java.io.IOException; import java.io.Serializable; @@ -60,7 +60,7 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex * @param output the identifier of the shard the record was sent to * @throws IOException exception when deserializing record */ - void deserialize(KinesisClientRecord record, String stream, String shardId, Collector output) + void deserialize(Record record, String stream, String shardId, Collector output) throws IOException; static KinesisDeserializationSchema of(DeserializationSchema deserializationSchema) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java index b594c5dc5..074d8472a 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/serialization/KinesisDeserializationSchemaWrapper.java @@ -22,10 +22,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; -import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.awssdk.services.kinesis.model.Record; import java.io.IOException; -import java.nio.ByteBuffer; /** * A simple wrapper for using the {@link DeserializationSchema} with the {@link @@ -49,15 +48,9 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc } @Override - public void deserialize( - KinesisClientRecord record, String stream, String shardId, Collector output) + public void deserialize(Record record, String stream, String shardId, Collector output) throws IOException { - ByteBuffer recordData = record.data(); - - byte[] dataBytes = new byte[recordData.remaining()]; - recordData.get(dataBytes); - - deserializationSchema.deserialize(dataBytes, output); + deserializationSchema.deserialize(record.data().asByteArray(), output); } @Override diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java index b3a0809fe..8f4c9a3dd 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitterTest.java @@ -29,6 +29,7 @@ import org.apache.flink.util.Collector; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.IOException; @@ -207,13 +208,10 @@ private static class SequenceNumberBasedDeserializationSchema @Override public void deserialize( - KinesisClientRecord record, String stream, String shardId, Collector output) + Record record, String stream, String shardId, Collector output) throws IOException { if (Objects.equals(record.sequenceNumber(), "emit")) { - ByteBuffer recordData = record.data(); - byte[] dataBytes = new byte[recordData.remaining()]; - recordData.get(dataBytes); - STRING_SCHEMA.deserialize(dataBytes, output); + STRING_SCHEMA.deserialize(record.data().asByteArray(), output); } } @@ -236,15 +234,11 @@ private AssertRecordMetadataDeserializationSchema( @Override public void deserialize( - KinesisClientRecord record, String stream, String shardId, Collector output) + Record record, String stream, String shardId, Collector output) throws IOException { assertThat(stream).isEqualTo(expectedStreamArn); assertThat(shardId).isEqualTo(expectedShardId); - - ByteBuffer recordData = record.data(); - byte[] dataBytes = new byte[recordData.remaining()]; - recordData.get(dataBytes); - STRING_SCHEMA.deserialize(dataBytes, output); + STRING_SCHEMA.deserialize(record.data().asByteArray(), output); } @Override