Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions java/FlinkDataGenerator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@

<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public static void main(String[] args) throws Exception {
KinesisStreamsSink<StockPrice> kinesisSink = createKinesisSink(
kinesisProperties,
// Serialize the Kinesis record as JSON
new JsonSerializationSchema<>(),
new JsonSerializationSchema<StockPrice>(),
// Shard by `ticker`
partitionKeyGenerator
);
Expand All @@ -203,7 +203,7 @@ public static void main(String[] args) throws Exception {
// Create Kafka sink with JSON serialization (only if configured)
if (hasKafkaSink) {
String kafkaTopic = Preconditions.checkNotNull(StringUtils.trimToNull(kafkaProperties.getProperty("topic")), "Kafka topic not defined");
SerializationSchema<StockPrice> valueSerializationSchema = new JsonSerializationSchema<>();
SerializationSchema<StockPrice> valueSerializationSchema = new JsonSerializationSchema<StockPrice>();
SerializationSchema<StockPrice> keySerializationSchema = (stockPrice) -> stockPrice.getTicker().getBytes();
KafkaRecordSerializationSchema<StockPrice> kafkaRecordSerializationSchema =
KafkaRecordSerializationSchema.<StockPrice>builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.amazonaws.services.msf.domain;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class StockPrice {
// This annotation as well as the associated jackson2 import is needed to correctly map the JSON input key to the
// appropriate POJO property name to ensure event_time isn't missed in serialization and deserialization
@JsonProperty("event_time")
// IMPORTANT: you must use the shaded Jackson annotations (under org.apache.flink.shaded.jackson2.com.fasterxml...)
// otherwise JsonSerializationSchema, which is also shaded, will not honor it.
@org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty("event_time")
private String eventTime;
private String ticker;
private float price;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.junit.Test;
import static org.junit.Assert.*;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Properties;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -33,20 +33,20 @@ public void testCreateDataGeneratorSource() throws Exception {
DataGeneratorSource<StockPrice> source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
null, dataGenProps, generatorFunction, typeInfo);

assertNotNull("DataGeneratorSource should not be null", source);
assertNotNull(source, "DataGeneratorSource should not be null");

// Test with null properties (should use default rate)
source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
null, null, generatorFunction, typeInfo);

assertNotNull("DataGeneratorSource should not be null with null properties", source);
assertNotNull(source, "DataGeneratorSource should not be null with null properties");

// Test with empty properties (should use default rate)
Properties emptyProps = new Properties();
source = (DataGeneratorSource<StockPrice>) createDataGeneratorSourceMethod.invoke(
null, emptyProps, generatorFunction, typeInfo);

assertNotNull("DataGeneratorSource should not be null with empty properties", source);
assertNotNull(source, "DataGeneratorSource should not be null with empty properties");
}

@Test
Expand All @@ -72,7 +72,7 @@ public void testCreateKafkaSink() throws Exception {
KafkaSink<StockPrice> kafkaSink = (KafkaSink<StockPrice>) createKafkaSinkMethod.invoke(
null, kafkaProps, recordSerializationSchema);

assertNotNull("KafkaSink should not be null", kafkaSink);
assertNotNull(kafkaSink, "KafkaSink should not be null");
}

@Test
Expand All @@ -85,20 +85,18 @@ public void testKafkaPartitioningKey() {
byte[] key1 = stock1.getTicker().getBytes();
byte[] key2 = stock2.getTicker().getBytes();

assertNotNull("Kafka key should not be null", key1);
assertNotNull("Kafka key should not be null", key2);
assertTrue("Kafka key should not be empty", key1.length > 0);
assertTrue("Kafka key should not be empty", key2.length > 0);
assertNotNull(key1, "Kafka key should not be null");
assertNotNull(key2, "Kafka key should not be null");
assertTrue(key1.length > 0, "Kafka key should not be empty");
assertTrue(key2.length > 0, "Kafka key should not be empty");

// Test that different tickers produce different keys
assertFalse("Different tickers should produce different keys",
java.util.Arrays.equals(key1, key2));
assertFalse(java.util.Arrays.equals(key1, key2), "Different tickers should produce different keys");

// Test that same ticker produces same key
StockPrice stock3 = new StockPrice("2024-01-15T10:30:47", "AAPL", 175.50f);
byte[] key3 = stock3.getTicker().getBytes();
assertTrue("Same ticker should produce same key",
java.util.Arrays.equals(key1, key3));
assertTrue(java.util.Arrays.equals(key1, key3), "Same ticker should produce same key");
}

@Test
Expand All @@ -109,9 +107,9 @@ public void testConditionalSinkValidation() {
// Test with no sinks configured - should be invalid
boolean hasKinesis = appProperties.get("KinesisSink") != null;
boolean hasKafka = appProperties.get("KafkaSink") != null;
assertFalse("Should not have Kinesis sink when not configured", hasKinesis);
assertFalse("Should not have Kafka sink when not configured", hasKafka);
assertTrue("Should require at least one sink", !hasKinesis && !hasKafka);
assertFalse(hasKinesis, "Should not have Kinesis sink when not configured");
assertFalse(hasKafka, "Should not have Kafka sink when not configured");
assertTrue(!hasKinesis && !hasKafka, "Should require at least one sink");

// Test with only Kinesis configured - should be valid
Properties kinesisProps = new Properties();
Expand All @@ -121,9 +119,9 @@ public void testConditionalSinkValidation() {

hasKinesis = appProperties.get("KinesisSink") != null;
hasKafka = appProperties.get("KafkaSink") != null;
assertTrue("Should have Kinesis sink when configured", hasKinesis);
assertFalse("Should not have Kafka sink when not configured", hasKafka);
assertTrue("Should be valid with one sink", hasKinesis || hasKafka);
assertTrue(hasKinesis, "Should have Kinesis sink when configured");
assertFalse(hasKafka, "Should not have Kafka sink when not configured");
assertTrue(hasKinesis || hasKafka, "Should be valid with one sink");

// Test with only Kafka configured - should be valid
appProperties.clear();
Expand All @@ -134,17 +132,17 @@ public void testConditionalSinkValidation() {

hasKinesis = appProperties.get("KinesisSink") != null;
hasKafka = appProperties.get("KafkaSink") != null;
assertFalse("Should not have Kinesis sink when not configured", hasKinesis);
assertTrue("Should have Kafka sink when configured", hasKafka);
assertTrue("Should be valid with one sink", hasKinesis || hasKafka);
assertFalse(hasKinesis, "Should not have Kinesis sink when not configured");
assertTrue(hasKafka, "Should have Kafka sink when configured");
assertTrue(hasKinesis || hasKafka, "Should be valid with one sink");

// Test with both configured - should be valid
appProperties.put("KinesisSink", kinesisProps);

hasKinesis = appProperties.get("KinesisSink") != null;
hasKafka = appProperties.get("KafkaSink") != null;
assertTrue("Should have Kinesis sink when configured", hasKinesis);
assertTrue("Should have Kafka sink when configured", hasKafka);
assertTrue("Should be valid with both sinks", hasKinesis || hasKafka);
assertTrue(hasKinesis, "Should have Kinesis sink when configured");
assertTrue(hasKafka, "Should have Kafka sink when configured");
assertTrue(hasKinesis || hasKafka, "Should be valid with both sinks");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.amazonaws.services.msf;

import com.amazonaws.services.msf.domain.StockPrice;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class JsonSerializationTest {

@Test
public void testStockPriceJsonSerialization() throws Exception {
StockPrice stock = new StockPrice("2024-05-28T19:53:17.497201", "AMZN", 42.88f);

JsonSerializationSchema<StockPrice> serializer = new JsonSerializationSchema<StockPrice>();
serializer.open(null);

byte[] jsonBytes = serializer.serialize(stock);
String json = new String(jsonBytes);

System.out.println("Actual JSON: " + json);

// Verify the JSON contains event_time (not eventTime)
assertTrue(json.contains("\"event_time\""), "JSON should contain 'event_time' field");
assertFalse(json.contains("\"eventTime\""), "JSON should not contain 'eventTime' field");

// Verify other fields
assertTrue(json.contains("\"ticker\":\"AMZN\""), "JSON should contain ticker");
assertTrue(json.contains("\"price\":42.88"), "JSON should contain price");
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.amazonaws.services.msf.domain;

import org.junit.Test;
import static org.junit.Assert.*;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -45,19 +45,18 @@ public void testStockGeneratorFunction() throws Exception {

// Verify ticker is one of the expected values
List<String> expectedTickers = Arrays.asList(TICKERS);
assertTrue("Ticker should be one of the expected values",
expectedTickers.contains(stock.getTicker()));
assertTrue(expectedTickers.contains(stock.getTicker()), "Ticker should be one of the expected values");

// Verify price is within expected range (0 to 100)
assertTrue("Price should be >= 0", stock.getPrice() >= 0);
assertTrue("Price should be <= 100", stock.getPrice() <= 100);
assertTrue(stock.getPrice() >= 0, "Price should be >= 0");
assertTrue(stock.getPrice() <= 100, "Price should be <= 100");

// Verify price has at most 2 decimal places
String priceStr = String.valueOf(stock.getPrice());
int decimalIndex = priceStr.indexOf('.');
if (decimalIndex != -1) {
int decimalPlaces = priceStr.length() - decimalIndex - 1;
assertTrue("Price should have at most 2 decimal places", decimalPlaces <= 2);
assertTrue(decimalPlaces <= 2, "Price should have at most 2 decimal places");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.amazonaws.services.msf.domain;

import org.junit.Test;
import static org.junit.Assert.*;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

public class StockPriceTest {

Expand Down Expand Up @@ -41,24 +41,22 @@ public void testStockHashCodeForPartitioning() {
StockPrice stock3 = new StockPrice("2024-01-15T10:30:45", "AAPL", 150.25f); // Same as stock1

// Test that hashCode is consistent for equal objects
assertEquals("Equal stock objects should have same hashCode",
stock1.hashCode(), stock3.hashCode());
assertEquals(stock1.hashCode(), stock3.hashCode(), "Equal stock objects should have same hashCode");

// Test that equals works correctly
assertEquals("Same stock objects should be equal", stock1, stock3);
assertNotEquals("Different stock objects should not be equal", stock1, stock2);
assertEquals(stock1, stock3, "Same stock objects should be equal");
assertNotEquals(stock1, stock2, "Different stock objects should not be equal");

// Test that different stocks likely have different hashCodes
assertNotEquals("Different stock objects should likely have different hashCodes",
stock1.hashCode(), stock2.hashCode());
assertNotEquals(stock1.hashCode(), stock2.hashCode(), "Different stock objects should likely have different hashCodes");

// Test that hashCode can be used as partition key (should not throw exception)
String partitionKey1 = String.valueOf(stock1.hashCode());
String partitionKey2 = String.valueOf(stock2.hashCode());

assertNotNull("Partition key should not be null", partitionKey1);
assertNotNull("Partition key should not be null", partitionKey2);
assertFalse("Partition key should not be empty", partitionKey1.isEmpty());
assertFalse("Partition key should not be empty", partitionKey2.isEmpty());
assertNotNull(partitionKey1, "Partition key should not be null");
assertNotNull(partitionKey2, "Partition key should not be null");
assertFalse(partitionKey1.isEmpty(), "Partition key should not be empty");
assertFalse(partitionKey2.isEmpty(), "Partition key should not be empty");
}
}