Skip to content

Commit 3d9ae5e

Browse files
feat(dataproc): create pyspark nodegroup cluster sample (#13513)
* Create submit_pyspark_job_to_driver_node_group_cluster.py Sample code to submit a PySpark job to a Dataproc driver node group cluster. * Update submit_pyspark_job_to_driver_node_group_cluster.py Added a END region tag * Update submit_pyspark_job_to_driver_node_group_cluster.py Improving readability * Delete submit_pyspark_job_to_driver_node_group_cluster.py * Update submit_pyspark_job_to_driver_node_group_cluster.py - moved the file to dataproc/snippets/ - updated the description for main_python_file_uri * Update submit_pyspark_job_to_driver_node_group_cluster.py changing the main method * Create submit_pyspark_job_to_driver_node_group_cluster_test.py adding the test file * fix: reduce quota issues by dropping tested versions Addresses #13456 * auto format document * ci: test current required versions, only * Ref #13456 only test one python version, quota issues * ci: unique cluster name for tests * confirm name to naming regex --------- Co-authored-by: Katie McLaughlin <[email protected]>
1 parent cd8c39d commit 3d9ae5e

File tree

3 files changed

+196
-1
lines changed

3 files changed

+196
-1
lines changed

dataproc/snippets/noxfile_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
TEST_CONFIG_OVERRIDE = {
2424
# You can opt out from the test for specific Python versions.
25-
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.11"],
25+
"ignored_versions": ["2.7", "3.7", "3.8", "3.10", "3.11", "3.12", "3.13"],
2626
# Old samples are opted out of enforcing Python type hints
2727
# All new samples should feature them
2828
# "enforce_type_hints": True,
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2025 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# This sample walks a user through submitting a Spark job to a
18+
# Dataproc driver node group cluster using the Dataproc
19+
# client library.
20+
21+
# Usage:
22+
# python submit_pyspark_job_to_driver_node_group_cluster.py \
23+
# --project_id <PROJECT_ID> --region <REGION> \
24+
# --cluster_name <CLUSTER_NAME>
25+
26+
# [START dataproc_submit_pyspark_job_to_driver_node_group_cluster]
27+
28+
import re
29+
30+
from google.cloud import dataproc_v1 as dataproc
31+
from google.cloud import storage
32+
33+
34+
def submit_job(project_id, region, cluster_name):
35+
"""Submits a PySpark job to a Dataproc cluster with a driver node group.
36+
37+
Args:
38+
project_id (str): The ID of the Google Cloud project.
39+
region (str): The region where the Dataproc cluster is located.
40+
cluster_name (str): The name of the Dataproc cluster.
41+
"""
42+
# Create the job client.
43+
job_client = dataproc.JobControllerClient(
44+
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
45+
)
46+
47+
driver_scheduling_config = dataproc.DriverSchedulingConfig(
48+
memory_mb=2048, # Example memory in MB
49+
vcores=2, # Example number of vcores
50+
)
51+
52+
# Create the job config. The main Python file URI points to the script in
53+
# a Google Cloud Storage bucket.
54+
job = {
55+
"placement": {"cluster_name": cluster_name},
56+
"pyspark_job": {
57+
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
58+
},
59+
"driver_scheduling_config": driver_scheduling_config,
60+
}
61+
62+
operation = job_client.submit_job_as_operation(
63+
request={"project_id": project_id, "region": region, "job": job}
64+
)
65+
response = operation.result()
66+
67+
# Dataproc job output gets saved to the Google Cloud Storage bucket
68+
# allocated to the job. Use a regex to obtain the bucket and blob info.
69+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
70+
if not matches:
71+
raise ValueError(
72+
f"Unexpected driver output URI: {response.driver_output_resource_uri}"
73+
)
74+
75+
output = (
76+
storage.Client()
77+
.get_bucket(matches.group(1))
78+
.blob(f"{matches.group(2)}.000000000")
79+
.download_as_bytes()
80+
.decode("utf-8")
81+
)
82+
83+
print(f"Job finished successfully: {output}")
84+
85+
86+
# [END dataproc_submit_pyspark_job_to_driver_node_group_cluster]
87+
88+
if __name__ == "__main__":
89+
import argparse
90+
91+
parser = argparse.ArgumentParser(
92+
description="Submits a Spark job to a Dataproc driver node group cluster."
93+
)
94+
parser.add_argument(
95+
"--project_id", help="The Google Cloud project ID.", required=True
96+
)
97+
parser.add_argument(
98+
"--region",
99+
help="The Dataproc region where the cluster is located.",
100+
required=True,
101+
)
102+
parser.add_argument(
103+
"--cluster_name", help="The name of the Dataproc cluster.", required=True
104+
)
105+
106+
args = parser.parse_args()
107+
submit_job(args.project_id, args.region, args.cluster_name)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import os
16+
import subprocess
17+
import uuid
18+
19+
import backoff
20+
from google.api_core.exceptions import (
21+
Aborted,
22+
InternalServerError,
23+
NotFound,
24+
ServiceUnavailable,
25+
)
26+
from google.cloud import dataproc_v1 as dataproc
27+
28+
import submit_pyspark_job_to_driver_node_group_cluster
29+
30+
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
31+
REGION = "us-central1"
32+
CLUSTER_NAME = f"py-ps-test-{str(uuid.uuid4())}"
33+
34+
cluster_client = dataproc.ClusterControllerClient(
35+
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
36+
)
37+
38+
39+
@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
40+
def teardown():
41+
try:
42+
operation = cluster_client.delete_cluster(
43+
request={
44+
"project_id": PROJECT_ID,
45+
"region": REGION,
46+
"cluster_name": CLUSTER_NAME,
47+
}
48+
)
49+
# Wait for cluster to delete
50+
operation.result()
51+
except NotFound:
52+
print("Cluster already deleted")
53+
54+
55+
@backoff.on_exception(
56+
backoff.expo,
57+
(
58+
InternalServerError,
59+
ServiceUnavailable,
60+
Aborted,
61+
),
62+
max_tries=5,
63+
)
64+
def test_workflows(capsys):
65+
# Setup driver node group cluster. TODO: cleanup b/424371877
66+
command = f"""gcloud dataproc clusters create {CLUSTER_NAME} \
67+
--region {REGION} \
68+
--project {PROJECT_ID} \
69+
--driver-pool-size=1 \
70+
--driver-pool-id=pytest"""
71+
72+
output = subprocess.run(
73+
command,
74+
capture_output=True,
75+
shell=True,
76+
check=True,
77+
)
78+
print(output)
79+
80+
# Wrapper function for client library function
81+
submit_pyspark_job_to_driver_node_group_cluster.submit_job(
82+
PROJECT_ID, REGION, CLUSTER_NAME
83+
)
84+
85+
out, _ = capsys.readouterr()
86+
assert "Job finished successfully" in out
87+
88+
# cluster deleted in teardown()

0 commit comments

Comments
 (0)