From b6ac995f8cdefb63647d914671e9896b6cd149a6 Mon Sep 17 00:00:00 2001 From: yanmin Date: Sat, 11 Oct 2025 17:04:06 +0800 Subject: [PATCH 1/4] Support distributing files to executors --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 923e8258..0b2d6ef5 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -234,6 +234,7 @@ private[spark] class SparkSubmit extends Logging { case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL + case m if m.startsWith("ray") => RAY case _ => OTHERS // error("Master must either be yarn or start with spark, mesos, k8s, or local") // -1 @@ -634,9 +635,9 @@ private[spark] class SparkSubmit extends Logging { confKey = EXECUTOR_MEMORY.key), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = CORES_MAX.key), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES | RAY, ALL_DEPLOY_MODES, confKey = FILES.key), - OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES | RAY, ALL_DEPLOY_MODES, confKey = ARCHIVES.key), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES | OTHERS, ALL_DEPLOY_MODES, @@ -995,7 +996,8 @@ object SparkSubmit extends CommandLineUtils with Logging { private val LOCAL = 8 private val KUBERNETES = 16 private val OTHERS = 32 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | OTHERS + private val RAY = 64 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | OTHERS | RAY // Deploy modes private val CLIENT = 1 From 72cd4e6fdc4977646b4f5d32b2a2443d8d51bd3e Mon Sep 17 00:00:00 2001 From: yanmin Date: Sun, 12 Oct 2025 00:41:09 +0800 Subject: [PATCH 2/4] pin click version --- .github/workflows/ray_nightly_test.yml | 2 +- .github/workflows/raydp.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ray_nightly_test.yml b/.github/workflows/ray_nightly_test.yml index 9483276b..24f82fa7 100644 --- a/.github/workflows/ray_nightly_test.yml +++ b/.github/workflows/ray_nightly_test.yml @@ -73,7 +73,7 @@ jobs: run: | python -m pip install --upgrade pip pip install wheel - pip install "numpy<1.24" + pip install "numpy<1.24" "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index f58f6f44..64e7754e 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -74,7 +74,7 @@ jobs: python -m pip install --upgrade pip pip install wheel pip install "numpy<1.24" - pip install "pydantic<2.0" + pip install "pydantic<2.0" "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then From 05ca7a4f22e362eae6e7d4a043cef94a285524c8 Mon Sep 17 00:00:00 2001 From: yanmin Date: Mon, 13 Oct 2025 01:45:39 +0800 Subject: [PATCH 3/4] add ut --- python/raydp/tests/test_spark_cluster.py | 46 ++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/python/raydp/tests/test_spark_cluster.py b/python/raydp/tests/test_spark_cluster.py index f1fae5b7..bf999917 100644 --- a/python/raydp/tests/test_spark_cluster.py +++ b/python/raydp/tests/test_spark_cluster.py @@ -22,6 +22,7 @@ import pytest import pyarrow import ray +import zipfile from multiprocessing import get_context @@ -60,6 +61,51 @@ def test_legacy_spark_on_fractional_cpu(): cluster.shutdown() +def test_raydp_submit_py_files(tmp_path): + cluster = Cluster( + initialize_head=True, + head_node_args={"num_cpus": 4}, + ) + + spark = None + try: + module_path = tmp_path / "extra_module.py" + module_path.write_text("VALUE = 'pyfiles works'\n") + + py_files_path = tmp_path / "extra_module.zip" + with zipfile.ZipFile(py_files_path, "w") as zip_file: + zip_file.write(module_path, arcname="extra_module.py") + + ray.init(address=cluster.address, include_dashboard=False) + spark = raydp.init_spark( + app_name="test_raydp_submit_py_files", + num_executors=1, + executor_cores=1, + executor_memory="500M", + configs={"spark.submit.pyFiles": py_files_path.as_uri()}, + ) + + py_files_conf = spark.sparkContext.getConf().get("spark.submit.pyFiles") + assert py_files_conf is not None + assert py_files_path.name in py_files_conf + + def use_extra_module(_): + from extra_module import VALUE + + return VALUE + + result = spark.sparkContext.parallelize([0]).map(use_extra_module).collect() + assert result == ["pyfiles works"] + finally: + if spark is not None: + spark.stop() + raydp.stop_spark() + time.sleep(5) + if ray.is_initialized(): + ray.shutdown() + cluster.shutdown() + + def test_spark_executor_on_fractional_cpu(): cluster = Cluster( initialize_head=True, From 277ac9623128e0712b70d7891af2b57d0bdb7828 Mon Sep 17 00:00:00 2001 From: yanmin Date: Mon, 13 Oct 2025 11:09:16 +0800 Subject: [PATCH 4/4] fix ut --- python/raydp/tests/test_spark_cluster.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/raydp/tests/test_spark_cluster.py b/python/raydp/tests/test_spark_cluster.py index bf999917..d17258b1 100644 --- a/python/raydp/tests/test_spark_cluster.py +++ b/python/raydp/tests/test_spark_cluster.py @@ -73,8 +73,8 @@ def test_raydp_submit_py_files(tmp_path): module_path.write_text("VALUE = 'pyfiles works'\n") py_files_path = tmp_path / "extra_module.zip" - with zipfile.ZipFile(py_files_path, "w") as zip_file: - zip_file.write(module_path, arcname="extra_module.py") + #with zipfile.ZipFile(py_files_path, "w") as zip_file: + # zip_file.write(module_path, arcname="extra_module.py") ray.init(address=cluster.address, include_dashboard=False) spark = raydp.init_spark( @@ -82,12 +82,12 @@ def test_raydp_submit_py_files(tmp_path): num_executors=1, executor_cores=1, executor_memory="500M", - configs={"spark.submit.pyFiles": py_files_path.as_uri()}, + configs={"spark.submit.pyFiles": module_path.as_uri()}, ) py_files_conf = spark.sparkContext.getConf().get("spark.submit.pyFiles") assert py_files_conf is not None - assert py_files_path.name in py_files_conf + assert module_path.name in py_files_conf def use_extra_module(_): from extra_module import VALUE