diff --git a/rocketmq-spark/pom.xml b/rocketmq-spark/pom.xml index 87810a536..debba0eec 100644 --- a/rocketmq-spark/pom.xml +++ b/rocketmq-spark/pom.xml @@ -34,9 +34,9 @@ 1.8 1.8 4.9.4 - 2.3.0 - 2.11.8 - 2.11 + 3.3.1 + 2.12.10 + 2.12 2.5 diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala index efc671d7e..b526bf2cb 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala @@ -115,7 +115,6 @@ private class RocketMQSource( if (content(0) == 'v') { val indexOfNewLine = content.indexOf("\n") if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) RocketMQSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { throw new IllegalStateException( diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala index 7635de38b..c8f702cfe 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala @@ -25,15 +25,14 @@ package org.apache.spark.sql.rocketmq import org.apache.rocketmq.common.message.MessageQueue +import org.apache.spark.sql.connector.read.streaming.PartitionOffset import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.streaming.{PartitionOffset, Offset => OffsetV2} - /** * An [[Offset]] for the [[RocketMQSource]]. This one tracks all partitions of subscribed topics and * their offsets. */ private[rocketmq] -case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends OffsetV2 { +case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends Offset { override val json = JsonUtils.partitionOffsets(queueToOffsets) } diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala index d30eef1ba..0fcd7bafa 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala @@ -174,7 +174,7 @@ private[rocketmq] class RocketMQSourceRDD( } } // Release consumer, either by removing it or indicating we're no longer using it - context.addTaskCompletionListener { _ => + context.addTaskCompletionListener[Unit] { _ => underlying.closeIfNeeded() } underlying diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala index 46a64de23..bf4273314 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala @@ -25,15 +25,16 @@ package org.apache.spark.sql.rocketmq -import java.{util => ju} - import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types.{BinaryType, StringType} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils +import java.{util => ju} + /** * The [[RocketMQWriter]] class is used to write data from a batch query * or structured streaming query, given by a [[QueryExecution]], to RocketMQ. @@ -53,12 +54,12 @@ private object RocketMQWriter extends Logging { options: ju.Map[String, String], topic: Option[String] = None): Unit = { schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( - if (topic.isEmpty) { - throw new AnalysisException(s"topic option required when no " + + topic match { + case Some(topicValue) => Literal(UTF8String.fromString(topicValue), StringType) + case None => throw new AnalysisException( + s"topic option required when no " + s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " + s"${RocketMQConf.PRODUCER_TOPIC} option for setting a topic.") - } else { - Literal(topic.get, StringType) } ).dataType match { case StringType => // good diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala index 08a23520a..fc4e46bb5 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala @@ -178,7 +178,7 @@ class RocketMqRDD ( logDebug(s"Computing topic ${part.topic}, queueId ${part.queueId} " + s"offsets ${part.partitionOffsetRanges.mkString(",")}") - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() } val consumer = if (useConsumerCache) {