Skip to content

Commit c301e7b

Browse files
Merge pull request #2673 from paradedb/stuhood.fix-order-by-dup-string
Fix `TopDocs::order_by_string_fast_field` for duplicates
2 parents 811c68c + d9eb093 commit c301e7b

File tree

2 files changed

+104
-18
lines changed

2 files changed

+104
-18
lines changed

src/collector/top_score_collector.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,6 +1529,72 @@ mod tests {
15291529
Ok(())
15301530
}
15311531

1532+
proptest! {
1533+
#[test]
1534+
fn test_top_field_collect_string_prop(
1535+
order in prop_oneof!(Just(Order::Desc), Just(Order::Asc)),
1536+
limit in 1..256_usize,
1537+
offset in 0..256_usize,
1538+
segments_terms in
1539+
proptest::collection::vec(
1540+
proptest::collection::vec(0..32_u8, 1..32_usize),
1541+
0..8_usize,
1542+
)
1543+
) {
1544+
let mut schema_builder = Schema::builder();
1545+
let city = schema_builder.add_text_field("city", TEXT | FAST);
1546+
let schema = schema_builder.build();
1547+
let index = Index::create_in_ram(schema);
1548+
let mut index_writer = index.writer_for_tests()?;
1549+
1550+
// A Vec<Vec<u8>>, where the outer Vec represents segments, and the inner Vec
1551+
// represents terms.
1552+
for segment_terms in segments_terms.into_iter() {
1553+
for term in segment_terms.into_iter() {
1554+
let term = format!("{term:0>3}");
1555+
index_writer.add_document(doc!(
1556+
city => term,
1557+
))?;
1558+
}
1559+
index_writer.commit()?;
1560+
}
1561+
1562+
let searcher = index.reader()?.searcher();
1563+
let top_n_results = searcher.search(&AllQuery, &TopDocs::with_limit(limit)
1564+
.and_offset(offset)
1565+
.order_by_string_fast_field("city", order.clone()))?;
1566+
let all_results = searcher.search(&AllQuery, &DocSetCollector)?.into_iter().map(|doc_address| {
1567+
// Get the term for this address.
1568+
// NOTE: We can't determine the SegmentIds that will be generated for Segments
1569+
// ahead of time, so we can't pre-compute the expected `DocAddress`es.
1570+
let column = searcher.segment_readers()[doc_address.segment_ord as usize].fast_fields().str("city").unwrap().unwrap();
1571+
let term_ord = column.term_ords(doc_address.doc_id).next().unwrap();
1572+
let mut city = Vec::new();
1573+
column.dictionary().ord_to_term(term_ord, &mut city).unwrap();
1574+
(String::try_from(city).unwrap(), doc_address)
1575+
});
1576+
1577+
// Using the TopDocs collector should always be equivalent to sorting, skipping the
1578+
// offset, and then taking the limit.
1579+
let sorted_docs: Vec<_> = if order.is_desc() {
1580+
let mut comparable_docs: Vec<ComparableDoc<_, _, true>> =
1581+
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
1582+
comparable_docs.sort();
1583+
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
1584+
} else {
1585+
let mut comparable_docs: Vec<ComparableDoc<_, _, false>> =
1586+
all_results.into_iter().map(|(feature, doc)| ComparableDoc { feature, doc}).collect();
1587+
comparable_docs.sort();
1588+
comparable_docs.into_iter().map(|cd| (cd.feature, cd.doc)).collect()
1589+
};
1590+
let expected_docs = sorted_docs.into_iter().skip(offset).take(limit).collect::<Vec<_>>();
1591+
prop_assert_eq!(
1592+
expected_docs,
1593+
top_n_results
1594+
);
1595+
}
1596+
}
1597+
15321598
#[test]
15331599
#[should_panic]
15341600
fn test_field_does_not_exist() {

sstable/src/dictionary.rs

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -507,38 +507,58 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
507507
/// Returns true if and only if all terms have been found.
508508
pub fn sorted_ords_to_term_cb<F: FnMut(&[u8]) -> io::Result<()>>(
509509
&self,
510-
ord: impl Iterator<Item = TermOrdinal>,
510+
mut ords: impl Iterator<Item = TermOrdinal>,
511511
mut cb: F,
512512
) -> io::Result<bool> {
513+
let Some(mut ord) = ords.next() else {
514+
return Ok(true);
515+
};
516+
517+
// Open the block for the first ordinal.
513518
let mut bytes = Vec::new();
514-
let mut current_block_addr = self.sstable_index.get_block_with_ord(0);
519+
let mut current_block_addr = self.sstable_index.get_block_with_ord(ord);
515520
let mut current_sstable_delta_reader =
516521
self.sstable_delta_reader_block(current_block_addr.clone())?;
517-
let mut current_ordinal = 0;
518-
for ord in ord {
519-
assert!(ord >= current_ordinal);
520-
// check if block changed for new term_ord
521-
let new_block_addr = self.sstable_index.get_block_with_ord(ord);
522-
if new_block_addr != current_block_addr {
523-
current_block_addr = new_block_addr;
524-
current_ordinal = current_block_addr.first_ordinal;
525-
current_sstable_delta_reader =
526-
self.sstable_delta_reader_block(current_block_addr.clone())?;
527-
bytes.clear();
528-
}
522+
let mut current_block_ordinal = current_block_addr.first_ordinal;
529523

530-
// move to ord inside that block
531-
for _ in current_ordinal..=ord {
524+
loop {
525+
// move to the ord inside the current block
526+
while current_block_ordinal <= ord {
532527
if !current_sstable_delta_reader.advance()? {
533528
return Ok(false);
534529
}
535530
bytes.truncate(current_sstable_delta_reader.common_prefix_len());
536531
bytes.extend_from_slice(current_sstable_delta_reader.suffix());
532+
current_block_ordinal += 1;
537533
}
538-
current_ordinal = ord + 1;
539534
cb(&bytes)?;
535+
536+
// fetch the next ordinal
537+
let Some(next_ord) = ords.next() else {
538+
return Ok(true);
539+
};
540+
541+
// advance forward if the new ord is different than the one we just processed
542+
//
543+
// this allows the input TermOrdinal iterator to contain duplicates, so long as it's
544+
// still sorted
545+
if next_ord < ord {
546+
panic!("Ordinals were not sorted: received {next_ord} after {ord}");
547+
} else if next_ord > ord {
548+
// check if block changed for new term_ord
549+
let new_block_addr = self.sstable_index.get_block_with_ord(next_ord);
550+
if new_block_addr != current_block_addr {
551+
current_block_addr = new_block_addr;
552+
current_block_ordinal = current_block_addr.first_ordinal;
553+
current_sstable_delta_reader =
554+
self.sstable_delta_reader_block(current_block_addr.clone())?;
555+
bytes.clear();
556+
}
557+
ord = next_ord;
558+
} else {
559+
// The next ord is equal to the previous ord: no need to seek or advance.
560+
}
540561
}
541-
Ok(true)
542562
}
543563

544564
/// Returns the number of terms in the dictionary.

0 commit comments

Comments
 (0)