Skip to content

Commit ba209e2

Browse files
feat: Bigquery sink using depot (#154) (#185)
* feat: Bigquery sink using depot (#154) * chore: fix checkstyle * feat: Checkpointing on bq sink (#187) * feat: prepare for commit * fix: clear the messages for pushing to bq * docs: add documentation for BQ sink in Dagger - [#188] * chore: version bump of depot Co-authored-by: Sumit Aich <[email protected]>
1 parent 6133f7f commit ba209e2

35 files changed

+960
-214
lines changed

dagger-common/src/main/java/io/odpf/dagger/common/core/Constants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ public class Constants {
55
public static final boolean SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT = false;
66
public static final String SCHEMA_REGISTRY_STENCIL_URLS_KEY = "SCHEMA_REGISTRY_STENCIL_URLS";
77
public static final String SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT = "";
8-
public static final String SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY = "SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS";
9-
public static final Integer SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT = 60000;
8+
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS = "SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS";
9+
public static final Integer SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT = 60000;
1010
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_KEY = "SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS";
1111
public static final String SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS_DEFAULT = "";
1212

dagger-common/src/main/java/io/odpf/dagger/common/core/StencilClientOrchestrator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ public StencilClientOrchestrator(Configuration configuration) {
3838
}
3939

4040
StencilConfig createStencilConfig() {
41-
Integer timeoutMS = configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
42-
List<Header> headers = this.getHeaders(configuration);
43-
return StencilConfig.builder().fetchTimeoutMs(timeoutMS).fetchHeaders(headers).build();
41+
return StencilConfig.builder()
42+
.fetchHeaders(getHeaders(configuration))
43+
.fetchTimeoutMs(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT))
44+
.build();
4445
}
4546

4647
private List<Header> getHeaders(Configuration config) {

dagger-common/src/main/java/io/odpf/dagger/common/serde/proto/deserialization/ProtoDeserializer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package io.odpf.dagger.common.serde.proto.deserialization;
22

3-
import io.odpf.dagger.common.serde.DaggerDeserializer;
4-
import org.apache.flink.api.common.typeinfo.TypeInformation;
5-
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
6-
import org.apache.flink.types.Row;
7-
83
import com.google.protobuf.Descriptors;
94
import com.google.protobuf.DynamicMessage;
105
import com.google.protobuf.InvalidProtocolBufferException;
116
import io.odpf.dagger.common.core.StencilClientOrchestrator;
127
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
138
import io.odpf.dagger.common.exceptions.serde.DaggerDeserializationException;
9+
import io.odpf.dagger.common.serde.DaggerDeserializer;
1410
import io.odpf.dagger.common.serde.typehandler.RowFactory;
11+
import org.apache.flink.api.common.typeinfo.TypeInformation;
12+
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
13+
import org.apache.flink.types.Row;
1514
import org.apache.kafka.clients.consumer.ConsumerRecord;
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
@@ -25,11 +24,11 @@
2524
*/
2625
public class ProtoDeserializer implements KafkaDeserializationSchema<Row>, DaggerDeserializer<Row> {
2726

27+
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class);
2828
private final String protoClassName;
2929
private final int timestampFieldIndex;
3030
private final StencilClientOrchestrator stencilClientOrchestrator;
3131
private final TypeInformation<Row> typeInformation;
32-
private static final Logger LOGGER = LoggerFactory.getLogger(ProtoDeserializer.class);
3332

3433
/**
3534
* Instantiates a new Proto deserializer.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.odpf.dagger.common.serde.proto.serialization;
2+
3+
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
4+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
5+
import org.apache.flink.types.Row;
6+
7+
import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import java.util.Objects;
13+
14+
public class KafkaProtoSerializer implements KafkaRecordSerializationSchema<Row> {
15+
private final String outputTopic;
16+
private final ProtoSerializer protoSerializer;
17+
private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink");
18+
19+
public KafkaProtoSerializer(ProtoSerializer protoSerializer) {
20+
this(protoSerializer, "");
21+
}
22+
23+
public KafkaProtoSerializer(ProtoSerializer protoSerializer, String outputTopic) {
24+
this.protoSerializer = protoSerializer;
25+
this.outputTopic = outputTopic;
26+
}
27+
28+
@Override
29+
public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
30+
KafkaRecordSerializationSchema.super.open(context, sinkContext);
31+
}
32+
33+
@Override
34+
public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext context, Long timestamp) {
35+
if (Objects.isNull(outputTopic) || outputTopic.equals("")) {
36+
throw new DaggerSerializationException("outputTopic is required");
37+
}
38+
LOGGER.info("row to kafka: " + row);
39+
byte[] key = protoSerializer.serializeKey(row);
40+
byte[] message = protoSerializer.serializeValue(row);
41+
return new ProducerRecord<>(outputTopic, key, message);
42+
}
43+
}
Lines changed: 22 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,38 @@
11
package io.odpf.dagger.common.serde.proto.serialization;
22

3-
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
4-
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
5-
import org.apache.flink.types.Row;
6-
73
import com.google.protobuf.Descriptors;
84
import com.google.protobuf.DynamicMessage;
95
import io.odpf.dagger.common.core.StencilClientOrchestrator;
106
import io.odpf.dagger.common.exceptions.DescriptorNotFoundException;
117
import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
128
import io.odpf.dagger.common.exceptions.serde.InvalidColumnMappingException;
9+
import org.apache.flink.types.Row;
1310
import io.odpf.dagger.common.serde.typehandler.TypeHandler;
1411
import io.odpf.dagger.common.serde.typehandler.TypeHandlerFactory;
15-
import org.apache.kafka.clients.producer.ProducerRecord;
16-
import org.slf4j.Logger;
17-
import org.slf4j.LoggerFactory;
1812

13+
import java.io.Serializable;
1914
import java.util.Arrays;
2015
import java.util.Objects;
2116

22-
public class ProtoSerializer implements KafkaRecordSerializationSchema<Row> {
23-
private String[] columnNames;
24-
private StencilClientOrchestrator stencilClientOrchestrator;
25-
private String keyProtoClassName;
26-
private String messageProtoClassName;
27-
private String outputTopic;
28-
private static final Logger LOGGER = LoggerFactory.getLogger("KafkaSink");
17+
public class ProtoSerializer implements Serializable {
18+
19+
private final String keyProtoClassName;
20+
private final String[] columnNames;
21+
private final StencilClientOrchestrator stencilClientOrchestrator;
22+
private final String messageProtoClassName;
2923

30-
/**
31-
* Instantiates a new Proto serializer with specified output topic name.
32-
*
33-
* @param keyProtoClassName the key proto class name
34-
* @param messageProtoClassName the message proto class name
35-
* @param columnNames the column names
36-
* @param stencilClientOrchestrator the stencil client orchestrator
37-
*/
3824
public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) {
39-
if (Objects.isNull(messageProtoClassName)) {
40-
throw new DaggerSerializationException("messageProtoClassName is required");
41-
}
4225
this.keyProtoClassName = keyProtoClassName;
43-
this.messageProtoClassName = messageProtoClassName;
4426
this.columnNames = columnNames;
4527
this.stencilClientOrchestrator = stencilClientOrchestrator;
28+
this.messageProtoClassName = messageProtoClassName;
29+
checkValidity();
4630
}
4731

48-
/**
49-
* Instantiates a new Proto serializer with specified output topic name.
50-
*
51-
* @param keyProtoClassName the key proto class name
52-
* @param messageProtoClassName the message proto class name
53-
* @param columnNames the column names
54-
* @param stencilClientOrchestrator the stencil client orchestrator
55-
* @param outputTopic the output topic
56-
*/
57-
public ProtoSerializer(String keyProtoClassName, String messageProtoClassName, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, String outputTopic) {
58-
this(keyProtoClassName, messageProtoClassName, columnNames, stencilClientOrchestrator);
59-
this.outputTopic = outputTopic;
60-
}
61-
62-
@Override
63-
public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
64-
KafkaRecordSerializationSchema.super.open(context, sinkContext);
65-
}
66-
67-
@Override
68-
public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext context, Long timestamp) {
69-
if (Objects.isNull(outputTopic) || outputTopic.equals("")) {
70-
throw new DaggerSerializationException("outputTopic is required");
32+
private void checkValidity() {
33+
if (Objects.isNull(messageProtoClassName) || messageProtoClassName.isEmpty()) {
34+
throw new DaggerSerializationException("messageProtoClassName is required");
7135
}
72-
LOGGER.info("row to kafka: " + row);
73-
byte[] key = serializeKey(row);
74-
byte[] message = serializeValue(row);
75-
return new ProducerRecord<>(outputTopic, key, message);
7636
}
7737

7838
/**
@@ -82,16 +42,10 @@ public ProducerRecord<byte[], byte[]> serialize(Row row, KafkaSinkContext contex
8242
* @return the byte [ ]
8343
*/
8444
public byte[] serializeKey(Row row) {
85-
return (Objects.isNull(keyProtoClassName) || keyProtoClassName.equals("")) ? null
45+
return (Objects.isNull(keyProtoClassName) || keyProtoClassName.isEmpty()) ? null
8646
: parse(row, getDescriptor(keyProtoClassName)).toByteArray();
8747
}
8848

89-
/**
90-
* Serialize value message.
91-
*
92-
* @param row the row
93-
* @return the byte [ ]
94-
*/
9549
public byte[] serializeValue(Row row) {
9650
return parse(row, getDescriptor(messageProtoClassName)).toByteArray();
9751
}
@@ -117,6 +71,14 @@ private DynamicMessage parse(Row element, Descriptors.Descriptor descriptor) {
11771
return builder.build();
11872
}
11973

74+
private Descriptors.Descriptor getDescriptor(String className) {
75+
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className);
76+
if (dsc == null) {
77+
throw new DescriptorNotFoundException();
78+
}
79+
return dsc;
80+
}
81+
12082
private DynamicMessage.Builder populateNestedBuilder(Descriptors.Descriptor parentDescriptor, String[] nestedColumnNames, DynamicMessage.Builder parentBuilder, Object data) {
12183
String childColumnName = nestedColumnNames[0];
12284
Descriptors.FieldDescriptor childFieldDescriptor = parentDescriptor.findFieldByName(childColumnName);
@@ -153,12 +115,4 @@ private DynamicMessage.Builder populateBuilder(DynamicMessage.Builder builder, D
153115

154116
return builder;
155117
}
156-
157-
private Descriptors.Descriptor getDescriptor(String className) {
158-
Descriptors.Descriptor dsc = stencilClientOrchestrator.getStencilClient().get(className);
159-
if (dsc == null) {
160-
throw new DescriptorNotFoundException();
161-
}
162-
return dsc;
163-
}
164118
}

dagger-common/src/test/java/io/odpf/dagger/common/core/StencilClientOrchestratorTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private Configuration getConfig(Map<String, String> mapConfig) {
3737
public void shouldReturnClassLoadStencilClientIfStencilDisabled() throws NoSuchFieldException, IllegalAccessException {
3838
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT);
3939
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT);
40-
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
40+
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
4141
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
4242
stencilClient = stencilClientOrchestrator.getStencilClient();
4343

@@ -53,7 +53,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill
5353
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,"
5454
+ "http://localhost/events/latest,"
5555
+ "http://localhost/entities/release");
56-
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
56+
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
5757
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
5858
stencilClient = stencilClientOrchestrator.getStencilClient();
5959

@@ -66,7 +66,7 @@ public void shouldReturnMultiURLStencilClient() throws NoSuchFieldException, Ill
6666
@Test
6767
public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAccessException {
6868
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true);
69-
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
69+
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
7070
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,");
7171
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
7272
StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient();
@@ -93,7 +93,7 @@ public void shouldEnrichStencilClient() throws NoSuchFieldException, IllegalAcce
9393
public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldException, IllegalAccessException {
9494
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(true);
9595
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn("http://localhost/latest,");
96-
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
96+
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
9797
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
9898
StencilClient oldStencilClient = stencilClientOrchestrator.getStencilClient();
9999

@@ -118,7 +118,7 @@ public void shouldNotEnrichIfNoNewAdditionalURLsAdded() throws NoSuchFieldExcept
118118
public void shouldReturnClassLoadStencilClientWhenStencilDisabledAndEnrichmentStencilUrlsIsNotNull() throws NoSuchFieldException, IllegalAccessException {
119119
when(configuration.getBoolean(SCHEMA_REGISTRY_STENCIL_ENABLE_KEY, SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_ENABLE_DEFAULT);
120120
when(configuration.getString(SCHEMA_REGISTRY_STENCIL_URLS_KEY, SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_URLS_DEFAULT);
121-
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT);
121+
when(configuration.getInteger(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT)).thenReturn(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT);
122122
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
123123

124124
List<String> enrichmentStencilURLs = Collections
@@ -138,13 +138,13 @@ public void shouldReturnDefaultTimeoutIfTimeoutMsConfigNotSet() {
138138
Configuration config = getConfig(configMap);
139139
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config);
140140
StencilConfig stencilConfig = stencilClientOrchestrator.createStencilConfig();
141-
assertEquals(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs());
141+
assertEquals(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS_DEFAULT, stencilConfig.getFetchTimeoutMs());
142142
}
143143

144144
@Test
145145
public void shouldReturnConfiguredTimeoutIfTimeoutMsConfigIsSet() {
146146
Map<String, String> configMap = new HashMap<String, String>() {{
147-
put(SCHEMA_REGISTRY_STENCIL_TIMEOUT_MS_KEY, "8000");
147+
put(SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS, "8000");
148148
}};
149149
Configuration config = getConfig(configMap);
150150
StencilClientOrchestrator stencilClientOrchestrator = new StencilClientOrchestrator(config);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.odpf.dagger.common.serde.proto.serialization;
2+
3+
import io.odpf.dagger.common.exceptions.serde.DaggerSerializationException;
4+
import org.apache.flink.types.Row;
5+
import org.apache.kafka.clients.producer.ProducerRecord;
6+
import org.junit.Assert;
7+
import org.junit.Test;
8+
import org.mockito.Mockito;
9+
10+
import static org.junit.Assert.assertEquals;
11+
import static org.junit.Assert.assertThrows;
12+
13+
public class KafkaProtoSerializerTest {
14+
15+
@Test
16+
public void shouldSerializeIntoKafkaRecord() {
17+
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
18+
String outputTopic = "test";
19+
Row element = new Row(1);
20+
element.setField(0, "testing");
21+
byte[] keyBytes = "key".getBytes();
22+
byte[] valueBytes = "value".getBytes();
23+
Mockito.when(serializer.serializeKey(element)).thenReturn(keyBytes);
24+
Mockito.when(serializer.serializeValue(element)).thenReturn(valueBytes);
25+
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, outputTopic);
26+
ProducerRecord<byte[], byte[]> record = kafkaProtoSerializer.serialize(element, null, null);
27+
ProducerRecord<byte[], byte[]> expectedRecord = new ProducerRecord<>("test", keyBytes, valueBytes);
28+
Assert.assertEquals(expectedRecord, record);
29+
}
30+
31+
@Test
32+
public void shouldThrowExceptionWhenOutputTopicIsNullForSerializeMethod() {
33+
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
34+
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, null);
35+
Row element = new Row(1);
36+
element.setField(0, "1234");
37+
DaggerSerializationException exception = assertThrows(DaggerSerializationException.class,
38+
() -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000));
39+
assertEquals("outputTopic is required", exception.getMessage());
40+
}
41+
42+
@Test
43+
public void shouldThrowExceptionWhenOutputTopicIsEmptyForSerializeMethod() {
44+
ProtoSerializer serializer = Mockito.mock(ProtoSerializer.class);
45+
KafkaProtoSerializer kafkaProtoSerializer = new KafkaProtoSerializer(serializer, "");
46+
Row element = new Row(1);
47+
element.setField(0, "1234");
48+
49+
DaggerSerializationException exception = assertThrows(DaggerSerializationException.class,
50+
() -> kafkaProtoSerializer.serialize(element, null, System.currentTimeMillis() / 1000));
51+
assertEquals("outputTopic is required", exception.getMessage());
52+
}
53+
}

0 commit comments

Comments
 (0)