From aab410b8a43818972dfaa1bebc52baf61117d97c Mon Sep 17 00:00:00 2001 From: Edward Guy Capriolo Date: Thu, 27 May 2021 03:13:41 -0400 Subject: [PATCH 1/2] Add full end to end --- pom.xml | 59 +++++++++++++++++++ .../examples/ConfluentKafkaAvroWriter.scala | 21 ++++++- .../absa/abris/avro/endtoend/FullSpec.scala | 56 ++++++++++++++++++ 3 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala diff --git a/pom.xml b/pom.xml index a782ddda..890fdb9b 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,65 @@ + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.6.1 + + + install-schemaReg + test-compile + + artifact + + + + + spring-schema-registry-embedded + com.github.mvallim + 1.0.0 + false + ${project.build.directory} + + + + com.bazaarvoice.maven.plugins + process-exec-maven-plugin + 0.4 + + + cleanup-test + validate + + stop-all + + + + start-jar + process-test-classes + + start + + + schemaReg + + java + -jar + -Xms64m + -Xmx128m + spring-schema-registry-embedded-1.0.0.jar + + + + + stop-test + prepare-package + + stop-all + + + + net.alchim31.maven diff --git a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala index be6dff1e..6bcaf42f 100644 --- a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala +++ b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala @@ -17,7 +17,7 @@ package za.co.absa.abris.examples import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.functions.{col, struct} import org.apache.spark.sql.{DataFrame, Encoder, Row, SparkSession} import za.co.absa.abris.avro.format.SparkAvroConversions import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils @@ -67,6 +67,25 @@ object ConfluentKafkaAvroWriter { .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", kafkaTopicName) .save() + + val readConfig = AbrisConfig + .fromConfluentAvro + .downloadReaderSchemaByLatestVersion + .andTopicNameStrategy(kafkaTopicName) + .usingSchemaRegistry("http://localhost:8081") + + import za.co.absa.abris.avro.functions.from_avro + val deserialized = dataFrame.select(from_avro(col("value"), readConfig) as 'data) + + deserialized.printSchema() + + deserialized + .writeStream + .format("console") + .option("truncate", "false") + .start() + .awaitTermination(5000) + } private def generateRandomDataFrame(spark: SparkSession): DataFrame = { diff --git a/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala b/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala new file mode 100644 index 00000000..ccd05411 --- /dev/null +++ b/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala @@ -0,0 +1,56 @@ +package za.co.absa.abris.avro.endtoend + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions.struct +import org.scalatest.FlatSpec +import za.co.absa.abris.avro.functions.to_avro +import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils +import za.co.absa.abris.config.AbrisConfig +import za.co.absa.abris.examples.ConfluentKafkaAvroWriter.kafkaTopicName + +import java.net.Socket +import java.nio.charset.StandardCharsets + +case class SimulatedKakfaRow(key: Array[Byte], value: Array[Byte]) +class FullSpec extends FlatSpec { + + it should "work end to end" in { + assume(hostPortInUse("localhost", 8081)) + val spark = SparkSession.builder().appName("end2end").master("local[1]").getOrCreate() + implicit val ctx = spark.sqlContext + import spark.implicits._ + val events = MemoryStream[SimulatedKakfaRow] + events.addData(SimulatedKakfaRow("key".getBytes(StandardCharsets.UTF_8), + """{"a":"b", "c", 1} """.getBytes(StandardCharsets.UTF_8))) + + val dataFrame = events.toDF() + val allColumns = struct(dataFrame.columns.head, dataFrame.columns.tail: _*) + + val schema = AvroSchemaUtils.toAvroSchema(dataFrame) + val abrisConfig = AbrisConfig + .toConfluentAvro + .provideAndRegisterSchema(schema.toString) + .usingTopicNameStrategy(kafkaTopicName) + .usingSchemaRegistry("http://localhost:8081") + + val avroFrame = dataFrame.select(to_avro(allColumns, abrisConfig) as 'value) + + avroFrame + .write + .format("kafka") + .option("kafka.bootstrap.servers", "localhost:9092") + .option("topic", kafkaTopicName) + .save() + + + + } + + def hostPortInUse(host: String, port: Int): Boolean = { + scala.util.Try[Boolean] { + new Socket(host, port).close() + true + }.getOrElse(false) + } +} From 93a00f54d3716f58b5d7636e3a2780297bb8d138 Mon Sep 17 00:00:00 2001 From: Edward Guy Capriolo Date: Thu, 27 May 2021 03:30:26 -0400 Subject: [PATCH 2/2] Added to wrong file --- .../examples/ConfluentKafkaAvroWriter.scala | 18 ----------------- .../absa/abris/avro/endtoend/FullSpec.scala | 20 ++++++++++++++++++- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala index 6bcaf42f..bbfa5ebc 100644 --- a/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala +++ b/src/main/scala/za/co/absa/abris/examples/ConfluentKafkaAvroWriter.scala @@ -68,24 +68,6 @@ object ConfluentKafkaAvroWriter { .option("topic", kafkaTopicName) .save() - val readConfig = AbrisConfig - .fromConfluentAvro - .downloadReaderSchemaByLatestVersion - .andTopicNameStrategy(kafkaTopicName) - .usingSchemaRegistry("http://localhost:8081") - - import za.co.absa.abris.avro.functions.from_avro - val deserialized = dataFrame.select(from_avro(col("value"), readConfig) as 'data) - - deserialized.printSchema() - - deserialized - .writeStream - .format("console") - .option("truncate", "false") - .start() - .awaitTermination(5000) - } private def generateRandomDataFrame(spark: SparkSession): DataFrame = { diff --git a/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala b/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala index ccd05411..9111851c 100644 --- a/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala +++ b/src/test/scala/za/co/absa/abris/avro/endtoend/FullSpec.scala @@ -2,7 +2,7 @@ package za.co.absa.abris.avro.endtoend import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.functions.{col, struct} import org.scalatest.FlatSpec import za.co.absa.abris.avro.functions.to_avro import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils @@ -44,6 +44,24 @@ class FullSpec extends FlatSpec { .save() + val readConfig = AbrisConfig + .fromConfluentAvro + .downloadReaderSchemaByLatestVersion + .andTopicNameStrategy(kafkaTopicName) + .usingSchemaRegistry("http://localhost:8081") + + import za.co.absa.abris.avro.functions.from_avro + val deserialized = dataFrame.select(from_avro(col("value"), readConfig) as 'data) + + deserialized.printSchema() + + deserialized + .writeStream + .format("console") + .option("truncate", "false") + .start() + .awaitTermination(5000) + }