Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion columnar/src/columnar/column_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub enum ColumnType {
Bool = 5u8,
IpAddr = 6u8,
DateTime = 7u8,
U128 = 8u8,
}

impl fmt::Display for ColumnType {
Expand All @@ -33,6 +34,7 @@ impl fmt::Display for ColumnType {
ColumnType::Bool => "bool",
ColumnType::IpAddr => "ip",
ColumnType::DateTime => "datetime",
ColumnType::U128 => "u128",
};
write!(f, "{short_str}")
}
Expand Down Expand Up @@ -83,7 +85,8 @@ impl ColumnType {
| ColumnType::Str
| ColumnType::Bool
| ColumnType::IpAddr
| ColumnType::DateTime => None,
| ColumnType::DateTime
| ColumnType::U128 => None,
}
}
}
Expand Down
35 changes: 34 additions & 1 deletion columnar/src/columnar/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) enum ColumnTypeCategory {
Bool,
IpAddr,
DateTime,
U128,
}

impl From<ColumnType> for ColumnTypeCategory {
Expand All @@ -51,6 +52,7 @@ impl From<ColumnType> for ColumnTypeCategory {
ColumnType::Bool => ColumnTypeCategory::Bool,
ColumnType::IpAddr => ColumnTypeCategory::IpAddr,
ColumnType::DateTime => ColumnTypeCategory::DateTime,
ColumnType::U128 => ColumnTypeCategory::U128,
}
}
}
Expand Down Expand Up @@ -123,7 +125,10 @@ fn dynamic_column_to_u64_monotonic(dynamic_column: DynamicColumn) -> Option<Colu
DynamicColumn::U64(column) => Some(column.to_u64_monotonic()),
DynamicColumn::F64(column) => Some(column.to_u64_monotonic()),
DynamicColumn::DateTime(column) => Some(column.to_u64_monotonic()),
DynamicColumn::IpAddr(_) | DynamicColumn::Bytes(_) | DynamicColumn::Str(_) => None,
DynamicColumn::IpAddr(_)
| DynamicColumn::Bytes(_)
| DynamicColumn::Str(_)
| DynamicColumn::U128(_) => None,
}
}

Expand Down Expand Up @@ -194,6 +199,33 @@ fn merge_column(

serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::U128 => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut column_values: Vec<Option<Arc<dyn ColumnValues<u128>>>> =
Vec::with_capacity(columns_to_merge.len());
for (i, dynamic_column_opt) in columns_to_merge.into_iter().enumerate() {
if let Some(DynamicColumn::U128(Column { index: idx, values })) = dynamic_column_opt
{
column_indexes.push(idx);
column_values.push(Some(values));
} else {
column_indexes.push(ColumnIndex::Empty {
num_docs: num_docs_per_column[i],
});
column_values.push(None);
}
}

let merged_column_index =
crate::column_index::merge_column_index(&column_indexes[..], merge_row_order);
let merge_column_values = MergedColumnValues {
column_indexes: &column_indexes[..],
column_values: &column_values,
merge_row_order,
};

serialize_column_mappable_to_u128(merged_column_index, &merge_column_values, wrt)?;
}
ColumnType::Bytes | ColumnType::Str => {
let mut column_indexes: Vec<ColumnIndex> = Vec::with_capacity(columns_to_merge.len());
let mut bytes_columns: Vec<Option<BytesColumn>> =
Expand Down Expand Up @@ -467,6 +499,7 @@ fn min_max_if_numerical(column: &DynamicColumn) -> Option<(NumericalValue, Numer
DynamicColumn::F64(column) => Some((column.min_value().into(), column.max_value().into())),
DynamicColumn::Bool(_)
| DynamicColumn::IpAddr(_)
| DynamicColumn::U128(_)
| DynamicColumn::DateTime(_)
| DynamicColumn::Bytes(_)
| DynamicColumn::Str(_) => None,
Expand Down
12 changes: 12 additions & 0 deletions columnar/src/columnar/writer/column_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ impl SymbolValue for Ipv6Addr {
}
}

impl SymbolValue for u128 {
fn serialize(self, buffer: &mut [u8]) -> u8 {
buffer[0..16].copy_from_slice(&self.to_be_bytes());
16
}

fn deserialize(bytes: &[u8]) -> Self {
let octets: [u8; 16] = bytes[0..16].try_into().unwrap();
u128::from_be_bytes(octets)
}
}

#[derive(Default)]
struct MiniBuffer {
pub bytes: [u8; 17],
Expand Down
56 changes: 56 additions & 0 deletions columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct SpareBuffers {
value_index_builders: PreallocatedIndexBuilders,
u64_values: Vec<u64>,
ip_addr_values: Vec<Ipv6Addr>,
u128_values: Vec<u128>,
}

/// Makes it possible to create a new columnar.
Expand All @@ -52,6 +53,7 @@ pub struct ColumnarWriter {
datetime_field_hash_map: ArenaHashMap,
bool_field_hash_map: ArenaHashMap,
ip_addr_field_hash_map: ArenaHashMap,
u128_field_hash_map: ArenaHashMap,
bytes_field_hash_map: ArenaHashMap,
str_field_hash_map: ArenaHashMap,
arena: MemoryArena,
Expand Down Expand Up @@ -145,6 +147,10 @@ impl ColumnarWriter {
column_name.as_bytes(),
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
),
ColumnType::U128 => self.u128_field_hash_map.mutate_or_create(
column_name.as_bytes(),
|column_opt: Option<ColumnWriter>| column_opt.unwrap_or_default(),
),
}
}

Expand Down Expand Up @@ -177,6 +183,18 @@ impl ColumnarWriter {
);
}

pub fn record_u128(&mut self, doc: RowId, column_name: &str, u128: u128) {
let (hash_map, arena) = (&mut self.u128_field_hash_map, &mut self.arena);
hash_map.mutate_or_create(
column_name.as_bytes(),
|column_opt: Option<ColumnWriter>| {
let mut column: ColumnWriter = column_opt.unwrap_or_default();
column.record(doc, u128, arena);
column
},
);
}

pub fn record_bool(&mut self, doc: RowId, column_name: &str, val: bool) {
let (hash_map, arena) = (&mut self.bool_field_hash_map, &mut self.arena);
hash_map.mutate_or_create(
Expand Down Expand Up @@ -323,6 +341,20 @@ impl ColumnarWriter {
)?;
column_serializer.finalize()?;
}
ColumnType::U128 => {
let column_writer: ColumnWriter = self.u128_field_hash_map.read(addr);
let cardinality = column_writer.get_cardinality(num_docs);
let mut column_serializer =
serializer.start_serialize_column(column_name, ColumnType::U128);
serialize_u128_column(
cardinality,
num_docs,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
column_serializer.finalize()?;
}
ColumnType::Bytes | ColumnType::Str => {
let str_or_bytes_column_writer: StrOrBytesColumnWriter =
if column_type == ColumnType::Bytes {
Expand Down Expand Up @@ -536,6 +568,30 @@ fn serialize_ip_addr_column(
Ok(())
}

fn serialize_u128_column(
cardinality: Cardinality,
num_docs: RowId,
column_operations_it: impl Iterator<Item = ColumnOperation<u128>>,
buffers: &mut SpareBuffers,
wrt: &mut impl io::Write,
) -> io::Result<()> {
let SpareBuffers {
value_index_builders,
u128_values,
..
} = buffers;
send_to_serialize_column_mappable_to_u128(
column_operations_it,
cardinality,
num_docs,
value_index_builders,
u128_values,
wrt,
)?;
Ok(())
}


fn send_to_serialize_column_mappable_to_u128<
T: Copy + Ord + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU128 + PartialOrd,
>(
Expand Down
16 changes: 16 additions & 0 deletions columnar/src/dynamic_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub enum DynamicColumn {
I64(Column<i64>),
U64(Column<u64>),
F64(Column<f64>),
U128(Column<u128>),
IpAddr(Column<Ipv6Addr>),
DateTime(Column<DateTime>),
Bytes(BytesColumn),
Expand All @@ -30,6 +31,7 @@ impl fmt::Debug for DynamicColumn {
DynamicColumn::I64(col) => write!(f, " {col:?}")?,
DynamicColumn::U64(col) => write!(f, " {col:?}")?,
DynamicColumn::F64(col) => write!(f, "{col:?}")?,
DynamicColumn::U128(col) => write!(f, "{col:?}")?,
DynamicColumn::IpAddr(col) => write!(f, "{col:?}")?,
DynamicColumn::DateTime(col) => write!(f, "{col:?}")?,
DynamicColumn::Bytes(col) => write!(f, "{col:?}")?,
Expand All @@ -46,6 +48,7 @@ impl DynamicColumn {
DynamicColumn::I64(c) => &c.index,
DynamicColumn::U64(c) => &c.index,
DynamicColumn::F64(c) => &c.index,
DynamicColumn::U128(c) => &c.index,
DynamicColumn::IpAddr(c) => &c.index,
DynamicColumn::DateTime(c) => &c.index,
DynamicColumn::Bytes(c) => &c.ords().index,
Expand All @@ -63,6 +66,7 @@ impl DynamicColumn {
DynamicColumn::I64(c) => c.values.num_vals(),
DynamicColumn::U64(c) => c.values.num_vals(),
DynamicColumn::F64(c) => c.values.num_vals(),
DynamicColumn::U128(c) => c.values.num_vals(),
DynamicColumn::IpAddr(c) => c.values.num_vals(),
DynamicColumn::DateTime(c) => c.values.num_vals(),
DynamicColumn::Bytes(c) => c.ords().values.num_vals(),
Expand All @@ -76,6 +80,7 @@ impl DynamicColumn {
DynamicColumn::I64(_) => ColumnType::I64,
DynamicColumn::U64(_) => ColumnType::U64,
DynamicColumn::F64(_) => ColumnType::F64,
DynamicColumn::U128(_) => ColumnType::U128,
DynamicColumn::IpAddr(_) => ColumnType::IpAddr,
DynamicColumn::DateTime(_) => ColumnType::DateTime,
DynamicColumn::Bytes(_) => ColumnType::Bytes,
Expand Down Expand Up @@ -227,6 +232,7 @@ static_dynamic_conversions!(Column<DateTime>, DateTime);
static_dynamic_conversions!(StrColumn, Str);
static_dynamic_conversions!(BytesColumn, Bytes);
static_dynamic_conversions!(Column<Ipv6Addr>, IpAddr);
static_dynamic_conversions!(Column<u128>, U128);

#[derive(Clone, Debug)]
pub struct DynamicColumnHandle {
Expand Down Expand Up @@ -272,6 +278,13 @@ impl DynamicColumnHandle {
)?;
Ok(Some(column))
}
ColumnType::U128 => {
let column = crate::column::open_column_u128_as_compact_u64(
column_bytes,
self.format_version,
)?;
Ok(Some(column))
}
ColumnType::Bool
| ColumnType::I64
| ColumnType::U64
Expand Down Expand Up @@ -301,6 +314,9 @@ impl DynamicColumnHandle {
ColumnType::F64 => {
crate::column::open_column_u64::<f64>(column_bytes, self.format_version)?.into()
}
ColumnType::U128 => {
crate::column::open_column_u128::<u128>(column_bytes, self.format_version)?.into()
}
ColumnType::Bool => {
crate::column::open_column_u64::<bool>(column_bytes, self.format_version)?.into()
}
Expand Down
17 changes: 17 additions & 0 deletions columnar/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ enum ColumnValue {
Bytes(&'static [u8]),
Numerical(NumericalValue),
IpAddr(Ipv6Addr),
U128(u128),
Bool(bool),
DateTime(DateTime),
}
Expand All @@ -269,6 +270,7 @@ impl ColumnValue {
ColumnValue::Bytes(_) => ColumnTypeCategory::Bytes,
ColumnValue::Numerical(_) => ColumnTypeCategory::Numerical,
ColumnValue::IpAddr(_) => ColumnTypeCategory::IpAddr,
ColumnValue::U128(_) => ColumnTypeCategory::U128,
ColumnValue::Bool(_) => ColumnTypeCategory::Bool,
ColumnValue::DateTime(_) => ColumnTypeCategory::DateTime,
}
Expand Down Expand Up @@ -303,6 +305,7 @@ fn column_value_strategy() -> impl Strategy<Value = ColumnValue> {
0,
ip_addr_byte
))),
1 => any::<u128>().prop_map(|val| ColumnValue::U128(val)),
1 => any::<bool>().prop_map(ColumnValue::Bool),
1 => (679_723_993i64..1_679_723_995i64)
.prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) })
Expand Down Expand Up @@ -353,6 +356,9 @@ fn build_columnar_with_mapping(docs: &[Vec<(&'static str, ColumnValue)>]) -> Col
ColumnValue::IpAddr(ip_addr) => {
columnar_writer.record_ip_addr(doc_id as u32, column_name, ip_addr);
}
ColumnValue::U128(u128) => {
columnar_writer.record_u128(doc_id as u32, column_name, u128);
}
ColumnValue::Bool(bool_val) => {
columnar_writer.record_bool(doc_id as u32, column_name, bool_val);
}
Expand Down Expand Up @@ -506,6 +512,15 @@ impl AssertEqualToColumnValue for Ipv6Addr {
}
}

impl AssertEqualToColumnValue for u128 {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::U128(val) = column_value else {
panic!()
};
assert_eq!(self, val);
}
}

impl<T: Coerce + PartialEq + Debug + Into<NumericalValue>> AssertEqualToColumnValue for T {
fn assert_equal_to_column_value(&self, column_value: &ColumnValue) {
let ColumnValue::Numerical(num) = column_value else {
Expand Down Expand Up @@ -617,6 +632,8 @@ proptest! {
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U128(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
Expand Down
7 changes: 7 additions & 0 deletions src/aggregation/metric/top_hits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ impl TopHitsAggregationReq {
.values_for_doc(doc_id)
.map(FastFieldValue::IpAddr)
.collect::<Vec<_>>(),
DynamicColumn::U128(accessor) => accessor
.values_for_doc(doc_id)
.map(FastFieldValue::U128)
.collect::<Vec<_>>(),
DynamicColumn::DateTime(accessor) => accessor
.values_for_doc(doc_id)
.map(FastFieldValue::Date)
Expand Down Expand Up @@ -334,6 +338,8 @@ pub enum FastFieldValue {
IpAddr(Ipv6Addr),
/// A list of values.
Array(Vec<Self>),
/// U128
U128(u128),
}

impl From<FastFieldValue> for OwnedValue {
Expand All @@ -350,6 +356,7 @@ impl From<FastFieldValue> for OwnedValue {
FastFieldValue::Array(a) => {
OwnedValue::Array(a.into_iter().map(OwnedValue::from).collect())
}
FastFieldValue::U128(u128) => OwnedValue::U128(u128),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/core/json_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ pub(crate) fn index_json_value<'a, V: Value<'a>>(
ReferenceValueLeaf::IpAddr(_) => {
unimplemented!("IP address support in dynamic fields is not yet implemented")
}
ReferenceValueLeaf::U128(_) => {
unimplemented!("U128 support in dynamic fields is not yet implemented")
}
},
ReferenceValue::Array(elements) => {
for val in elements {
Expand Down
Loading