Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ valgrind:
#----------------------------------------------------------------------------------------------

flow_test:
$(SHOW)poetry build -vv
$(SHOW)poetry install
$(SHOW)poetry run pytest tests/flow -v -s
$(SHOW)poetry run pytest tests/flow -vv -s

.PHONY: flow_test

Expand Down
23 changes: 23 additions & 0 deletions src/VecSim/algorithms/hnsw/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -1307,19 +1307,33 @@ HNSWIndex<DataType, DistType>::safeCollectAllNodeIncomingNeighbors(idType node_i

template <typename DataType, typename DistType>
void HNSWIndex<DataType, DistType>::resizeIndexInternal(size_t new_max_elements) {
std::cout<< "resize element_levels_" << std::endl;

element_levels_.resize(new_max_elements);
std::cout<< "resize element_levels_" << std::endl;

element_levels_.shrink_to_fit();
std::cout<< "resize LabelLookup" << std::endl;

resizeLabelLookup(new_max_elements);
std::cout<< "resize visited_nodes_handler_pool" << std::endl;

visited_nodes_handler_pool.resize(new_max_elements);
std::cout<< "resize element_neighbors_locks_" << std::endl;

vecsim_stl::vector<std::mutex>(new_max_elements, this->allocator)
.swap(element_neighbors_locks_);
// Reallocate base layer
std::cout<< "resize data_level0_memory from " << (*(((size_t *)data_level0_memory_) - 1))<< " to" << (new_max_elements * size_data_per_element_) <<std::endl;

char *data_level0_memory_new = (char *)this->allocator->reallocate(
data_level0_memory_, new_max_elements * size_data_per_element_);
if (data_level0_memory_new == nullptr)
throw std::runtime_error("Not enough memory: resizeIndex failed to allocate base layer");
data_level0_memory_ = data_level0_memory_new;

std::cout<< "linkLists_new " << (*(((size_t *)linkLists_) - 1))<< " to" << (sizeof(void *) * new_max_elements) <<std::endl;

// Reallocate all other layers
char **linkLists_new =
(char **)this->allocator->reallocate(linkLists_, sizeof(void *) * new_max_elements);
Expand All @@ -1328,6 +1342,8 @@ void HNSWIndex<DataType, DistType>::resizeIndexInternal(size_t new_max_elements)
linkLists_ = linkLists_new;

max_elements_ = new_max_elements;
std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() <<std::endl;

}

template <typename DataType, typename DistType>
Expand Down Expand Up @@ -1640,8 +1656,11 @@ HNSWIndex<DataType, DistType>::~HNSWIndex() {
*/
template <typename DataType, typename DistType>
void HNSWIndex<DataType, DistType>::increaseCapacity() {
std::cout<< "increase capacity current hnsw allocation size = " << this->getAllocationSize() <<std::endl;
size_t vectors_to_add = this->blockSize - max_elements_ % this->blockSize;
resizeIndexInternal(max_elements_ + vectors_to_add);
std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() <<std::endl;

}

template <typename DataType, typename DistType>
Expand Down Expand Up @@ -1707,6 +1726,10 @@ void HNSWIndex<DataType, DistType>::removeAndSwap(idType internalId) {
size_t extra_space_to_free = max_elements_ % this->blockSize;

// Remove one block from the capacity.
std::cout<< "resize down from removeAndSwap interanl id = " << internalId <<std::endl;
std::cout<< "current hnsw allocation size = " << this->getAllocationSize() <<std::endl;
std::cout<< "current hnsw allocation size = " << this->getAllocationSize() <<std::endl;

this->resizeIndexInternal(max_elements_ - this->blockSize - extra_space_to_free);
}
}
Expand Down
67 changes: 54 additions & 13 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ struct HNSWSwapJob : public VecsimBaseObject {
HNSWSwapJob(std::shared_ptr<VecSimAllocator> allocator, idType deletedId)
: VecsimBaseObject(allocator), deleted_id(deletedId), pending_repair_jobs_counter(0) {}
void setRepairJobsNum(long num_repair_jobs) { pending_repair_jobs_counter = num_repair_jobs; }
void atomicDecreasePendingJobsNum() {
pending_repair_jobs_counter--;
int atomicDecreasePendingJobsNum() {
int ret = --pending_repair_jobs_counter;
assert(pending_repair_jobs_counter >= 0);
return ret;
}
};

Expand Down Expand Up @@ -74,6 +75,7 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
// vectors reached this limit, we apply swap jobs *only for vectors that has no more pending
// repair jobs*, and are ready to be removed from the graph.
size_t pendingSwapJobsThreshold;
size_t readySwapJobs;

// Protect the both idToRepairJobs lookup and the pending_repair_jobs_counter for the
// associated swap jobs.
Expand All @@ -85,9 +87,14 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
// To be executed synchronously upon deleting a vector, doesn't require a wrapper. Main HNSW
// lock is assumed to be held exclusive here.
void executeSwapJob(HNSWSwapJob *job, vecsim_stl::vector<idType> &idsToRemove);

#ifdef BUILD_TESTS
public:
#endif
void executeReadySwapJobsIMP();
void executeReadySwapJobs();

#ifdef BUILD_TESTS
private:
#endif
// Wrappers static functions to be sent as callbacks upon creating the jobs (since members
// functions cannot serve as callback, this serve as the "gateway" to the appropriate index).
static void executeInsertJobWrapper(AsyncJob *job);
Expand Down Expand Up @@ -190,6 +197,10 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
inline void setLastSearchMode(VecSearchMode mode) override {
return this->backendIndex->setLastSearchMode(mode);
}

#ifdef BUILD_TESTS
void getDataByLabel(labelType label, std::vector<std::vector<DataType>> &vectors_output) const;
#endif
};

/**
Expand Down Expand Up @@ -224,7 +235,9 @@ void TieredHNSWIndex<DataType, DistType>::executeSwapJob(HNSWSwapJob *job,
for (auto &job_it : idToRepairJobs.at(job->deleted_id)) {
job_it->node_id = INVALID_JOB_ID;
for (auto &swap_job_it : job_it->associatedSwapJobs) {
swap_job_it->atomicDecreasePendingJobsNum();
if (swap_job_it->atomicDecreasePendingJobsNum() == 0) {
readySwapJobs++;
}
}
}
idToRepairJobs.erase(job->deleted_id);
Expand Down Expand Up @@ -257,12 +270,10 @@ HNSWIndex<DataType, DistType> *TieredHNSWIndex<DataType, DistType>::getHNSWIndex
}

template <typename DataType, typename DistType>
void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobs() {
// If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every
// job for which all of its pending repair jobs were executed (otherwise finish and return).
if (idToSwapJob.size() < this->pendingSwapJobsThreshold) {
return;
}
void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobsIMP() {

std::cout << "executing " << readySwapJobs << " swap hobs" << std::endl;
std::cout << "there are " << this->getHNSWIndex()->getNumMarkedDeleted() << " marked delted vectors" << std::endl;
// Execute swap jobs - acquire hnsw write lock.
this->mainIndexGuard.lock();

Expand All @@ -279,7 +290,19 @@ void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobs() {
for (idType id : idsToRemove) {
idToSwapJob.erase(id);
}
readySwapJobs-= idsToRemove.size();
this->mainIndexGuard.unlock();

}
template <typename DataType, typename DistType>
void TieredHNSWIndex<DataType, DistType>::executeReadySwapJobs() {
// If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every
// job for which all of its pending repair jobs were executed (otherwise finish and return).
if (readySwapJobs < this->pendingSwapJobsThreshold) {
return;
}

executeReadySwapJobsIMP();
}

template <typename DataType, typename DistType>
Expand Down Expand Up @@ -331,6 +354,9 @@ int TieredHNSWIndex<DataType, DistType>::deleteLabelFromHNSW(labelType label) {
}
}
swap_job->setRepairJobsNum(incoming_edges.size());
if (incoming_edges.size() == 0) {
readySwapJobs++;
}
this->idToRepairJobsGuard.unlock();

this->submitJobs(repair_jobs);
Expand Down Expand Up @@ -378,7 +404,10 @@ void TieredHNSWIndex<DataType, DistType>::insertVectorToHNSW(
// Check if resizing is still required (another thread might have done it in the meantime
// while we release the shared lock).
if (hnsw_index->indexCapacity() == hnsw_index->indexSize()) {
std::cout<< "current tiered total allocation size = " << this->getAllocationSize() <<std::endl;
hnsw_index->increaseCapacity();
std::cout<< "after hnsw resize tiered total allocation size = " << this->getAllocationSize() <<std::endl;

}
// Hold the index data lock while we store the new element. If the new node's max level is
// higher than the current one, hold the lock through the entire insertion to ensure that
Expand Down Expand Up @@ -506,7 +535,9 @@ void TieredHNSWIndex<DataType, DistType>::executeRepairJob(HNSWRepairJob *job) {
repair_jobs.pop_back();
}
for (auto &it : job->associatedSwapJobs) {
it->atomicDecreasePendingJobsNum();
if (it->atomicDecreasePendingJobsNum() == 0) {
readySwapJobs++;
}
}
this->idToRepairJobsGuard.unlock();

Expand All @@ -524,7 +555,7 @@ TieredHNSWIndex<DataType, DistType>::TieredHNSWIndex(HNSWIndex<DataType, DistTyp
std::shared_ptr<VecSimAllocator> allocator)
: VecSimTieredIndex<DataType, DistType>(hnsw_index, bf_index, tiered_index_params, allocator),
labelToInsertJobs(this->allocator), idToRepairJobs(this->allocator),
idToSwapJob(this->allocator) {
idToSwapJob(this->allocator), readySwapJobs(0) {
// If the param for swapJobThreshold is 0 use the default value, if it exceeds the maximum
// allowed, use the maximum value.
this->pendingSwapJobsThreshold =
Expand Down Expand Up @@ -649,6 +680,7 @@ int TieredHNSWIndex<DataType, DistType>::addVector(const void *blob, labelType l
}
// Apply ready swap jobs if number of deleted vectors reached the threshold (under exclusive
// lock of the main index guard).

this->executeReadySwapJobs();

// Insert job to the queue and signal the workers' updater.
Expand Down Expand Up @@ -990,3 +1022,12 @@ void TieredHNSWIndex<DataType, DistType>::TieredHNSW_BatchIterator::filter_irrel
// Update number of results (pop the tail)
array_pop_back_n(rl.results, end - cur_end);
}


#ifdef BUILD_TESTS
template <typename DataType, typename DistType>
void TieredHNSWIndex<DataType, DistType>::getDataByLabel(
labelType label, std::vector<std::vector<DataType>> &vectors_output) const {
this->getHNSWIndex()->getDataByLabel(label, vectors_output);
}
#endif
9 changes: 9 additions & 0 deletions src/VecSim/vec_sim_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
delete[] buf;
}
}
#ifdef BUILD_TESTS
// Set new log context to be sent to the log callback.
// Returns the previous logctx.
inline void *setLogCtx(void *new_logCtx) {
void *prev_logCtx = this->logCallbackCtx;
this->logCallbackCtx = new_logCtx;
return prev_logCtx;
}
#endif

// Adds all common info to the info iterator, besides the block size.
void addCommonInfoToIterator(VecSimInfoIterator *infoIterator, const CommonInfo &info) const {
Expand Down
8 changes: 8 additions & 0 deletions src/VecSim/vec_sim_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,11 @@ void Vecsim_Log(void *ctx, const char *message) { std::cout << message << std::e
timeoutCallbackFunction VecSimIndexInterface::timeoutCallback = [](void *ctx) { return 0; };
logCallbackFunction VecSimIndexInterface::logCallback = Vecsim_Log;
VecSimWriteMode VecSimIndexInterface::asyncWriteMode = VecSim_WriteAsync;

#ifdef BUILD_TESTS
static inline void Vecsim_Log_DO_NOTHING(void *ctx, const char *message) {}

void VecSimIndexInterface::resetLogCallbackFunction() {
VecSimIndexInterface::logCallback = Vecsim_Log_DO_NOTHING;
}
#endif
4 changes: 4 additions & 0 deletions src/VecSim/vec_sim_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ struct VecSimIndexInterface : public VecsimBaseObject {
VecSimIndexInterface::logCallback = callback;
}

#ifdef BUILD_TESTS
static void resetLogCallbackFunction();
#endif

/**
* @brief Allow 3rd party to set the write mode for tiered index - async insert/delete using
* background jobs, or insert/delete inplace.
Expand Down
8 changes: 6 additions & 2 deletions src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class VecSimTieredIndex : public VecSimIndexInterface {

// Return the current state of the global write mode (async/in-place).
static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; }

#ifdef BUILD_TESTS
inline VecSimIndexAbstract<DistType> *getFlatbufferIndex() const { return this->frontendIndex; }
#endif
private:
virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override {
// Will be used only if a processing stage is needed
Expand Down Expand Up @@ -141,7 +143,9 @@ VecSimQueryResult_List
VecSimTieredIndex<DataType, DistType>::topKQuery(const void *queryBlob, size_t k,
VecSimQueryParams *queryParams) const {
this->flatIndexGuard.lock_shared();

#ifdef BUILD_TESTS
this->getFlatbufferIndex()->log("");
#endif
// If the flat buffer is empty, we can simply query the main index.
if (this->frontendIndex->indexSize() == 0) {
// Release the flat lock and acquire the main lock.
Expand Down
Loading