Skip to content

Commit d27c2d4

Browse files
committed
arrow to rdd
1 parent c17af0b commit d27c2d4

File tree

12 files changed

+62
-5
lines changed

12 files changed

+62
-5
lines changed

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ object ObjectStoreWriter {
225225
}
226226
val uuid = dfToId.getOrElseUpdate(df, UUID.randomUUID())
227227
val queue = ObjectRefHolder.getQueue(uuid)
228-
val rdd = df.toArrowBatchRdd
228+
val rdd = SparkShimLoader.getSparkShims.toArrowBatchRDD(df)
229229
rdd.persist(storageLevel)
230230
rdd.count()
231231
var executorIds = df.sqlContext.sparkContext.getExecutorIds.toArray

core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.arrow.vector.types.pojo.Schema
2121
import org.apache.spark.{SparkEnv, TaskContext}
2222
import org.apache.spark.api.java.JavaRDD
2323
import org.apache.spark.executor.RayDPExecutorBackendFactory
24+
import org.apache.spark.rdd.RDD
2425
import org.apache.spark.sql.types.StructType
2526
import org.apache.spark.sql.{DataFrame, SparkSession}
2627

@@ -40,4 +41,6 @@ trait SparkShims {
4041
def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext
4142

4243
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema
44+
45+
def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]]
4346
}

core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark322.SparkSqlUtils
2727
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
2828
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.types.StructType
3031

3132
class Spark322Shims extends SparkShims {
@@ -49,4 +50,8 @@ class Spark322Shims extends SparkShims {
4950
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
5051
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
5152
}
53+
54+
override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
55+
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
56+
}
5257
}

core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.spark322
1919

2020
import org.apache.arrow.vector.types.pojo.Schema
2121
import org.apache.spark.api.java.JavaRDD
22+
import org.apache.spark.rdd.RDD
2223
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2324
import org.apache.spark.sql.execution.arrow.ArrowConverters
2425
import org.apache.spark.sql.types.StructType
@@ -32,4 +33,8 @@ object SparkSqlUtils {
3233
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
3334
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
3435
}
36+
37+
def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
38+
dataFrame.toArrowBatchRdd
39+
}
3540
}

core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark330.SparkSqlUtils
2727
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
2828
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.types.StructType
3031

3132
class Spark330Shims extends SparkShims {
@@ -49,4 +50,8 @@ class Spark330Shims extends SparkShims {
4950
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
5051
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
5152
}
53+
54+
override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
55+
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
56+
}
5257
}

core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.spark330
1919

2020
import org.apache.arrow.vector.types.pojo.Schema
2121
import org.apache.spark.api.java.JavaRDD
22+
import org.apache.spark.rdd.RDD
2223
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2324
import org.apache.spark.sql.execution.arrow.ArrowConverters
2425
import org.apache.spark.sql.types.StructType
@@ -32,4 +33,8 @@ object SparkSqlUtils {
3233
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
3334
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
3435
}
36+
37+
def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
38+
dataFrame.toArrowBatchRdd
39+
}
3540
}

core/shims/spark340/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark340.SparkSqlUtils
2727
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
2828
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.types.StructType
3031

3132
class Spark340Shims extends SparkShims {
@@ -49,4 +50,8 @@ class Spark340Shims extends SparkShims {
4950
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
5051
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
5152
}
53+
54+
override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
55+
dataFrame.toArrowBatchRdd
56+
}
5257
}

core/shims/spark340/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.spark340
2020
import org.apache.arrow.vector.types.pojo.Schema
2121
import org.apache.spark.TaskContext
2222
import org.apache.spark.api.java.JavaRDD
23+
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2425
import org.apache.spark.sql.execution.arrow.ArrowConverters
2526
import org.apache.spark.sql.types._
@@ -42,4 +43,8 @@ object SparkSqlUtils {
4243
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
4344
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
4445
}
46+
47+
def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
48+
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
49+
}
4550
}

core/shims/spark350/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark350.SparkSqlUtils
2727
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
2828
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.rdd.RDD
2930
import org.apache.spark.sql.types.StructType
3031

3132
class Spark350Shims extends SparkShims {
@@ -49,4 +50,8 @@ class Spark350Shims extends SparkShims {
4950
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
5051
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
5152
}
53+
54+
override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
55+
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
56+
}
5257
}

core/shims/spark350/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.spark350
2020
import org.apache.arrow.vector.types.pojo.Schema
2121
import org.apache.spark.TaskContext
2222
import org.apache.spark.api.java.JavaRDD
23+
import org.apache.spark.rdd.RDD
2324
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2425
import org.apache.spark.sql.execution.arrow.ArrowConverters
2526
import org.apache.spark.sql.types._
@@ -42,4 +43,8 @@ object SparkSqlUtils {
4243
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
4344
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId, errorOnDuplicatedFieldNames = false)
4445
}
46+
47+
def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
48+
dataFrame.toArrowBatchRdd
49+
}
4550
}

0 commit comments

Comments
 (0)