Skip to content

perf improvement - Summary metric #729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 44 additions & 31 deletions core/src/detail/ckms_quantiles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,67 +86,80 @@ double CKMSQuantiles::allowableError(int rank) {
}

bool CKMSQuantiles::insertBatch() {
// If there is no data to insert return false
if (buffer_count_ == 0) {
return false;
}

// Sort the buffer upto buffer_count_ to prepare for inserting items
std::sort(buffer_.begin(), buffer_.begin() + buffer_count_);

std::size_t start = 0;

sample_.reserve(buffer_count_);
// If the sample set is empty, add the first item
if (sample_.empty()) {
sample_.emplace_back(buffer_[0], 1, 0);
++start;
++start; // Skip the first item since it's already added to the sample
++count_;
}

std::size_t idx = 0;
std::size_t item = idx++;

// Loop through the buffer and insert the items into the sample set
for (std::size_t i = start; i < buffer_count_; ++i) {
double v = buffer_[i];
while (idx < sample_.size() && sample_[item].value < v) {
item = idx++;
}

if (sample_[item].value > v) {
--idx;
float value = buffer_[i];

auto iterator = std::lower_bound(
sample_.begin(), sample_.end(), value,
[](const Item& item, float val) { return item.value < val; });
std::size_t idx = std::distance(sample_.begin(), iterator);

int delta = 0;
if (idx > 0 && idx < sample_.size()) {
delta = static_cast<int>(
std::floor(allowableError(static_cast<int>(idx) + 1))) +
1;
}

int delta;
if (idx - 1 == 0 || idx + 1 == sample_.size()) {
delta = 0;
} else {
delta = static_cast<int>(std::floor(allowableError(idx + 1))) + 1;
}

sample_.emplace(sample_.begin() + idx, v, 1, delta);
count_++;
item = idx++;
sample_.emplace(iterator, value, 1, delta);
++count_;
}

buffer_count_ = 0;
return true;
}

void CKMSQuantiles::compress() {
// If there are less than 2 items in the sample set, there's nothing to
// compress
if (sample_.size() < 2) {
return;
}

std::size_t idx = 0;
std::size_t prev;
std::size_t next = idx++;
std::vector<Item> compressed_samples; // Vector to hold compressed samples
compressed_samples.reserve(
sample_.size()); // Reserve space to avoid multiple allocations

// Start with the first sample
compressed_samples.push_back(sample_[0]);

while (idx < sample_.size()) {
prev = next;
next = idx++;
for (std::size_t idx = 1; idx < sample_.size(); ++idx) {
const Item& current_sample = sample_[idx];
Item& last_compressed_sample = compressed_samples.back();

if (sample_[prev].g + sample_[next].g + sample_[next].delta <=
allowableError(idx - 1)) {
sample_[next].g += sample_[prev].g;
sample_.erase(sample_.begin() + prev);
// Check if we can compress the current sample into the last compressed
// sample
if (last_compressed_sample.g + current_sample.g + current_sample.delta <=
allowableError(static_cast<int>(compressed_samples.size()) - 1)) {
// Merge current sample into last compressed sample
last_compressed_sample.g += current_sample.g; // Update weight
} else {
// If not compressible, add current sample to compressed samples
compressed_samples.push_back(current_sample);
}
}

// Replace old samples with new compressed samples
sample_ = std::move(compressed_samples);
}

} // namespace detail
Expand Down