Skip to content

Commit 51f340f

Browse files
authored
Merge pull request #2837 from quickwit-oss/congxie/replaceHll
Replace hyperloglogplus with Apache DataSketches HLL (lg_k=11)
2 parents 57fe659 + 7eca331 commit 51f340f

File tree

2 files changed

+136
-44
lines changed

2 files changed

+136
-44
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ tantivy-bitpacker = { version = "0.9", path = "./bitpacker" }
6565
common = { version = "0.10", path = "./common/", package = "tantivy-common" }
6666
tokenizer-api = { version = "0.6", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
6767
sketches-ddsketch = { version = "0.3.0", features = ["use_serde"] }
68-
hyperloglogplus = { version = "0.4.1", features = ["const-loop"] }
68+
datasketches = "0.2.0"
6969
futures-util = { version = "0.3.28", optional = true }
7070
futures-channel = { version = "0.3.28", optional = true }
7171
fnv = "1.0.7"

src/aggregation/metric/cardinality.rs

Lines changed: 135 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use std::collections::hash_map::DefaultHasher;
2-
use std::hash::{BuildHasher, Hasher};
1+
use std::hash::Hash;
32

43
use columnar::column_values::CompactSpaceU64Accessor;
54
use columnar::{Column, ColumnType, Dictionary, StrColumn};
65
use common::f64_to_u64;
7-
use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
6+
use datasketches::hll::{HllSketch, HllType, HllUnion};
87
use rustc_hash::FxHashSet;
9-
use serde::{Deserialize, Serialize};
8+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
109

1110
use crate::aggregation::agg_data::AggregationsSegmentCtx;
1211
use crate::aggregation::intermediate_agg_result::{
@@ -16,29 +15,17 @@ use crate::aggregation::segment_agg_result::SegmentAggregationCollector;
1615
use crate::aggregation::*;
1716
use crate::TantivyError;
1817

19-
#[derive(Clone, Debug, Serialize, Deserialize)]
20-
struct BuildSaltedHasher {
21-
salt: u8,
22-
}
23-
24-
impl BuildHasher for BuildSaltedHasher {
25-
type Hasher = DefaultHasher;
26-
27-
fn build_hasher(&self) -> Self::Hasher {
28-
let mut hasher = DefaultHasher::new();
29-
hasher.write_u8(self.salt);
30-
31-
hasher
32-
}
33-
}
18+
/// Log2 of the number of registers for the HLL sketch.
19+
/// 2^11 = 2048 registers, giving ~2.3% relative error and ~1KB per sketch (Hll4).
20+
const LG_K: u8 = 11;
3421

3522
/// # Cardinality
3623
///
3724
/// The cardinality aggregation allows for computing an estimate
3825
/// of the number of different values in a data set based on the
39-
/// HyperLogLog++ algorithm. This is particularly useful for understanding the
40-
/// uniqueness of values in a large dataset where counting each unique value
41-
/// individually would be computationally expensive.
26+
/// Apache DataSketches HyperLogLog algorithm. This is particularly useful for
27+
/// understanding the uniqueness of values in a large dataset where counting
28+
/// each unique value individually would be computationally expensive.
4229
///
4330
/// For example, you might use a cardinality aggregation to estimate the number
4431
/// of unique visitors to a website by aggregating on a field that contains
@@ -184,7 +171,7 @@ impl SegmentCardinalityCollectorBucket {
184171

185172
term_ids.sort_unstable();
186173
dict.sorted_ords_to_term_cb(term_ids.iter().map(|term| *term as u64), |term| {
187-
self.cardinality.sketch.insert_any(&term);
174+
self.cardinality.insert(term);
188175
Ok(())
189176
})?;
190177
if has_missing {
@@ -195,17 +182,17 @@ impl SegmentCardinalityCollectorBucket {
195182
);
196183
match missing_key {
197184
Key::Str(missing) => {
198-
self.cardinality.sketch.insert_any(&missing);
185+
self.cardinality.insert(missing.as_str());
199186
}
200187
Key::F64(val) => {
201188
let val = f64_to_u64(*val);
202-
self.cardinality.sketch.insert_any(&val);
189+
self.cardinality.insert(val);
203190
}
204191
Key::U64(val) => {
205-
self.cardinality.sketch.insert_any(&val);
192+
self.cardinality.insert(*val);
206193
}
207194
Key::I64(val) => {
208-
self.cardinality.sketch.insert_any(&val);
195+
self.cardinality.insert(*val);
209196
}
210197
}
211198
}
@@ -296,11 +283,11 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
296283
})?;
297284
for val in col_block_accessor.iter_vals() {
298285
let val: u128 = compact_space_accessor.compact_to_u128(val as u32);
299-
bucket.cardinality.sketch.insert_any(&val);
286+
bucket.cardinality.insert(val);
300287
}
301288
} else {
302289
for val in col_block_accessor.iter_vals() {
303-
bucket.cardinality.sketch.insert_any(&val);
290+
bucket.cardinality.insert(val);
304291
}
305292
}
306293

@@ -321,11 +308,18 @@ impl SegmentAggregationCollector for SegmentCardinalityCollector {
321308
}
322309
}
323310

324-
#[derive(Clone, Debug, Serialize, Deserialize)]
325-
/// The percentiles collector used during segment collection and for merging results.
311+
#[derive(Clone, Debug)]
312+
/// The cardinality collector used during segment collection and for merging results.
313+
/// Uses Apache DataSketches HLL (lg_k=11, Hll4) for compact binary serialization
314+
/// and cross-language compatibility (e.g. Java `datasketches` library).
326315
pub struct CardinalityCollector {
327-
sketch: HyperLogLogPlus<u64, BuildSaltedHasher>,
316+
sketch: HllSketch,
317+
/// Salt derived from `ColumnType`, used to differentiate values of different column types
318+
/// that map to the same u64 (e.g. bool `false` = 0 vs i64 `0`).
319+
/// Not serialized — only needed during insertion, not after sketch registers are populated.
320+
salt: u8,
328321
}
322+
329323
impl Default for CardinalityCollector {
330324
fn default() -> Self {
331325
Self::new(0)
@@ -338,25 +332,52 @@ impl PartialEq for CardinalityCollector {
338332
}
339333
}
340334

341-
impl CardinalityCollector {
342-
/// Compute the final cardinality estimate.
343-
pub fn finalize(self) -> Option<f64> {
344-
Some(self.sketch.clone().count().trunc())
335+
impl Serialize for CardinalityCollector {
336+
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
337+
let bytes = self.sketch.serialize();
338+
serializer.serialize_bytes(&bytes)
339+
}
340+
}
341+
342+
impl<'de> Deserialize<'de> for CardinalityCollector {
343+
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
344+
let bytes: Vec<u8> = Deserialize::deserialize(deserializer)?;
345+
let sketch = HllSketch::deserialize(&bytes).map_err(serde::de::Error::custom)?;
346+
Ok(Self { sketch, salt: 0 })
345347
}
348+
}
346349

350+
impl CardinalityCollector {
347351
fn new(salt: u8) -> Self {
348352
Self {
349-
sketch: HyperLogLogPlus::new(16, BuildSaltedHasher { salt }).unwrap(),
353+
sketch: HllSketch::new(LG_K, HllType::Hll4),
354+
salt,
350355
}
351356
}
352357

353-
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
354-
self.sketch.merge(&right.sketch).map_err(|err| {
355-
TantivyError::AggregationError(AggregationError::InternalError(format!(
356-
"Error while merging cardinality {err:?}"
357-
)))
358-
})?;
358+
/// Insert a value into the HLL sketch, salted by the column type.
359+
/// The salt ensures that identical u64 values from different column types
360+
/// (e.g. bool `false` vs i64 `0`) are counted as distinct.
361+
pub(crate) fn insert<T: Hash>(&mut self, value: T) {
362+
self.sketch.update((self.salt, value));
363+
}
364+
365+
/// Compute the final cardinality estimate.
366+
pub fn finalize(self) -> Option<f64> {
367+
Some(self.sketch.estimate().trunc())
368+
}
359369

370+
/// Serialize the HLL sketch to its compact binary representation.
371+
/// The format is cross-language compatible with Apache DataSketches (Java, C++, Python).
372+
pub fn to_sketch_bytes(&self) -> Vec<u8> {
373+
self.sketch.serialize()
374+
}
375+
376+
pub(crate) fn merge_fruits(&mut self, right: CardinalityCollector) -> crate::Result<()> {
377+
let mut union = HllUnion::new(LG_K);
378+
union.update(&self.sketch);
379+
union.update(&right.sketch);
380+
self.sketch = union.get_result(HllType::Hll4);
360381
Ok(())
361382
}
362383
}
@@ -518,4 +539,75 @@ mod tests {
518539

519540
Ok(())
520541
}
542+
543+
#[test]
544+
fn cardinality_collector_serde_roundtrip() {
545+
use super::CardinalityCollector;
546+
547+
let mut collector = CardinalityCollector::default();
548+
collector.insert("hello");
549+
collector.insert("world");
550+
collector.insert("hello"); // duplicate
551+
552+
let serialized = serde_json::to_vec(&collector).unwrap();
553+
let deserialized: CardinalityCollector = serde_json::from_slice(&serialized).unwrap();
554+
555+
let original_estimate = collector.finalize().unwrap();
556+
let roundtrip_estimate = deserialized.finalize().unwrap();
557+
assert_eq!(original_estimate, roundtrip_estimate);
558+
assert_eq!(original_estimate, 2.0);
559+
}
560+
561+
#[test]
562+
fn cardinality_collector_merge() {
563+
use super::CardinalityCollector;
564+
565+
let mut left = CardinalityCollector::default();
566+
left.insert("a");
567+
left.insert("b");
568+
569+
let mut right = CardinalityCollector::default();
570+
right.insert("b");
571+
right.insert("c");
572+
573+
left.merge_fruits(right).unwrap();
574+
let estimate = left.finalize().unwrap();
575+
assert_eq!(estimate, 3.0);
576+
}
577+
578+
#[test]
579+
fn cardinality_collector_serialize_deserialize_binary() {
580+
use datasketches::hll::HllSketch;
581+
582+
use super::CardinalityCollector;
583+
584+
let mut collector = CardinalityCollector::default();
585+
collector.insert("apple");
586+
collector.insert("banana");
587+
collector.insert("cherry");
588+
589+
let bytes = collector.to_sketch_bytes();
590+
let deserialized = HllSketch::deserialize(&bytes).unwrap();
591+
assert!((deserialized.estimate() - 3.0).abs() < 0.01);
592+
}
593+
594+
#[test]
595+
fn cardinality_collector_salt_differentiates_types() {
596+
use super::CardinalityCollector;
597+
598+
// Without salt, same u64 value from different column types would collide
599+
let mut collector_bool = CardinalityCollector::new(5); // e.g. ColumnType::Bool
600+
collector_bool.insert(0u64); // false
601+
collector_bool.insert(1u64); // true
602+
603+
let mut collector_i64 = CardinalityCollector::new(2); // e.g. ColumnType::I64
604+
collector_i64.insert(0u64);
605+
collector_i64.insert(1u64);
606+
607+
// Merge them
608+
collector_bool.merge_fruits(collector_i64).unwrap();
609+
let estimate = collector_bool.finalize().unwrap();
610+
// Should be 4 because salt makes (5, 0) != (2, 0) and (5, 1) != (2, 1)
611+
assert_eq!(estimate, 4.0);
612+
}
521613
}

0 commit comments

Comments
 (0)