Skip to content

[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ import org.apache.logging.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Offset, Sample, Sort}
import org.apache.spark.sql.connector.DataSourcePushdownTestUtils
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.NullOrdering
import org.apache.spark.sql.connector.expressions.aggregate.GeneralAggregateFunc
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, V1ScanWrapper}
import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest

@DockerTest
private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFunSuite {
private[v2] trait V2JDBCTest
extends DataSourcePushdownTestUtils
with DockerIntegrationFunSuite
with SharedSparkSession {
import testImplicits._

val catalogName: String
Expand Down Expand Up @@ -468,56 +469,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def supportsTableSample: Boolean = false

private def checkSamplePushed(df: DataFrame, pushed: Boolean = true): Unit = {
val sample = df.queryExecution.optimizedPlan.collect {
case s: Sample => s
}
if (pushed) {
assert(sample.isEmpty)
} else {
assert(sample.nonEmpty)
}
}

protected def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = {
val filter = df.queryExecution.optimizedPlan.collect {
case f: Filter => f
}
if (pushed) {
assert(filter.isEmpty)
} else {
assert(filter.nonEmpty)
}
}

protected def checkLimitRemoved(df: DataFrame, pushed: Boolean = true): Unit = {
val limit = df.queryExecution.optimizedPlan.collect {
case l: LocalLimit => l
case g: GlobalLimit => g
}
if (pushed) {
assert(limit.isEmpty)
} else {
assert(limit.nonEmpty)
}
}

private def checkLimitPushed(df: DataFrame, limit: Option[Int]): Unit = {
df.queryExecution.optimizedPlan.collect {
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.limit == limit)
}
}
}

private def checkColumnPruned(df: DataFrame, col: String): Unit = {
val scan = df.queryExecution.optimizedPlan.collectFirst {
case s: DataSourceV2ScanRelation => s
}.get
assert(scan.schema.names.sameElements(Seq(col)))
}

test("SPARK-48172: Test CONTAINS") {
val df1 = spark.sql(
s"""
Expand Down Expand Up @@ -841,39 +792,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}

private def checkSortRemoved(df: DataFrame, pushed: Boolean = true): Unit = {
val sorts = df.queryExecution.optimizedPlan.collect {
case s: Sort => s
}

if (pushed) {
assert(sorts.isEmpty)
} else {
assert(sorts.nonEmpty)
}
}

private def checkOffsetRemoved(df: DataFrame, pushed: Boolean = true): Unit = {
val offsets = df.queryExecution.optimizedPlan.collect {
case o: Offset => o
}

if (pushed) {
assert(offsets.isEmpty)
} else {
assert(offsets.nonEmpty)
}
}

private def checkOffsetPushed(df: DataFrame, offset: Option[Int]): Unit = {
df.queryExecution.optimizedPlan.collect {
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.offset == offset)
}
}
}

gridTest("simple scan")(partitioningEnabledTestCase) { partitioningEnabled =>
val (tableOptions, partitionInfo) = getTableOptions("employee", partitioningEnabled)
val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
Expand Down Expand Up @@ -1028,27 +946,6 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}

private def checkAggregateRemoved(df: DataFrame): Unit = {
val aggregates = df.queryExecution.optimizedPlan.collect {
case agg: Aggregate => agg
}
assert(aggregates.isEmpty)
}

private def checkAggregatePushed(df: DataFrame, funcName: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good change

df.queryExecution.optimizedPlan.collect {
case DataSourceV2ScanRelation(_, scan, _, _, _) =>
assert(scan.isInstanceOf[V1ScanWrapper])
val wrapper = scan.asInstanceOf[V1ScanWrapper]
assert(wrapper.pushedDownOperators.aggregation.isDefined)
val aggregationExpressions =
wrapper.pushedDownOperators.aggregation.get.aggregateExpressions()
assert(aggregationExpressions.length == 1)
assert(aggregationExpressions(0).isInstanceOf[GeneralAggregateFunc])
assert(aggregationExpressions(0).asInstanceOf[GeneralAggregateFunc].name() == funcName)
}
}

protected def caseConvert(tableName: String): String = tableName

Seq(true, false).foreach { isDistinct =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc.v2.join

import java.sql.Connection
import java.util.Locale

import org.apache.spark.sql.jdbc.{DockerJDBCIntegrationSuite, JdbcDialect, OracleDatabaseOnDocker, OracleDialect}
import org.apache.spark.sql.jdbc.v2.JDBCV2JoinPushdownIntegrationSuiteBase
import org.apache.spark.tags.DockerTest

/**
* The following are the steps to test this:
*
* 1. Choose to use a prebuilt image or build Oracle database in a container
* - The documentation on how to build Oracle RDBMS in a container is at
* https://github.com/oracle/docker-images/blob/master/OracleDatabase/SingleInstance/README.md
* - Official Oracle container images can be found at https://container-registry.oracle.com
* - Trustable and streamlined Oracle Database Free images can be found on Docker Hub at
* https://hub.docker.com/r/gvenzl/oracle-free
* see also https://github.com/gvenzl/oci-oracle-free
* 2. Run: export ORACLE_DOCKER_IMAGE_NAME=image_you_want_to_use_for_testing
* - Example: export ORACLE_DOCKER_IMAGE_NAME=gvenzl/oracle-free:latest
* 3. Run: export ENABLE_DOCKER_INTEGRATION_TESTS=1
* 4. Start docker: sudo service docker start
* - Optionally, docker pull $ORACLE_DOCKER_IMAGE_NAME
* 5. Run Spark integration tests for Oracle with: ./build/sbt -Pdocker-integration-tests
* "testOnly org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite"
*
* A sequence of commands to build the Oracle Database Free container image:
* $ git clone https://github.com/oracle/docker-images.git
* $ cd docker-images/OracleDatabase/SingleInstance/dockerfiles0
* $ ./buildContainerImage.sh -v 23.4.0 -f
* $ export ORACLE_DOCKER_IMAGE_NAME=oracle/database:23.4.0-free
*
* This procedure has been validated with Oracle Database Free version 23.4.0,
* and with Oracle Express Edition versions 18.4.0 and 21.4.0
*/
@DockerTest
class OracleJoinPushdownIntegrationSuite
extends DockerJDBCIntegrationSuite
with JDBCV2JoinPushdownIntegrationSuiteBase {
override val namespaceOpt: Option[String] = Some("SYSTEM")

override val db = new OracleDatabaseOnDocker

override val url = db.getJdbcUrl(dockerIp, externalPort)

override val jdbcDialect: JdbcDialect = OracleDialect()

override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)

override def schemaPreparation(connection: Connection): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions) {

// If join has been pushed down, reuse join query as a subquery. Otherwise, fallback to
// what is provided in options.
private def tableOrQuery = joinQuery.getOrElse(options.tableOrQuery)
protected final def tableOrQuery: String = joinQuery.getOrElse(options.tableOrQuery)

/**
* Build the final SQL query that following dialect's SQL syntax.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N
extends JdbcSQLQueryBuilder(dialect, options) {

override def build(): String = {
val selectStmt = s"SELECT $hintClause$columnList FROM ${options.tableOrQuery}" +
val selectStmt = s"SELECT $hintClause$columnList FROM $tableOrQuery" +
s" $tableSampleClause $whereClause $groupByClause $orderByClause"
val finalSelectStmt = if (limit > 0) {
if (offset > 0) {
Expand Down Expand Up @@ -268,6 +268,8 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N

override def supportsHint: Boolean = true

override def supportsJoin: Boolean = true

override def classifyException(
e: Throwable,
condition: String,
Expand Down
Loading