@@ -93,8 +93,8 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
9393 batchSize = 0
9494 }
9595 val schema = df.schema
96- val arrowSchema = SparkShimLoader .getSparkShims.toArrowSchema(
97- schema, timeZoneId, sparkSession)
96+ val arrowSchemaJson = SparkShimLoader .getSparkShims.toArrowSchema(
97+ schema, timeZoneId, sparkSession).toJson
9898
9999 val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
100100 val queue = ObjectRefHolder .getQueue(uuid)
@@ -105,6 +105,9 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
105105 } else {
106106 Iterator (iter)
107107 }
108+
109+ // Reconstruct arrow schema from JSON on executor
110+ val arrowSchema = Schema .fromJSON(arrowSchemaJson)
108111 val allocator = ArrowUtils .rootAllocator.newChildAllocator(
109112 s " ray object store writer " , 0 , Long .MaxValue )
110113 val root = VectorSchemaRoot .create(arrowSchema, allocator)
@@ -229,7 +232,7 @@ object ObjectStoreWriter {
229232 val rdd = SparkShimLoader .getSparkShims.toArrowBatchRDD(df)
230233 rdd.persist(storageLevel)
231234 rdd.count()
232- var executorIds = df.sqlContext .sparkContext.getExecutorIds.toArray
235+ val executorIds = df.sparkSession .sparkContext.getExecutorIds.toArray
233236 val numExecutors = executorIds.length
234237 val appMasterHandle = Ray .getActor(RayAppMaster .ACTOR_NAME )
235238 .get.asInstanceOf [ActorHandle [RayAppMaster ]]
0 commit comments