diff --git a/loadgen/issue_query_controller.cc b/loadgen/issue_query_controller.cc index c1abea9d14..376ed98b8b 100644 --- a/loadgen/issue_query_controller.cc +++ b/loadgen/issue_query_controller.cc @@ -30,7 +30,8 @@ namespace loadgen { QueryMetadata::QueryMetadata( const std::vector& query_sample_indices, std::chrono::nanoseconds scheduled_delta, - ResponseDelegate* response_delegate, SequenceGen* sequence_gen) + ResponseDelegate* response_delegate, SequenceGen* sequence_gen, + uint64_t repeat_index) : scheduled_delta(scheduled_delta), response_delegate(response_delegate), sequence_id(sequence_gen->NextQueryId()), @@ -38,7 +39,7 @@ QueryMetadata::QueryMetadata( samples_.reserve(query_sample_indices.size()); for (QuerySampleIndex qsi : query_sample_indices) { samples_.push_back({this, sequence_gen->NextSampleId(), qsi, - sequence_gen->NextAccLogRng()}); + sequence_gen->NextAccLogRng(), repeat_index}); } query_to_send.reserve(query_sample_indices.size()); for (auto& s : samples_) { @@ -459,8 +460,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride, #if USE_NEW_LOGGING_FORMAT std::stringstream ss; ss << "IssueQueryThread " << thread_idx - << " Ending early: Too many outstanding queries." << " issued " - << queries_issued_total << " outstanding " + << " Ending early: Too many outstanding queries." + << " issued " << queries_issued_total << " outstanding " << queries_outstanding; MLPERF_LOG_ERROR(detail, "error_runtime", ss.str()); #else @@ -499,8 +500,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride, #if USE_NEW_LOGGING_FORMAT std::stringstream ss; ss << "IssueQueryThread " << thread_idx - << " Ending early: Max query count reached." << " query_count " - << queries_issued; + << " Ending early: Max query count reached." + << " query_count " << queries_issued; MLPERF_LOG_ERROR(detail, "error_runtime", ss.str()); #else detail.Error("IssueQueryThread ", std::to_string(thread_idx), @@ -519,8 +520,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride, #if USE_NEW_LOGGING_FORMAT std::stringstream ss; ss << "IssueQueryThread " << thread_idx - << " Ending early: Max test duration reached." << " duration_ns " - << duration.count(); + << " Ending early: Max test duration reached." + << " duration_ns " << duration.count(); MLPERF_LOG_ERROR(detail, "error_runtime", ss.str()); #else detail.Error("IssueQueryThread ", std::to_string(thread_idx), diff --git a/loadgen/issue_query_controller.h b/loadgen/issue_query_controller.h index 5668c574ed..2cdb4e0a92 100644 --- a/loadgen/issue_query_controller.h +++ b/loadgen/issue_query_controller.h @@ -83,6 +83,7 @@ struct SampleMetadata { uint64_t sequence_id; QuerySampleIndex sample_index; double accuracy_log_val; + uint64_t repeat_index; // Index for repeated sampling (0 to k-1) }; /// \brief Maintains data and timing info for a query and all its samples. @@ -90,7 +91,8 @@ class QueryMetadata { public: QueryMetadata(const std::vector& query_sample_indices, std::chrono::nanoseconds scheduled_delta, - ResponseDelegate* response_delegate, SequenceGen* sequence_gen); + ResponseDelegate* response_delegate, SequenceGen* sequence_gen, + uint64_t repeat_index = 0); QueryMetadata(QueryMetadata&& src); void NotifyOneSampleCompleted(PerfClock::time_point timestamp); diff --git a/loadgen/loadgen.cc b/loadgen/loadgen.cc index 42b2140de2..0a5d2585ab 100644 --- a/loadgen/loadgen.cc +++ b/loadgen/loadgen.cc @@ -121,7 +121,8 @@ struct ResponseDelegateDetailed : public ResponseDelegate { if (sample_data_copy) { log.LogAccuracy(sample->sequence_id, sample->sample_index, - LogBinaryAsHexString{sample_data_copy}, n_tokens); + LogBinaryAsHexString{sample_data_copy}, n_tokens, + sample->repeat_index); delete sample_data_copy; } @@ -263,6 +264,12 @@ std::vector GenerateQueries( auto tracer = MakeScopedTracer([](AsyncTrace& trace) { trace("GenerateQueries"); }); + // In PerformanceOnly mode, repeats_per_sample must be 1 + if (mode != TestMode::AccuracyOnly) { + assert(settings.repeats_per_sample == 1 && + "repeats_per_sample must be 1 in PerformanceOnly mode"); + } + auto& loaded_samples = loaded_sample_set.set; // Generate 2x more samples than we think we'll need given the expected @@ -287,6 +294,11 @@ std::vector GenerateQueries( // For MultiStream, loaded samples is properly padded. // For Offline, we create a 'remainder' query at the end of this function. min_queries = loaded_samples.size() / samples_per_query; + + // For repeated sampling, multiply min_queries by repeats + if (mode == TestMode::AccuracyOnly) { + min_queries *= settings.repeats_per_sample; + } } std::vector queries; @@ -395,9 +407,15 @@ std::vector GenerateQueries( : sample_distribution(sample_rng)]; } } - queries.emplace_back(samples, timestamp, response_delegate, sequence_gen); - prev_timestamp = timestamp; - timestamp += schedule_distribution(schedule_rng); + + // Handle repeated sampling: create repeats_per_sample queries for the same sample(s) + // In PerformanceOnly mode, this is always 1 (single query per sample) + for (uint64_t k = 0; k < settings.repeats_per_sample; k++) { + queries.emplace_back(samples, timestamp, response_delegate, sequence_gen, k); + prev_timestamp = timestamp; + timestamp += schedule_distribution(schedule_rng); + } + // In equal_issue mode, the min_queries will be bumped up by a multiple of // the dataset size if the test time has not met the threshold. if (enable_equal_issue && (queries.size() >= min_queries) && @@ -417,7 +435,11 @@ std::vector GenerateQueries( for (auto& s : samples) { s = loaded_samples[sample_distribution(sample_rng)]; } - queries.emplace_back(samples, timestamp, response_delegate, sequence_gen); + + // Handle repeated sampling for remainder query as well + for (uint64_t k = 0; k < settings.repeats_per_sample; k++) { + queries.emplace_back(samples, timestamp, response_delegate, sequence_gen, k); + } } } diff --git a/loadgen/logging.cc b/loadgen/logging.cc index 807c1954a8..d9d4883be7 100644 --- a/loadgen/logging.cc +++ b/loadgen/logging.cc @@ -280,22 +280,24 @@ void AsyncLog::StopTrace() { void AsyncLog::LogAccuracy(uint64_t seq_id, const QuerySampleIndex qsl_idx, const LogBinaryAsHexString& response, - int64_t n_tokens = 0) { + int64_t n_tokens, uint64_t repeat_index) { std::unique_lock lock(log_mutex_); if (!accuracy_out_) { return; } *accuracy_out_ << (accuracy_needs_comma_ ? ",\n{ " : "\n{ "); + if (!use_tokens_) { - LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data", - response); + LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, + "repeat_idx", repeat_index, "data", response); } else if (!needs_first_token_) { - LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data", - response, "token_count", n_tokens); + LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, + "repeat_idx", repeat_index, "data", response, "token_count", n_tokens); } else { const size_t i = seq_id - latencies_first_sample_sequence_id_; - LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data", - response, "token_data", token_records_[i], "token_count", n_tokens); + LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, + "repeat_idx", repeat_index, "data", response, "token_data", + token_records_[i], "token_count", n_tokens); } *accuracy_out_ << " }"; @@ -812,7 +814,8 @@ void Logger::CollectTlsLoggerStats(TlsLogger* tls_logger) { if (max_entry_vector_size > kTlsLogReservedEntryCount) { #if USE_NEW_LOGGING_FORMAT std::stringstream msg; - msg << "Logging allocation detected:" << " tid: " << tls_logger->Tid() + msg << "Logging allocation detected:" + << " tid: " << tls_logger->Tid() << " reserved_entries: " << kTlsLogReservedEntryCount << " max_entries: " << max_entry_vector_size; MLPERF_LOG_WARNING((*this), "warning_generic_message", msg.str()); diff --git a/loadgen/logging.h b/loadgen/logging.h index 8f1a398e9d..2c770438d6 100644 --- a/loadgen/logging.h +++ b/loadgen/logging.h @@ -213,7 +213,8 @@ class AsyncLog { void SetCurrentPidTid(uint64_t pid, uint64_t tid); void LogAccuracy(uint64_t seq_id, const QuerySampleIndex qsl_idx, - const LogBinaryAsHexString& response, int64_t n_tokens); + const LogBinaryAsHexString& response, int64_t n_tokens, + uint64_t repeat_index = 0); void CacheToken(uint64_t seq_id, const LogBinaryAsHexString& response); template diff --git a/loadgen/test_settings.h b/loadgen/test_settings.h index 584d073bb8..5470277e2c 100644 --- a/loadgen/test_settings.h +++ b/loadgen/test_settings.h @@ -271,6 +271,15 @@ struct TestSettings { /// \brief Infer token latencies bool infer_token_latencies = false; uint64_t token_latency_scaling_factor; + + /// \brief Enable repeated sampling in accuracy mode + /// \details When enabled, each sample is issued + /// repeats_per_sample times and multiple responses are collected. + /// This is used for code generation benchmarks like gpt-oss where multiple + /// solutions are generated and evaluated (pass@k metric). + /// Default is 1 (single sample). Set to k (e.g., 5 for pass@5) to enable. + /// Must be 1 for PerformanceOnly mode. + uint64_t repeats_per_sample = 1; /**@}*/ }; diff --git a/loadgen/test_settings_internal.cc b/loadgen/test_settings_internal.cc index 3f2cd88473..425d4db833 100644 --- a/loadgen/test_settings_internal.cc +++ b/loadgen/test_settings_internal.cc @@ -54,7 +54,8 @@ TestSettingsInternal::TestSettingsInternal( server_ttft_latency(requested.server_ttft_latency), server_tpot_latency(requested.server_tpot_latency), infer_token_latencies(requested.infer_token_latencies), - token_latency_scaling_factor(requested.token_latency_scaling_factor) { + token_latency_scaling_factor(requested.token_latency_scaling_factor), + repeats_per_sample(requested.repeats_per_sample) { // Target QPS, target latency, and max_async_queries. switch (requested.scenario) { case TestScenario::SingleStream: @@ -779,6 +780,7 @@ int TestSettings::FromConfig(const std::string &path, const std::string &model, lookupkv(model, scenario, "max_query_count", &max_query_count, nullptr); lookupkv(model, scenario, "performance_sample_count_override", &performance_sample_count_override, nullptr); + lookupkv(model, scenario, "repeats_per_sample", &repeats_per_sample, nullptr); lookupkv(model, "SingleStream", "target_latency", nullptr, &single_stream_expected_latency_ns, 1000 * 1000); lookupkv(model, "MultiStream", "target_latency", nullptr, diff --git a/loadgen/test_settings_internal.h b/loadgen/test_settings_internal.h index ab2773bd18..44a430d869 100644 --- a/loadgen/test_settings_internal.h +++ b/loadgen/test_settings_internal.h @@ -88,6 +88,8 @@ struct TestSettingsInternal { bool infer_token_latencies = false; int64_t token_latency_scaling_factor; + + uint64_t repeats_per_sample; }; /// \brief A namespace of collections of FindPeakPerformance helper functions,