diff --git a/.github/workflows/ray_nightly_test.yml b/.github/workflows/ray_nightly_test.yml index 9483276b..24f82fa7 100644 --- a/.github/workflows/ray_nightly_test.yml +++ b/.github/workflows/ray_nightly_test.yml @@ -73,7 +73,7 @@ jobs: run: | python -m pip install --upgrade pip pip install wheel - pip install "numpy<1.24" + pip install "numpy<1.24" "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then diff --git a/.github/workflows/raydp.yml b/.github/workflows/raydp.yml index f58f6f44..64e7754e 100644 --- a/.github/workflows/raydp.yml +++ b/.github/workflows/raydp.yml @@ -74,7 +74,7 @@ jobs: python -m pip install --upgrade pip pip install wheel pip install "numpy<1.24" - pip install "pydantic<2.0" + pip install "pydantic<2.0" "click<8.3.0" SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])') if [ "$(uname -s)" == "Linux" ] then diff --git a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 923e8258..0b2d6ef5 100644 --- a/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/raydp-main/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -234,6 +234,7 @@ private[spark] class SparkSubmit extends Logging { case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL + case m if m.startsWith("ray") => RAY case _ => OTHERS // error("Master must either be yarn or start with spark, mesos, k8s, or local") // -1 @@ -634,9 +635,9 @@ private[spark] class SparkSubmit extends Logging { confKey = EXECUTOR_MEMORY.key), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = CORES_MAX.key), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES | RAY, ALL_DEPLOY_MODES, confKey = FILES.key), - OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES | RAY, ALL_DEPLOY_MODES, confKey = ARCHIVES.key), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES | OTHERS, ALL_DEPLOY_MODES, @@ -995,7 +996,8 @@ object SparkSubmit extends CommandLineUtils with Logging { private val LOCAL = 8 private val KUBERNETES = 16 private val OTHERS = 32 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | OTHERS + private val RAY = 64 + private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | OTHERS | RAY // Deploy modes private val CLIENT = 1 diff --git a/python/raydp/tests/test_spark_cluster.py b/python/raydp/tests/test_spark_cluster.py index f1fae5b7..d17258b1 100644 --- a/python/raydp/tests/test_spark_cluster.py +++ b/python/raydp/tests/test_spark_cluster.py @@ -22,6 +22,7 @@ import pytest import pyarrow import ray +import zipfile from multiprocessing import get_context @@ -60,6 +61,51 @@ def test_legacy_spark_on_fractional_cpu(): cluster.shutdown() +def test_raydp_submit_py_files(tmp_path): + cluster = Cluster( + initialize_head=True, + head_node_args={"num_cpus": 4}, + ) + + spark = None + try: + module_path = tmp_path / "extra_module.py" + module_path.write_text("VALUE = 'pyfiles works'\n") + + py_files_path = tmp_path / "extra_module.zip" + #with zipfile.ZipFile(py_files_path, "w") as zip_file: + # zip_file.write(module_path, arcname="extra_module.py") + + ray.init(address=cluster.address, include_dashboard=False) + spark = raydp.init_spark( + app_name="test_raydp_submit_py_files", + num_executors=1, + executor_cores=1, + executor_memory="500M", + configs={"spark.submit.pyFiles": module_path.as_uri()}, + ) + + py_files_conf = spark.sparkContext.getConf().get("spark.submit.pyFiles") + assert py_files_conf is not None + assert module_path.name in py_files_conf + + def use_extra_module(_): + from extra_module import VALUE + + return VALUE + + result = spark.sparkContext.parallelize([0]).map(use_extra_module).collect() + assert result == ["pyfiles works"] + finally: + if spark is not None: + spark.stop() + raydp.stop_spark() + time.sleep(5) + if ray.is_initialized(): + ray.shutdown() + cluster.shutdown() + + def test_spark_executor_on_fractional_cpu(): cluster = Cluster( initialize_head=True,