Skip to content

[SPARK-52813][CONNECT] Allow DAGs in Spark Connect #51516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 168 additions & 168 deletions python/pyspark/sql/connect/proto/relations_pb2.py

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions python/pyspark/sql/connect/proto/relations_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class Relation(google.protobuf.message.Message):
TRANSPOSE_FIELD_NUMBER: builtins.int
UNRESOLVED_TABLE_VALUED_FUNCTION_FIELD_NUMBER: builtins.int
LATERAL_JOIN_FIELD_NUMBER: builtins.int
REFERENCED_PLAN_ID_FIELD_NUMBER: builtins.int
FILL_NA_FIELD_NUMBER: builtins.int
DROP_NA_FIELD_NUMBER: builtins.int
REPLACE_FIELD_NUMBER: builtins.int
Expand Down Expand Up @@ -215,6 +216,14 @@ class Relation(google.protobuf.message.Message):
def unresolved_table_valued_function(self) -> global___UnresolvedTableValuedFunction: ...
@property
def lateral_join(self) -> global___LateralJoin: ...
referenced_plan_id: builtins.int
"""Reference to a node else where in the tree. There are two use cases for this:
1. Reduce tree duplication. In this case the tree contains two or more subtrees that are
identical. The referenced plan can only be a back reference, to a subtree that was
already visited by the planner. The planner is expected to visit the tree bottom-up from
left to right.
1. Reduce tree depth.
"""
@property
def fill_na(self) -> global___NAFill:
"""NA functions"""
Expand Down Expand Up @@ -301,6 +310,7 @@ class Relation(google.protobuf.message.Message):
transpose: global___Transpose | None = ...,
unresolved_table_valued_function: global___UnresolvedTableValuedFunction | None = ...,
lateral_join: global___LateralJoin | None = ...,
referenced_plan_id: builtins.int = ...,
fill_na: global___NAFill | None = ...,
drop_na: global___NADrop | None = ...,
replace: global___NAReplace | None = ...,
Expand Down Expand Up @@ -394,6 +404,8 @@ class Relation(google.protobuf.message.Message):
b"range",
"read",
b"read",
"referenced_plan_id",
b"referenced_plan_id",
"rel_type",
b"rel_type",
"repartition",
Expand Down Expand Up @@ -519,6 +531,8 @@ class Relation(google.protobuf.message.Message):
b"range",
"read",
b"read",
"referenced_plan_id",
b"referenced_plan_id",
"rel_type",
b"rel_type",
"repartition",
Expand Down Expand Up @@ -614,6 +628,7 @@ class Relation(google.protobuf.message.Message):
"transpose",
"unresolved_table_valued_function",
"lateral_join",
"referenced_plan_id",
"fill_na",
"drop_na",
"replace",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.jdk.CollectionConverters._
import scala.util.Random

import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
Expand Down Expand Up @@ -1671,6 +1672,26 @@ class ClientE2ETestSuite
checkAnswer(df, (0 until 6).map(i => Row(i)))
}

test("Execute optimized plan - 33 duplicate local relations") {
val implicits = spark.implicits
import implicits._
val rng = new Random(61209389765L)
val data = IndexedSeq.tabulate(128) { id =>
id -> rng.nextBytes(1024)
}
val input = data.toDF("key", "value")
val unions = Iterator.range(0, 5).foldLeft(input) {
case (current, _) => current.union(current)
}
val df = unions.filter($"key".isin(input.select($"key").filter($"key" < 5)))
.groupBy($"key", $"value")
.count()
val compressionRatio =
df.optimizedPlan.getSerializedSize.toDouble / df.plan.getSerializedSize.toDouble
assert(compressionRatio < (1.0d / 32.0d)) // It should be very close to a 1/33 ratio.
checkAnswer(df, data.take(5).map(kv => Row(kv._1, kv._2, 32L)))
}

test("SPARK-52770: Support Time type") {
val df = spark.sql("SELECT TIME '12:13:14'")

Expand Down
Loading