diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 3c6c65633..e41c7ba0f 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -48,6 +48,7 @@ from ..core.kueue import LOCAL_QUEUE_NAME from ..core.nap import ( get_autoprovisioning_node_selector_args, + get_autoprovisioning_tolerations, is_autoprovisioning_enabled, ) from ..core.pathways import ( @@ -101,6 +102,8 @@ hostNetwork: true dnsPolicy: ClusterFirstWithHostNet terminationGracePeriodSeconds: {args.termination_grace_period_seconds} + tolerations: + {autoprovisioning_tolerations} containers: {container} volumes: @@ -395,6 +398,7 @@ def workload_create(args) -> None: # Currently autoprovisioning is not enabled for Pathways workloads. autoprovisioning_args = '' + autoprovisioning_tolerations = '' autoprovisioning_enabled, return_code = is_autoprovisioning_enabled( args, system ) @@ -407,6 +411,11 @@ def workload_create(args) -> None: ) if return_code != 0: xpk_exit(return_code) + autoprovisioning_tolerations, return_code = ( + get_autoprovisioning_tolerations(args) + ) + if return_code != 0: + xpk_exit(return_code) # Create the workload file based on accelerator type or workload type. if system.accelerator_type == AcceleratorType['GPU']: @@ -467,6 +476,7 @@ def workload_create(args) -> None: local_queue_name=LOCAL_QUEUE_NAME, autoprovisioning_args=autoprovisioning_args, volumes=get_volumes(args, system), + autoprovisioning_tolerations=autoprovisioning_tolerations, ) tmp = write_tmp_file(yml_string) command = f'kubectl apply -f {str(tmp.file.name)}' diff --git a/src/xpk/core/core.py b/src/xpk/core/core.py index 91db6298f..e43fa62a7 100644 --- a/src/xpk/core/core.py +++ b/src/xpk/core/core.py @@ -763,7 +763,7 @@ def get_capacity_node_selectors_from_capacity_type( case CapacityType.ON_DEMAND.name: node_selector = '' case CapacityType.SPOT.name: - node_selector = 'cloud.google.com/gke-spot="true"' + node_selector = 'cloud.google.com/gke-spot: "true"' case CapacityType.RESERVATION.name: node_selector = f'cloud.google.com/reservation-name: {args.reservation}' case _: diff --git a/src/xpk/core/nap.py b/src/xpk/core/nap.py index b6021e96f..a6192dec7 100644 --- a/src/xpk/core/nap.py +++ b/src/xpk/core/nap.py @@ -285,6 +285,49 @@ def is_autoprovisioning_enabled( return False, 1 +def get_capacity_type_str_from_args_or_cluster_default(args) -> tuple[str, int]: + """Determine the capacity type based on user arguments or cluster default. + + Args: + args: user provided arguments for running the command. + + Returns: + Tuple with string with the system characteristics and + int of 0 if successful and 1 otherwise. + """ + # If the user doesn't specify args, then use the cluster settings. + capacity_type, return_code = get_capacity_type(args) + if return_code != 0: + xpk_print('Unable to get capacity type.') + return CapacityType.UNKNOWN.name, return_code + + if capacity_type != CapacityType.UNKNOWN: + return capacity_type.name, 0 + + # Use default settings from cluster creation. + # + # Error out if the metadata config map doesn't exist, and is attempting to use + # autoprovisioning. + cluster_config_map = get_cluster_configmap( + args, f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' + ) + if cluster_config_map is None: + xpk_print( + 'Unable to find config map. Please specify a capacity type' + ' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue' + ' to use autoprovisioning (--enable-autoprovisioning).' + ) + return CapacityType.UNKNOWN.name, 1 + + return_code, capacity_type_str = get_value_from_map( + CAPACITY_TYPE_CONFIG_KEY, cluster_config_map + ) + if return_code != 0: + return CapacityType.UNKNOWN.name, return_code + + return capacity_type_str, 0 + + def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]: """Determine the capacity type when autoprovisioning is enabled. @@ -297,44 +340,33 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]: """ return_code = 0 node_selector_args = '' - # If the user doesn't specify args, then use the cluster settings. - capacity_type, return_code = get_capacity_type(args) - capacity_type_str = capacity_type.name + capacity_type_str, return_code = ( + get_capacity_type_str_from_args_or_cluster_default(args) + ) if return_code != 0: - xpk_print('Unable to get capacity type.') return node_selector_args, return_code - if capacity_type_str == CapacityType.UNKNOWN.name: - # Use default settings from cluster creation. - metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) - - # Error out if the metadata config map doesn't exist, and is attempting to use - # autoprovisioning. - if cluster_config_map is None: - xpk_print( - 'Unable to find config map. Please specify a capacity type' - ' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue' - ' to use autoprovisioning (--enable-autoprovisioning).' - ) - return node_selector_args, 1 - - return_code, capacity_type_str = get_value_from_map( - CAPACITY_TYPE_CONFIG_KEY, cluster_config_map + cluster_config_map = get_cluster_configmap( + args, f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' + ) + if cluster_config_map is None: + xpk_print( + 'Unable to find config map. Please specify a capacity type' + ' --on-demand, --spot, --reservation=$RESERVATION_ID) to continue' + ' to use autoprovisioning (--enable-autoprovisioning).' + ) + return node_selector_args, 1 + + if capacity_type_str == CapacityType.RESERVATION.name: + return_code, args.reservation = get_value_from_map( + RESERVATION_CONFIG_KEY, cluster_config_map ) if return_code != 0: return node_selector_args, return_code - - if capacity_type_str == CapacityType.RESERVATION.name: - return_code, args.reservation = get_value_from_map( - RESERVATION_CONFIG_KEY, cluster_config_map - ) - if return_code != 0: - return node_selector_args, return_code - return_code = verify_reservation_exists(args) - if return_code > 0: - xpk_print('Unable to verify reservation name saved in config map.') - return node_selector_args, return_code + return_code = verify_reservation_exists(args) + if return_code > 0: + xpk_print('Unable to verify reservation name saved in config map.') + return node_selector_args, return_code # Check if reservation id is valid. Shared function with cluster creation. node_selector_args, return_code = ( @@ -345,3 +377,36 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]: return node_selector_args, return_code return node_selector_args, return_code + + +def get_autoprovisioning_tolerations(args) -> tuple[str, int]: + """Determine the pod tolerations when autoprovisioning is enabled. + + Args: + args: user provided arguments for running the command. + + Returns: + Tuple with string of autoprovisioning tolerations and + int of 0 if successful and 1 otherwise. + """ + capacity_type_str, return_code = ( + get_capacity_type_str_from_args_or_cluster_default(args) + ) + if return_code != 0: + return '', return_code + + if capacity_type_str == CapacityType.SPOT.name: + # https://cloud.google.com/kubernetes-engine/docs/concepts/node-auto-provisioning#support_for_spot_vms + # + # > Creating node pools based on Spot VMs is only considered if + # > unschedulable pods with a toleration for the + # > cloud.google.com/gke-spot="true":NoSchedule taint exist + return ( + '''- key: "cloud.google.com/gke-spot" + operator: "Equal" + value: "true" + effect: "NoSchedule"''', + 0, + ) + + return '', 0