Skip to content
Draft
220 changes: 146 additions & 74 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,112 @@
operator: "All"
targetReplicatedJobs:
- {args.targetReplicatedJob}
startupPolicy:
startupPolicyOrder: InOrder
replicatedJobs:
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-rm"
- name: NAMESPACE
value: "cloud_prod"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
env:
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-proxy"
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
- name: worker
replicas: {args.num_slices}
template:
Expand All @@ -277,10 +382,16 @@
metadata:
annotations:
{storage_annotations}
gke-gcsfuse/volumes: "true"
gke-gcsfuse/cpu-limit: "500m"
gke-gcsfuse/memory-limit: "350Gi"
gke-gcsfuse/ephemeral-storage-limit: "40Gi"
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
serviceAccountName: {service_account}
containers:
- name: gke-gcsfuse-sidecar
image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64
- args:
{pathways_worker_args}
image: {args.server_image}
Expand All @@ -298,6 +409,12 @@
volumeMounts:
- mountPath: /tmp
name: shared-tmp
- mountPath: /tmp/gcsfuse
name: gcs-ckpt-pvc
readOnly: false
- mountPath: /tmp/dataset
name: gcs-dataset-pvc
readOnly: false
{storage_volume_mounts}
env:
# Workaround for v6e
Expand All @@ -321,6 +438,20 @@
fieldPath: "metadata.labels['jobset.sigs.k8s.io/job-index']"
- name: MEGASCALE_COORDINATOR_ADDRESS
value: "$(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-$(MEGASCALE_SLICE_ID)-0.$(JOBSET_NAME)"
- name: PROJECT_ID
value: {args.project}
- name: LOCATION
value: {args.zone}
- name: CLUSTER_NAME
value: {args.cluster}
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: CONTAINER_NAME
value: "pathways-worker"
- name: NAMESPACE
value: "cloud_prod"
{pathways_sidecar_container}
nodeSelector:
{accelerator_label}
Expand All @@ -334,79 +465,17 @@
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: gcs-ckpt-pvc
persistentVolumeClaim:
claimName: ckpt-bucket-pvc
- name: gcs-dataset-pvc
persistentVolumeClaim:
claimName: cached-dataset-bucket-pvc
- name: gke-gcsfuse-cache
emptyDir:
medium: Memory
sizeLimit: 100Gi
{storage_volumes}
- name: rm
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_rm_args}
env:
- name: REPLICATED_JOB_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name']
- name: JOBSET_NAME
valueFrom:
fieldRef:
fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name']
- name: HOST_ADDRESS
value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME)
- name: TPU_SKIP_MDS_QUERY
value: "true"
image: {args.server_image}
imagePullPolicy: Always
name: pathways-rm
ports:
- containerPort: 29001
securityContext:
privileged: true
volumeMounts:
- mountPath: /tmp
name: shared-tmp
nodeSelector:
cloud.google.com/gke-nodepool: cpu-rm-np
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- hostPath:
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: proxy
replicas: 1
template:
metadata:
labels:
xpk.google.com/workload: {args.workload}
spec:
backoffLimit: 0
completions: 1
parallelism: 1
template:
spec:
containers:
- args:
{pathways_proxy_args}
image: {args.proxy_server_image}
imagePullPolicy: Always
name: pathways-proxy
ports:
- containerPort: 29000
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
nodeSelector:
cloud.google.com/gke-nodepool: cpu-proxy-np
{user_workload}
"""


Expand Down Expand Up @@ -534,7 +603,10 @@ def workload_create(args) -> None:
- PodFailurePolicy"""
restart_on_exit_codes = get_restart_exit_codes(args)
restart_on_exit_codes = ','.join(map(str, restart_on_exit_codes))
pod_failure_policy = f"""
if args.use_pathways == True:
pod_failure_policy = ''
else:
pod_failure_policy = f"""
podFailurePolicy:
rules:
- action: FailJob
Expand Down Expand Up @@ -625,7 +697,7 @@ def workload_create(args) -> None:
].resource_type,
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
backoff_limit=system.vms_per_slice * 4,
backoff_limit=system.vms_per_slice * 10,
storage_annotations=storage_annotations,
storage_volumes=get_storage_volumes_yaml(all_storages),
storage_volume_mounts=get_storage_volume_mounts_yaml(all_storages),
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from .nodepool import upgrade_gke_nodepools_version
from .system_characteristics import SystemCharacteristics

JOBSET_VERSION = 'v0.7.2'
JOBSET_VERSION = 'v0.8.0'
INSTALLER_NCC_TCPX = 'https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/gpudirect-tcpx/nccl-tcpx-installer.yaml'
INSTALLER_NCC_TCPXO = 'https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/gpudirect-tcpxo/nccl-tcpxo-installer.yaml'

Expand Down
6 changes: 6 additions & 0 deletions src/xpk/core/docker_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
if args.use_pathways:
volume_mount_yaml = """- mountPath: /tmp
name: shared-tmp
- mountPath: /tmp/dataset
name: gcs-dataset-pvc
readOnly: false
- mountPath: /tmp/gcsfuse
name: gcs-ckpt-pvc
readOnly: false
"""
elif (
system.accelerator_type == AcceleratorType['TPU']
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/core/nodepool.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def run_gke_node_pool_create_command(
create_commands.append(command)
create_task_names.append(task)

desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np']
desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np', 'high-mem-pool']
if args.enable_pathways:
# Pathways needs CPU nodepools in addition to TPU nodepools
for node_pool_name in desired_pw_cpu_node_pools:
Expand Down
35 changes: 32 additions & 3 deletions src/xpk/core/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ def add_pw_resource_flavors(args):
spec:
nodeLabels:
cloud.google.com/gke-nodepool: cpu-user-np
---
apiVersion: kueue.x-k8s.io/v1beta1
kind: ResourceFlavor
metadata:
name: high-mem
spec:
nodeLabels:
cloud.google.com/gke-nodepool: high-mem-pool
---"""
if args.enable_pathways:
return resource_flavor_yaml
Expand All @@ -154,6 +162,12 @@ def add_pw_resources_to_kueue(args):
- name: "memory"
nominalQuota: 2000G
- name: cpu-user
resources:
- name: "cpu"
nominalQuota: 480
- name: "memory"
nominalQuota: 2000G
- name: high-mem
resources:
- name: "cpu"
nominalQuota: 480
Expand Down Expand Up @@ -186,7 +200,7 @@ def ensure_pathways_workload_prerequisites(args, system) -> bool:

# Ensure the cluster and CPU nodepools were created with create-pathways
all_node_pools = get_all_nodepools_programmatic(args)
desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'}
desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np', 'high-mem-pool'}
if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])):
xpk_print(
'Cluster needs to be created with `xpk create-pathways` to run'
Expand Down Expand Up @@ -303,12 +317,17 @@ def get_user_workload_for_pathways(
metadata:
annotations:
{gcs_fuse_annotation}
gke-gcsfuse/volumes: "true"
gke-gcsfuse/cpu-limit: "500m"
gke-gcsfuse/memory-limit: "0"
gke-gcsfuse/ephemeral-storage-limit: "40Gi"
spec:
containers:
- name: gke-gcsfuse-sidecar
image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64
{container}
serviceAccountName: {service_account}
nodeSelector:
cloud.google.com/gke-nodepool: cpu-user-np
cloud.google.com/gke-nodepool: high-mem-pool
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
restartPolicy: Never
Expand All @@ -317,6 +336,16 @@ def get_user_workload_for_pathways(
path: /tmp
type: DirectoryOrCreate
name: shared-tmp
- name: gke-gcsfuse-cache
emptyDir:
medium: Memory
sizeLimit: 150Gi
- name: gcs-dataset-pvc
persistentVolumeClaim:
claimName: cached-dataset-bucket-pvc
- name: gcs-ckpt-pvc
persistentVolumeClaim:
claimName: ckpt-bucket-pvc
{storage_volumes}"""
if args.headless:
return ''
Expand Down