Skip to content

Commit 17c7d3f

Browse files
authored
Timeout enabling for HNSW (#163)
* enabling timeout on hnsw * added timeout support for hnsw batch iterator * added tests * checks at least once every graph scan call * improve test * added comment * moved candidate iteration to a function * NULLing VecSimQueryResult_List after freeing * improved tests * review fixes * added timeout check to searchBottomLayerEP * test improvement
1 parent 3ce2aca commit 17c7d3f

File tree

7 files changed

+289
-69
lines changed

7 files changed

+289
-69
lines changed

src/VecSim/algorithms/hnsw/hnsw_batch_iterator.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ VecSimQueryResult_List HNSW_BatchIterator::prepareResults(candidatesMaxHeap top_
3333
VecSimQueryResult_SetScore(rl.results[i], top_candidates.top().first);
3434
top_candidates.pop();
3535
}
36-
rl.code = VecSim_QueryResult_OK;
3736
return rl;
3837
}
3938

4039
candidatesMaxHeap HNSW_BatchIterator::scanGraph(candidatesMinHeap &candidates,
4140
candidatesMinHeap &top_candidates_extras,
42-
float &lower_bound, idType entry_point) {
41+
float &lower_bound, idType entry_point,
42+
VecSimQueryResult_Code *rc) {
4343

4444
candidatesMaxHeap top_candidates(this->allocator);
4545
if (entry_point == HNSW_INVALID_ID) {
@@ -58,6 +58,11 @@ candidatesMaxHeap HNSW_BatchIterator::scanGraph(candidatesMinHeap &candidates,
5858
this->visitNode(entry_point);
5959
candidates.emplace(dist, entry_point);
6060
}
61+
// Checks that we didn't got timeout between iterations.
62+
if (__builtin_expect(VecSimIndex::timeoutCallback(this->getTimeoutCtx()), 0)) {
63+
*rc = VecSim_QueryResult_TimedOut;
64+
return top_candidates;
65+
}
6166

6267
// Move extras from previous iteration to the top candidates.
6368
while (top_candidates.size() < this->hnsw_index->getEf() && !top_candidates_extras.empty()) {
@@ -77,6 +82,10 @@ candidatesMaxHeap HNSW_BatchIterator::scanGraph(candidatesMinHeap &candidates,
7782
if (curr_node_dist > lower_bound && top_candidates.size() >= this->hnsw_index->getEf()) {
7883
break;
7984
}
85+
if (__builtin_expect(VecSimIndex::timeoutCallback(this->getTimeoutCtx()), 0)) {
86+
*rc = VecSim_QueryResult_TimedOut;
87+
return top_candidates;
88+
}
8089
if (top_candidates.size() < this->hnsw_index->getEf() || lower_bound > curr_node_dist) {
8190
top_candidates.emplace(curr_node_dist, curr_node_id);
8291
if (top_candidates.size() > this->hnsw_index->getEf()) {
@@ -152,6 +161,7 @@ HNSW_BatchIterator::HNSW_BatchIterator(void *query_vector, HNSWIndex *index_wrap
152161
VecSimQueryResult_List HNSW_BatchIterator::getNextResults(size_t n_res,
153162
VecSimQueryResult_Order order) {
154163

164+
VecSimQueryResult_List batch = {0};
155165
// If ef_runtime lower than the number of results to return, increase it. Therefore, we assume
156166
// that the number of results that return from the graph scan is at least n_res (if exist).
157167
size_t orig_ef = this->hnsw_index->getEf();
@@ -162,26 +172,33 @@ VecSimQueryResult_List HNSW_BatchIterator::getNextResults(size_t n_res,
162172
// In the first iteration, we search the graph from top bottom to find the initial entry point,
163173
// and then we scan the graph to get results (layer 0).
164174
if (this->getResultsCount() == 0) {
165-
idType bottom_layer_ep = this->hnsw_index->searchBottomLayerEP(this->getQueryBlob());
175+
idType bottom_layer_ep = this->hnsw_index->searchBottomLayerEP(
176+
this->getQueryBlob(), this->getTimeoutCtx(), &batch.code);
177+
if (VecSim_OK != batch.code) {
178+
return batch;
179+
}
166180
this->entry_point = bottom_layer_ep;
167181
}
168182
// We ask for at least n_res candidate from the scan. In fact, at most ef results will return,
169183
// and it could be that ef > n_res.
170184
auto top_candidates = this->scanGraph(this->candidates, this->top_candidates_extras,
171-
this->lower_bound, this->entry_point);
185+
this->lower_bound, this->entry_point, &batch.code);
186+
if (VecSim_OK != batch.code) {
187+
return batch;
188+
}
172189
// Move the spare results to the "extras" queue if needed, and create the batch results array.
173-
auto batch_results = this->prepareResults(top_candidates, n_res);
190+
batch = this->prepareResults(top_candidates, n_res);
174191

175-
this->updateResultsCount(VecSimQueryResult_Len(batch_results));
192+
this->updateResultsCount(VecSimQueryResult_Len(batch));
176193
if (this->getResultsCount() == this->index_wrapper->indexSize()) {
177194
this->depleted = true;
178195
}
179196
// By default, results are ordered by score.
180197
if (order == BY_ID) {
181-
sort_results_by_id(batch_results);
198+
sort_results_by_id(batch);
182199
}
183200
this->hnsw_index->setEf(orig_ef);
184-
return batch_results;
201+
return batch;
185202
}
186203

187204
bool HNSW_BatchIterator::isDepleted() {

src/VecSim/algorithms/hnsw/hnsw_batch_iterator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class HNSW_BatchIterator : public VecSimBatchIterator {
2929

3030
candidatesMaxHeap scanGraph(candidatesMinHeap &candidates,
3131
candidatesMinHeap &spare_top_candidates, float &lower_bound,
32-
idType entry_point);
32+
idType entry_point, VecSimQueryResult_Code *rc);
3333
VecSimQueryResult_List
3434
prepareResults(vecsim_stl::max_priority_queue<pair<float, idType>> top_candidates,
3535
size_t n_res);

src/VecSim/algorithms/hnsw/hnsw_wrapper.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void HNSWIndex::setEf(size_t ef) { this->hnsw->setEf(ef); }
9595
VecSimQueryResult_List HNSWIndex::topKQuery(const void *query_data, size_t k,
9696
VecSimQueryParams *queryParams) {
9797
VecSimQueryResult_List rl = {0};
98+
void *timeoutCtx = nullptr;
9899
try {
99100
this->last_mode = STANDARD_KNN;
100101
float normalized_data[this->dim]; // This will be use only if metric == VecSimMetric_Cosine
@@ -108,11 +109,15 @@ VecSimQueryResult_List HNSWIndex::topKQuery(const void *query_data, size_t k,
108109
size_t originalEF = this->hnsw->getEf();
109110

110111
if (queryParams) {
112+
timeoutCtx = queryParams->timeoutCtx;
111113
if (queryParams->hnswRuntimeParams.efRuntime != 0) {
112114
hnsw->setEf(queryParams->hnswRuntimeParams.efRuntime);
113115
}
114116
}
115-
auto knn_res = hnsw->searchKnn(query_data, k);
117+
auto knn_res = hnsw->searchKnn(query_data, k, timeoutCtx, &rl.code);
118+
if (VecSim_OK != rl.code) {
119+
return rl;
120+
}
116121
rl.results = array_new_len<VecSimQueryResult>(knn_res.size(), knn_res.size());
117122
for (int i = (int)knn_res.size() - 1; i >= 0; --i) {
118123
VecSimQueryResult_SetId(rl.results[i], knn_res.top().second);
@@ -123,7 +128,6 @@ VecSimQueryResult_List HNSWIndex::topKQuery(const void *query_data, size_t k,
123128
hnsw->setEf(originalEF);
124129
assert(hnsw->getEf() == originalEF);
125130

126-
rl.code = VecSim_QueryResult_OK;
127131
} catch (...) {
128132
rl.code = VecSim_QueryResult_Err;
129133
}

src/VecSim/algorithms/hnsw/hnswlib.h

Lines changed: 137 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ typedef unsigned int linklistsizeint;
3535

3636
template <typename dist_t>
3737
using candidatesMaxHeap = vecsim_stl::max_priority_queue<pair<dist_t, tableint>>;
38+
template <typename dist_t>
39+
using candidatesLabelsMaxHeap = vecsim_stl::max_priority_queue<pair<dist_t, labeltype>>;
3840

3941
template <typename dist_t>
4042
class HierarchicalNSW : public VecsimBaseObject {
@@ -110,8 +112,17 @@ class HierarchicalNSW : public VecsimBaseObject {
110112
size_t Mcurmax, tableint *node_neighbors,
111113
const vecsim_stl::set<tableint> &orig_neighbors, tableint *removed_links,
112114
size_t *removed_links_num);
115+
inline dist_t processCandidate(tableint curNodeId, const void *data_point, size_t layer,
116+
size_t ef, tag_t visited_tag,
117+
candidatesMaxHeap<dist_t> &top_candidates,
118+
candidatesMaxHeap<dist_t> &candidates_set,
119+
dist_t lowerBound) const;
113120
candidatesMaxHeap<dist_t> searchLayer(tableint ep_id, const void *data_point, size_t layer,
114121
size_t ef) const;
122+
candidatesLabelsMaxHeap<dist_t> searchBottomLayer_WithTimeout(tableint ep_id,
123+
const void *data_point, size_t ef,
124+
size_t k, void *timeoutCtx,
125+
VecSimQueryResult_Code *rc) const;
115126
void getNeighborsByHeuristic2(candidatesMaxHeap<dist_t> &top_candidates, size_t M);
116127
tableint mutuallyConnectNewElement(tableint cur_c, candidatesMaxHeap<dist_t> &top_candidates,
117128
size_t level);
@@ -144,9 +155,10 @@ class HierarchicalNSW : public VecsimBaseObject {
144155
bool removePoint(labeltype label);
145156
void addPoint(const void *data_point, labeltype label);
146157
dist_t getDistanceByLabelFromPoint(labeltype label, const void *data_point);
147-
tableint searchBottomLayerEP(const void *query_data) const;
148-
vecsim_stl::max_priority_queue<pair<dist_t, labeltype>> searchKnn(const void *query_data,
149-
size_t k) const;
158+
tableint searchBottomLayerEP(const void *query_data, void *timeoutCtx,
159+
VecSimQueryResult_Code *rc) const;
160+
vecsim_stl::max_priority_queue<pair<dist_t, labeltype>>
161+
searchKnn(const void *query_data, size_t k, void *timeoutCtx, VecSimQueryResult_Code *rc) const;
150162
};
151163

152164
/**
@@ -337,6 +349,57 @@ void HierarchicalNSW<dist_t>::removeExtraLinks(linklistsizeint *node_ll,
337349
*removed_links_num = removed_idx;
338350
}
339351

352+
template <typename dist_t>
353+
dist_t HierarchicalNSW<dist_t>::processCandidate(tableint curNodeId, const void *data_point,
354+
size_t layer, size_t ef, tag_t visited_tag,
355+
candidatesMaxHeap<dist_t> &top_candidates,
356+
candidatesMaxHeap<dist_t> &candidate_set,
357+
dist_t lowerBound) const {
358+
359+
#ifdef ENABLE_PARALLELIZATION
360+
std::unique_lock<std::mutex> lock(link_list_locks_[curNodeId]);
361+
#endif
362+
linklistsizeint *node_ll = get_linklist_at_level(curNodeId, layer);
363+
size_t links_num = getListCount(node_ll);
364+
auto *node_links = (tableint *)(node_ll + 1);
365+
#ifdef USE_SSE
366+
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_ll + 1)), _MM_HINT_T0);
367+
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_ll + 1) + 64),
368+
_MM_HINT_T0);
369+
_mm_prefetch(getDataByInternalId(*node_links), _MM_HINT_T0);
370+
_mm_prefetch(getDataByInternalId(*(node_links + 1)), _MM_HINT_T0);
371+
#endif
372+
373+
for (size_t j = 0; j < links_num; j++) {
374+
tableint candidate_id = *(node_links + j);
375+
#ifdef USE_SSE
376+
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_links + j + 1)),
377+
_MM_HINT_T0);
378+
_mm_prefetch(getDataByInternalId(*(node_links + j + 1)), _MM_HINT_T0);
379+
#endif
380+
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
381+
continue;
382+
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
383+
char *currObj1 = (getDataByInternalId(candidate_id));
384+
385+
dist_t dist1 = fstdistfunc_(data_point, currObj1, dist_func_param_);
386+
if (top_candidates.size() < ef || lowerBound > dist1) {
387+
candidate_set.emplace(-dist1, candidate_id);
388+
#ifdef USE_SSE
389+
_mm_prefetch(getDataByInternalId(candidate_set.top().second), _MM_HINT_T0);
390+
#endif
391+
top_candidates.emplace(dist1, candidate_id);
392+
393+
if (top_candidates.size() > ef)
394+
top_candidates.pop();
395+
396+
if (!top_candidates.empty())
397+
lowerBound = top_candidates.top().first;
398+
}
399+
}
400+
return lowerBound;
401+
}
402+
340403
template <typename dist_t>
341404
candidatesMaxHeap<dist_t> HierarchicalNSW<dist_t>::searchLayer(tableint ep_id,
342405
const void *data_point, size_t layer,
@@ -366,50 +429,10 @@ candidatesMaxHeap<dist_t> HierarchicalNSW<dist_t>::searchLayer(tableint ep_id,
366429
}
367430
candidate_set.pop();
368431

369-
tableint curNodeNum = curr_el_pair.second;
370-
#ifdef ENABLE_PARALLELIZATION
371-
std::unique_lock<std::mutex> lock(link_list_locks_[curNodeNum]);
372-
#endif
373-
linklistsizeint *node_ll = get_linklist_at_level(curNodeNum, layer);
374-
size_t links_num = getListCount(node_ll);
375-
auto *node_links = (tableint *)(node_ll + 1);
376-
#ifdef USE_SSE
377-
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_ll + 1)),
378-
_MM_HINT_T0);
379-
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_ll + 1) + 64),
380-
_MM_HINT_T0);
381-
_mm_prefetch(getDataByInternalId(*node_links), _MM_HINT_T0);
382-
_mm_prefetch(getDataByInternalId(*(node_links + 1)), _MM_HINT_T0);
383-
#endif
384-
385-
for (size_t j = 0; j < links_num; j++) {
386-
tableint candidate_id = *(node_links + j);
387-
#ifdef USE_SSE
388-
_mm_prefetch((char *)(visited_nodes_handler->getElementsTags() + *(node_links + j + 1)),
389-
_MM_HINT_T0);
390-
_mm_prefetch(getDataByInternalId(*(node_links + j + 1)), _MM_HINT_T0);
391-
#endif
392-
if (this->visited_nodes_handler->getNodeTag(candidate_id) == visited_tag)
393-
continue;
394-
this->visited_nodes_handler->tagNode(candidate_id, visited_tag);
395-
char *currObj1 = (getDataByInternalId(candidate_id));
396-
397-
dist_t dist1 = fstdistfunc_(data_point, currObj1, dist_func_param_);
398-
if (top_candidates.size() < ef || lowerBound > dist1) {
399-
candidate_set.emplace(-dist1, candidate_id);
400-
#ifdef USE_SSE
401-
_mm_prefetch(getDataByInternalId(candidate_set.top().second), _MM_HINT_T0);
402-
#endif
403-
top_candidates.emplace(dist1, candidate_id);
404-
405-
if (top_candidates.size() > ef)
406-
top_candidates.pop();
407-
408-
if (!top_candidates.empty())
409-
lowerBound = top_candidates.top().first;
410-
}
411-
}
432+
lowerBound = processCandidate(curr_el_pair.second, data_point, layer, ef, visited_tag,
433+
top_candidates, candidate_set, lowerBound);
412434
}
435+
413436
#ifdef ENABLE_PARALLELIZATION
414437
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
415438
#endif
@@ -1034,7 +1057,8 @@ void HierarchicalNSW<dist_t>::addPoint(const void *data_point, const labeltype l
10341057
}
10351058

10361059
template <typename dist_t>
1037-
tableint HierarchicalNSW<dist_t>::searchBottomLayerEP(const void *query_data) const {
1060+
tableint HierarchicalNSW<dist_t>::searchBottomLayerEP(const void *query_data, void *timeoutCtx,
1061+
VecSimQueryResult_Code *rc) const {
10381062

10391063
if (cur_element_count == 0) {
10401064
return entrypoint_node_;
@@ -1045,6 +1069,10 @@ tableint HierarchicalNSW<dist_t>::searchBottomLayerEP(const void *query_data) co
10451069
for (size_t level = maxlevel_; level > 0; level--) {
10461070
bool changed = true;
10471071
while (changed) {
1072+
if (__builtin_expect(VecSimIndex::timeoutCallback(timeoutCtx), 0)) {
1073+
*rc = VecSim_QueryResult_TimedOut;
1074+
return HNSW_INVALID_ID;
1075+
}
10481076
changed = false;
10491077
linklistsizeint *node_ll = get_linklist(currObj, level);
10501078
unsigned short links_count = getListCount(node_ll);
@@ -1064,30 +1092,81 @@ tableint HierarchicalNSW<dist_t>::searchBottomLayerEP(const void *query_data) co
10641092
}
10651093
}
10661094
}
1095+
*rc = VecSim_QueryResult_OK;
10671096
return currObj;
10681097
}
10691098

10701099
template <typename dist_t>
1071-
vecsim_stl::max_priority_queue<pair<dist_t, labeltype>>
1072-
HierarchicalNSW<dist_t>::searchKnn(const void *query_data, size_t k) const {
1100+
candidatesLabelsMaxHeap<dist_t>
1101+
HierarchicalNSW<dist_t>::searchBottomLayer_WithTimeout(tableint ep_id, const void *data_point,
1102+
size_t ef, size_t k, void *timeoutCtx,
1103+
VecSimQueryResult_Code *rc) const {
1104+
candidatesLabelsMaxHeap<dist_t> results(this->allocator);
1105+
1106+
#ifdef ENABLE_PARALLELIZATION
1107+
this->visited_nodes_handler =
1108+
this->visited_nodes_handler_pool->getAvailableVisitedNodesHandler();
1109+
#endif
10731110

1074-
vecsim_stl::max_priority_queue<std::pair<dist_t, labeltype>> result(this->allocator);
1075-
if (cur_element_count == 0)
1076-
return result;
1111+
tag_t visited_tag = this->visited_nodes_handler->getFreshTag();
10771112

1078-
tableint bottom_layer_ep = searchBottomLayerEP(query_data);
1079-
vecsim_stl::max_priority_queue<pair<dist_t, tableint>> top_candidates =
1080-
searchLayer(bottom_layer_ep, query_data, 0, std::max(ef_, k));
1113+
candidatesMaxHeap<dist_t> top_candidates(this->allocator);
1114+
candidatesMaxHeap<dist_t> candidate_set(this->allocator);
1115+
1116+
dist_t dist = fstdistfunc_(data_point, getDataByInternalId(ep_id), dist_func_param_);
1117+
dist_t lowerBound = dist;
1118+
top_candidates.emplace(dist, ep_id);
1119+
candidate_set.emplace(-dist, ep_id);
1120+
1121+
this->visited_nodes_handler->tagNode(ep_id, visited_tag);
1122+
1123+
while (!candidate_set.empty()) {
1124+
std::pair<dist_t, tableint> curr_el_pair = candidate_set.top();
1125+
if ((-curr_el_pair.first) > lowerBound) {
1126+
break;
1127+
}
1128+
if (__builtin_expect(VecSimIndex::timeoutCallback(timeoutCtx), 0)) {
1129+
*rc = VecSim_QueryResult_TimedOut;
1130+
return results;
1131+
}
1132+
candidate_set.pop();
10811133

1134+
lowerBound = processCandidate(curr_el_pair.second, data_point, 0, ef, visited_tag,
1135+
top_candidates, candidate_set, lowerBound);
1136+
}
1137+
#ifdef ENABLE_PARALLELIZATION
1138+
visited_nodes_handler_pool->returnVisitedNodesHandlerToPool(this->visited_nodes_handler);
1139+
#endif
10821140
while (top_candidates.size() > k) {
10831141
top_candidates.pop();
10841142
}
10851143
while (top_candidates.size() > 0) {
1086-
std::pair<dist_t, tableint> rez = top_candidates.top();
1087-
result.push(std::pair<dist_t, labeltype>(rez.first, getExternalLabel(rez.second)));
1144+
auto &res = top_candidates.top();
1145+
results.emplace(res.first, getExternalLabel(res.second));
10881146
top_candidates.pop();
10891147
}
1090-
return result;
1148+
*rc = VecSim_QueryResult_OK;
1149+
return results;
1150+
}
1151+
1152+
template <typename dist_t>
1153+
candidatesLabelsMaxHeap<dist_t>
1154+
HierarchicalNSW<dist_t>::searchKnn(const void *query_data, size_t k, void *timeoutCtx,
1155+
VecSimQueryResult_Code *rc) const {
1156+
1157+
if (cur_element_count == 0) {
1158+
*rc = VecSim_QueryResult_OK;
1159+
return candidatesLabelsMaxHeap<dist_t>(this->allocator);
1160+
}
1161+
1162+
tableint bottom_layer_ep = searchBottomLayerEP(query_data, timeoutCtx, rc);
1163+
if (VecSim_OK != *rc) {
1164+
return candidatesLabelsMaxHeap<dist_t>(this->allocator);
1165+
}
1166+
candidatesLabelsMaxHeap<dist_t> results = searchBottomLayer_WithTimeout(
1167+
bottom_layer_ep, query_data, std::max(ef_, k), k, timeoutCtx, rc);
1168+
1169+
return results;
10911170
}
10921171

10931172
} // namespace hnswlib

0 commit comments

Comments
 (0)