diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index e0099e8f9..0146e5477 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -260,7 +260,112 @@ operator: "All" targetReplicatedJobs: - {args.targetReplicatedJob} + startupPolicy: + startupPolicyOrder: InOrder replicatedJobs: + - name: rm + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_rm_args} + env: + - name: REPLICATED_JOB_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] + - name: JOBSET_NAME + valueFrom: + fieldRef: + fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] + - name: HOST_ADDRESS + value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) + - name: TPU_SKIP_MDS_QUERY + value: "true" + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-rm" + - name: NAMESPACE + value: "cloud_prod" + image: {args.server_image} + imagePullPolicy: Always + name: pathways-rm + ports: + - containerPort: 29001 + securityContext: + privileged: true + volumeMounts: + - mountPath: /tmp + name: shared-tmp + nodeSelector: + cloud.google.com/gke-nodepool: cpu-rm-np + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + volumes: + - hostPath: + path: /tmp + type: DirectoryOrCreate + name: shared-tmp + - name: proxy + replicas: 1 + template: + metadata: + labels: + xpk.google.com/workload: {args.workload} + spec: + backoffLimit: 0 + completions: 1 + parallelism: 1 + template: + spec: + containers: + - args: + {pathways_proxy_args} + env: + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-proxy" + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + image: {args.proxy_server_image} + imagePullPolicy: Always + name: pathways-proxy + ports: + - containerPort: 29000 + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + nodeSelector: + cloud.google.com/gke-nodepool: cpu-proxy-np + {user_workload} - name: worker replicas: {args.num_slices} template: @@ -277,10 +382,16 @@ metadata: annotations: {storage_annotations} + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "500m" + gke-gcsfuse/memory-limit: "350Gi" + gke-gcsfuse/ephemeral-storage-limit: "40Gi" spec: terminationGracePeriodSeconds: {args.termination_grace_period_seconds} serviceAccountName: {service_account} containers: + - name: gke-gcsfuse-sidecar + image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64 - args: {pathways_worker_args} image: {args.server_image} @@ -298,6 +409,12 @@ volumeMounts: - mountPath: /tmp name: shared-tmp + - mountPath: /tmp/gcsfuse + name: gcs-ckpt-pvc + readOnly: false + - mountPath: /tmp/dataset + name: gcs-dataset-pvc + readOnly: false {storage_volume_mounts} env: # Workaround for v6e @@ -321,6 +438,20 @@ fieldPath: "metadata.labels['jobset.sigs.k8s.io/job-index']" - name: MEGASCALE_COORDINATOR_ADDRESS value: "$(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-$(MEGASCALE_SLICE_ID)-0.$(JOBSET_NAME)" + - name: PROJECT_ID + value: {args.project} + - name: LOCATION + value: {args.zone} + - name: CLUSTER_NAME + value: {args.cluster} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CONTAINER_NAME + value: "pathways-worker" + - name: NAMESPACE + value: "cloud_prod" {pathways_sidecar_container} nodeSelector: {accelerator_label} @@ -334,79 +465,17 @@ path: /tmp type: DirectoryOrCreate name: shared-tmp + - name: gcs-ckpt-pvc + persistentVolumeClaim: + claimName: ckpt-bucket-pvc + - name: gcs-dataset-pvc + persistentVolumeClaim: + claimName: cached-dataset-bucket-pvc + - name: gke-gcsfuse-cache + emptyDir: + medium: Memory + sizeLimit: 100Gi {storage_volumes} - - name: rm - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_rm_args} - env: - - name: REPLICATED_JOB_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/replicatedjob-name'] - - name: JOBSET_NAME - valueFrom: - fieldRef: - fieldPath: metadata.annotations['jobset.sigs.k8s.io/jobset-name'] - - name: HOST_ADDRESS - value: $(JOBSET_NAME)-$(REPLICATED_JOB_NAME)-0-0.$(JOBSET_NAME) - - name: TPU_SKIP_MDS_QUERY - value: "true" - image: {args.server_image} - imagePullPolicy: Always - name: pathways-rm - ports: - - containerPort: 29001 - securityContext: - privileged: true - volumeMounts: - - mountPath: /tmp - name: shared-tmp - nodeSelector: - cloud.google.com/gke-nodepool: cpu-rm-np - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - volumes: - - hostPath: - path: /tmp - type: DirectoryOrCreate - name: shared-tmp - - name: proxy - replicas: 1 - template: - metadata: - labels: - xpk.google.com/workload: {args.workload} - spec: - backoffLimit: 0 - completions: 1 - parallelism: 1 - template: - spec: - containers: - - args: - {pathways_proxy_args} - image: {args.proxy_server_image} - imagePullPolicy: Always - name: pathways-proxy - ports: - - containerPort: 29000 - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - nodeSelector: - cloud.google.com/gke-nodepool: cpu-proxy-np - {user_workload} """ @@ -534,7 +603,10 @@ def workload_create(args) -> None: - PodFailurePolicy""" restart_on_exit_codes = get_restart_exit_codes(args) restart_on_exit_codes = ','.join(map(str, restart_on_exit_codes)) - pod_failure_policy = f""" + if args.use_pathways == True: + pod_failure_policy = '' + else: + pod_failure_policy = f""" podFailurePolicy: rules: - action: FailJob @@ -625,7 +697,7 @@ def workload_create(args) -> None: ].resource_type, local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, - backoff_limit=system.vms_per_slice * 4, + backoff_limit=system.vms_per_slice * 10, storage_annotations=storage_annotations, storage_volumes=get_storage_volumes_yaml(all_storages), storage_volume_mounts=get_storage_volume_mounts_yaml(all_storages), diff --git a/src/xpk/core/cluster.py b/src/xpk/core/cluster.py index 2a5e3d840..34c282a18 100644 --- a/src/xpk/core/cluster.py +++ b/src/xpk/core/cluster.py @@ -31,7 +31,7 @@ from .nodepool import upgrade_gke_nodepools_version from .system_characteristics import SystemCharacteristics -JOBSET_VERSION = 'v0.7.2' +JOBSET_VERSION = 'v0.8.0' INSTALLER_NCC_TCPX = 'https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/gpudirect-tcpx/nccl-tcpx-installer.yaml' INSTALLER_NCC_TCPXO = 'https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/gpudirect-tcpxo/nccl-tcpxo-installer.yaml' diff --git a/src/xpk/core/docker_resources.py b/src/xpk/core/docker_resources.py index e52d6daee..dea0b1d10 100644 --- a/src/xpk/core/docker_resources.py +++ b/src/xpk/core/docker_resources.py @@ -237,6 +237,12 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str: if args.use_pathways: volume_mount_yaml = """- mountPath: /tmp name: shared-tmp + - mountPath: /tmp/dataset + name: gcs-dataset-pvc + readOnly: false + - mountPath: /tmp/gcsfuse + name: gcs-ckpt-pvc + readOnly: false """ elif ( system.accelerator_type == AcceleratorType['TPU'] diff --git a/src/xpk/core/nodepool.py b/src/xpk/core/nodepool.py index b1ebddeb1..539f47b9c 100644 --- a/src/xpk/core/nodepool.py +++ b/src/xpk/core/nodepool.py @@ -318,7 +318,7 @@ def run_gke_node_pool_create_command( create_commands.append(command) create_task_names.append(task) - desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'] + desired_pw_cpu_node_pools = ['cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np', 'high-mem-pool'] if args.enable_pathways: # Pathways needs CPU nodepools in addition to TPU nodepools for node_pool_name in desired_pw_cpu_node_pools: diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index f72375c99..614256174 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -131,6 +131,14 @@ def add_pw_resource_flavors(args): spec: nodeLabels: cloud.google.com/gke-nodepool: cpu-user-np +--- +apiVersion: kueue.x-k8s.io/v1beta1 +kind: ResourceFlavor +metadata: + name: high-mem +spec: + nodeLabels: + cloud.google.com/gke-nodepool: high-mem-pool ---""" if args.enable_pathways: return resource_flavor_yaml @@ -154,6 +162,12 @@ def add_pw_resources_to_kueue(args): - name: "memory" nominalQuota: 2000G - name: cpu-user + resources: + - name: "cpu" + nominalQuota: 480 + - name: "memory" + nominalQuota: 2000G + - name: high-mem resources: - name: "cpu" nominalQuota: 480 @@ -186,7 +200,7 @@ def ensure_pathways_workload_prerequisites(args, system) -> bool: # Ensure the cluster and CPU nodepools were created with create-pathways all_node_pools = get_all_nodepools_programmatic(args) - desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np'} + desired_pw_cpu_node_pools = {'cpu-user-np', 'cpu-rm-np', 'cpu-proxy-np', 'high-mem-pool'} if not desired_pw_cpu_node_pools.issubset(set(all_node_pools[0])): xpk_print( 'Cluster needs to be created with `xpk create-pathways` to run' @@ -303,12 +317,17 @@ def get_user_workload_for_pathways( metadata: annotations: {gcs_fuse_annotation} + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "500m" + gke-gcsfuse/memory-limit: "0" + gke-gcsfuse/ephemeral-storage-limit: "40Gi" spec: containers: + - name: gke-gcsfuse-sidecar + image: gcr.io/gcs-tess/gcs-fuse-csi-driver-sidecar-mounter:v2.10.0_linux_amd64 {container} - serviceAccountName: {service_account} nodeSelector: - cloud.google.com/gke-nodepool: cpu-user-np + cloud.google.com/gke-nodepool: high-mem-pool hostNetwork: true dnsPolicy: ClusterFirstWithHostNet restartPolicy: Never @@ -317,6 +336,16 @@ def get_user_workload_for_pathways( path: /tmp type: DirectoryOrCreate name: shared-tmp + - name: gke-gcsfuse-cache + emptyDir: + medium: Memory + sizeLimit: 150Gi + - name: gcs-dataset-pvc + persistentVolumeClaim: + claimName: cached-dataset-bucket-pvc + - name: gcs-ckpt-pvc + persistentVolumeClaim: + claimName: ckpt-bucket-pvc {storage_volumes}""" if args.headless: return ''