Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ray_nightly_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
matrix:
os: [ ubuntu-latest ]
python-version: [3.9, 3.10.14]
spark-version: [3.3.2, 3.4.0, 3.5.0]
spark-version: [4.0.0]

runs-on: ${{ matrix.os }}

Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: [3.9, 3.10.14]
spark-version: [3.3.2, 3.4.0, 3.5.0]
spark-version: [4.0.0]
ray-version: [2.34.0, 2.40.0]

runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -74,8 +74,8 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install wheel
pip install "numpy<1.24" "click<8.3.0"
pip install "pydantic<2.0"
pip install "numpy<1.24"
pip install "pydantic<2.0" "click<8.3.0"
SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
if [ "$(uname -s)" == "Linux" ]
then
Expand Down
12 changes: 9 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<spark330.version>3.3.0</spark330.version>
<spark340.version>3.4.0</spark340.version>
<spark350.version>3.5.0</spark350.version>
<spark400.version>4.0.0</spark400.version>
<snappy.version>1.1.10.4</snappy.version>
<netty.version>4.1.94.Final</netty.version>
<commons.text.version>1.10.0</commons.text.version>
Expand All @@ -29,9 +30,9 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.15</scala.version>
<jackson.version>2.15.0</jackson.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.13.12</scala.version>
<jackson.version>2.18.2</jackson.version>
<scala.binary.version>2.13</scala.binary.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
</properties>

Expand Down Expand Up @@ -151,23 +152,27 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<!-- Guava is excluded because of SPARK-6149. The Guava version referenced in this module is
15.0, which causes runtime incompatibility issues. -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand All @@ -179,6 +184,7 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
5 changes: 0 additions & 5 deletions core/raydp-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,20 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Guava is excluded because of SPARK-6149. The Guava version referenced in this module is
15.0, which causes runtime incompatibility issues. -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand All @@ -162,7 +158,6 @@
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import javax.xml.bind.DatatypeConverter

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import io.ray.api.{ActorHandle, PlacementGroups, Ray}
import io.ray.api.id.PlacementGroupId
import io.ray.api.placementgroup.PlacementGroup
import io.ray.runtime.config.RayConfig
import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{RayDPException, SecurityManager, SparkConf}
import org.apache.spark.executor.RayDPExecutor
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.raydp.{RayExecutorUtils, SparkOnRayConfigs}
import org.apache.spark.rpc._
import org.apache.spark.util.Utils


class RayAppMaster(host: String,
port: Int,
actorExtraClasspath: String) extends Serializable with Logging {
Expand Down Expand Up @@ -298,7 +299,7 @@ class RayAppMaster(host: String,
.map { case (name, amount) => (name, Double.box(amount)) }.asJava,
placementGroup,
getNextBundleIndex,
seqAsJavaList(appInfo.desc.command.javaOpts))
appInfo.desc.command.javaOpts.asJava)
appInfo.addPendingRegisterExecutor(executorId, handler, sparkCoresPerExecutor, memory)
}

Expand Down Expand Up @@ -356,11 +357,15 @@ object RayAppMaster extends Serializable {
val ACTOR_NAME = "RAY_APP_MASTER"

def setProperties(properties: String): Unit = {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
val parsed = parse(properties).extract[Map[String, String]]
parsed.foreach{ case (key, value) =>
System.setProperty(key, value)
// Use Jackson ObjectMapper directly to avoid JSON4S version conflicts
val mapper = new ObjectMapper()
val javaMap = mapper.readValue(properties, classOf[java.util.Map[String, Object]])
val scalaMap = javaMap.asScala.toMap
scalaMap.foreach{ case (key, value) =>
// Convert all values to strings since System.setProperty expects String
System.setProperty(key, value.toString)
}

// Use the same session dir as the python side
RayConfig.create().setSessionDir(System.getProperty("ray.session-dir"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import java.util.function.{Function => JFunction}
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.ArrowStreamWriter
import org.apache.arrow.vector.types.pojo.Schema
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.{RayDPException, SparkContext}
import org.apache.spark.deploy.raydp._
Expand Down Expand Up @@ -85,13 +85,16 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
* Save the DataFrame to Ray object store with Apache Arrow format.
*/
def save(useBatch: Boolean, ownerName: String): List[RecordBatch] = {
val conf = df.queryExecution.sparkSession.sessionState.conf
val sparkSession = df.sparkSession
val conf = sparkSession.sessionState.conf
val timeZoneId = conf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
var batchSize = conf.getConf(SQLConf.ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
if (!useBatch) {
batchSize = 0
}
val schema = df.schema
val arrowSchemaJson = SparkShimLoader.getSparkShims.toArrowSchema(
schema, timeZoneId, sparkSession).toJson

val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
val queue = ObjectRefHolder.getQueue(uuid)
Expand All @@ -103,7 +106,8 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
Iterator(iter)
}

val arrowSchema = SparkShimLoader.getSparkShims.toArrowSchema(schema, timeZoneId)
// Reconstruct arrow schema from JSON on executor
val arrowSchema = Schema.fromJSON(arrowSchemaJson)
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"ray object store writer", 0, Long.MaxValue)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
Expand Down Expand Up @@ -213,9 +217,9 @@ object ObjectStoreWriter {
}

def toArrowSchema(df: DataFrame): Schema = {
val conf = df.queryExecution.sparkSession.sessionState.conf
val conf = df.sparkSession.sessionState.conf
val timeZoneId = conf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
SparkShimLoader.getSparkShims.toArrowSchema(df.schema, timeZoneId)
SparkShimLoader.getSparkShims.toArrowSchema(df.schema, timeZoneId, df.sparkSession)
}

def fromSparkRDD(df: DataFrame, storageLevel: StorageLevel): Array[Array[Byte]] = {
Expand All @@ -225,10 +229,10 @@ object ObjectStoreWriter {
}
val uuid = dfToId.getOrElseUpdate(df, UUID.randomUUID())
val queue = ObjectRefHolder.getQueue(uuid)
val rdd = df.toArrowBatchRdd
val rdd = SparkShimLoader.getSparkShims.toArrowBatchRDD(df)
rdd.persist(storageLevel)
rdd.count()
var executorIds = df.sqlContext.sparkContext.getExecutorIds.toArray
val executorIds = df.sparkSession.sparkContext.getExecutorIds.toArray
val numExecutors = executorIds.length
val appMasterHandle = Ray.getActor(RayAppMaster.ACTOR_NAME)
.get.asInstanceOf[ActorHandle[RayAppMaster]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.executor.RayDPExecutorBackendFactory
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}

Expand All @@ -39,5 +40,7 @@ trait SparkShims {

def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext

def toArrowSchema(schema : StructType, timeZoneId : String) : Schema
def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema

def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]]
}
3 changes: 2 additions & 1 deletion core/shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
<module>spark330</module>
<module>spark340</module>
<module>spark350</module>
<module>spark400</module>
</modules>

<properties>
<scala.binary.version>2.12</scala.binary.version>
<scala.binary.version>2.13</scala.binary.version>
<scala.plugin.version>4.3.0</scala.plugin.version>
<scalatest-maven-plugin.version>3.2.2</scalatest-maven-plugin.version>
</properties>
Expand Down
3 changes: 1 addition & 2 deletions core/shims/spark322/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
<packaging>jar</packaging>

<properties>
<scala.version>2.12.15</scala.version>
<jackson.version>2.13.5</jackson.version>
<scala.version>2.13.12</scala.version>
</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.spark322.SparkSqlUtils
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType

class Spark322Shims extends SparkShims {
Expand All @@ -46,7 +47,14 @@ class Spark322Shims extends SparkShims {
TaskContextUtils.getDummyTaskContext(partitionId, env)
}

override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
override def toArrowSchema(
schema : StructType,
timeZoneId : String,
sparkSession: SparkSession) : Schema = {
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId, session = sparkSession)
}

override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.spark322

import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.types.StructType
Expand All @@ -29,7 +30,11 @@ object SparkSqlUtils {
ArrowConverters.toDataFrame(rdd, schema, new SQLContext(session))
}

def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
def toArrowSchema(schema : StructType, timeZoneId : String, session: SparkSession) : Schema = {
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
}

def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
dataFrame.toArrowBatchRdd
}
}
3 changes: 1 addition & 2 deletions core/shims/spark330/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
<packaging>jar</packaging>

<properties>
<scala.version>2.12.15</scala.version>
<jackson.version>2.13.5</jackson.version>
<scala.version>2.13.12</scala.version>
</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.spark330.SparkSqlUtils
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StructType

class Spark330Shims extends SparkShims {
Expand All @@ -46,7 +47,18 @@ class Spark330Shims extends SparkShims {
TaskContextUtils.getDummyTaskContext(partitionId, env)
}

override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
override def toArrowSchema(
schema : StructType,
timeZoneId : String,
sparkSession: SparkSession) : Schema = {
SparkSqlUtils.toArrowSchema(
schema = schema,
timeZoneId = timeZoneId,
sparkSession = sparkSession
)
}

override def toArrowBatchRDD(dataFrame: DataFrame): RDD[Array[Byte]] = {
SparkSqlUtils.toArrowRDD(dataFrame, dataFrame.sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.spark330

import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.types.StructType
Expand All @@ -29,7 +30,11 @@ object SparkSqlUtils {
ArrowConverters.toDataFrame(rdd, schema, session)
}

def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
def toArrowSchema(schema : StructType, timeZoneId : String, sparkSession: SparkSession) : Schema = {
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
}

def toArrowRDD(dataFrame: DataFrame, sparkSession: SparkSession): RDD[Array[Byte]] = {
dataFrame.toArrowBatchRdd
}
}
3 changes: 1 addition & 2 deletions core/shims/spark340/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
<packaging>jar</packaging>

<properties>
<scala.version>2.12.15</scala.version>
<jackson.version>2.13.5</jackson.version>
<scala.version>2.13.12</scala.version>
</properties>

<build>
Expand Down
Loading
Loading