Skip to content

Commit 111bfd2

Browse files
committed
pin click<8.3.0
1 parent 9009759 commit 111bfd2

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

.github/workflows/raydp.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ jobs:
7474
run: |
7575
python -m pip install --upgrade pip
7676
pip install wheel
77-
pip install "numpy<1.24" "click<8.3.0"
78-
pip install "pydantic<2.0"
77+
pip install "numpy<1.24"
78+
pip install "pydantic<2.0" "click<8.3.0"
7979
SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
8080
if [ "$(uname -s)" == "Linux" ]
8181
then

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
9393
batchSize = 0
9494
}
9595
val schema = df.schema
96-
val arrowSchema = SparkShimLoader.getSparkShims.toArrowSchema(
97-
schema, timeZoneId, sparkSession)
96+
val arrowSchemaJson = SparkShimLoader.getSparkShims.toArrowSchema(
97+
schema, timeZoneId, sparkSession).toJson
9898

9999
val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
100100
val queue = ObjectRefHolder.getQueue(uuid)
@@ -105,6 +105,9 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
105105
} else {
106106
Iterator(iter)
107107
}
108+
109+
// Reconstruct arrow schema from JSON on executor
110+
val arrowSchema = Schema.fromJSON(arrowSchemaJson)
108111
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
109112
s"ray object store writer", 0, Long.MaxValue)
110113
val root = VectorSchemaRoot.create(arrowSchema, allocator)
@@ -229,7 +232,7 @@ object ObjectStoreWriter {
229232
val rdd = SparkShimLoader.getSparkShims.toArrowBatchRDD(df)
230233
rdd.persist(storageLevel)
231234
rdd.count()
232-
var executorIds = df.sqlContext.sparkContext.getExecutorIds.toArray
235+
val executorIds = df.sparkSession.sparkContext.getExecutorIds.toArray
233236
val numExecutors = executorIds.length
234237
val appMasterHandle = Ray.getActor(RayAppMaster.ACTOR_NAME)
235238
.get.asInstanceOf[ActorHandle[RayAppMaster]]

0 commit comments

Comments
 (0)