diff --git a/Makefile b/Makefile index 55c39d647..7bc37e69f 100644 --- a/Makefile +++ b/Makefile @@ -186,8 +186,9 @@ valgrind: #---------------------------------------------------------------------------------------------- flow_test: + $(SHOW)poetry build -vv $(SHOW)poetry install - $(SHOW)poetry run pytest tests/flow -v -s + $(SHOW)poetry run pytest tests/flow -vv -s .PHONY: flow_test diff --git a/src/VecSim/algorithms/hnsw/hnsw.h b/src/VecSim/algorithms/hnsw/hnsw.h index a0e4e204a..e3f866d06 100644 --- a/src/VecSim/algorithms/hnsw/hnsw.h +++ b/src/VecSim/algorithms/hnsw/hnsw.h @@ -1307,19 +1307,33 @@ HNSWIndex::safeCollectAllNodeIncomingNeighbors(idType node_i template void HNSWIndex::resizeIndexInternal(size_t new_max_elements) { + std::cout<< "resize element_levels_" << std::endl; + element_levels_.resize(new_max_elements); + std::cout<< "resize element_levels_" << std::endl; + element_levels_.shrink_to_fit(); + std::cout<< "resize LabelLookup" << std::endl; + resizeLabelLookup(new_max_elements); + std::cout<< "resize visited_nodes_handler_pool" << std::endl; + visited_nodes_handler_pool.resize(new_max_elements); + std::cout<< "resize element_neighbors_locks_" << std::endl; + vecsim_stl::vector(new_max_elements, this->allocator) .swap(element_neighbors_locks_); // Reallocate base layer + std::cout<< "resize data_level0_memory from " << (*(((size_t *)data_level0_memory_) - 1))<< " to" << (new_max_elements * size_data_per_element_) <allocator->reallocate( data_level0_memory_, new_max_elements * size_data_per_element_); if (data_level0_memory_new == nullptr) throw std::runtime_error("Not enough memory: resizeIndex failed to allocate base layer"); data_level0_memory_ = data_level0_memory_new; + std::cout<< "linkLists_new " << (*(((size_t *)linkLists_) - 1))<< " to" << (sizeof(void *) * new_max_elements) <allocator->reallocate(linkLists_, sizeof(void *) * new_max_elements); @@ -1328,6 +1342,8 @@ void HNSWIndex::resizeIndexInternal(size_t new_max_elements) linkLists_ = linkLists_new; max_elements_ = new_max_elements; + std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() < @@ -1640,8 +1656,11 @@ HNSWIndex::~HNSWIndex() { */ template void HNSWIndex::increaseCapacity() { + std::cout<< "increase capacity current hnsw allocation size = " << this->getAllocationSize() <blockSize - max_elements_ % this->blockSize; resizeIndexInternal(max_elements_ + vectors_to_add); + std::cout<< "after resize hnsw allocation size = " << this->getAllocationSize() < @@ -1707,6 +1726,10 @@ void HNSWIndex::removeAndSwap(idType internalId) { size_t extra_space_to_free = max_elements_ % this->blockSize; // Remove one block from the capacity. + std::cout<< "resize down from removeAndSwap interanl id = " << internalId < { inline void setLastSearchMode(VecSearchMode mode) override { return this->backendIndex->setLastSearchMode(mode); } + +#ifdef BUILD_TESTS + void getDataByLabel(labelType label, std::vector> &vectors_output) const; +#endif }; /** @@ -224,7 +235,9 @@ void TieredHNSWIndex::executeSwapJob(HNSWSwapJob *job, for (auto &job_it : idToRepairJobs.at(job->deleted_id)) { job_it->node_id = INVALID_JOB_ID; for (auto &swap_job_it : job_it->associatedSwapJobs) { - swap_job_it->atomicDecreasePendingJobsNum(); + if (swap_job_it->atomicDecreasePendingJobsNum() == 0) { + readySwapJobs++; + } } } idToRepairJobs.erase(job->deleted_id); @@ -257,12 +270,10 @@ HNSWIndex *TieredHNSWIndex::getHNSWIndex } template -void TieredHNSWIndex::executeReadySwapJobs() { - // If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every - // job for which all of its pending repair jobs were executed (otherwise finish and return). - if (idToSwapJob.size() < this->pendingSwapJobsThreshold) { - return; - } +void TieredHNSWIndex::executeReadySwapJobsIMP() { + + std::cout << "executing " << readySwapJobs << " swap hobs" << std::endl; + std::cout << "there are " << this->getHNSWIndex()->getNumMarkedDeleted() << " marked delted vectors" << std::endl; // Execute swap jobs - acquire hnsw write lock. this->mainIndexGuard.lock(); @@ -279,7 +290,19 @@ void TieredHNSWIndex::executeReadySwapJobs() { for (idType id : idsToRemove) { idToSwapJob.erase(id); } + readySwapJobs-= idsToRemove.size(); this->mainIndexGuard.unlock(); + +} +template +void TieredHNSWIndex::executeReadySwapJobs() { + // If swapJobs size is equal or larger than a threshold, go over the swap jobs and execute every + // job for which all of its pending repair jobs were executed (otherwise finish and return). + if (readySwapJobs < this->pendingSwapJobsThreshold) { + return; + } + + executeReadySwapJobsIMP(); } template @@ -331,6 +354,9 @@ int TieredHNSWIndex::deleteLabelFromHNSW(labelType label) { } } swap_job->setRepairJobsNum(incoming_edges.size()); + if (incoming_edges.size() == 0) { + readySwapJobs++; + } this->idToRepairJobsGuard.unlock(); this->submitJobs(repair_jobs); @@ -378,7 +404,10 @@ void TieredHNSWIndex::insertVectorToHNSW( // Check if resizing is still required (another thread might have done it in the meantime // while we release the shared lock). if (hnsw_index->indexCapacity() == hnsw_index->indexSize()) { + std::cout<< "current tiered total allocation size = " << this->getAllocationSize() <increaseCapacity(); + std::cout<< "after hnsw resize tiered total allocation size = " << this->getAllocationSize() <::executeRepairJob(HNSWRepairJob *job) { repair_jobs.pop_back(); } for (auto &it : job->associatedSwapJobs) { - it->atomicDecreasePendingJobsNum(); + if (it->atomicDecreasePendingJobsNum() == 0) { + readySwapJobs++; + } } this->idToRepairJobsGuard.unlock(); @@ -524,7 +555,7 @@ TieredHNSWIndex::TieredHNSWIndex(HNSWIndex allocator) : VecSimTieredIndex(hnsw_index, bf_index, tiered_index_params, allocator), labelToInsertJobs(this->allocator), idToRepairJobs(this->allocator), - idToSwapJob(this->allocator) { + idToSwapJob(this->allocator), readySwapJobs(0) { // If the param for swapJobThreshold is 0 use the default value, if it exceeds the maximum // allowed, use the maximum value. this->pendingSwapJobsThreshold = @@ -649,6 +680,7 @@ int TieredHNSWIndex::addVector(const void *blob, labelType l } // Apply ready swap jobs if number of deleted vectors reached the threshold (under exclusive // lock of the main index guard). + this->executeReadySwapJobs(); // Insert job to the queue and signal the workers' updater. @@ -990,3 +1022,12 @@ void TieredHNSWIndex::TieredHNSW_BatchIterator::filter_irrel // Update number of results (pop the tail) array_pop_back_n(rl.results, end - cur_end); } + + +#ifdef BUILD_TESTS +template +void TieredHNSWIndex::getDataByLabel( + labelType label, std::vector> &vectors_output) const { + this->getHNSWIndex()->getDataByLabel(label, vectors_output); +} +#endif diff --git a/src/VecSim/vec_sim_index.h b/src/VecSim/vec_sim_index.h index 0b2ff0238..7416fe1c3 100644 --- a/src/VecSim/vec_sim_index.h +++ b/src/VecSim/vec_sim_index.h @@ -133,6 +133,15 @@ struct VecSimIndexAbstract : public VecSimIndexInterface { delete[] buf; } } +#ifdef BUILD_TESTS + // Set new log context to be sent to the log callback. + // Returns the previous logctx. + inline void *setLogCtx(void *new_logCtx) { + void *prev_logCtx = this->logCallbackCtx; + this->logCallbackCtx = new_logCtx; + return prev_logCtx; + } +#endif // Adds all common info to the info iterator, besides the block size. void addCommonInfoToIterator(VecSimInfoIterator *infoIterator, const CommonInfo &info) const { diff --git a/src/VecSim/vec_sim_interface.cpp b/src/VecSim/vec_sim_interface.cpp index b1953b023..1b953cc50 100644 --- a/src/VecSim/vec_sim_interface.cpp +++ b/src/VecSim/vec_sim_interface.cpp @@ -14,3 +14,11 @@ void Vecsim_Log(void *ctx, const char *message) { std::cout << message << std::e timeoutCallbackFunction VecSimIndexInterface::timeoutCallback = [](void *ctx) { return 0; }; logCallbackFunction VecSimIndexInterface::logCallback = Vecsim_Log; VecSimWriteMode VecSimIndexInterface::asyncWriteMode = VecSim_WriteAsync; + +#ifdef BUILD_TESTS +static inline void Vecsim_Log_DO_NOTHING(void *ctx, const char *message) {} + +void VecSimIndexInterface::resetLogCallbackFunction() { + VecSimIndexInterface::logCallback = Vecsim_Log_DO_NOTHING; +} +#endif diff --git a/src/VecSim/vec_sim_interface.h b/src/VecSim/vec_sim_interface.h index ad5d28e89..8a7c2eca1 100644 --- a/src/VecSim/vec_sim_interface.h +++ b/src/VecSim/vec_sim_interface.h @@ -226,6 +226,10 @@ struct VecSimIndexInterface : public VecsimBaseObject { VecSimIndexInterface::logCallback = callback; } +#ifdef BUILD_TESTS + static void resetLogCallbackFunction(); +#endif + /** * @brief Allow 3rd party to set the write mode for tiered index - async insert/delete using * background jobs, or insert/delete inplace. diff --git a/src/VecSim/vec_sim_tiered_index.h b/src/VecSim/vec_sim_tiered_index.h index 3aa3cefdc..fdd369256 100644 --- a/src/VecSim/vec_sim_tiered_index.h +++ b/src/VecSim/vec_sim_tiered_index.h @@ -99,7 +99,9 @@ class VecSimTieredIndex : public VecSimIndexInterface { // Return the current state of the global write mode (async/in-place). static VecSimWriteMode getWriteMode() { return VecSimIndexInterface::asyncWriteMode; } - +#ifdef BUILD_TESTS + inline VecSimIndexAbstract *getFlatbufferIndex() const { return this->frontendIndex; } +#endif private: virtual int addVectorWrapper(const void *blob, labelType label, void *auxiliaryCtx) override { // Will be used only if a processing stage is needed @@ -141,7 +143,9 @@ VecSimQueryResult_List VecSimTieredIndex::topKQuery(const void *queryBlob, size_t k, VecSimQueryParams *queryParams) const { this->flatIndexGuard.lock_shared(); - +#ifdef BUILD_TESTS + this->getFlatbufferIndex()->log(""); +#endif // If the flat buffer is empty, we can simply query the main index. if (this->frontendIndex->indexSize() == 0) { // Release the flat lock and acquire the main lock. diff --git a/src/python_bindings/bindings.cpp b/src/python_bindings/bindings.cpp index c2ab25d80..ef8b5319c 100644 --- a/src/python_bindings/bindings.cpp +++ b/src/python_bindings/bindings.cpp @@ -16,8 +16,10 @@ #include #include #include +#include "tiered_index_mock.h" namespace py = pybind11; +using namespace tiered_index_mock; // Helper function that iterates query results and wrap them in python numpy object - // a tuple of two 2D arrays: (labels, distances) @@ -174,6 +176,13 @@ class PyVecSimIndex { size_t indexSize() { return VecSimIndex_IndexSize(index.get()); } + size_t indexMemory() { return this->index->getAllocationSize(); } + + double getGetDistanceFrom(size_t id, const py::object &input) { + py::array query(input); + return this->index->getDistanceFrom(id, (const char *)query.data(0)); + } + PyBatchIterator createBatchIterator(const py::object &input, VecSimQueryParams *query_params) { py::array query(input); return PyBatchIterator( @@ -360,6 +369,139 @@ class PyHNSWLibIndex : public PyVecSimIndex { } }; +template +struct KNNLogCtx { + VecSimIndexAbstract *flat_index; + size_t curr_flat_size; + KNNLogCtx() : flat_index(nullptr), curr_flat_size(0) {} +}; + +class PyTIEREDIndex : public PyVecSimIndex { +private: + VecSimIndexAbstract *getFlatBuffer() { + return reinterpret_cast *>(this->index.get()) + ->getFlatbufferIndex(); + } + +protected: + JobQueue jobQueue; // External queue that holds the jobs. + IndexExtCtx jobQueueCtx; // External context to be sent to the submit callback. + SubmitCB submitCb; // A callback that submits an array of jobs into a given jobQueue. + size_t flatBufferLimit; // Maximum size allowed for the flat buffer. If flat buffer is full, use + // in-place insertion. + bool run_thread; + std::bitset executions_status; + + KNNLogCtx knnLogCtx; + + TieredIndexParams TieredIndexParams_Init() { + TieredIndexParams ret = { + .jobQueue = &this->jobQueue, + .jobQueueCtx = &this->jobQueueCtx, + .submitCb = this->submitCb, + .flatBufferLimit = this->flatBufferLimit, + }; + + return ret; + } + +public: + explicit PyTIEREDIndex(size_t BufferLimit = 1000) + : submitCb(submit_callback), + flatBufferLimit(BufferLimit), run_thread(true) { + + for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { + ThreadParams params(run_thread, executions_status, i, jobQueue); + thread_pool.emplace_back(thread_main_loop, params); + } + + ResetLogCB(); + } + + virtual ~PyTIEREDIndex() = 0; + + void WaitForIndex(size_t waiting_duration = 10) { + bool keep_wating = true; + while (keep_wating) { + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration)); + std::unique_lock lock(queue_guard); + if (jobQueue.empty()) { + while (true) { + if (executions_status.count() == 0) { + keep_wating = false; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration)); + } + } + } + } + + static void log_flat_buffer_size(void *ctx, const char *msg) { + if(ctx) { + auto *knnLogCtx = reinterpret_cast *>(ctx); + knnLogCtx->curr_flat_size = knnLogCtx->flat_index->indexLabelCount(); + } + } + void SetKNNLogCtx() { + knnLogCtx.flat_index = getFlatBuffer(); + knnLogCtx.curr_flat_size = 0; + knnLogCtx.flat_index->setLogCtx(&knnLogCtx); + this->index->setLogCallbackFunction(log_flat_buffer_size); + } + size_t getFlatIndexSize(const char *mode = "None") { + if (!strcmp(mode, "insert_and_knn")) { + return knnLogCtx.curr_flat_size; + } + + return getFlatBuffer()->indexLabelCount(); + } + + void ResetLogCB() { this->index->resetLogCallbackFunction(); } + static size_t GetThreadsNum() { return THREAD_POOL_SIZE; } + + size_t getBufferLimit() { return flatBufferLimit; } +}; + +PyTIEREDIndex::~PyTIEREDIndex() { + thread_pool_terminate(jobQueue, run_thread); + ResetLogCB(); +} +class PyTIERED_HNSWIndex : public PyTIEREDIndex { +public: + explicit PyTIERED_HNSWIndex(const HNSWParams &hnsw_params, + const TieredHNSWParams &tiered_hnsw_params) { + + // Create primaryIndexParams and specific params for hnsw tiered index. + VecSimParams primary_index_params = {.algo = VecSimAlgo_HNSWLIB, .hnswParams = hnsw_params}; + + // create TieredIndexParams + TieredIndexParams tiered_params = TieredIndexParams_Init(); + + tiered_params.primaryIndexParams = &primary_index_params; + tiered_params.specificParams.tieredHnswParams = tiered_hnsw_params; + + // create VecSimParams for TieredIndexParams + VecSimParams params = {.algo = VecSimAlgo_TIERED, .tieredParams = tiered_params}; + + this->index = std::shared_ptr(VecSimIndex_New(¶ms), VecSimIndex_Free); + // Set the created tiered index in the index external context. + this->jobQueueCtx.index_strong_ref = this->index; + } + size_t HNSWLabelCount() { + return this->index->info().tieredInfo.backendCommonInfo.indexLabelCount; + } + + size_t HNSWMarkedDeleted() { + return this->index->info().tieredInfo.backendInfo.hnswInfo.numberOfMarkedDeletedNodes; + } + + void executeReadySwapJobs() { + reinterpret_cast *>(this->index.get()) + ->executeReadySwapJobsIMP(); + } +}; + class PyBFIndex : public PyVecSimIndex { public: explicit PyBFIndex(const BFParams &bf_params) { @@ -413,6 +555,10 @@ PYBIND11_MODULE(VecSim, m) { .def_readwrite("initialCapacity", &BFParams::initialCapacity) .def_readwrite("blockSize", &BFParams::blockSize); + py::class_(m, "TieredHNSWParams") + .def(py::init()) + .def_readwrite("swapJobThreshold", &TieredHNSWParams::swapJobThreshold); + py::class_(m, "VecSimParams") .def(py::init()) .def_readwrite("algo", &VecSimParams::algo) @@ -439,8 +585,11 @@ PYBIND11_MODULE(VecSim, m) { .def("range_query", &PyVecSimIndex::range, py::arg("vector"), py::arg("radius"), py::arg("query_param") = nullptr) .def("index_size", &PyVecSimIndex::indexSize) + .def("index_memory", &PyVecSimIndex::indexMemory) .def("create_batch_iterator", &PyVecSimIndex::createBatchIterator, py::arg("query_blob"), py::arg("query_param") = nullptr) + .def("get_distance_from", &PyVecSimIndex::getGetDistanceFrom, py::arg("label"), + py::arg("blob")) .def("get_vector", &PyVecSimIndex::getVector); py::class_(m, "HNSWIndex") @@ -460,6 +609,24 @@ PYBIND11_MODULE(VecSim, m) { .def("range_parallel", &PyHNSWLibIndex::searchRangeParallel, py::arg("queries"), py::arg("radius"), py::arg("query_param") = nullptr, py::arg("num_threads") = -1); + py::class_(m, "TIEREDIndex") + .def("wait_for_index", &PyTIERED_HNSWIndex::WaitForIndex, py::arg("waiting_duration") = 10) + .def("get_curr_bf_size", &PyTIERED_HNSWIndex::getFlatIndexSize, py::arg("mode") = "None") + .def("get_buffer_limit", &PyTIERED_HNSWIndex::getBufferLimit) + .def_static("get_threads_num", &PyTIEREDIndex::GetThreadsNum) + .def("reset_log", &PyTIERED_HNSWIndex::ResetLogCB) + .def("start_knn_log", &PyTIERED_HNSWIndex::SetKNNLogCtx); + + py::class_(m, "TIERED_HNSWIndex") + .def( + py::init([](const HNSWParams &hnsw_params, const TieredHNSWParams &tiered_hnsw_params) { + return new PyTIERED_HNSWIndex(hnsw_params, tiered_hnsw_params); + }), + py::arg("hnsw_params"), py::arg("tiered_hnsw_params")) + .def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount) + .def("hnsw_marked_deleted", &PyTIERED_HNSWIndex::HNSWMarkedDeleted) + .def("execute_swap_jobs", &PyTIERED_HNSWIndex::executeReadySwapJobs); + py::class_(m, "BFIndex") .def(py::init([](const BFParams ¶ms) { return new PyBFIndex(params); }), py::arg("params")); diff --git a/src/python_bindings/tiered_index_mock.h b/src/python_bindings/tiered_index_mock.h new file mode 100644 index 000000000..eb426878f --- /dev/null +++ b/src/python_bindings/tiered_index_mock.h @@ -0,0 +1,134 @@ +/* + *Copyright Redis Ltd. 2021 - present + *Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or + *the Server Side Public License v1 (SSPLv1). + */ + +#pragma once + +#include +#include +#include + +#include "VecSim/vec_sim.h" +#include "VecSim/algorithms/hnsw/hnsw_tiered.h" +#include "pybind11/pybind11.h" + +namespace tiered_index_mock { + +typedef struct RefManagedJob { + AsyncJob *job; + std::weak_ptr index_weak_ref; +} RefManagedJob; + +using JobQueue = std::queue; +int submit_callback(void *job_queue, void *index_ctx, AsyncJob **jobs, JobCallback *CBs, + JobCallback *freeCBs, size_t jobs_len); + +typedef struct IndexExtCtx { + std::shared_ptr index_strong_ref; + ~IndexExtCtx() { std::cout << "ctx dtor" << std::endl; } +} IndexExtCtx; + +static const size_t MAX_POOL_SIZE = 8; +static const size_t hardware_cpu = std::thread::hardware_concurrency(); +static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, hardware_cpu); +extern std::vector thread_pool; +extern std::mutex queue_guard; +extern std::condition_variable queue_cond; + +void thread_pool_terminate(JobQueue &jobQ, bool &run_thread); + +class ThreadParams { +public: + bool &run_thread; + std::bitset &executions_status; + const unsigned int thread_index; + JobQueue &jobQ; + ThreadParams(bool &run_thread, std::bitset &executions_status, + const unsigned int thread_index, JobQueue &jobQ) + : run_thread(run_thread), executions_status(executions_status), thread_index(thread_index), + jobQ(jobQ) {} + + ThreadParams(const ThreadParams &other) = default; +}; + +void inline MarkExecuteInProcess(std::bitset &executions_status, + size_t thread_index) { + executions_status.set(thread_index); +} + +void inline MarkExecuteDone(std::bitset &executions_status, size_t thread_index) { + executions_status.reset(thread_index); +} +void thread_main_loop(ThreadParams params) { + while (params.run_thread) { + std::unique_lock lock(queue_guard); + // Wake up and acquire the lock (atomically) ONLY if the job queue is not empty at that + // point, or if the thread should not run anymore (and quit in that case). + queue_cond.wait(lock, [¶ms]() { return !(params.jobQ.empty()) || !params.run_thread; }); + if (!params.run_thread) + return; + auto managed_job = params.jobQ.front(); + MarkExecuteInProcess(params.executions_status, params.thread_index); + params.jobQ.pop(); + + lock.unlock(); + // Upgrade the index weak reference to a strong ref while we run the job over the index. + if (auto temp_ref = managed_job.index_weak_ref.lock()) { + managed_job.job->Execute(managed_job.job); + MarkExecuteDone(params.executions_status, params.thread_index); + } + } +} + +/* + * Mock callbacks for testing async tiered index. We use a simple std::queue to simulate the job + * queue. + */ + +std::mutex queue_guard; +std::condition_variable queue_cond; +std::vector thread_pool; + +int submit_callback(void *job_queue, void *index_ctx, AsyncJob **jobs, JobCallback *CBs, + JobCallback *freeCBs, size_t len) { + { + std::unique_lock lock(queue_guard); + for (size_t i = 0; i < len; i++) { + // Wrap the job with a struct that contains a weak reference to the related index. + auto owned_job = RefManagedJob{ + .job = jobs[i], + .index_weak_ref = reinterpret_cast(index_ctx)->index_strong_ref}; + static_cast(job_queue)->push(owned_job); + } + } + if (len == 1) { + queue_cond.notify_one(); + } else { + queue_cond.notify_all(); + } + return VecSim_OK; +} + +// Main loop for background worker threads that execute the jobs form the job queue. +// run_thread uses as a signal to the thread that indicates whether it should keep running or +// stop and terminate the thread. + +void thread_pool_terminate(JobQueue &jobQ, bool &run_thread) { + // Check every 10 ms if queue is empty, and if so, terminate the threads loop. + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::unique_lock lock(queue_guard); + if (jobQ.empty()) { + run_thread = false; + queue_cond.notify_all(); + break; + } + } + for (size_t i = 0; i < THREAD_POOL_SIZE; i++) { + thread_pool[i].join(); + } + thread_pool.clear(); +} +} // namespace tiered_index_mock diff --git a/tests/flow/common.py b/tests/flow/common.py index e5cad51f7..3d95fc387 100644 --- a/tests/flow/common.py +++ b/tests/flow/common.py @@ -7,7 +7,23 @@ from scipy import spatial from numpy.testing import assert_allclose import time +import math +def create_hnsw_params(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01, + is_multi=False): + hnsw_params = HNSWParams() + + hnsw_params.dim = dim + hnsw_params.metric = metric + hnsw_params.type = data_type + hnsw_params.M = m + hnsw_params.efConstruction = ef_construction + hnsw_params.initialCapacity = num_elements + hnsw_params.efRuntime = ef_runtime + hnsw_params.epsilon = epsilon + hnsw_params.multi = is_multi + + return hnsw_params # Helper function for creating an index,uses the default HNSW parameters if not specified. def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01, is_multi=False): @@ -24,3 +40,14 @@ def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, hnsw_params.multi = is_multi return HNSWIndex(hnsw_params) + +def bytes_to_mega(bytes, ndigits = 3): + return round(bytes/pow(10,6), ndigits) + +def round_(f_value, ndigits = 2): + return round(f_value, ndigits) + + +def round_ms(f_value, ndigits = 2): + return round(f_value * 1000, ndigits) + \ No newline at end of file diff --git a/tests/flow/test_bm_hnsw_tiered_dataset.py b/tests/flow/test_bm_hnsw_tiered_dataset.py new file mode 100644 index 000000000..9cc4b441e --- /dev/null +++ b/tests/flow/test_bm_hnsw_tiered_dataset.py @@ -0,0 +1,522 @@ +# Copyright Redis Ltd. 2021 - present +# Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or +# the Server Side Public License v1 (SSPLv1). +import concurrent +import math +import multiprocessing +import os +import time +from common import * +import h5py +from urllib.request import urlretrieve +import pickle +from enum import Enum + +from random import choice + +class CreationMode(Enum): + ONLY_PARAMS = 1 + CREATE_TIERED_INDEX = 2 + +def download(src, dst): + if not os.path.exists(dst): + print('downloading %s -> %s...' % (src, dst)) + urlretrieve(src, dst) + +# Download dataset from s3, save the file locally +def get_data_set(dataset_name): + hdf5_filename = os.path.join('%s.hdf5' % dataset_name) + print(f"download {hdf5_filename}") + url = 'https://s3.amazonaws.com/benchmarks.redislabs/vecsim/dbpedia/dbpedia-768.hdf5' + download(url, hdf5_filename) + return h5py.File(hdf5_filename, 'r') + +def load_data(dataset_name): + data = 0 + + np_data_file_path = os.path.join('np_train_%s.npy' % dataset_name) + + try: + print(f"try to load {np_data_file_path}") + data = np.load(np_data_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + print(f"failed to load {np_data_file_path}") + dataset = get_data_set(dataset_name) + data = np.array(dataset['train']) + np.save(np_data_file_path, data) + print(f"yay! generated") + + return data + +def load_queries(dataset_name): + queries = 0 + np_test_file_path = os.path.join('np_test_%s.npy' % dataset_name) + + try: + queries = np.load(np_test_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + hdf5_filename = os.path.join('%s.hdf5' % dataset_name) + dataset = h5py.File(hdf5_filename, 'r') + queries = np.array(dataset['test']) + np.save(np_test_file_path, queries) + print(f"yay! generated ") + + return queries + +# swap_job_threshold = 0 means use the default swap_job_threshold defined in hnsw_tiered.h +def create_tiered_hnsw_params(swap_job_threshold = 0): + tiered_hnsw_params = TieredHNSWParams() + tiered_hnsw_params.swapJobThreshold = swap_job_threshold + return tiered_hnsw_params + +class DBPediaIndexCtx: + def __init__(self, data_size = 0, initialCap = 0, M = 32, ef_c = 512, ef_r = 10, metric = VecSimMetric_Cosine, is_multi = False, data_type = VecSimType_FLOAT32, swap_job_threshold = 0, mode=CreationMode.ONLY_PARAMS): + self.M = M + self.efConstruction = ef_c + self.efRuntime = ef_r + + data = load_data("dbpedia-768") + self.num_elements = data_size if data_size != 0 else data.shape[0] + #self.initialCap = initialCap if initialCap != 0 else 2 * self.num_elements + self.initialCap = initialCap if initialCap != 0 else self.num_elements + + self.data = data[:self.num_elements] + self.dim = len(self.data[0]) + self.metric = metric + self.data_type = data_type + self.is_multi = is_multi + + self.hnsw_params = create_hnsw_params(dim=self.dim, + num_elements=self.initialCap, + metric=self.metric, + data_type=self.data_type, + ef_construction=ef_c, + m=M, + ef_runtime=ef_r, + is_multi=self.is_multi) + self.tiered_hnsw_params = create_tiered_hnsw_params(swap_job_threshold) + + assert isinstance(mode, CreationMode) + if mode == CreationMode.CREATE_TIERED_INDEX: + self.tiered_index = TIERED_HNSWIndex(self.hnsw_params, self.tiered_hnsw_params) + + def create_tiered(self): + return TIERED_HNSWIndex(self.hnsw_params, self.tiered_hnsw_params) + + def create_hnsw(self): + return HNSWIndex(self.hnsw_params) + + def init_and_populate_flat_index(self): + bfparams = BFParams() + bfparams.initialCapacity = self.num_elements + bfparams.dim =self.dim + bfparams.type =self.data_type + bfparams.metric =self.metric + bfparams.multi = self.is_multi + self.flat_index = BFIndex(bfparams) + + for i, vector in enumerate(self.data): + self.flat_index.add_vector(vector, i) + + return self.flat_index + + def init_and_populate_hnsw_index(self): + hnsw_index = HNSWIndex(self.hnsw_params) + + for i, vector in enumerate(self.data): + hnsw_index.add_vector(vector, i) + self.hnsw_index = hnsw_index + return hnsw_index + + def populate_index(self, index): + start = time.time() + duration = 0 + for label, vector in enumerate(self.data): + start_add = time.time() + index.add_vector(vector, label) + duration += time.time() - start_add + if label % 1000 == 0: + print(f"time passes= {duration}") + end = time.time() + return (start, duration, end) + + def generate_random_vectors(self, num_vectors): + vectors = 0 + np_file_path = os.path.join(f'np_{num_vectors}vec_dim{self.dim}.npy') + + try: + vectors = np.load(np_file_path, allow_pickle = True) + print(f"yay! loaded ") + except: + rng = np.random.default_rng(seed=47) + vectors = np.float32(rng.random((num_vectors, self.dim))) + np.save(np_file_path, vectors) + print(f"yay! generated ") + + return vectors + + def insert_in_batch(self, index, data, data_first_idx, batch_size, first_label): + duration = 0 + data_last_idx = data_first_idx + batch_size + for i, vector in enumerate(data[data_first_idx:data_last_idx]): + label = i + first_label + start_add = time.time() + index.add_vector(vector, label) + duration += time.time() - start_add + end = time.time() + return (duration, end) + + def generate_queries(self, num_queries): + self.rng = np.random.default_rng(seed=47) + + queries = self.rng.random((num_queries, self.dim)) + return np.float32(queries) if self.data_type == VecSimType_FLOAT32 else queries + + def generate_query_from_ds(self): + return choice(self.data) + + +def create_dbpedia(): + indices_ctx = DBPediaIndexCtx(data_size= 1000000) + + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements + def create_parallel(): + index = indices_ctx.create_hnsw() + print(f"Insert {num_elements} vectors to parallel index") + start = time.time() + index.add_vector_parallel(data, np.array(range(num_elements)), threads_num) + dur = time.time() - start + print(f"Insert {num_elements} vectors to parallel index took {dur} s") + + create_parallel() + def create_tiered(): + index = indices_ctx.create_tiered() + + print(f"Insert {num_elements} vectors to tiered index") + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + + assert index.hnsw_label_count() == num_elements + + # Measure insertion to tiered index + + print(f"Insert {num_elements} vectors to tiered index took {dur} s") + + # Measure total memory of the tiered index + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + print(f"Start tiered hnsw creation") + create_tiered() + +def create_dbpedia_graph(): + indices_ctx = DBPediaIndexCtx() + + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + dbpeida_data = indices_ctx.data + num_elements = indices_ctx.num_elements + + batches_num_per_ds = 10 + batch_size = int(num_elements / batches_num_per_ds) + + + def create_tiered(): + index = indices_ctx.create_tiered() + flat_buffer_limit = index.get_buffer_limit() + print(f"flat buffer limit = {flat_buffer_limit}") + assert flat_buffer_limit > batch_size + + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to tiered index") + first_label = batch * batch_size + + #insert in batches of batch size + bf_time, start_wait = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= first_label, batch_size=batch_size, first_label = first_label) + print(f''' insert to bf took {bf_time}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + + # measure time until wait for index for each batch + index.wait_for_index() + dur = time.time() - start_wait + assert index.hnsw_label_count() == (batch + 1) * batch_size + total_time = bf_time + dur + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {total_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + + #Next insert the random vactors + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} random vectors to tiered index") + data_first_idx = batch * batch_size + first_label = num_elements + data_first_idx + + #insert in batches of batch size + bf_time, start_wait = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= data_first_idx, + batch_size=batch_size, + first_label = first_label) + print(f''' insert to bf took {bf_time}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + + # measure time until wait for index for each batch + index.wait_for_index() + dur = time.time() - start_wait + assert index.hnsw_label_count() == num_elements + (batch + 1 ) * batch_size + total_time = bf_time + dur + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {total_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + print(f"Start tiered hnsw creation") + create_tiered() + def create_hnsw(): + index = indices_ctx.create_hnsw() + + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to sync hnsw index") + first_label = batch * batch_size + + #insert in batches of batch size + batch_time, _ = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= first_label, batch_size=batch_size, first_label = first_label) + + assert index.index_size() == (batch + 1) * batch_size + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {batch_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + #first insert dbpedia in batches + for batch in range(batches_num_per_ds): + print(f"Insert {batch_size} vectors from dbpedia to sync hnsw index") + data_first_idx = batch * batch_size + first_label = num_elements + data_first_idx + + #insert in batches of batch size + batch_time, _ = indices_ctx.insert_in_batch(index, dbpeida_data, data_first_idx= data_first_idx, batch_size=batch_size, first_label = first_label) + + assert index.index_size() == num_elements + (batch + 1) * batch_size + print(f"Batch number {batch} : Insert {batch_size} vectors to tiered index took {batch_time} s") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + # print(f"dbpedia vectors = {dbpeida_data[0:4].shape[0]}") + # print(f"vectors = {vectors[0]}") + print(f"Start hnsw creation") + + create_hnsw() + + +def insert_delete_reinsert(): + indices_ctx = DBPediaIndexCtx(data_size = 1000000,mode=CreationMode.CREATE_TIERED_INDEX) + index = indices_ctx.tiered_index + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements + + # compute ground truth + k = 10 + query_data = indices_ctx.generate_query_from_ds() + + bf_index = indices_ctx.init_and_populate_flat_index() + bf_labels, _ = bf_index.knn_query(query_data, k) + + + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall after firsdt insertion= {curr_recall}") + + # Delete half of the index. + for i in range(0, num_elements, 2): + index.delete_vector(i) + assert index.hnsw_label_count() == (num_elements / 2) + index.wait_for_index() + + #reinsert the deleted vectors + start = time.time() + for i in range(0, num_elements, 2): + vector = data[i] + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + assert index.hnsw_label_count() == (num_elements) + print(f''' reinsert to the hnsw took insert to bf took {dur}, current hnsw size is {index.hnsw_label_count()}")''') + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + + + + print(f"total memory of bf index = {bf_index.index_memory()/pow(10,9)} GB") + + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + + +def insert_and_update(): + indices_ctx = DBPediaIndexCtx(mode=CreationMode.CREATE_TIERED_INDEX) + index = indices_ctx.tiered_index + threads_num = TIEREDIndex.get_threads_num() + print(f"thread num = {threads_num}") + data = indices_ctx.data + num_elements = indices_ctx.num_elements + + print(f"flat buffer limit = {index.get_buffer_limit()}") + start = time.time() + for i, vector in enumerate(data): + index.add_vector(vector, i) + bf_dur = time.time() - start + + print(f''' insert {num_elements} vecs to bf took {bf_dur}, current hnsw size is {index.hnsw_label_count()}") + wait for index\n''') + index.wait_for_index() + dur = time.time() - start + # Measure insertion to tiered index + + print(f"Insert {num_elements} vectors to tiered index took {dur} s") + + # Measure total memory of the tiered index + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + assert index.get_curr_bf_size() == 0 + + def search_insert(is_multi: bool, num_per_label = 1): + + #choose random vector from the data base and perform query on it + query_data = indices_ctx.generate_query_from_ds() + # query_data = indices_ctx.generate_queries(num_queries=1) + k = 10 + # Calculate ground truth results + bf_index = indices_ctx.init_and_populate_flat_index() + bf_labels, _ = bf_index.knn_query(query_data, k) + + assert bf_index.index_size() == num_elements + def query(): + query_start = time.time() + tiered_labels, _ = index.knn_query(query_data, k) + query_dur = time.time() - query_start + + print(f"query time = {round_ms(query_dur)} ms") + + curr_correct = len(np.intersect1d(tiered_labels[0], bf_labels[0])) + curr_recall = float(curr_correct)/k + print(f"curr recall = {curr_recall}") + + return query_dur, curr_correct + # config knn log + index.start_knn_log() + + + # query before any changes + print("query before overriding") + _, _ = query() + assert index.get_curr_bf_size(mode = 'insert_and_knn') == 0 + + # Start background insertion to the tiered index. + print(f"start overriding") + index_start, bf_dur, _ = indices_ctx.populate_index(index) + print(f"bf size is:" ) + bf_size = index.hnsw_label_count() + print(f"{bf_size}") + print(f"current hnsw size is {index.hnsw_label_count()}") + print(f"insert to bf took {bf_dur}") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + correct = 0 + searches_number = 0 + + # run knn query every 1 s. + total_tiered_search_time = 0 + bf_curr_size = num_elements + while bf_curr_size != 0: + query_dur, curr_correct = query() + # For each run get the current hnsw size and the query time. + total_tiered_search_time += query_dur + bf_curr_size = index.get_curr_bf_size(mode = 'insert_and_knn') + + print(f"bf size = {bf_curr_size}") + correct += curr_correct + + time.sleep(5) + searches_number += 1 + + index.reset_log() + + # HNSW labels count updates before the job is done, so we need to wait for the queue to be empty. + index.wait_for_index(1) + index_dur = time.time() - index_start + assert index.get_curr_bf_size() == 0 + assert index.hnsw_label_count() == num_elements + + print(f"indexing during search in tiered took {round_(index_dur)} s, all repair jobs are done") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + # Measure recall. + recall = float(correct)/(k*searches_number) + print("Average recall is:", round_(recall, 3)) + print("tiered query per seconds: ", round_(searches_number/total_tiered_search_time)) + + #execute swap jobs and execute query + swap_start = time.time() + + index.execute_swap_jobs() + + swap_dur = time.time() - swap_start + print(f"swap jobs took = {round_ms(swap_dur)} ms") + + assert index.hnsw_marked_deleted() == 0 + print("query after swap execution") + print(f"total memory of tiered index = {index.index_memory()/pow(10,9)} GB") + + + query_dur, curr_correct = query() + + search_insert(is_multi=False) + + +def test_main(): + print("Test creation") + # create_dbpedia() + # create_dbpedia_graph() + print(f"\nStart insert & search test") + # search_insert(is_multi=False) + insert_and_update() + #or sanity + #insert_delete_reinsert() + +