diff --git a/perfkitbenchmarker/benchmark_spec.py b/perfkitbenchmarker/benchmark_spec.py index 7a06f34696..780e537107 100644 --- a/perfkitbenchmarker/benchmark_spec.py +++ b/perfkitbenchmarker/benchmark_spec.py @@ -54,7 +54,6 @@ from perfkitbenchmarker import providers from perfkitbenchmarker import relational_db from perfkitbenchmarker import resource as resource_type -from perfkitbenchmarker import resources # pylint:disable=unused-import # Load the __init__.py from perfkitbenchmarker import smb_service from perfkitbenchmarker import stages from perfkitbenchmarker import static_virtual_machine as static_vm diff --git a/perfkitbenchmarker/configs/benchmark_config_spec.py b/perfkitbenchmarker/configs/benchmark_config_spec.py index 4331c5afac..5c791efcf2 100644 --- a/perfkitbenchmarker/configs/benchmark_config_spec.py +++ b/perfkitbenchmarker/configs/benchmark_config_spec.py @@ -40,8 +40,6 @@ from perfkitbenchmarker.configs import vm_group_decoders from perfkitbenchmarker.resources import example_resource_spec from perfkitbenchmarker.resources import jobs_setter -# Included to import & load Kubernetes' __init__.py somewhere. -from perfkitbenchmarker.resources import kubernetes # pylint:disable=unused-import from perfkitbenchmarker.resources import managed_ai_model_spec from perfkitbenchmarker.resources.pinecone import pinecone_resource_spec from perfkitbenchmarker.resources.vertex_vector_search import vvs_resource_spec diff --git a/perfkitbenchmarker/configs/default_benchmark_config.yaml b/perfkitbenchmarker/configs/default_benchmark_config.yaml index 71fe946b44..ac73b320e9 100644 --- a/perfkitbenchmarker/configs/default_benchmark_config.yaml +++ b/perfkitbenchmarker/configs/default_benchmark_config.yaml @@ -310,6 +310,43 @@ sparksql_tpcds_1t: provisioned_iops: 5000 provisioned_throughput: 480 +sparksql_tpcds_1t_tuned: + name: dpb_sparksql_benchmark + flags: + create_and_boot_post_task_delay: 2 + dpb_export_job_stats: True + dpb_sparksql_copy_to_hdfs: True + dpb_sparksql_create_hive_tables: False + dpb_sparksql_data_compression: snappy + dpb_sparksql_data_format: parquet + dpb_sparksql_order: 1,2,3,4,5,6,7,8,9,10,11,12,13,14a,14b,15,16,17,18,19,20,21,22,23a,23b,24a,24b,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39a,39b,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99 + dpb_sparksql_query: tpcds_2_4 + hadoop_hdfs_client_readahead: 0 + hadoop_hdfs_replication: 1 + hadoop_version: 3.4.1 + openjdk_version: 11 + spark_version: 3.5.6 + sparksql_readahead_kb: 1024 + dpb_service: + service_type: unmanaged_spark_cluster + worker_count: 8 + worker_group: + vm_spec: + GCP: + num_local_ssds: 0 + disk_count: 4 + disk_spec: + GCP: + disk_size: 320 + disk_type: hyperdisk-balanced + provisioned_iops: 160000 + provisioned_throughput: 2400 + AWS: + disk_size: 500 + disk_type: gp3 + provisioned_iops: 5000 + provisioned_throughput: 480 + sysbench_oltp_base: &sysbench_oltp_base sysbench_table_size: 50000000 sysbench_tables: 8 @@ -390,24 +427,6 @@ postgres_sysbench_tpcc: sysbench_scale: 100 sysbench_use_fk: False -mongodb: - name: mongodb_ycsb - flags: - mongodb_readahead_kb: 8 - iostat: True - sar: True - sar_interval: 1 - ycsb_fail_on_incomplete_loading: True - ycsb_measurement_type: hdrhistogram - ycsb_status: True - ycsb_status_interval_sec: 1 - ycsb_operation_count: 1000000000 - ycsb_record_command_line: False - ycsb_run_parameters: dataintegrity=true,readallfields=true,writeallfields=true - timeout_minutes: 360 - mongodb_primary_only: True - mongodb_pss: False - fio_latency: name: fio flags: @@ -446,7 +465,6 @@ hadoop_dfsio: hadoop_version: 3.4.1 num_cpus_override: 16 openjdk_version: 11 - os_type: ubuntu2404 yarn_scheduler: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler dpb_service: service_type: unmanaged_dpb_svc_yarn_cluster @@ -461,3 +479,74 @@ hadoop_dfsio: disk_size: 6830 disk_type: hyperdisk-throughput provisioned_throughput: 600 + +diskspd_read_only: + name: diskspd + flags: &diskspd_read_only_flags + diskspd_prefill_duration: 2700 + diskspd_write_read_ratio: 0 + +diskspd_write_only: + name: diskspd + flags: &diskspd_write_only_flags + diskspd_write_read_ratio: 100 + +diskspd_single_thread_latency: + name: diskspd + flags: &diskspd_single_thread_latency_flags + diskspd_thread_number_per_file: 1 + diskspd_outstanding_io: 1 + diskspd_block_size: 4k + diskspd_access_pattern: r + +diskspd_max_iops: + name: diskspd + flags: &diskspd_max_iops_flags + diskspd_thread_number_per_file: 1,4,8,16 + diskspd_outstanding_io: 128 + diskspd_block_size: 4k + diskspd_access_pattern: r + +diskspd_max_throughput: + name: diskspd + flags: &diskspd_max_throughput_flags + diskspd_thread_number_per_file: 8,16 + diskspd_outstanding_io: 64,128 + diskspd_block_size: 1M + diskspd_access_pattern: s + +diskspd_single_thread_latency_read_only: + name: diskspd + flags: + <<: *diskspd_read_only_flags + <<: *diskspd_single_thread_latency_flags + +diskspd_single_thread_latency_write_only: + name: diskspd + flags: + <<: *diskspd_write_only_flags + <<: *diskspd_single_thread_latency_flags + +diskspd_max_iops_read_only: + name: diskspd + flags: + <<: *diskspd_read_only_flags + <<: *diskspd_max_iops_flags + +diskspd_max_iops_write_only: + name: diskspd + flags: + <<: *diskspd_write_only_flags + <<: *diskspd_max_iops_flags + +diskspd_max_throughput_read_only: + name: diskspd + flags: + <<: *diskspd_read_only_flags + <<: *diskspd_max_throughput_flags + +diskspd_max_throughput_write_only: + name: diskspd + flags: + <<: *diskspd_write_only_flags + <<: *diskspd_max_throughput_flags diff --git a/perfkitbenchmarker/data/docker/flink/cloudbuild.yaml.j2 b/perfkitbenchmarker/data/docker/flink/cloudbuild.yaml.j2 index 70f15049cb..e4b423d859 100644 --- a/perfkitbenchmarker/data/docker/flink/cloudbuild.yaml.j2 +++ b/perfkitbenchmarker/data/docker/flink/cloudbuild.yaml.j2 @@ -1,6 +1,6 @@ steps: -- name: gcr.io/cloud-builders/gsutil - args: ['cp', {{ dpb_job_jarfile }}, 'job.jar'] +- name: gcr.io/cloud-builders/gcloud + args: ['storage', 'cp', {{ dpb_job_jarfile }}, 'job.jar'] - name: 'gcr.io/cloud-builders/docker' args: ['build', '--build-arg', diff --git a/perfkitbenchmarker/data/spark/spark-defaults.conf.j2 b/perfkitbenchmarker/data/spark/spark-defaults.conf.j2 index c9d3d6a9c0..50bd4299b0 100644 --- a/perfkitbenchmarker/data/spark/spark-defaults.conf.j2 +++ b/perfkitbenchmarker/data/spark/spark-defaults.conf.j2 @@ -17,6 +17,10 @@ spark.master=spark://{{ leader_ip }}:7077 # Hadoop configs prefixed with spark.hadoop spark.hadoop.fs.defaultFS=hdfs://{{ leader_ip }}/ +{% if shuffle_partitions is not none %} +spark.sql.shuffle.partitions={{ shuffle_partitions }} +{% endif %} + {% for key, value in spark_conf.items() %} {{ key }}={{ value }} {% endfor %} diff --git a/perfkitbenchmarker/edw_service.py b/perfkitbenchmarker/edw_service.py index 35b0e31dcb..600296b9d0 100644 --- a/perfkitbenchmarker/edw_service.py +++ b/perfkitbenchmarker/edw_service.py @@ -118,10 +118,21 @@ None, 'The schema of the hosted snowflake database to use during the benchmark.', ) +flags.DEFINE_string( + 'snowflake_account', + None, + 'The Snowflake account to use during the benchmark. Must follow the format' + ' "-".', +) +flags.DEFINE_string( + 'snowflake_user', + None, + 'The Snowflake user to use during the benchmark.', +) flags.DEFINE_enum( 'snowflake_client_interface', 'JDBC', - ['JDBC'], + ['JDBC', 'PYTHON'], 'The Runtime Interface used when interacting with Snowflake.', ) flags.DEFINE_string( @@ -180,6 +191,9 @@ FLAGS = flags.FLAGS +EDW_PYTHON_DRIVER_LIB_FILE = 'edw_python_driver_lib.py' +EDW_PYTHON_DRIVER_LIB_DIR = 'edw/common/clients/python' + TYPE_2_PROVIDER = dict([ ('athena', 'aws'), ('redshift', 'aws'), diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py index ca291a947c..d8146dfa27 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py @@ -45,6 +45,7 @@ """ from collections.abc import MutableMapping +import functools import json import logging import os @@ -53,11 +54,13 @@ from typing import Any, List from absl import flags +from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import dpb_service from perfkitbenchmarker import dpb_sparksql_benchmark_helper from perfkitbenchmarker import errors +from perfkitbenchmarker import linux_virtual_machine from perfkitbenchmarker import object_storage_service from perfkitbenchmarker import sample from perfkitbenchmarker import temp_dir @@ -123,6 +126,10 @@ ' DPB services.', ) +_READAHEAD_KB = flags.DEFINE_integer( + 'sparksql_readahead_kb', None, 'Configure block device readahead settings.' +) + FLAGS = flags.FLAGS LOG_RESULTS_PATTERN = ( @@ -214,6 +221,14 @@ def CheckPrerequisites(benchmark_config): ) +def _PrepareNode(vm: linux_virtual_machine.BaseLinuxVirtualMachine) -> None: + if _READAHEAD_KB.value is not None: + vm.SetReadAhead( + _READAHEAD_KB.value * 2, + [d.GetDevicePath() for d in vm.scratch_disks], + ) + + def Prepare(benchmark_spec): """Installs and sets up dataset on the Spark clusters. @@ -223,6 +238,12 @@ def Prepare(benchmark_spec): Args: benchmark_spec: The benchmark specification """ + # Only unmanaged dpb services are VM-aware + if benchmark_spec.dpb_service.CLOUD == 'Unmanaged': + nodes = benchmark_spec.dpb_service.vms['worker_group'] + partials = [functools.partial(_PrepareNode, node) for node in nodes] + background_tasks.RunThreaded((lambda f: f()), partials) + cluster = benchmark_spec.dpb_service storage_service = cluster.storage_service diff --git a/perfkitbenchmarker/linux_benchmarks/mongodb_ycsb_benchmark.py b/perfkitbenchmarker/linux_benchmarks/mongodb_ycsb_benchmark.py index b6a5a44559..2d0b532c32 100644 --- a/perfkitbenchmarker/linux_benchmarks/mongodb_ycsb_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/mongodb_ycsb_benchmark.py @@ -40,11 +40,11 @@ from perfkitbenchmarker.linux_packages import ycsb flags.DEFINE_integer( - 'mongodb_readahead_kb', None, 'Configure block device readahead settings.' + 'mongodb_readahead_kb', 8, 'Configure block device readahead settings.' ) flags.DEFINE_bool( 'mongodb_primary_only', - False, + True, 'Run with a simple primary-only setup. Mutually exclusive with' ' --mongodb_pss. If both are False, the default PSA setup will be used.', ) @@ -78,7 +78,19 @@ description: Run YCSB against MongoDB. vm_groups: primary: - vm_spec: *default_dual_core + vm_spec: + GCP: + machine_type: n4-standard-2 + zone: us-central1-b + boot_disk_size: 100 + Azure: + machine_type: Standard_D2s_v6 + zone: eastus-1 + boot_disk_size: 100 + AWS: + machine_type: m7i.large + zone: us-east-1a + boot_disk_size: 100 disk_spec: GCP: disk_size: 500 @@ -94,7 +106,19 @@ mount_point: /scratch vm_count: 1 secondary: - vm_spec: *default_dual_core + vm_spec: + GCP: + machine_type: n4-standard-2 + zone: us-central1-b + boot_disk_size: 100 + Azure: + machine_type: Standard_D2s_v6 + zone: eastus-1 + boot_disk_size: 100 + AWS: + machine_type: m7i.large + zone: us-east-1a + boot_disk_size: 100 disk_spec: GCP: disk_size: 500 @@ -110,7 +134,19 @@ mount_point: /scratch vm_count: 1 secondary_2: - vm_spec: *default_dual_core + vm_spec: + GCP: + machine_type: n4-standard-2 + zone: us-central1-b + boot_disk_size: 100 + Azure: + machine_type: Standard_D2s_v6 + zone: eastus-1 + boot_disk_size: 100 + AWS: + machine_type: m7i.large + zone: us-east-1a + boot_disk_size: 100 disk_spec: GCP: disk_size: 500 @@ -138,6 +174,27 @@ fstab_options: noatime enable_transparent_hugepages: false create_and_boot_post_task_delay: 5 + ycsb_fail_on_incomplete_loading: True + ycsb_measurement_type: hdrhistogram + ycsb_status: True + ycsb_status_interval_sec: 1 + ycsb_operation_count: 1000000000 + ycsb_record_command_line: False + ycsb_run_parameters: dataintegrity=true,readallfields=true,writeallfields=true + ycsb_client_vms: 1 + ycsb_preload_threads: 512 + ycsb_threads_per_client: "2048" + ycsb_workload_files: workloadac,workloada,workloadx + ycsb_field_count: 10 + ycsb_field_length: 100 + ycsb_record_count: 20000000 + ycsb_sleep_after_load_in_sec: 300 + ycsb_timelimit: 300 + ycsb_sleep_between_thread_runs_sec: 120 + os_type: ubuntu2204 + timeout_minutes: 360 + sar: True + sar_interval: 1 """ _LinuxVM = linux_virtual_machine.BaseLinuxVirtualMachine diff --git a/perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py b/perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py index 817f70e562..6a2ebd7de6 100644 --- a/perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/unmanaged_postgresql_sysbench_benchmark.py @@ -20,6 +20,7 @@ import copy import logging +import re import time from absl import flags @@ -105,10 +106,27 @@ _OLTP_WRITE_ONLY = 'oltp_write_only' _OLTP = [_OLTP_READ_WRITE, _OLTP_READ_ONLY, _OLTP_WRITE_ONLY] -_SHARED_BUFFER_SIZE = flags.DEFINE_integer( +_SHARED_BUFFER_SIZE = flags.DEFINE_string( 'postgresql_shared_buffer_size', - 10, - 'Size of the shared buffer in the postgresql cluster (in Gb).', + '10G', + 'Size of the shared buffer in the postgresql cluster.' + 'Format: [], where is one of (B, K, M, G). ' + 'Example: 16G, 512M. If no unit is specified, G is assumed by default.', +) + + +def _ValidateSharedBufferSizeFlagValue(value: str) -> bool: + """Validates the shared buffer size flag's format.""" + # Checks for one or more digits, optionally followed by B, K, M, or G. + return bool(re.fullmatch(r'^\d+[BKMG]?$', value)) + +flags.register_validator( + _SHARED_BUFFER_SIZE, + _ValidateSharedBufferSizeFlagValue, + message=( + '--postgresql_shared_buffer_size must be in the format [] ' + 'where is one of (B, K, M, G). Example: 16G, 512M, 1024K, 2048B.' + ) ) _MEASURE_MAX_QPS = flags.DEFINE_bool( 'postgresql_measure_max_qps', @@ -123,6 +141,16 @@ ) +def GetBufferSize() -> str: + """Returns the buffer key for the given buffer size.""" + buffer_size = _SHARED_BUFFER_SIZE.value + if buffer_size.endswith( + ('B', 'K', 'M', 'G') + ): + return buffer_size + return f'{buffer_size}G' + + def GetConfig(user_config): """Get the benchmark config, applying user overrides. @@ -185,7 +213,7 @@ def Prepare(benchmark_spec: bm_spec.BenchmarkSpec): postgresql.ConfigureAndRestart( primary_server, FLAGS.run_uri, - _SHARED_BUFFER_SIZE.value, + GetBufferSize(), _CONF_TEMPLATE_PATH.value, ) for index, replica in enumerate(replica_servers): @@ -194,7 +222,7 @@ def Prepare(benchmark_spec: bm_spec.BenchmarkSpec): replica, index, FLAGS.run_uri, - _SHARED_BUFFER_SIZE.value, + GetBufferSize(), _CONF_TEMPLATE_PATH.value, ) clients = benchmark_spec.vm_groups['client'] diff --git a/perfkitbenchmarker/linux_packages/hadoop.py b/perfkitbenchmarker/linux_packages/hadoop.py index 42c67baf19..e7528b32c8 100644 --- a/perfkitbenchmarker/linux_packages/hadoop.py +++ b/perfkitbenchmarker/linux_packages/hadoop.py @@ -37,7 +37,12 @@ FLAGS = flags.FLAGS -_VERSION = flags.DEFINE_string('hadoop_version', None, 'Version of Hadoop.') +_VERSION = flags.DEFINE_string( + 'hadoop_version', + '3.4.1', + 'Version of Hadoop. Default is 3.4.1 to get around' + ' https://issues.apache.org/jira/projects/INFRA/issues/INFRA-27182.', +) _URL_OVERRIDE = flags.DEFINE_string( 'hadoop_bin_url', None, 'Specify to override url from HADOOP_URL_BASE.' ) diff --git a/perfkitbenchmarker/linux_packages/mongodb_server.py b/perfkitbenchmarker/linux_packages/mongodb_server.py index 447703adec..2e1834ecb5 100644 --- a/perfkitbenchmarker/linux_packages/mongodb_server.py +++ b/perfkitbenchmarker/linux_packages/mongodb_server.py @@ -39,7 +39,7 @@ MONGOD_CONF_WIRED_TIGER_SESSION_MAX = flags.DEFINE_string( 'mongod_conf_wired_tiger_session_max', - None, + 'session_max=8192', 'An internal WiredTiger parameter that sets the maximum number of' ' concurrent sessions (or connections) the storage engine can handle. This' ' is often aligned with the maximum number of client connections. Default' @@ -48,7 +48,7 @@ MONGOD_CONF_NETWORK_MAX_INCOMING_CONNECTIONS = flags.DEFINE_integer( 'mongod_conf_network_max_incoming_connections', - None, + 8192, 'The maximum number of simultaneous client connections that the mongod' ' process will accept on network layer. Default for Windows is 1,000,000.' ' Default for Linux is (RLIMIT_NOFILE) * 0.8, which is OS-dependent.', diff --git a/perfkitbenchmarker/linux_packages/optimize_gpu.py b/perfkitbenchmarker/linux_packages/optimize_gpu.py index a09776c962..5dcd30233b 100644 --- a/perfkitbenchmarker/linux_packages/optimize_gpu.py +++ b/perfkitbenchmarker/linux_packages/optimize_gpu.py @@ -93,9 +93,9 @@ def SetContainerEnv(vm): return ( 'export LD_LIBRARY_PATH=' # pylint: disable=anomalous-backslash-in-string - '/opt/aws-ofi-nccl/lib:/opt/amazon/efa:\$LD_LIBRARY_PATH; ' + r'/opt/aws-ofi-nccl/lib:/opt/amazon/efa:\$LD_LIBRARY_PATH; ' # pylint: disable=anomalous-backslash-in-string - 'export PATH=/opt/amazon/openmpi/bin/:\$PATH; ' + r'export PATH=/opt/amazon/openmpi/bin/:\$PATH; ' 'export FI_PROVIDER=efa; export FI_EFA_USE_DEVICE_RDMA=1;' ) + tuner if FLAGS.cloud == 'GCP' and vm.machine_type == 'a3-megagpu-8g': @@ -112,7 +112,7 @@ def SetContainerEnv(vm): 'export NCCL_FASTRAK_USE_LLCM=1; ' 'export NCCL_FASTRAK_LLCM_DEVICE_DIRECTORY=/dev/aperture_devices; ' # pylint: disable=anomalous-backslash-in-string - 'export LD_LIBRARY_PATH=/var/lib/tcpxo/lib64:\$LD_LIBRARY_PATH; ' + r'export LD_LIBRARY_PATH=/var/lib/tcpxo/lib64:\$LD_LIBRARY_PATH; ' ) diff --git a/perfkitbenchmarker/linux_packages/postgresql.py b/perfkitbenchmarker/linux_packages/postgresql.py index 170a9e8f46..3594242ce6 100644 --- a/perfkitbenchmarker/linux_packages/postgresql.py +++ b/perfkitbenchmarker/linux_packages/postgresql.py @@ -33,25 +33,25 @@ SYSBENCH_PASSWORD = 'Syb3enCh#1' SHARED_BUFFERS_CONF = { - 'SIZE_10GB': { + 'SIZE_10G': { 'shared_buffers': '10GB', 'effective_cache_size': '30GB', 'max_memory': '40G', 'nr_hugepages': '5632', }, - 'SIZE_100GB': { + 'SIZE_100G': { 'shared_buffers': '100GB', 'effective_cache_size': '112.5GB', 'max_memory': '150G', 'nr_hugepages': '52736', }, - 'SIZE_80GB': { # run with 40M table size and 8 tables to write 80GB of data + 'SIZE_80G': { # run with 40M table size and 8 tables to write 80GB of data 'shared_buffers': '80GB', 'effective_cache_size': '90GB', 'max_memory': '120G', 'nr_hugepages': '45000', }, - 'SIZE_42GB': { + 'SIZE_42G': { # run with 21M table size and 8 tables to write 42GB of data 'shared_buffers': '42GB', 'effective_cache_size': '89.6GB', 'max_memory': '108.8G', @@ -215,7 +215,7 @@ def ConfigureAndRestart(vm, run_uri, buffer_size, conf_template_path): """ conf_path = GetOSDependentDefaults(vm.OS_TYPE)['conf_dir'] data_path = GetOSDependentDefaults(vm.OS_TYPE)['data_dir'] - buffer_size_key = f'SIZE_{buffer_size}GB' + buffer_size_key = f'SIZE_{buffer_size}' remote_temp_config = '/tmp/my.cnf' postgres_conf_path = os.path.join(conf_path, 'postgresql-custom.conf') pg_hba_conf_path = os.path.join(conf_path, 'pg_hba.conf') @@ -315,7 +315,7 @@ def SetupReplica( primary_vm, replica_vm, replica_id, run_uri, buffer_size, conf_template_path ): """Setup postgres replica.""" - buffer_size_key = f'SIZE_{buffer_size}GB' + buffer_size_key = f'SIZE_{buffer_size}' data_path = GetOSDependentDefaults(replica_vm.OS_TYPE)['data_dir'] conf_path = GetOSDependentDefaults(replica_vm.OS_TYPE)['conf_dir'] replica_vm.RemoteCommand(f'sudo mkdir -p {data_path}') diff --git a/perfkitbenchmarker/linux_packages/spark.py b/perfkitbenchmarker/linux_packages/spark.py index 15f68ec210..54834fff15 100644 --- a/perfkitbenchmarker/linux_packages/spark.py +++ b/perfkitbenchmarker/linux_packages/spark.py @@ -42,6 +42,14 @@ 'spark_version', None, 'Version of spark. Defaults to latest.' ) +_SHUFFLE_PARTITIONS = flags.DEFINE_integer( + 'spark_shuffle_partitions', + None, + 'Configure Spark shuffle partitions. See ' + 'https://spark.apache.org/docs/latest/sql-performance-tuning.html#tuning-partitions ' + 'for more details.', +) + DATA_FILES = [ 'spark/spark-defaults.conf.j2', 'spark/spark-env.sh.j2', @@ -173,6 +181,11 @@ def _RenderConfig( worker_cores = worker.NumCpusForBenchmark() worker_memory_mb = int((worker.total_memory_kb / 1024) * memory_fraction) driver_memory_mb = int((leader.total_memory_kb / 1024) * memory_fraction) + # Default to the recommended value from + # https://cloud.google.com/dataproc/docs/support/spark-job-tuning#configuring_partitions + shuffle_partitions = ( + _SHUFFLE_PARTITIONS.value or worker_cores * len(workers) * 3 + ) spark_conf = GetConfiguration( driver_memory_mb=driver_memory_mb, @@ -204,6 +217,7 @@ def _RenderConfig( 'hadoop_cmd': hadoop.HADOOP_CMD, 'python_cmd': 'python3', 'optional_tools': optional_tools, + 'shuffle_partitions': shuffle_partitions, } for file_name in DATA_FILES: diff --git a/perfkitbenchmarker/linux_packages/sysbench.py b/perfkitbenchmarker/linux_packages/sysbench.py index fa1796fa52..16b312c986 100644 --- a/perfkitbenchmarker/linux_packages/sysbench.py +++ b/perfkitbenchmarker/linux_packages/sysbench.py @@ -19,6 +19,7 @@ import logging import re import statistics +import time from absl import flags import immutabledict @@ -41,6 +42,12 @@ 'sysbench_max_commit_delay', None, 'Max commit delay for spanner oltp in ms' ) +_SYSBENCH_QPS_TIME_SERIES = flags.DEFINE_bool( + 'sysbench_qps_time_series', + False, + 'If true, collect qps time series.', +) + # release 1.0.20; committed Apr 24, 2020. When updating this, also update the # correct line for CONCURRENT_MODS, as it may have changed in between releases. DEFAULT_RELEASE_TAG = '1.0.20' @@ -199,6 +206,7 @@ def ParseSysbenchTimeSeries(sysbench_output, metadata) -> list[sample.Sample]: tps_numbers = [] latency_numbers = [] qps_numbers = [] + time_ms = [] for line in sysbench_output.split('\n'): # parse a line like (it's one line - broken up in the comment to fit): # [ 6s ] thds: 16 tps: 650.51 qps: 12938.26 (r/w/o: 9046.18/2592.05/1300.03) @@ -218,6 +226,11 @@ def ParseSysbenchTimeSeries(sysbench_output, metadata) -> list[sample.Sample]: qps_numbers.append(float(match.group(1))) if line.startswith('SQL statistics:'): break + if _SYSBENCH_QPS_TIME_SERIES.value: + match = re.search(r'\[ ([0-9]+)s \]', line) + if not match: + raise ValueError(f'no time in: {line}') + time_ms.append(float(match.group(1)) * 1000) tps_metadata = metadata.copy() tps_metadata.update({'tps': tps_numbers}) @@ -231,7 +244,28 @@ def ParseSysbenchTimeSeries(sysbench_output, metadata) -> list[sample.Sample]: qps_metadata.update({'qps': qps_numbers}) qps_sample = sample.Sample('qps_array', -1, 'qps', qps_metadata) - return [tps_sample, latency_sample, qps_sample] + samples = [tps_sample, latency_sample, qps_sample] + if _SYSBENCH_QPS_TIME_SERIES.value: + if len(time_ms) != len(qps_numbers): + raise ValueError( + f'time_ms and qps_numbers have different lengths: {time_ms},' + f' {qps_numbers}' + ) + # There might be small inaccuracy here owing to the time gap between the + # instant last metric was recorded and the instant time.time() was invoked. + start_time = (time.time() * 1000) - time_ms[-1] + timestamps = [start_time + time_ms[i] for i in range(len(time_ms))] + qps_time_series = sample.CreateTimeSeriesSample( + qps_numbers, + timestamps, + sample.QPS_TIME_SERIES, + 'qps', + 1, + additional_metadata=qps_metadata, + ) + samples.append(qps_time_series) + + return samples def ParseSysbenchLatency( diff --git a/perfkitbenchmarker/pkb.py b/perfkitbenchmarker/pkb.py index 20368256b3..34e12a0642 100644 --- a/perfkitbenchmarker/pkb.py +++ b/perfkitbenchmarker/pkb.py @@ -96,6 +96,7 @@ from perfkitbenchmarker import providers from perfkitbenchmarker import publisher from perfkitbenchmarker import requirements +from perfkitbenchmarker import resources from perfkitbenchmarker import sample from perfkitbenchmarker import stages from perfkitbenchmarker import static_virtual_machine @@ -548,6 +549,7 @@ def _CreateBenchmarkSpecs(): Returns: A list of BenchmarkSpecs. """ + resources.LoadModules() specs = [] benchmark_tuple_list = benchmark_sets.GetBenchmarksFromFlags() benchmark_counts = collections.defaultdict(itertools.count) diff --git a/perfkitbenchmarker/providers/gcp/bigquery.py b/perfkitbenchmarker/providers/gcp/bigquery.py index 1de7535ba8..00e9d4652c 100644 --- a/perfkitbenchmarker/providers/gcp/bigquery.py +++ b/perfkitbenchmarker/providers/gcp/bigquery.py @@ -431,7 +431,7 @@ def Prepare(self, package_name: str) -> None: ) self.client_vm.RemoteCommand('python3 -m venv .venv') self.client_vm.RemoteCommand( - 'source .venv/bin/activate && pip install google-cloud-bigquery' + 'source .venv/bin/activate && pip install google-cloud-bigquery absl-py' ' google-cloud-bigquery-storage pyarrow' ) @@ -439,6 +439,12 @@ def Prepare(self, package_name: str) -> None: self.client_vm.PushDataFile( os.path.join(BQ_PYTHON_CLIENT_DIR, BQ_PYTHON_CLIENT_FILE) ) + self.client_vm.PushDataFile( + os.path.join( + edw_service.EDW_PYTHON_DRIVER_LIB_DIR, + edw_service.EDW_PYTHON_DRIVER_LIB_FILE, + ) + ) def ExecuteQuery( self, query_name: str, print_results: bool = False diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc_serverless_prices.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc_serverless_prices.py index 997dd91109..a71542c9f7 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc_serverless_prices.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc_serverless_prices.py @@ -9,277 +9,277 @@ 'standard': { 'us-west1': { 'usd_per_milli_dcu_sec': 0.06 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'us-west2': { 'usd_per_milli_dcu_sec': 0.072071 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'us-west3': { 'usd_per_milli_dcu_sec': 0.072071 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'us-west4': { 'usd_per_milli_dcu_sec': 0.067572 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'us-east1': { 'usd_per_milli_dcu_sec': 0.06 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'us-east4': { 'usd_per_milli_dcu_sec': 0.067572 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'us-east5': { 'usd_per_milli_dcu_sec': 0.06 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'us-central1': { 'usd_per_milli_dcu_sec': 0.06 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'us-south1': { 'usd_per_milli_dcu_sec': 0.0708 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.0472 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.0472 / 720 / 3600, }, 'europe-north1': { 'usd_per_milli_dcu_sec': 0.066062 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'europe-west1': { 'usd_per_milli_dcu_sec': 0.066007 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'europe-west2': { 'usd_per_milli_dcu_sec': 0.077307 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'europe-west3': { 'usd_per_milli_dcu_sec': 0.077307 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'europe-west4': { 'usd_per_milli_dcu_sec': 0.066057 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'europe-west6': { - 'usd_per_milli_dcu_sec': 0.083955 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.052 / 744 / 3600, + 'usd_per_milli_dcu_sec': 0.0786 / 1000 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.052 / 720 / 3600, }, 'europe-west8': { 'usd_per_milli_dcu_sec': 0.0696 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.0464 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.0464 / 720 / 3600, }, 'europe-west9': { 'usd_per_milli_dcu_sec': 0.0696 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.0464 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.0464 / 720 / 3600, }, 'europe-central2': { - 'usd_per_milli_dcu_sec': 0.077307 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_milli_dcu_sec': 0.0726 / 1000 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'europe-southwest1': { 'usd_per_milli_dcu_sec': 0.0708 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.047 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.047 / 720 / 3600, }, 'asia-northeast1': { 'usd_per_milli_dcu_sec': 0.076976 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.052 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.052 / 720 / 3600, }, 'asia-northeast2': { 'usd_per_milli_dcu_sec': 0.076976 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.052 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.052 / 720 / 3600, }, 'asia-northeast3': { 'usd_per_milli_dcu_sec': 0.076976 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.052 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.052 / 720 / 3600, }, 'asia-east1': { 'usd_per_milli_dcu_sec': 0.069477 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.04 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.04 / 720 / 3600, }, 'asia-east2': { 'usd_per_milli_dcu_sec': 0.083955 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.05 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.05 / 720 / 3600, }, 'asia-south1': { 'usd_per_milli_dcu_sec': 0.072071 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'asia-south2': { 'usd_per_milli_dcu_sec': 0.072071 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.048 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.048 / 720 / 3600, }, 'asia-southeast1': { 'usd_per_milli_dcu_sec': 0.074015 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'asia-southeast2': { 'usd_per_milli_dcu_sec': 0.080674 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.052 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.052 / 720 / 3600, }, 'australia-southeast1': { 'usd_per_milli_dcu_sec': 0.085135 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.054 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.054 / 720 / 3600, }, 'australia-southeast2': { 'usd_per_milli_dcu_sec': 0.085135 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.054 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.054 / 720 / 3600, }, 'northamerica-northeast1': { 'usd_per_milli_dcu_sec': 0.066057 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'northamerica-northeast2': { 'usd_per_milli_dcu_sec': 0.066057 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.044 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.044 / 720 / 3600, }, 'southamerica-east1': { 'usd_per_milli_dcu_sec': 0.095246 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.06 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.06 / 720 / 3600, }, 'southamerica-west1': { 'usd_per_milli_dcu_sec': 0.08581 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.057 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.057 / 720 / 3600, }, }, 'premium': { 'us-west1': { 'usd_per_milli_dcu_sec': 0.089 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'us-west2': { 'usd_per_milli_dcu_sec': 0.106905 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'us-west3': { 'usd_per_milli_dcu_sec': 0.106905 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'us-west4': { 'usd_per_milli_dcu_sec': 0.100232 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'us-east1': { 'usd_per_milli_dcu_sec': 0.089 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'us-east4': { 'usd_per_milli_dcu_sec': 0.100232 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'us-east5': { 'usd_per_milli_dcu_sec': 0.089 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'us-central1': { 'usd_per_milli_dcu_sec': 0.089 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'us-south1': { 'usd_per_milli_dcu_sec': 0.10502 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.118 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.118 / 720 / 3600, }, 'europe-north1': { 'usd_per_milli_dcu_sec': 0.097992 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'europe-west1': { 'usd_per_milli_dcu_sec': 0.09791 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'europe-west2': { 'usd_per_milli_dcu_sec': 0.114672 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'europe-west3': { 'usd_per_milli_dcu_sec': 0.114672 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'europe-west4': { 'usd_per_milli_dcu_sec': 0.097984 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'europe-west6': { - 'usd_per_milli_dcu_sec': 0.124533 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.13 / 744 / 3600, + 'usd_per_milli_dcu_sec': 0.11659 / 1000 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.13 / 720 / 3600, }, 'europe-west8': { 'usd_per_milli_dcu_sec': 0.10324 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.116 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.116 / 720 / 3600, }, 'europe-west9': { 'usd_per_milli_dcu_sec': 0.10324 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.116 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.116 / 720 / 3600, }, 'europe-central2': { - 'usd_per_milli_dcu_sec': 0.114672 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_milli_dcu_sec': 0.10769 / 1000 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'europe-southwest1': { 'usd_per_milli_dcu_sec': 0.10502 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.118 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.118 / 720 / 3600, }, 'asia-northeast1': { 'usd_per_milli_dcu_sec': 0.114181 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.13 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.13 / 720 / 3600, }, 'asia-northeast2': { 'usd_per_milli_dcu_sec': 0.114181 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.13 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.13 / 720 / 3600, }, 'asia-northeast3': { 'usd_per_milli_dcu_sec': 0.114181 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.13 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.13 / 720 / 3600, }, 'asia-east1': { 'usd_per_milli_dcu_sec': 0.103058 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.1 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.1 / 720 / 3600, }, 'asia-east2': { 'usd_per_milli_dcu_sec': 0.124533 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.125 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.125 / 720 / 3600, }, 'asia-south1': { 'usd_per_milli_dcu_sec': 0.106905 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'asia-south2': { 'usd_per_milli_dcu_sec': 0.106905 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.12 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.12 / 720 / 3600, }, 'asia-southeast1': { 'usd_per_milli_dcu_sec': 0.10979 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'asia-southeast2': { 'usd_per_milli_dcu_sec': 0.119666 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.13 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.13 / 720 / 3600, }, 'australia-southeast1': { 'usd_per_milli_dcu_sec': 0.126284 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.135 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.135 / 720 / 3600, }, 'australia-southeast2': { 'usd_per_milli_dcu_sec': 0.126284 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.135 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.135 / 720 / 3600, }, 'northamerica-northeast1': { 'usd_per_milli_dcu_sec': 0.097984 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'northamerica-northeast2': { 'usd_per_milli_dcu_sec': 0.097984 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.11 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.11 / 720 / 3600, }, 'southamerica-east1': { 'usd_per_milli_dcu_sec': 0.141282 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.15 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.15 / 720 / 3600, }, 'southamerica-west1': { 'usd_per_milli_dcu_sec': 0.127285 / 1000 / 3600, - 'usd_per_shuffle_storage_gb_sec': 0.143 / 744 / 3600, + 'usd_per_shuffle_storage_gb_sec': 0.143 / 720 / 3600, }, }, } diff --git a/perfkitbenchmarker/providers/snowflake/snowflake.py b/perfkitbenchmarker/providers/snowflake/snowflake.py index 9f59d2b66b..21e389db0f 100644 --- a/perfkitbenchmarker/providers/snowflake/snowflake.py +++ b/perfkitbenchmarker/providers/snowflake/snowflake.py @@ -21,6 +21,7 @@ import copy import json import logging +import os from typing import Any, Union from absl import flags from perfkitbenchmarker import data @@ -34,15 +35,35 @@ provider_info.AWS: 'snowflake-jdbc-client-2.14-enterprise.jar', provider_info.AZURE: 'snowflake-jdbc-client-azure-external-2.0.jar', } +# The python client is developed internally as of now. This requires a client +# library which should call into the DB. Contact p3rf-team [at] google [dot] com +# if you would like to run this benchmark to get the file. +SNOWFLAKE_PYTHON_CLIENT_FILE = 'sf_python_driver.py' +SNOWFLAKE_PYTHON_CLIENT_DIR = 'edw/snowflake/clients/python' -class JdbcClientInterface(edw_service.EdwClientInterface): +class SnowflakeClientInterface(edw_service.EdwClientInterface): + """Base client interface for Snowflake.""" + + warehouse: str + database: str + schema: str + + def __init__(self, warehouse: str, database: str, schema: str): + super().__init__() + self.warehouse = warehouse + self.database = database + self.schema = schema + + def GetMetadata(self) -> dict[str, str]: + """Gets the Metadata attributes for the Client Interface.""" + return {'client': FLAGS.snowflake_client_interface} + + +class JdbcClientInterface(SnowflakeClientInterface): """Jdbc Client Interface class for Snowflake. Attributes: - warehouse: String name of the virtual warehouse used during benchmark - database: String name of the database to benchmark - schema: String name of the schema to benchmark jdbc_client: String filename of the JDBC client associated with the Snowflake backend being tested (AWS/Azure/etc.) """ @@ -50,9 +71,7 @@ class JdbcClientInterface(edw_service.EdwClientInterface): def __init__( self, warehouse: str, database: str, schema: str, jdbc_client: str ): - self.warehouse = warehouse - self.database = database - self.schema = schema + super().__init__(warehouse, database, schema) self.jdbc_client = jdbc_client def Prepare(self, package_name: str) -> None: @@ -146,14 +165,124 @@ def ExecuteThroughput( stdout, _ = self.client_vm.RemoteCommand(query_command) return stdout - def GetMetadata(self) -> dict[str, str]: - """Gets the Metadata attributes for the Client Interface.""" - return {'client': FLAGS.snowflake_client_interface} + +class PythonClientInterface(SnowflakeClientInterface): + """Python Client Interface class for Snowflake. + + Attributes: + user: The user to connect to snowflake with. + account: The snowflake account. + """ + + def __init__( + self, + warehouse: str, + database: str, + schema: str, + user: str, + account: str, + ): + super().__init__(warehouse, database, schema) + self.user = user + self.account = account + + def Prepare(self, package_name: str) -> None: + """Prepares the client vm to execute query.""" + + # Push the private key to the working directory on client vm + if FLAGS.snowflake_jdbc_key_file: + self.client_vm.PushFile(FLAGS.snowflake_jdbc_key_file) + else: + self.client_vm.InstallPreprovisionedPackageData( + package_name, ['snowflake_keyfile.p8'], '' + ) + + # Install dependencies for driver + self.client_vm.Install('pip') + self.client_vm.RemoteCommand( + 'sudo apt-get -qq update && DEBIAN_FRONTEND=noninteractive sudo apt-get' + ' -qq install python3.12-venv' + ) + self.client_vm.RemoteCommand('python3 -m venv .venv') + self.client_vm.RemoteCommand( + 'source .venv/bin/activate && pip install snowflake-connector-python' + ' pyarrow absl-py pandas' + ) + + # Push driver script to client vm + self.client_vm.PushDataFile( + os.path.join(SNOWFLAKE_PYTHON_CLIENT_DIR, SNOWFLAKE_PYTHON_CLIENT_FILE) + ) + self.client_vm.PushDataFile( + os.path.join( + edw_service.EDW_PYTHON_DRIVER_LIB_DIR, + edw_service.EDW_PYTHON_DRIVER_LIB_FILE, + ) + ) + + def _RunPythonClientCommand( + self, command: str, additional_args: list[str] + ) -> str: + """Runs a command on the python client. + + Args: + command: The command to run (e.g. 'single', 'throughput'). + additional_args: A list of additional arguments for the command. + + Returns: + The stdout from the command. + """ + cmd_parts = [ + f'.venv/bin/python {SNOWFLAKE_PYTHON_CLIENT_FILE}', + command, + f'--warehouse {self.warehouse}', + f'--database {self.database}', + f'--schema {self.schema}', + f'--user {self.user}', + f'--account {self.account}', + '--credentials_file snowflake_keyfile.p8', + ] + cmd_parts.extend(additional_args) + cmd = ' '.join(cmd_parts) + stdout, _ = self.client_vm.RemoteCommand(cmd) + return stdout + + def ExecuteQuery( + self, query_name: str, print_results: bool = False + ) -> tuple[float, dict[str, Any]]: + """Executes a query and returns performance details.""" + args = [f'--query_file {query_name}'] + if print_results: + args.append('--print_results') + stdout = self._RunPythonClientCommand('single', args) + details = copy.copy(self.GetMetadata()) + details.update(json.loads(stdout)['details']) + return json.loads(stdout)['query_wall_time_in_secs'], details + + def ExecuteThroughput( + self, + concurrency_streams: list[list[str]], + labels: dict[str, str] | None = None, + ) -> str: + """Executes queries simultaneously on client and return performance details.""" + del labels # Currently not supported by this implementation. + args = [f"--query_streams='{json.dumps(concurrency_streams)}'"] + return self._RunPythonClientCommand('throughput', args) + + def RunQueryWithResults(self, query_name: str) -> str: + """Executes a query and returns performance details and query output.""" + args = [f'--query_file {query_name}', '--print_results'] + return self._RunPythonClientCommand('single', args) def GetSnowflakeClientInterface( - warehouse: str, database: str, schema: str, cloud: str -) -> JdbcClientInterface: + warehouse: str, + database: str, + schema: str, + cloud: str, + user: str | None, + account: str | None, +) -> SnowflakeClientInterface: """Builds and Returns the requested Snowflake client Interface. Args: @@ -162,6 +291,8 @@ def GetSnowflakeClientInterface( database: String name of the Snowflake database to use during the benchmark schema: String name of the Snowflake schema to use during the benchmark cloud: String ID of the cloud service the client interface is for + user: The user to connect to snowflake with. + account: The snowflake account. Returns: A concrete Client Interface object (subclass of EdwClientInterface) @@ -173,6 +304,15 @@ def GetSnowflakeClientInterface( return JdbcClientInterface( warehouse, database, schema, JdbcClientDict[cloud] ) + elif FLAGS.snowflake_client_interface == 'PYTHON': + assert user and account + return PythonClientInterface( + warehouse, + database, + schema, + user, + account, + ) raise RuntimeError('Unknown Snowflake Client Interface requested.') @@ -187,8 +327,15 @@ def __init__(self, edw_service_spec): self.warehouse = FLAGS.snowflake_warehouse self.database = FLAGS.snowflake_database self.schema = FLAGS.snowflake_schema + self.user = FLAGS.snowflake_user + self.account = FLAGS.snowflake_account self.client_interface = GetSnowflakeClientInterface( - self.warehouse, self.database, self.schema, self.CLOUD + self.warehouse, + self.database, + self.schema, + self.CLOUD, + self.user, + self.account, ) def IsUserManaged(self, edw_service_spec): @@ -361,6 +508,10 @@ def GetMetadata(self): basic_data['warehouse'] = self.warehouse basic_data['database'] = self.database basic_data['schema'] = self.schema + if self.user: + basic_data['user'] = self.user + if self.account: + basic_data['account'] = self.account basic_data.update(self.client_interface.GetMetadata()) return basic_data diff --git a/perfkitbenchmarker/resources/__init__.py b/perfkitbenchmarker/resources/__init__.py index ccc7d7d879..c1f5e6258b 100644 --- a/perfkitbenchmarker/resources/__init__.py +++ b/perfkitbenchmarker/resources/__init__.py @@ -17,6 +17,3 @@ def LoadModules(): list(import_util.LoadModulesForPath(__path__, __name__)) - - -LoadModules() diff --git a/perfkitbenchmarker/resources/kubernetes/__init__.py b/perfkitbenchmarker/resources/kubernetes/__init__.py deleted file mode 100644 index be43b42116..0000000000 --- a/perfkitbenchmarker/resources/kubernetes/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Init file for the kubernetes package. - -Loads all modules, neccesary for registering ResourceTypes and SpecClasses. -""" - - -from perfkitbenchmarker import import_util - - -def LoadModules(): - list(import_util.LoadModulesForPath(__path__, __name__)) - - -LoadModules() diff --git a/perfkitbenchmarker/resources/kubernetes/wg_serving_inference_server.py b/perfkitbenchmarker/resources/kubernetes/wg_serving_inference_server.py index 19dccb5657..287bf3d058 100644 --- a/perfkitbenchmarker/resources/kubernetes/wg_serving_inference_server.py +++ b/perfkitbenchmarker/resources/kubernetes/wg_serving_inference_server.py @@ -884,11 +884,50 @@ def _Create(self) -> None: self.created_resources.extend(created_resources) deployment_name = self.deployment_metadata['metadata']['name'] - self.cluster.WaitForResource( - f'deployment/{deployment_name}', - 'available', - timeout=self.spec.deployment_timeout, - ) + try: + self.cluster.WaitForResource( + f'deployment/{deployment_name}', + 'available', + timeout=self.spec.deployment_timeout, + ) + except errors.VmUtil.IssueCommandError as e: + pods = self.cluster.GetResourceMetadataByName( + 'pods', + f'app={self.app_selector}', + output_format='name', + output_formatter=lambda res: res.splitlines(), + ) + events = self.cluster.GetEvents() + quota_failure = False + for pod in pods: + pod_name = pod.split('/')[1] + status_cmd = [ + 'get', + 'pod', + pod.split('/')[1], + '-o', + 'jsonpath={.status.phase}', + ] + status, _, _ = container_service.RunKubectlCommand(status_cmd) + if 'Pending' not in status: + continue + for event in events: + if ( + event.resource.kind == 'Pod' + and event.resource.name == pod_name + and 'GCE out of resources' in event.message + ): + quota_failure = True + break + if 'timed out waiting for the condition' in str(e) and quota_failure: + raise errors.Benchmarks.QuotaFailure( + f'TIMED OUT: Deployment {deployment_name} did not become available' + f' within {self.spec.deployment_timeout} seconds. This can be due' + ' to issues like resource exhaustion, but can also be due to image' + f' pull errors, or pod scheduling problems. Original error: {e}' + ) from e + else: + raise e def _ApplyGCSFusePVC(self): """Apply the PV & PVC to the environment.""" diff --git a/perfkitbenchmarker/resources/pinecone/__init__.py b/perfkitbenchmarker/resources/pinecone/__init__.py deleted file mode 100644 index 07e5b39b12..0000000000 --- a/perfkitbenchmarker/resources/pinecone/__init__.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2024 PerfKitBenchmarker Authors. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Init file for the pinecone package. - -Loads all modules, necessary for registering ResourceTypes and SpecClasses. -""" - - -from perfkitbenchmarker import import_util - - -def LoadModules(): - list(import_util.LoadModulesForPath(__path__, __name__)) - - -LoadModules() diff --git a/perfkitbenchmarker/sample.py b/perfkitbenchmarker/sample.py index d3a5f72ef4..39bf1de364 100644 --- a/perfkitbenchmarker/sample.py +++ b/perfkitbenchmarker/sample.py @@ -35,6 +35,7 @@ TPM_TIME_SERIES = 'TPM_time_series' OPS_TIME_SERIES = 'OPS_time_series' LATENCY_TIME_SERIES = 'Latency_time_series' +QPS_TIME_SERIES = 'QPS_time_series' # Metadata for time series VALUES = 'values' diff --git a/perfkitbenchmarker/time_triggers/maintenance_simulation_trigger.py b/perfkitbenchmarker/time_triggers/maintenance_simulation_trigger.py index 1a2fe70352..36304acd04 100644 --- a/perfkitbenchmarker/time_triggers/maintenance_simulation_trigger.py +++ b/perfkitbenchmarker/time_triggers/maintenance_simulation_trigger.py @@ -28,6 +28,7 @@ TIME_SERIES_SAMPLES_FOR_AGGREGATION = [ sample.TPM_TIME_SERIES, sample.OPS_TIME_SERIES, + sample.QPS_TIME_SERIES, ] PERCENTILES = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] @@ -254,7 +255,6 @@ def _AggregateThroughputSample(self, s: sample.Sample) -> List[sample.Sample]: ) + self._ComputeLossWork( median, values_after_lm_starts, interval, metadata ) - if values_after_lm_ends: mean_after_lm_ends = statistics.mean(values_after_lm_ends) samples += self._ComputeDegradation(mean, mean_after_lm_ends, metadata) diff --git a/perfkitbenchmarker/windows_benchmarks/diskspd_benchmark.py b/perfkitbenchmarker/windows_benchmarks/diskspd_benchmark.py index 30dee98a4d..2fdc796624 100644 --- a/perfkitbenchmarker/windows_benchmarks/diskspd_benchmark.py +++ b/perfkitbenchmarker/windows_benchmarks/diskspd_benchmark.py @@ -39,7 +39,7 @@ def GetConfig(user_config): def Prepare(benchmark_spec): vm = benchmark_spec.vms[0] vm.Install('diskspd') - if FLAGS.diskspd_prefill: + if FLAGS.diskspd_prefill_duration is not None: diskspd.Prefill(vm) diff --git a/perfkitbenchmarker/windows_packages/diskspd.py b/perfkitbenchmarker/windows_packages/diskspd.py index 92ae435143..0710f39e35 100644 --- a/perfkitbenchmarker/windows_packages/diskspd.py +++ b/perfkitbenchmarker/windows_packages/diskspd.py @@ -26,7 +26,6 @@ import xml.etree.ElementTree from absl import flags -from perfkitbenchmarker import background_tasks from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import sample @@ -38,12 +37,6 @@ LATENCY_PERCENTILES = [50, 90, 95, 99, 99.9, 99.99, 99.999, 99.9999, 99.99999] SampleData = collections.namedtuple('SampleData', ['metric', 'value', 'unit']) -flags.DEFINE_boolean( - 'diskspd_prefill', - False, - 'If we want to prefill the file with random data before running the test.', -) - flags.DEFINE_integer( 'diskspd_prefill_duration', None, @@ -54,25 +47,26 @@ flags.DEFINE_integer( 'diskspd_duration', - 20, - 'The number of seconds to run diskspd test.Defaults to 30s. Unit: seconds.', + 300, + 'The number of seconds to run diskspd test.Defaults to 300s. Unit:' + ' seconds.', ) flags.DEFINE_integer( 'diskspd_warmup', - 5, + 120, 'The warm up time for diskspd, the time needed to enter' 'steady state of I/O operation. ' - 'Defaults to 5s. Unit: seconds.', + 'Defaults to 120s. Unit: seconds.', ) flags.DEFINE_integer( 'diskspd_cooldown', - 5, + 30, 'The cool down time for diskspd, the time to ensure that' 'each instance of diskspd is active during each' 'measurement period of each instance. ' - 'Defaults: 5s. Unit: seconds', + 'Defaults: 30s. Unit: seconds', ) flags.DEFINE_enum( @@ -114,21 +108,23 @@ 'Please specify Unit for --diskspd_block_size', ) -flags.DEFINE_integer( - 'diskspd_stride_or_alignment', +DISKSPD_STRIDE = flags.DEFINE_string( + 'diskspd_stride', None, - 'If the access pattern is sequential, then this value' - 'means the stride for the access' - 'If the access pattern is random, then this value means' - 'the specified number of bytes that random I/O aligns to.' - 'Defaults: None.', + 'Stride is for sequential access, specifies the offset for an IO operation.' + ' For example, if a 64KiB stride is chosen for a 4KB block size, the first' + ' I/O will be at zero, the second at 64KB and so forth. Please use B|K|M|G' + ' to specify unit. Defaults: None.', ) -flags.DEFINE_enum( - 'diskspd_stride_or_alignment_unit', - 'K', - ['K', 'M', 'G', 'b'], - 'The unit of the stride_or_alignment,available option: K|M|G|bDefaults: K.', +flags.register_validator( + DISKSPD_STRIDE, + lambda x: x is None or x[-1] in ['B', 'K', 'M', 'G'] + and FLAGS.diskspd_access_pattern != 'r', + message=( + 'Diskspd_stride is only supported for sequential access pattern. Please' + ' specify Unit for --diskspd_stride.' + ), ) flags.DEFINE_bool( @@ -226,16 +222,9 @@ def Install(vm): def _RunDiskSpdWithOptions(vm, options): - total_runtime = ( - FLAGS.diskspd_warmup + FLAGS.diskspd_cooldown + FLAGS.diskspd_duration - ) - timeout_duration = total_runtime * DISKSPD_TIMEOUT_MULTIPLIER - diskspd_exe_dir = ntpath.join(vm.temp_dir, 'x86') - command = 'cd {diskspd_exe_dir}; .\\diskspd.exe {diskspd_options}'.format( - diskspd_exe_dir=diskspd_exe_dir, diskspd_options=options - ) - vm.RobustRemoteCommand(command, timeout=timeout_duration) + command = f'cd {diskspd_exe_dir}; .\\diskspd.exe {options}' + vm.RobustRemoteCommand(command) def _RemoveXml(vm): @@ -256,15 +245,21 @@ def _RemoveTempFile(vm): vm.RemoteCommand(rm_command, ignore_failure=True) -def _RunDiskSpd( - running_vm, outstanding_io, threads, metadata -): +def EnablePrefill(): + return ( + FLAGS.diskspd_prefill_duration is not None + and FLAGS.diskspd_prefill_duration != 0 + ) + + +def _RunDiskSpd(running_vm, outstanding_io, threads, metadata): """Run single iteration of Diskspd test.""" diskspd_config = _GenerateDiskspdConfig(outstanding_io, threads) - process_args = [(_RunDiskSpdWithOptions, (running_vm, diskspd_config), {})] - background_tasks.RunParallelProcesses(process_args, 200) + _RunDiskSpdWithOptions(running_vm, diskspd_config) result_xml = _CatXml(running_vm) - _RemoveTempFile(running_vm) + if not EnablePrefill(): + # Only remove the temp file if we did not prefill the file. + _RemoveTempFile(running_vm) _RemoveXml(running_vm) return ParseDiskSpdResults(result_xml, metadata) @@ -281,17 +276,14 @@ def _GenerateDiskspdConfig(outstanding_io, threads): access_pattern = FLAGS.diskspd_access_pattern diskspd_write_read_ratio = FLAGS.diskspd_write_read_ratio diskspd_block_size = FLAGS.diskspd_block_size - access_pattern_string = '' - if FLAGS.diskspd_stride_or_alignment: - access_pattern_string = ( - '-' - + str(access_pattern) - + str(FLAGS.diskspd_stride_or_alignment) - + str(FLAGS.diskspd_stride_or_alignment_unit) - ) os_hint = access_pattern if os_hint == 'si': os_hint = 's' + if DISKSPD_STRIDE.value: + access_pattern = ( + str(access_pattern) + + DISKSPD_STRIDE.value + ) throughput_per_ms_string = '' if FLAGS.diskspd_throughput_per_ms: @@ -304,7 +296,7 @@ def _GenerateDiskspdConfig(outstanding_io, threads): f' {large_page_string} {disable_affinity_string}' f' {software_cache_string} {write_through_string}' f' {throughput_per_ms_string} -b{diskspd_block_size}' - f' -{access_pattern} -f{os_hint} {access_pattern_string}' + f' -{access_pattern} -f{os_hint}' f' F:\\{DISKSPD_TMPFILE} > {DISKSPD_XMLFILE}' ) @@ -319,10 +311,7 @@ def RunDiskSpd(running_vm): # add the flag information to the metadata # some of the flags information has been included in the xml file - metadata['diskspd_stride_or_alignment'] = FLAGS.diskspd_stride_or_alignment - metadata['diskspd_stride_or_alignment_unit'] = ( - FLAGS.diskspd_stride_or_alignment_unit - ) + metadata['diskspd_stride'] = DISKSPD_STRIDE.value metadata['diskspd_large_page'] = FLAGS.diskspd_large_page metadata['diskspd_latency_stats'] = FLAGS.diskspd_latency_stats metadata['diskspd_disable_affinity'] = FLAGS.diskspd_disable_affinity @@ -353,7 +342,7 @@ def RunDiskSpd(running_vm): ) ) except errors.VirtualMachine.RemoteCommandError as e: - if 'Could not allocate a buffer for target' in e: + if 'Could not allocate a buffer for target' in str(e): logging.exception( 'Diskspd is not able to allocate buffer for this configuration,' ' try using smaller block size or reduce outstanding io or' @@ -376,21 +365,52 @@ def Prefill(running_vm): Args: running_vm: The VM to prefill the file on. """ - if not FLAGS.diskspd_prefill: + if not EnablePrefill(): + logging.info('Prefill duration is not set, skipping prefill.') return - prefill_duration = FLAGS.diskspd_prefill_duration - if prefill_duration is None: - raise errors.Setup.InvalidConfigurationError( - '--diskspd_prefill_duration is None' - ) logging.info('Prefilling file with random data') + prefill_duration = FLAGS.diskspd_prefill_duration diskspd_exe_dir = ntpath.join(running_vm.temp_dir, 'x86') diskspd_options = ( - f'-c{FLAGS.diskspd_file_size} -t16 -w100 -b4k -d{prefill_duration} -Sw' - f' -Su -o16 -r C:\\scratch\\{DISKSPD_TMPFILE}' + f'-c{FLAGS.diskspd_file_size} -t16 -w100 -b4k -d{prefill_duration} -Rxml' + f' -Sw -Su -o16 -r C:\\scratch\\{DISKSPD_TMPFILE} >' + f' {DISKSPD_XMLFILE}' ) command = f'cd {diskspd_exe_dir}; .\\diskspd.exe {diskspd_options}' running_vm.RobustRemoteCommand(command) + result_xml = _CatXml(running_vm) + _RemoveXml(running_vm) + + prefill_samples = ParseDiskSpdResults(result_xml, {}) + for prefill_sample in prefill_samples: + if prefill_sample.metric != 'write_bandwidth': + continue + write_bandwidth = prefill_sample.value + total_seconds = float(prefill_sample.metadata['TestTimeSeconds']) + total_data_written, unit = GetDiskspdFileSizeInPrefillSizeUnit( + write_bandwidth, total_seconds + ) + logging.info('Prefill Data written: %s %s', total_data_written, unit) + if total_data_written < float(DISKSPD_FILE_SIZE.value[0:-1]): + logging.error( + 'Prefill data written is less than the file size, please check the' + ' prefill duration.' + ) + + +def GetDiskspdFileSizeInPrefillSizeUnit( + write_bandwidth: float, test_duration: float +) -> float: + """Returns the diskspd file size in GB.""" + unit = DISKSPD_FILE_SIZE.value[-1] + written_data = 0 + if unit == 'G': + written_data = (write_bandwidth * test_duration) / 1024 + elif unit == 'M': + written_data = write_bandwidth * test_duration + elif unit == 'K': + written_data = (write_bandwidth * test_duration) * 1024 + return written_data, unit def ParseDiskSpdResults(result_xml, metadata): @@ -470,9 +490,9 @@ def ParseDiskSpdResults(result_xml, metadata): testtime = float(metadata['TestTimeSeconds']) # calculate the read and write speed (Byte -> MB) - read_bandwidth = int(read_bytes / testtime / 1024 / 1024) - write_bandwidth = int(write_bytes / testtime / 1024 / 1024) - total_bandwidth = int(total_byte / testtime / 1024 / 1024) + read_bandwidth = ConvertTotalBytesToMBPerSecond(read_bytes, testtime) + write_bandwidth = ConvertTotalBytesToMBPerSecond(write_bytes, testtime) + total_bandwidth = ConvertTotalBytesToMBPerSecond(total_byte, testtime) # calculate the read write times per second read_iops = int(read_count / testtime) @@ -499,6 +519,13 @@ def ParseDiskSpdResults(result_xml, metadata): return samples +def ConvertTotalBytesToMBPerSecond( + bytes_value: int, test_duration: float +) -> float: + """Converts total bytes to MB per second.""" + return bytes_value / test_duration / 1024 / 1024 + + def ParseLatencyBucket(latency_bucket): """Parse latency percentile data from bucket xml tag.""" percentile = latency_bucket.find('Percentile') diff --git a/tests/linux_benchmarks/kubernetes_ai_inference_benchmark_test.py b/tests/linux_benchmarks/kubernetes_ai_inference_benchmark_test.py new file mode 100644 index 0000000000..ef74ddec6b --- /dev/null +++ b/tests/linux_benchmarks/kubernetes_ai_inference_benchmark_test.py @@ -0,0 +1,453 @@ +import copy +import datetime +import json +import os +import unittest +from unittest import mock +import zoneinfo + +from perfkitbenchmarker import container_service +from perfkitbenchmarker import sample +from perfkitbenchmarker.linux_benchmarks import kubernetes_ai_inference_benchmark +from perfkitbenchmarker.resources.kubernetes import wg_serving_inference_server +from perfkitbenchmarker.resources.kubernetes import wg_serving_inference_server_spec as k8s_spec +from tests import pkb_common_test_case + +ZoneInfo = zoneinfo.ZoneInfo +tz_string = os.environ.get('TZ', 'America/Los_Angeles') +tz = ZoneInfo(tz_string) +_SAMPLE_RESULT_DATA_BASE = { + 'metrics': {'metric1': 10.0, 'request_rate': 1.0}, + 'dimensions': { + 'model_id': 'model_a', + 'tokenizer_id': 'tokenizer_a', + 'backend': 'vllm', + }, + 'config': {'start_time': {'seconds': 1000000000}}, +} + +_SAMPLE_RESULT_DATA_ANOTHER = { + 'metrics': {'metric2': 20.0, 'request_rate': 2.0}, + 'dimensions': { + 'model_id': 'model_b', + 'tokenizer_id': 'tokenizer_b', + 'backend': 'tgi', + }, + 'config': {'start_time': {'seconds': 2000000000}}, +} + + +class KubernetesAiInferenceBenchmarkTest( + pkb_common_test_case.PkbCommonTestCase +): + + def setUp(self): + super().setUp() + self.mock_listdir = self.enter_context( + mock.patch.object( + kubernetes_ai_inference_benchmark.os, 'listdir', autospec=True + ) + ) + self.mock_open = self.enter_context( + mock.patch.object( + kubernetes_ai_inference_benchmark, + 'open', + new_callable=mock.mock_open, + ) + ) + self.mock_log_exception = self.enter_context( + mock.patch.object( + kubernetes_ai_inference_benchmark.logging, + 'exception', + autospec=True, + ) + ) + + def _create_mock_k8s_server(self, hpa_enabled=False): + mock_k8s_spec = mock.create_autospec( + k8s_spec.WGServingInferenceServerConfigSpec, instance=True + ) + mock_k8s_spec.model_server = 'mock_model_server' + mock_k8s_spec.hpa_enabled = hpa_enabled + mock_k8s_spec.name = 'mock_server_name' + mock_k8s_spec.app_selector = 'vllm_inference_server' + mock_k8s_spec.catalog_components = '' + mock_k8s_server = mock.create_autospec( + wg_serving_inference_server.WGServingInferenceServer, + instance=True, + ) + mock_k8s_server.spec = mock_k8s_spec + mock_k8s_server.model_id = 'mock_model' + mock_k8s_server.tokenizer_id = 'mock_tokenizer' + mock_k8s_server.pod_names = [] + mock_k8s_server.cluster = mock.create_autospec( + container_service.KubernetesCluster, instance=True + ) + fixed_datetime = datetime.datetime(2025, 7, 18, 10, 0, 0) + fixed_timestamp = fixed_datetime.timestamp() + mock_k8s_server.model_load_timestamp = fixed_timestamp + mock_k8s_server.GetResourceMetadata.return_value = { + 'hpa_enabled': hpa_enabled + } + mock_k8s_server.timezone = tz_string + return mock_k8s_server + + def test_prepare_result_success(self): + self.mock_listdir.return_value = ['result1.json', 'result2.json'] + data1_str = json.dumps(_SAMPLE_RESULT_DATA_BASE) + data2_str = json.dumps(_SAMPLE_RESULT_DATA_ANOTHER) + self.mock_open.side_effect = [ + mock.mock_open(read_data=data1_str).return_value, + mock.mock_open(read_data=data2_str).return_value, + ] + mock_k8s_server = self._create_mock_k8s_server(hpa_enabled=False) + mock_k8s_server.GetResourceMetadata.return_value = { + 'model': 'mock_model', + 'tokenizer_id': 'mock_tokenizer', + 'inference_server': 'mock_model_server', + 'hpa_enabled': False, + } + expected_samples = [ + sample.Sample( + metric='metric1', + value=10.0, + unit='', + metadata={ + 'request_rate': 1.0, + 'model': 'mock_model', + 'tokenizer_id': 'mock_tokenizer', + 'inference_server': 'mock_model_server', + 'hpa_enabled': False, + }, + timestamp=1000000000.0, + ), + sample.Sample( + metric='request_rate', + value=1.0, + unit='', + metadata={ + 'request_rate': 1.0, + 'model': 'mock_model', + 'tokenizer_id': 'mock_tokenizer', + 'inference_server': 'mock_model_server', + 'hpa_enabled': False, + }, + timestamp=1000000000.0, + ), + sample.Sample( + metric='metric2', + value=20.0, + unit='', + metadata={ + 'request_rate': 2.0, + 'model': 'mock_model', + 'tokenizer_id': 'mock_tokenizer', + 'inference_server': 'mock_model_server', + 'hpa_enabled': False, + }, + timestamp=2000000000.0, + ), + sample.Sample( + metric='request_rate', + value=2.0, + unit='', + metadata={ + 'request_rate': 2.0, + 'model': 'mock_model', + 'tokenizer_id': 'mock_tokenizer', + 'inference_server': 'mock_model_server', + 'hpa_enabled': False, + }, + timestamp=2000000000.0, + ), + ] + + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertCountEqual(results, expected_samples) + + expected_calls = [ + mock.call( + os.path.join('dummy_path', 'result1.json'), 'r', encoding='utf-8' + ), + mock.call( + os.path.join('dummy_path', 'result2.json'), 'r', encoding='utf-8' + ), + ] + self.mock_open.assert_has_calls(expected_calls, any_order=True) + + def test_prepare_result_hpa_enabled(self): + self.mock_listdir.return_value = ['result1.json'] + data1_str = json.dumps(_SAMPLE_RESULT_DATA_BASE) + self.mock_open.return_value = mock.mock_open( + read_data=data1_str + ).return_value + mock_k8s_server = self._create_mock_k8s_server(hpa_enabled=True) + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertTrue(results[0].metadata['hpa_enabled']) + self.assertTrue(results[1].metadata['hpa_enabled']) + + def test_prepare_result_empty_directory(self): + self.mock_listdir.return_value = [] + mock_k8s_server = self._create_mock_k8s_server() + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertEmpty(results) + + def test_prepare_result_no_json_files(self): + self.mock_listdir.return_value = ['result.txt', 'another_file.log'] + mock_k8s_server = self._create_mock_k8s_server() + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertEmpty(results) + self.mock_open.assert_not_called() + + def test_prepare_result_malformed_json(self): + self.mock_listdir.return_value = ['malformed.json'] + self.mock_open.return_value.read.return_value = 'not valid json {' + mock_k8s_server = self._create_mock_k8s_server() + with self.assertRaises(json.JSONDecodeError): + kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + + def test_prepare_result_missing_keys(self): + self.mock_listdir.return_value = ['missing_keys.json'] + data_missing_metrics = copy.deepcopy(_SAMPLE_RESULT_DATA_BASE) + del data_missing_metrics['metrics'] + self.mock_open.return_value.read.return_value = json.dumps( + data_missing_metrics + ) + mock_k8s_server = self._create_mock_k8s_server() + with self.assertRaises(KeyError): + kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server, 'dummy_path' + ) + + def test_prepare_result_skips_weighted_json(self): + self.mock_listdir.return_value = ['result1.json', 'result1.weighted.json'] + data1_str = json.dumps(_SAMPLE_RESULT_DATA_BASE) + self.mock_open.return_value.read.return_value = data1_str + mock_k8s_server = self._create_mock_k8s_server() + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertLen(results, 2) + self.assertEqual(results[0].metric, 'metric1') + self.mock_open.assert_called_once_with( + os.path.join('dummy_path', 'result1.json'), 'r', encoding='utf-8' + ) + + def test_prepare_result_timestamp_conversion(self): + self.mock_listdir.return_value = ['time_test.json'] + + data_with_specific_time = copy.deepcopy(_SAMPLE_RESULT_DATA_BASE) + timestamp_seconds = 1000000000 + data_with_specific_time['config']['start_time'][ + 'seconds' + ] = timestamp_seconds + + self.mock_open.return_value.read.return_value = json.dumps( + data_with_specific_time + ) + mock_k8s_server = self._create_mock_k8s_server() + results = kubernetes_ai_inference_benchmark.CollectBenchmarkResult( + mock_k8s_server.GetResourceMetadata(), 'dummy_path' + ) + self.assertLen(results, 2) + + expected_timestamp = datetime.datetime.fromtimestamp( + timestamp_seconds, tz=datetime.timezone.utc + ).timestamp() + + self.assertEqual(results[0].timestamp, expected_timestamp) + self.assertEqual(results[1].timestamp, expected_timestamp) + + def test_format_utc_time_stamp_to_local_time(self): + mock_k8s_server = self._create_mock_k8s_server() + # 2025-07-18 22:17:01 UTC, result will be local time (Los Angeles) + # in tz specified. + utc_timestamp = datetime.datetime( + 2025, 7, 18, 22, 17, 1, tzinfo=datetime.timezone.utc + ).timestamp() + local_timestamp = ( + kubernetes_ai_inference_benchmark._FormatUTCTimeStampToLocalTime( + mock_k8s_server, utc_timestamp + ) + ) + utc_dt = datetime.datetime.fromtimestamp( + utc_timestamp, tz=datetime.timezone.utc + ) + tz_local = zoneinfo.ZoneInfo(mock_k8s_server.timezone) + local_dt = utc_dt.astimezone(tz_local) + dt_str_format = local_dt.strftime('%H:%M:%S') + self.assertEqual(local_timestamp, dt_str_format) + + def test_get_model_load_time_single_pod(self): + first_call_logs = """22:17:30 Starting to load model /data/models/llama3-8b-hf... + 22:17:52 Model loading took 14.9596 GiB and 21.619846 seconds + 22:19:00 Starting vLLM API server on http://0.0.0.0:8000 + """ + startup_event = container_service.KubernetesEvent( + resource=container_service.KubernetesEventResource( + kind='Pod', + name='pod1', + ), + reason='Started', + message='Started container inference-server', + timestamp=datetime.datetime( + 2025, 7, 18, 22, 17, 1, tzinfo=tz + ).timestamp(), + ) + mock_k8s_server = self._create_mock_k8s_server(hpa_enabled=False) + mock_k8s_server.app_selector = 'vllm_inference_server' + mock_k8s_server.cluster.GetResourceMetadataByName.return_value = [ + 'pod1', + ] + mock_k8s_server.cluster.GetEvents.return_value = [startup_event] + mock_k8s_server.GetStartupLogsFromPod.side_effect = [ + first_call_logs, + ] + results = kubernetes_ai_inference_benchmark.GetVLLMModelLoadTime( + mock_k8s_server + ) + fake_timestamp = mock_k8s_server.model_load_timestamp + metadata = { + 'hpa_enabled': False, + 'num_pods': 1, + } + expected_results = [ + sample.Sample( + 'ai_inference_container_init_time', + 29.0, + 'seconds', + metadata, + fake_timestamp, + ), + sample.Sample( + 'ai_inference_storage_model_load_time', + 22.0, + 'seconds', + metadata, + fake_timestamp, + ), + sample.Sample( + 'ai_inference_post_model_load_to_start_time', + 68.0, + 'seconds', + metadata, + fake_timestamp, + ), + ] + self.assertEqual(results, expected_results) + + def test_get_model_load_time_multiple_pods(self): + first_call_logs = """22:17:01 Automatically detected platform cuda + 22:17:30 Starting to load model /data/models/llama3-8b-hf... + 22:17:52 Model loading took 14.9596 GiB and 21.619846 seconds + 22:19:00 Starting vLLM API server on http://0.0.0.0:8000 + """ + second_call_logs = """22:17:05 Automatically detected platform cuda + 22:17:15 Starting to load model /data/models/llama3-8b-hf... + 22:17:30 Model loading took 14.9596 GiB and 21.619846 seconds + 22:19:45 Starting vLLM API server on http://0.0.0.0:8000 + """ + third_call_logs = """22:17:10 Automatically detected platform cuda + 22:17:30 Starting to load model /data/models/llama3-8b-hf... + 22:18:10 Model loading took 14.9596 GiB and 21.619846 seconds + 22:22:25 Starting vLLM API server on http://0.0.0.0:8000 + """ + startup_events = [ + container_service.KubernetesEvent( + resource=container_service.KubernetesEventResource( + kind='Pod', + name='pod1', + ), + reason='Started', + message='Started container inference-server', + timestamp=datetime.datetime( + 2025, 7, 18, 22, 17, 1, tzinfo=tz + ).timestamp(), + ), + container_service.KubernetesEvent( + resource=container_service.KubernetesEventResource( + kind='Pod', + name='pod2', + ), + reason='Started', + message='Started container inference-server', + timestamp=datetime.datetime( + 2025, 7, 18, 22, 17, 5, tzinfo=tz + ).timestamp(), + ), + container_service.KubernetesEvent( + resource=container_service.KubernetesEventResource( + kind='Pod', + name='pod3', + ), + reason='Started', + message='Started container inference-server', + timestamp=datetime.datetime( + 2025, 7, 18, 22, 17, 10, tzinfo=tz + ).timestamp(), + ), + ] + + mock_k8s_server = self._create_mock_k8s_server(hpa_enabled=True) + mock_k8s_server.app_selector = 'vllm_inference_server' + mock_k8s_server.cluster.GetResourceMetadataByName.return_value = [ + 'pod1', + 'pod2', + 'pod3', + ] + mock_k8s_server.cluster.GetEvents.return_value = [ + startup_events[0], + startup_events[1], + startup_events[2], + ] + mock_k8s_server.GetStartupLogsFromPod.side_effect = [ + first_call_logs, + second_call_logs, + third_call_logs, + ] + results = kubernetes_ai_inference_benchmark.GetVLLMModelLoadTime( + mock_k8s_server + ) + fake_timestamp = mock_k8s_server.model_load_timestamp + metadata = { + 'hpa_enabled': True, + 'num_pods': 3, + } + expected_results = [ + sample.Sample( + 'avg_ai_inference_container_init_time', + 19.667, # avg: (29 + 10 + 20) / 3 + 'seconds', + metadata, + fake_timestamp, + ), + sample.Sample( + 'avg_ai_inference_storage_model_load_time', + 25.667, # avg: (22 + 15 + 40) / 3 + 'seconds', + metadata, + fake_timestamp, + ), + sample.Sample( + 'avg_ai_inference_post_model_load_to_start_time', + 152.667, # avg: (68 + 135 + 255) / 3 + 'seconds', + metadata, + fake_timestamp, + ), + ] + self.assertEqual(results, expected_results) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/linux_packages/hadoop_test.py b/tests/linux_packages/hadoop_test.py index 47ba95c5fa..725fa8110e 100644 --- a/tests/linux_packages/hadoop_test.py +++ b/tests/linux_packages/hadoop_test.py @@ -48,6 +48,7 @@ def setUp(self): hadoop.HadoopVersion.cache_clear() @requests_mock.Mocker() + @flagsaver.flagsaver(hadoop_version='') def testDefaultHadoopVersion(self, mock_requests): mock_requests.get( 'https://dlcdn.apache.org/hadoop/common/stable', @@ -60,6 +61,7 @@ def testDefaultHadoopVersion(self, mock_requests): self.assertEqual(1, mock_requests.call_count) @requests_mock.Mocker() + @flagsaver.flagsaver(hadoop_version='') def testHadoopVersionConnectionError(self, mock_requests): mock_requests.get( 'https://dlcdn.apache.org/hadoop/common/stable', status_code=404 @@ -71,6 +73,7 @@ def testHadoopVersionConnectionError(self, mock_requests): hadoop.HadoopVersion() @requests_mock.Mocker() + @flagsaver.flagsaver(hadoop_version='') def testHadoopVersionParsingError(self, mock_requests): mock_requests.get( 'https://dlcdn.apache.org/hadoop/common/stable', @@ -91,7 +94,9 @@ def testHadoopVersionProvider(self, mock_requests): self.assertEqual(version.Version('4.2.0'), observed) @requests_mock.Mocker() - @flagsaver.flagsaver(hadoop_bin_url='http://my/hadooop-4.2.0.tar.gz') + @flagsaver.flagsaver( + hadoop_version='', hadoop_bin_url='http://my/hadooop-4.2.0.tar.gz' + ) def testHadoopVersionUrlOverride(self, mock_requests): observed = hadoop.HadoopVersion() self.assertFalse(mock_requests.called) diff --git a/tests/linux_packages/sysbench_test.py b/tests/linux_packages/sysbench_test.py index 9b597a2c97..4f190696ae 100644 --- a/tests/linux_packages/sysbench_test.py +++ b/tests/linux_packages/sysbench_test.py @@ -3,6 +3,7 @@ import os import unittest +from absl import flags from absl.testing import flagsaver import mock from perfkitbenchmarker import sample @@ -11,6 +12,10 @@ from tests import pkb_common_test_case +FLAGS = flags.FLAGS +FLAGS.mark_as_parsed() + + class SysbenchTest(pkb_common_test_case.PkbCommonTestCase): @flagsaver.flagsaver(sysbench_ignore_concurrent_modification=True) @@ -31,13 +36,7 @@ def setUp(self): ) with open(path) as fp: self.contents = fp.read() - - def testParseSysbenchResult(self): - metadata = {} - results = sysbench.ParseSysbenchTimeSeries( - self.contents, metadata - ) - expected_results = [ + self.expected_results = [ sample.Sample( 'tps_array', -1, @@ -111,7 +110,86 @@ def testParseSysbenchResult(self): }, ), ] - self.assertSampleListsEqualUpToTimestamp(results, expected_results) + + @flagsaver.flagsaver(sysbench_qps_time_series=False) + def testParseSysbenchResult(self): + metadata = {} + results = sysbench.ParseSysbenchTimeSeries( + self.contents, metadata + ) + self.assertSampleListsEqualUpToTimestamp(results, self.expected_results) + + @mock.patch('time.time', mock.MagicMock(return_value=28.0)) + @flagsaver.flagsaver(sysbench_qps_time_series=True) + def testParseSysbenchResultWithTimeSeries(self): + metadata = {} + results = sysbench.ParseSysbenchTimeSeries(self.contents, metadata) + expected_results_with_time_series = self.expected_results + [ + sample.Sample( + metric='QPS_time_series', + value=0.0, + unit='qps', + metadata={ + 'values': [ + 20333.18, + 20156.38, + 20448.49, + 20334.15, + 20194.07, + 20331.31, + 20207.00, + 20348.96, + 20047.11, + 19972.86, + 19203.97, + 18221.83, + 18689.14, + 18409.68, + 19155.63, + ], + 'timestamps': [ + 0.0, + 2000.0, + 4000.0, + 6000.0, + 8000.0, + 10000.0, + 12000.0, + 14000.0, + 16000.0, + 18000.0, + 20000.0, + 22000.0, + 24000.0, + 26000.0, + 28000.0, + ], + 'qps': [ + 20333.18, + 20156.38, + 20448.49, + 20334.15, + 20194.07, + 20331.31, + 20207.00, + 20348.96, + 20047.11, + 19972.86, + 19203.97, + 18221.83, + 18689.14, + 18409.68, + 19155.63, + ], + 'interval': 1, + }, + timestamp=0, + ), + ] + self.assertSampleListsEqualUpToTimestamp( + results, expected_results_with_time_series + ) + if __name__ == '__main__': unittest.main()