Skip to content

Commit 51572c8

Browse files
authored
Fixed JSON serialization to honor Jackson annotations (#139)
1 parent 8450f8b commit 51572c8

File tree

7 files changed

+78
-54
lines changed

7 files changed

+78
-54
lines changed

java/FlinkDataGenerator/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@
114114

115115
<!-- Test dependencies -->
116116
<dependency>
117-
<groupId>junit</groupId>
118-
<artifactId>junit</artifactId>
119-
<version>4.13.2</version>
117+
<groupId>org.junit.jupiter</groupId>
118+
<artifactId>junit-jupiter</artifactId>
119+
<version>5.10.0</version>
120120
<scope>test</scope>
121121
</dependency>
122122
</dependencies>

java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/DataGeneratorJob.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public static void main(String[] args) throws Exception {
192192
KinesisStreamsSink<StockPrice> kinesisSink = createKinesisSink(
193193
kinesisProperties,
194194
// Serialize the Kinesis record as JSON
195-
new JsonSerializationSchema<>(),
195+
new JsonSerializationSchema<StockPrice>(),
196196
// Shard by `ticker`
197197
partitionKeyGenerator
198198
);
@@ -203,7 +203,7 @@ public static void main(String[] args) throws Exception {
203203
// Create Kafka sink with JSON serialization (only if configured)
204204
if (hasKafkaSink) {
205205
String kafkaTopic = Preconditions.checkNotNull(StringUtils.trimToNull(kafkaProperties.getProperty("topic")), "Kafka topic not defined");
206-
SerializationSchema<StockPrice> valueSerializationSchema = new JsonSerializationSchema<>();
206+
SerializationSchema<StockPrice> valueSerializationSchema = new JsonSerializationSchema<StockPrice>();
207207
SerializationSchema<StockPrice> keySerializationSchema = (stockPrice) -> stockPrice.getTicker().getBytes();
208208
KafkaRecordSerializationSchema<StockPrice> kafkaRecordSerializationSchema =
209209
KafkaRecordSerializationSchema.<StockPrice>builder()

java/FlinkDataGenerator/src/main/java/com/amazonaws/services/msf/domain/StockPrice.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package com.amazonaws.services.msf.domain;
22

3-
import com.fasterxml.jackson.annotation.JsonProperty;
43

54
import java.util.Objects;
65

76
public class StockPrice {
8-
// This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the
9-
// appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization
10-
@JsonProperty("event_time")
7+
// IMPORTANT: you must use the shaded Jackson annotations (under org.apache.flink.shaded.jackson2.com.fasterxml...)
8+
// otherwise JsonSerializationSchema, which is also shaded, will not honor it.
9+
@org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty("event_time")
1110
private String eventTime;
1211
private String ticker;
1312
private float price;

java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/DataGeneratorJobTest.java

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import org.apache.flink.connector.datagen.source.GeneratorFunction;
88
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
99
import org.apache.flink.connector.kafka.sink.KafkaSink;
10-
import org.junit.Test;
11-
import static org.junit.Assert.*;
10+
import org.junit.jupiter.api.Test;
11+
import static org.junit.jupiter.api.Assertions.*;
1212
import java.util.Properties;
1313
import java.util.HashMap;
1414
import java.util.Map;
@@ -33,20 +33,20 @@ public void testCreateDataGeneratorSource() throws Exception {
3333
DataGeneratorSource<StockPrice> source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
3434
null, dataGenProps, generatorFunction, typeInfo);
3535

36-
assertNotNull("DataGeneratorSource should not be null", source);
36+
assertNotNull(source, "DataGeneratorSource should not be null");
3737

3838
// Test with null properties (should use default rate)
3939
source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
4040
null, null, generatorFunction, typeInfo);
4141

42-
assertNotNull("DataGeneratorSource should not be null with null properties", source);
42+
assertNotNull(source, "DataGeneratorSource should not be null with null properties");
4343

4444
// Test with empty properties (should use default rate)
4545
Properties emptyProps = new Properties();
4646
source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
4747
null, emptyProps, generatorFunction, typeInfo);
4848

49-
assertNotNull("DataGeneratorSource should not be null with empty properties", source);
49+
assertNotNull(source, "DataGeneratorSource should not be null with empty properties");
5050
}
5151

5252
@Test
@@ -72,7 +72,7 @@ public void testCreateKafkaSink() throws Exception {
7272
KafkaSink<StockPrice> kafkaSink = (KafkaSink<StockPrice>) createKafkaSinkMethod.invoke(
7373
null, kafkaProps, recordSerializationSchema);
7474

75-
assertNotNull("KafkaSink should not be null", kafkaSink);
75+
assertNotNull(kafkaSink, "KafkaSink should not be null");
7676
}
7777

7878
@Test
@@ -85,20 +85,18 @@ public void testKafkaPartitioningKey() {
8585
byte[] key1 = stock1.getTicker().getBytes();
8686
byte[] key2 = stock2.getTicker().getBytes();
8787

88-
assertNotNull("Kafka key should not be null", key1);
89-
assertNotNull("Kafka key should not be null", key2);
90-
assertTrue("Kafka key should not be empty", key1.length > 0);
91-
assertTrue("Kafka key should not be empty", key2.length > 0);
88+
assertNotNull(key1, "Kafka key should not be null");
89+
assertNotNull(key2, "Kafka key should not be null");
90+
assertTrue(key1.length > 0, "Kafka key should not be empty");
91+
assertTrue(key2.length > 0, "Kafka key should not be empty");
9292

9393
// Test that different tickers produce different keys
94-
assertFalse("Different tickers should produce different keys",
95-
java.util.Arrays.equals(key1, key2));
94+
assertFalse(java.util.Arrays.equals(key1, key2), "Different tickers should produce different keys");
9695

9796
// Test that same ticker produces same key
9897
StockPrice stock3 = new StockPrice("2024-01-15T10:30:47", "AAPL", 175.50f);
9998
byte[] key3 = stock3.getTicker().getBytes();
100-
assertTrue("Same ticker should produce same key",
101-
java.util.Arrays.equals(key1, key3));
99+
assertTrue(java.util.Arrays.equals(key1, key3), "Same ticker should produce same key");
102100
}
103101

104102
@Test
@@ -109,9 +107,9 @@ public void testConditionalSinkValidation() {
109107
// Test with no sinks configured - should be invalid
110108
boolean hasKinesis = appProperties.get("KinesisSink") != null;
111109
boolean hasKafka = appProperties.get("KafkaSink") != null;
112-
assertFalse("Should not have Kinesis sink when not configured", hasKinesis);
113-
assertFalse("Should not have Kafka sink when not configured", hasKafka);
114-
assertTrue("Should require at least one sink", !hasKinesis && !hasKafka);
110+
assertFalse(hasKinesis, "Should not have Kinesis sink when not configured");
111+
assertFalse(hasKafka, "Should not have Kafka sink when not configured");
112+
assertTrue(!hasKinesis && !hasKafka, "Should require at least one sink");
115113

116114
// Test with only Kinesis configured - should be valid
117115
Properties kinesisProps = new Properties();
@@ -121,9 +119,9 @@ public void testConditionalSinkValidation() {
121119

122120
hasKinesis = appProperties.get("KinesisSink") != null;
123121
hasKafka = appProperties.get("KafkaSink") != null;
124-
assertTrue("Should have Kinesis sink when configured", hasKinesis);
125-
assertFalse("Should not have Kafka sink when not configured", hasKafka);
126-
assertTrue("Should be valid with one sink", hasKinesis || hasKafka);
122+
assertTrue(hasKinesis, "Should have Kinesis sink when configured");
123+
assertFalse(hasKafka, "Should not have Kafka sink when not configured");
124+
assertTrue(hasKinesis || hasKafka, "Should be valid with one sink");
127125

128126
// Test with only Kafka configured - should be valid
129127
appProperties.clear();
@@ -134,17 +132,17 @@ public void testConditionalSinkValidation() {
134132

135133
hasKinesis = appProperties.get("KinesisSink") != null;
136134
hasKafka = appProperties.get("KafkaSink") != null;
137-
assertFalse("Should not have Kinesis sink when not configured", hasKinesis);
138-
assertTrue("Should have Kafka sink when configured", hasKafka);
139-
assertTrue("Should be valid with one sink", hasKinesis || hasKafka);
135+
assertFalse(hasKinesis, "Should not have Kinesis sink when not configured");
136+
assertTrue(hasKafka, "Should have Kafka sink when configured");
137+
assertTrue(hasKinesis || hasKafka, "Should be valid with one sink");
140138

141139
// Test with both configured - should be valid
142140
appProperties.put("KinesisSink", kinesisProps);
143141

144142
hasKinesis = appProperties.get("KinesisSink") != null;
145143
hasKafka = appProperties.get("KafkaSink") != null;
146-
assertTrue("Should have Kinesis sink when configured", hasKinesis);
147-
assertTrue("Should have Kafka sink when configured", hasKafka);
148-
assertTrue("Should be valid with both sinks", hasKinesis || hasKafka);
144+
assertTrue(hasKinesis, "Should have Kinesis sink when configured");
145+
assertTrue(hasKafka, "Should have Kafka sink when configured");
146+
assertTrue(hasKinesis || hasKafka, "Should be valid with both sinks");
149147
}
150148
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.amazonaws.services.msf;
2+
3+
import com.amazonaws.services.msf.domain.StockPrice;
4+
import org.apache.flink.formats.json.JsonSerializationSchema;
5+
import org.junit.jupiter.api.Test;
6+
import static org.junit.jupiter.api.Assertions.*;
7+
8+
public class JsonSerializationTest {
9+
10+
@Test
11+
public void testStockPriceJsonSerialization() throws Exception {
12+
StockPrice stock = new StockPrice("2024-05-28T19:53:17.497201", "AMZN", 42.88f);
13+
14+
JsonSerializationSchema<StockPrice> serializer = new JsonSerializationSchema<StockPrice>();
15+
serializer.open(null);
16+
17+
byte[] jsonBytes = serializer.serialize(stock);
18+
String json = new String(jsonBytes);
19+
20+
System.out.println("Actual JSON: " + json);
21+
22+
// Verify the JSON contains event_time (not eventTime)
23+
assertTrue(json.contains("\"event_time\""), "JSON should contain 'event_time' field");
24+
assertFalse(json.contains("\"eventTime\""), "JSON should not contain 'eventTime' field");
25+
26+
// Verify other fields
27+
assertTrue(json.contains("\"ticker\":\"AMZN\""), "JSON should contain ticker");
28+
assertTrue(json.contains("\"price\":42.88"), "JSON should contain price");
29+
}
30+
}

java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceGeneratorFunctionTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.amazonaws.services.msf.domain;
22

3-
import org.junit.Test;
4-
import static org.junit.Assert.*;
3+
import org.junit.jupiter.api.Test;
4+
import static org.junit.jupiter.api.Assertions.*;
55
import java.util.Arrays;
66
import java.util.List;
77

@@ -45,19 +45,18 @@ public void testStockGeneratorFunction() throws Exception {
4545

4646
// Verify ticker is one of the expected values
4747
List<String> expectedTickers = Arrays.asList(TICKERS);
48-
assertTrue("Ticker should be one of the expected values",
49-
expectedTickers.contains(stock.getTicker()));
48+
assertTrue(expectedTickers.contains(stock.getTicker()), "Ticker should be one of the expected values");
5049

5150
// Verify price is within expected range (0 to 100)
52-
assertTrue("Price should be >= 0", stock.getPrice() >= 0);
53-
assertTrue("Price should be <= 100", stock.getPrice() <= 100);
51+
assertTrue(stock.getPrice() >= 0, "Price should be >= 0");
52+
assertTrue(stock.getPrice() <= 100, "Price should be <= 100");
5453

5554
// Verify price has at most 2 decimal places
5655
String priceStr = String.valueOf(stock.getPrice());
5756
int decimalIndex = priceStr.indexOf('.');
5857
if (decimalIndex != -1) {
5958
int decimalPlaces = priceStr.length() - decimalIndex - 1;
60-
assertTrue("Price should have at most 2 decimal places", decimalPlaces <= 2);
59+
assertTrue(decimalPlaces <= 2, "Price should have at most 2 decimal places");
6160
}
6261
}
6362

java/FlinkDataGenerator/src/test/java/com/amazonaws/services/msf/domain/StockPriceTest.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.amazonaws.services.msf.domain;
22

3-
import org.junit.Test;
4-
import static org.junit.Assert.*;
3+
import org.junit.jupiter.api.Test;
4+
import static org.junit.jupiter.api.Assertions.*;
55

66
public class StockPriceTest {
77

@@ -41,24 +41,22 @@ public void testStockHashCodeForPartitioning() {
4141
StockPrice stock3 = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); // Same as stock1
4242

4343
// Test that hashCode is consistent for equal objects
44-
assertEquals("Equal stock objects should have same hashCode",
45-
stock1.hashCode(), stock3.hashCode());
44+
assertEquals(stock1.hashCode(), stock3.hashCode(), "Equal stock objects should have same hashCode");
4645

4746
// Test that equals works correctly
48-
assertEquals("Same stock objects should be equal", stock1, stock3);
49-
assertNotEquals("Different stock objects should not be equal", stock1, stock2);
47+
assertEquals(stock1, stock3, "Same stock objects should be equal");
48+
assertNotEquals(stock1, stock2, "Different stock objects should not be equal");
5049

5150
// Test that different stocks likely have different hashCodes
52-
assertNotEquals("Different stock objects should likely have different hashCodes",
53-
stock1.hashCode(), stock2.hashCode());
51+
assertNotEquals(stock1.hashCode(), stock2.hashCode(), "Different stock objects should likely have different hashCodes");
5452

5553
// Test that hashCode can be used as partition key (should not throw exception)
5654
String partitionKey1 = String.valueOf(stock1.hashCode());
5755
String partitionKey2 = String.valueOf(stock2.hashCode());
5856

59-
assertNotNull("Partition key should not be null", partitionKey1);
60-
assertNotNull("Partition key should not be null", partitionKey2);
61-
assertFalse("Partition key should not be empty", partitionKey1.isEmpty());
62-
assertFalse("Partition key should not be empty", partitionKey2.isEmpty());
57+
assertNotNull(partitionKey1, "Partition key should not be null");
58+
assertNotNull(partitionKey2, "Partition key should not be null");
59+
assertFalse(partitionKey1.isEmpty(), "Partition key should not be empty");
60+
assertFalse(partitionKey2.isEmpty(), "Partition key should not be empty");
6361
}
6462
}

0 commit comments

Comments
 (0)