From 0b409cac1ed4c06d55a6f5db6895be14ac53056e Mon Sep 17 00:00:00 2001 From: Tom Zhu Date: Tue, 18 Mar 2025 12:02:53 -0700 Subject: [PATCH 1/2] Support Trigger.AvailableNow --- .../sharing/spark/DeltaSharingSource.scala | 44 ++++++++++++++++++- version.sbt | 2 +- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala index 58e0debcb..5e6a32d7e 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala @@ -26,7 +26,7 @@ import org.apache.spark.delta.sharing.{CachedTableManager, TableRefreshResult} import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, DeltaSharingScanUtils, SparkSession} import org.apache.spark.sql.connector.read.streaming -import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl} +import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.types.StructType @@ -95,6 +95,7 @@ case class DeltaSharingSource( deltaLog: RemoteDeltaLog, options: DeltaSharingOptions) extends Source with SupportsAdmissionControl + with SupportsTriggerAvailableNow with Logging { // This is to ensure that the request sent from the client contains the http header for streaming. @@ -120,6 +121,33 @@ case class DeltaSharingSource( // This is checked before creating DeltaSharingSource assert(schema.nonEmpty, "schema cannot be empty in DeltaSharingSource.") + private var isTriggerAvailableNow = false + + private var isLastOffsetForTriggerAvailableNowInitialized = false + + private var lastOffsetForTriggerAvailableNow: Option[DeltaSharingSourceOffset] = None + + override def prepareForTriggerAvailableNow(): Unit = { + logInfo(s"The streaming query reports to use Trigger.AvailableNow") + isTriggerAvailableNow = true + } + + private def initForTriggerAvailableNowIfNeeded(): Unit = { + if (isTriggerAvailableNow && !isLastOffsetForTriggerAvailableNowInitialized) { + isLastOffsetForTriggerAvailableNowInitialized = true + initLastOffsetForTriggerAvailableNow() + } + } + + private def initLastOffsetForTriggerAvailableNow(): Unit = { + val offset = latestOffsetInternal(ReadLimit.allAvailable()) + if (offset != null) { + lastOffsetForTriggerAvailableNow = Some(DeltaSharingSourceOffset(tableId, offset)) + logInfo("lastOffset for Trigger.AvailableNow has set to " + + s"${lastOffsetForTriggerAvailableNow.get.json}") + } + } + /** A check on the source table that skips commits that contain removes from the set of files. */ private val skipChangeCommits = options.skipChangeCommits @@ -286,6 +314,13 @@ case class DeltaSharingSource( } else { getCDFFileChanges(fromVersion, fromIndex, endingVersionForQuery) } + + lastOffsetForTriggerAvailableNow.foreach { bound => + sortedFetchedFiles = sortedFetchedFiles.filter { + case IndexedFile(version, index, _, _, _, _, _) => + version < bound.tableVersion || (version == bound.tableVersion && index <= bound.index) + } + } } private def resetGlobalTimestamp(): Unit = { @@ -996,6 +1031,11 @@ case class DeltaSharingSource( } override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { + initForTriggerAvailableNowIfNeeded() + latestOffsetInternal(limit) + } + + private def latestOffsetInternal(limit: ReadLimit): streaming.Offset = { val limits = AdmissionLimits(limit) val currentOffset = if (previousOffset == null) { @@ -1017,6 +1057,8 @@ case class DeltaSharingSource( getTableInfoForLogging) val endOffset = DeltaSharingSourceOffset(tableId, end) + initForTriggerAvailableNowIfNeeded() + val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if ( startOffsetOption.isEmpty) { getStartingVersion match { diff --git a/version.sbt b/version.sbt index 48af491ec..bd727575e 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.0" +version in ThisBuild := "1.1.0-SNAPSHOT" From 168f7ccb1c471bc6b413b92bb2a5d162f77e37d5 Mon Sep 17 00:00:00 2001 From: Tom Zhu Date: Tue, 18 Mar 2025 13:12:45 -0700 Subject: [PATCH 2/2] fix --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index bd727575e..48af491ec 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.0-SNAPSHOT" +version in ThisBuild := "1.1.0"