diff --git a/dataproc/snippets/noxfile_config.py b/dataproc/snippets/noxfile_config.py index 084fb0d01db..99f474dc0b6 100644 --- a/dataproc/snippets/noxfile_config.py +++ b/dataproc/snippets/noxfile_config.py @@ -22,7 +22,7 @@ TEST_CONFIG_OVERRIDE = { # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.11"], + "ignored_versions": ["2.7", "3.7", "3.8", "3.10", "3.11", "3.12", "3.13"], # Old samples are opted out of enforcing Python type hints # All new samples should feature them # "enforce_type_hints": True, diff --git a/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster.py b/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster.py new file mode 100644 index 00000000000..45334c82ee0 --- /dev/null +++ b/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through submitting a Spark job to a +# Dataproc driver node group cluster using the Dataproc +# client library. + +# Usage: +# python submit_pyspark_job_to_driver_node_group_cluster.py \ +# --project_id --region \ +# --cluster_name + +# [START dataproc_submit_pyspark_job_to_driver_node_group_cluster] + +import re + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + + +def submit_job(project_id, region, cluster_name): + """Submits a PySpark job to a Dataproc cluster with a driver node group. + + Args: + project_id (str): The ID of the Google Cloud project. + region (str): The region where the Dataproc cluster is located. + cluster_name (str): The name of the Dataproc cluster. + """ + # Create the job client. + job_client = dataproc.JobControllerClient( + client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} + ) + + driver_scheduling_config = dataproc.DriverSchedulingConfig( + memory_mb=2048, # Example memory in MB + vcores=2, # Example number of vcores + ) + + # Create the job config. The main Python file URI points to the script in + # a Google Cloud Storage bucket. + job = { + "placement": {"cluster_name": cluster_name}, + "pyspark_job": { + "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py" + }, + "driver_scheduling_config": driver_scheduling_config, + } + + operation = job_client.submit_job_as_operation( + request={"project_id": project_id, "region": region, "job": job} + ) + response = operation.result() + + # Dataproc job output gets saved to the Google Cloud Storage bucket + # allocated to the job. Use a regex to obtain the bucket and blob info. + matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri) + if not matches: + raise ValueError( + f"Unexpected driver output URI: {response.driver_output_resource_uri}" + ) + + output = ( + storage.Client() + .get_bucket(matches.group(1)) + .blob(f"{matches.group(2)}.000000000") + .download_as_bytes() + .decode("utf-8") + ) + + print(f"Job finished successfully: {output}") + + +# [END dataproc_submit_pyspark_job_to_driver_node_group_cluster] + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Submits a Spark job to a Dataproc driver node group cluster." + ) + parser.add_argument( + "--project_id", help="The Google Cloud project ID.", required=True + ) + parser.add_argument( + "--region", + help="The Dataproc region where the cluster is located.", + required=True, + ) + parser.add_argument( + "--cluster_name", help="The name of the Dataproc cluster.", required=True + ) + + args = parser.parse_args() + submit_job(args.project_id, args.region, args.cluster_name) diff --git a/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster_test.py b/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster_test.py new file mode 100644 index 00000000000..38e3ebb24e3 --- /dev/null +++ b/dataproc/snippets/submit_pyspark_job_to_driver_node_group_cluster_test.py @@ -0,0 +1,88 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import uuid + +import backoff +from google.api_core.exceptions import ( + Aborted, + InternalServerError, + NotFound, + ServiceUnavailable, +) +from google.cloud import dataproc_v1 as dataproc + +import submit_pyspark_job_to_driver_node_group_cluster + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-central1" +CLUSTER_NAME = f"py-ps-test-{str(uuid.uuid4())}" + +cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} +) + + +@backoff.on_exception(backoff.expo, (Exception), max_tries=5) +def teardown(): + try: + operation = cluster_client.delete_cluster( + request={ + "project_id": PROJECT_ID, + "region": REGION, + "cluster_name": CLUSTER_NAME, + } + ) + # Wait for cluster to delete + operation.result() + except NotFound: + print("Cluster already deleted") + + +@backoff.on_exception( + backoff.expo, + ( + InternalServerError, + ServiceUnavailable, + Aborted, + ), + max_tries=5, +) +def test_workflows(capsys): + # Setup driver node group cluster. TODO: cleanup b/424371877 + command = f"""gcloud dataproc clusters create {CLUSTER_NAME} \ + --region {REGION} \ + --project {PROJECT_ID} \ + --driver-pool-size=1 \ + --driver-pool-id=pytest""" + + output = subprocess.run( + command, + capture_output=True, + shell=True, + check=True, + ) + print(output) + + # Wrapper function for client library function + submit_pyspark_job_to_driver_node_group_cluster.submit_job( + PROJECT_ID, REGION, CLUSTER_NAME + ) + + out, _ = capsys.readouterr() + assert "Job finished successfully" in out + + # cluster deleted in teardown()