Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions loadgen/issue_query_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ namespace loadgen {
QueryMetadata::QueryMetadata(
const std::vector<QuerySampleIndex>& query_sample_indices,
std::chrono::nanoseconds scheduled_delta,
ResponseDelegate* response_delegate, SequenceGen* sequence_gen)
ResponseDelegate* response_delegate, SequenceGen* sequence_gen,
uint64_t repeat_index)
: scheduled_delta(scheduled_delta),
response_delegate(response_delegate),
sequence_id(sequence_gen->NextQueryId()),
wait_count_(query_sample_indices.size()) {
samples_.reserve(query_sample_indices.size());
for (QuerySampleIndex qsi : query_sample_indices) {
samples_.push_back({this, sequence_gen->NextSampleId(), qsi,
sequence_gen->NextAccLogRng()});
sequence_gen->NextAccLogRng(), repeat_index});
}
query_to_send.reserve(query_sample_indices.size());
for (auto& s : samples_) {
Expand Down Expand Up @@ -459,8 +460,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride,
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Too many outstanding queries." << " issued "
<< queries_issued_total << " outstanding "
<< " Ending early: Too many outstanding queries."
<< " issued " << queries_issued_total << " outstanding "
<< queries_outstanding;
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
Expand Down Expand Up @@ -499,8 +500,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride,
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Max query count reached." << " query_count "
<< queries_issued;
<< " Ending early: Max query count reached."
<< " query_count " << queries_issued;
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
detail.Error("IssueQueryThread ", std::to_string(thread_idx),
Expand All @@ -519,8 +520,8 @@ void IssueQueryController::IssueQueriesInternal(size_t query_stride,
#if USE_NEW_LOGGING_FORMAT
std::stringstream ss;
ss << "IssueQueryThread " << thread_idx
<< " Ending early: Max test duration reached." << " duration_ns "
<< duration.count();
<< " Ending early: Max test duration reached."
<< " duration_ns " << duration.count();
MLPERF_LOG_ERROR(detail, "error_runtime", ss.str());
#else
detail.Error("IssueQueryThread ", std::to_string(thread_idx),
Expand Down
4 changes: 3 additions & 1 deletion loadgen/issue_query_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ struct SampleMetadata {
uint64_t sequence_id;
QuerySampleIndex sample_index;
double accuracy_log_val;
uint64_t repeat_index; // Index for repeated sampling (0 to k-1)
};

/// \brief Maintains data and timing info for a query and all its samples.
class QueryMetadata {
public:
QueryMetadata(const std::vector<QuerySampleIndex>& query_sample_indices,
std::chrono::nanoseconds scheduled_delta,
ResponseDelegate* response_delegate, SequenceGen* sequence_gen);
ResponseDelegate* response_delegate, SequenceGen* sequence_gen,
uint64_t repeat_index = 0);
QueryMetadata(QueryMetadata&& src);

void NotifyOneSampleCompleted(PerfClock::time_point timestamp);
Expand Down
32 changes: 27 additions & 5 deletions loadgen/loadgen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ struct ResponseDelegateDetailed : public ResponseDelegate {

if (sample_data_copy) {
log.LogAccuracy(sample->sequence_id, sample->sample_index,
LogBinaryAsHexString{sample_data_copy}, n_tokens);
LogBinaryAsHexString{sample_data_copy}, n_tokens,
sample->repeat_index);
delete sample_data_copy;
}

Expand Down Expand Up @@ -263,6 +264,12 @@ std::vector<QueryMetadata> GenerateQueries(
auto tracer =
MakeScopedTracer([](AsyncTrace& trace) { trace("GenerateQueries"); });

// In PerformanceOnly mode, repeats_per_sample must be 1
if (mode != TestMode::AccuracyOnly) {
assert(settings.repeats_per_sample == 1 &&
"repeats_per_sample must be 1 in PerformanceOnly mode");
}

auto& loaded_samples = loaded_sample_set.set;

// Generate 2x more samples than we think we'll need given the expected
Expand All @@ -287,6 +294,11 @@ std::vector<QueryMetadata> GenerateQueries(
// For MultiStream, loaded samples is properly padded.
// For Offline, we create a 'remainder' query at the end of this function.
min_queries = loaded_samples.size() / samples_per_query;

// For repeated sampling, multiply min_queries by repeats
if (mode == TestMode::AccuracyOnly) {
min_queries *= settings.repeats_per_sample;
}
}

std::vector<QueryMetadata> queries;
Expand Down Expand Up @@ -395,9 +407,15 @@ std::vector<QueryMetadata> GenerateQueries(
: sample_distribution(sample_rng)];
}
}
queries.emplace_back(samples, timestamp, response_delegate, sequence_gen);
prev_timestamp = timestamp;
timestamp += schedule_distribution(schedule_rng);

// Handle repeated sampling: create repeats_per_sample queries for the same sample(s)
// In PerformanceOnly mode, this is always 1 (single query per sample)
for (uint64_t k = 0; k < settings.repeats_per_sample; k++) {
queries.emplace_back(samples, timestamp, response_delegate, sequence_gen, k);
prev_timestamp = timestamp;
timestamp += schedule_distribution(schedule_rng);
}

// In equal_issue mode, the min_queries will be bumped up by a multiple of
// the dataset size if the test time has not met the threshold.
if (enable_equal_issue && (queries.size() >= min_queries) &&
Expand All @@ -417,7 +435,11 @@ std::vector<QueryMetadata> GenerateQueries(
for (auto& s : samples) {
s = loaded_samples[sample_distribution(sample_rng)];
}
queries.emplace_back(samples, timestamp, response_delegate, sequence_gen);

// Handle repeated sampling for remainder query as well
for (uint64_t k = 0; k < settings.repeats_per_sample; k++) {
queries.emplace_back(samples, timestamp, response_delegate, sequence_gen, k);
}
}
}

Expand Down
19 changes: 11 additions & 8 deletions loadgen/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,22 +280,24 @@ void AsyncLog::StopTrace() {

void AsyncLog::LogAccuracy(uint64_t seq_id, const QuerySampleIndex qsl_idx,
const LogBinaryAsHexString& response,
int64_t n_tokens = 0) {
int64_t n_tokens, uint64_t repeat_index) {
std::unique_lock<std::mutex> lock(log_mutex_);
if (!accuracy_out_) {
return;
}
*accuracy_out_ << (accuracy_needs_comma_ ? ",\n{ " : "\n{ ");

if (!use_tokens_) {
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data",
response);
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx,
"repeat_idx", repeat_index, "data", response);
} else if (!needs_first_token_) {
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data",
response, "token_count", n_tokens);
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx,
"repeat_idx", repeat_index, "data", response, "token_count", n_tokens);
} else {
const size_t i = seq_id - latencies_first_sample_sequence_id_;
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx, "data",
response, "token_data", token_records_[i], "token_count", n_tokens);
LogArgs(accuracy_out_, "seq_id", seq_id, "qsl_idx", qsl_idx,
"repeat_idx", repeat_index, "data", response, "token_data",
token_records_[i], "token_count", n_tokens);
}

*accuracy_out_ << " }";
Expand Down Expand Up @@ -812,7 +814,8 @@ void Logger::CollectTlsLoggerStats(TlsLogger* tls_logger) {
if (max_entry_vector_size > kTlsLogReservedEntryCount) {
#if USE_NEW_LOGGING_FORMAT
std::stringstream msg;
msg << "Logging allocation detected:" << " tid: " << tls_logger->Tid()
msg << "Logging allocation detected:"
<< " tid: " << tls_logger->Tid()
<< " reserved_entries: " << kTlsLogReservedEntryCount
<< " max_entries: " << max_entry_vector_size;
MLPERF_LOG_WARNING((*this), "warning_generic_message", msg.str());
Expand Down
3 changes: 2 additions & 1 deletion loadgen/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ class AsyncLog {
void SetCurrentPidTid(uint64_t pid, uint64_t tid);

void LogAccuracy(uint64_t seq_id, const QuerySampleIndex qsl_idx,
const LogBinaryAsHexString& response, int64_t n_tokens);
const LogBinaryAsHexString& response, int64_t n_tokens,
uint64_t repeat_index = 0);
void CacheToken(uint64_t seq_id, const LogBinaryAsHexString& response);

template <typename... Args>
Expand Down
9 changes: 9 additions & 0 deletions loadgen/test_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ struct TestSettings {
/// \brief Infer token latencies
bool infer_token_latencies = false;
uint64_t token_latency_scaling_factor;

/// \brief Enable repeated sampling in accuracy mode
/// \details When enabled, each sample is issued
/// repeats_per_sample times and multiple responses are collected.
/// This is used for code generation benchmarks like gpt-oss where multiple
/// solutions are generated and evaluated (pass@k metric).
/// Default is 1 (single sample). Set to k (e.g., 5 for pass@5) to enable.
/// Must be 1 for PerformanceOnly mode.
uint64_t repeats_per_sample = 1;
/**@}*/
};

Expand Down
4 changes: 3 additions & 1 deletion loadgen/test_settings_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ TestSettingsInternal::TestSettingsInternal(
server_ttft_latency(requested.server_ttft_latency),
server_tpot_latency(requested.server_tpot_latency),
infer_token_latencies(requested.infer_token_latencies),
token_latency_scaling_factor(requested.token_latency_scaling_factor) {
token_latency_scaling_factor(requested.token_latency_scaling_factor),
repeats_per_sample(requested.repeats_per_sample) {
// Target QPS, target latency, and max_async_queries.
switch (requested.scenario) {
case TestScenario::SingleStream:
Expand Down Expand Up @@ -779,6 +780,7 @@ int TestSettings::FromConfig(const std::string &path, const std::string &model,
lookupkv(model, scenario, "max_query_count", &max_query_count, nullptr);
lookupkv(model, scenario, "performance_sample_count_override",
&performance_sample_count_override, nullptr);
lookupkv(model, scenario, "repeats_per_sample", &repeats_per_sample, nullptr);
lookupkv(model, "SingleStream", "target_latency", nullptr,
&single_stream_expected_latency_ns, 1000 * 1000);
lookupkv(model, "MultiStream", "target_latency", nullptr,
Expand Down
2 changes: 2 additions & 0 deletions loadgen/test_settings_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ struct TestSettingsInternal {

bool infer_token_latencies = false;
int64_t token_latency_scaling_factor;

uint64_t repeats_per_sample;
};

/// \brief A namespace of collections of FindPeakPerformance helper functions,
Expand Down