diff --git a/core/src/detail/ckms_quantiles.cc b/core/src/detail/ckms_quantiles.cc index 7ab6f1f2..0707a7c2 100644 --- a/core/src/detail/ckms_quantiles.cc +++ b/core/src/detail/ckms_quantiles.cc @@ -86,42 +86,42 @@ double CKMSQuantiles::allowableError(int rank) { } bool CKMSQuantiles::insertBatch() { + // If there is no data to insert return false if (buffer_count_ == 0) { return false; } + // Sort the buffer upto buffer_count_ to prepare for inserting items std::sort(buffer_.begin(), buffer_.begin() + buffer_count_); std::size_t start = 0; + + sample_.reserve(buffer_count_); + // If the sample set is empty, add the first item if (sample_.empty()) { sample_.emplace_back(buffer_[0], 1, 0); - ++start; + ++start; // Skip the first item since it's already added to the sample ++count_; } - std::size_t idx = 0; - std::size_t item = idx++; - + // Loop through the buffer and insert the items into the sample set for (std::size_t i = start; i < buffer_count_; ++i) { - double v = buffer_[i]; - while (idx < sample_.size() && sample_[item].value < v) { - item = idx++; - } - - if (sample_[item].value > v) { - --idx; + float value = buffer_[i]; + + auto iterator = std::lower_bound( + sample_.begin(), sample_.end(), value, + [](const Item& item, float val) { return item.value < val; }); + std::size_t idx = std::distance(sample_.begin(), iterator); + + int delta = 0; + if (idx > 0 && idx < sample_.size()) { + delta = static_cast( + std::floor(allowableError(static_cast(idx) + 1))) + + 1; } - int delta; - if (idx - 1 == 0 || idx + 1 == sample_.size()) { - delta = 0; - } else { - delta = static_cast(std::floor(allowableError(idx + 1))) + 1; - } - - sample_.emplace(sample_.begin() + idx, v, 1, delta); - count_++; - item = idx++; + sample_.emplace(iterator, value, 1, delta); + ++count_; } buffer_count_ = 0; @@ -129,24 +129,37 @@ bool CKMSQuantiles::insertBatch() { } void CKMSQuantiles::compress() { + // If there are less than 2 items in the sample set, there's nothing to + // compress if (sample_.size() < 2) { return; } - std::size_t idx = 0; - std::size_t prev; - std::size_t next = idx++; + std::vector compressed_samples; // Vector to hold compressed samples + compressed_samples.reserve( + sample_.size()); // Reserve space to avoid multiple allocations + + // Start with the first sample + compressed_samples.push_back(sample_[0]); - while (idx < sample_.size()) { - prev = next; - next = idx++; + for (std::size_t idx = 1; idx < sample_.size(); ++idx) { + const Item& current_sample = sample_[idx]; + Item& last_compressed_sample = compressed_samples.back(); - if (sample_[prev].g + sample_[next].g + sample_[next].delta <= - allowableError(idx - 1)) { - sample_[next].g += sample_[prev].g; - sample_.erase(sample_.begin() + prev); + // Check if we can compress the current sample into the last compressed + // sample + if (last_compressed_sample.g + current_sample.g + current_sample.delta <= + allowableError(static_cast(compressed_samples.size()) - 1)) { + // Merge current sample into last compressed sample + last_compressed_sample.g += current_sample.g; // Update weight + } else { + // If not compressible, add current sample to compressed samples + compressed_samples.push_back(current_sample); } } + + // Replace old samples with new compressed samples + sample_ = std::move(compressed_samples); } } // namespace detail