Skip to content

Commit bfead9b

Browse files
committed
flow tests of tiered with dbpedia to run bm (including flat buffer limit)
1 parent 9abd6b4 commit bfead9b

File tree

4 files changed

+578
-0
lines changed

4 files changed

+578
-0
lines changed

src/python_bindings/bindings.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
#include <thread>
1717
#include <VecSim/algorithms/hnsw/hnsw_single.h>
1818
#include <VecSim/algorithms/brute_force/brute_force_single.h>
19+
#include "tiered_index_mock.h"
1920

2021
namespace py = pybind11;
22+
using namespace tiered_index_mock;
2123

2224
// Helper function that iterates query results and wrap them in python numpy object -
2325
// a tuple of two 2D arrays: (labels, distances)
@@ -174,6 +176,13 @@ class PyVecSimIndex {
174176

175177
size_t indexSize() { return VecSimIndex_IndexSize(index.get()); }
176178

179+
size_t indexMemory() { return this->index->getAllocationSize(); }
180+
181+
double getGetDistanceFrom(size_t id, const py::object &input) {
182+
py::array query(input);
183+
return this->index->getDistanceFrom(id, (const char *)query.data(0));
184+
}
185+
177186
PyBatchIterator createBatchIterator(const py::object &input, VecSimQueryParams *query_params) {
178187
py::array query(input);
179188
return PyBatchIterator(
@@ -360,6 +369,94 @@ class PyHNSWLibIndex : public PyVecSimIndex {
360369
}
361370
};
362371

372+
class PyTIEREDIndex : public PyVecSimIndex {
373+
374+
protected:
375+
JobQueue jobQueue; // External queue that holds the jobs.
376+
IndexExtCtx jobQueueCtx; // External context to be sent to the submit callback.
377+
SubmitCB submitCb; // A callback that submits an array of jobs into a given jobQueue.
378+
size_t memoryCtx; // External context that stores the index memory consumption.
379+
UpdateMemoryCB UpdateMemCb; // A callback that updates the memoryCtx
380+
// with a given memory (number).
381+
size_t flatBufferLimit; // Maximum size allowed for the flat buffer. If flat buffer is full, use
382+
// in-place insertion.
383+
bool run_thread;
384+
std::bitset<MAX_POOL_SIZE> executions_status;
385+
386+
TieredIndexParams TieredIndexParams_Init() {
387+
TieredIndexParams ret = {
388+
.jobQueue = &this->jobQueue,
389+
.jobQueueCtx = &this->jobQueueCtx,
390+
.submitCb = this->submitCb,
391+
.memoryCtx = &this->memoryCtx,
392+
.UpdateMemCb = this->UpdateMemCb,
393+
.flatBufferLimit = this->flatBufferLimit,
394+
};
395+
396+
return ret;
397+
}
398+
399+
public:
400+
explicit PyTIEREDIndex(size_t BufferLimit = 20000000)
401+
: submitCb(submit_callback), memoryCtx(0), UpdateMemCb(update_mem_callback), flatBufferLimit(BufferLimit),
402+
run_thread(true) {
403+
404+
for (size_t i = 0; i < THREAD_POOL_SIZE; i++) {
405+
ThreadParams params(run_thread, executions_status, i, jobQueue);
406+
thread_pool.emplace_back(thread_main_loop, params);
407+
}
408+
}
409+
410+
virtual ~PyTIEREDIndex() = 0;
411+
412+
void WaitForIndex(size_t waiting_duration = 10) {
413+
bool keep_wating = true;
414+
while (keep_wating) {
415+
std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration));
416+
std::unique_lock<std::mutex> lock(queue_guard);
417+
if (jobQueue.empty()) {
418+
while (true) {
419+
if (executions_status.count() == 0) {
420+
keep_wating = false;
421+
break;
422+
}
423+
std::this_thread::sleep_for(std::chrono::milliseconds(waiting_duration));
424+
}
425+
}
426+
}
427+
}
428+
429+
430+
static size_t GetThreadsNum() { return THREAD_POOL_SIZE; }
431+
432+
size_t getBufferLimit() {return flatBufferLimit; }
433+
};
434+
435+
PyTIEREDIndex::~PyTIEREDIndex() { thread_pool_terminate(jobQueue, run_thread); }
436+
class PyTIERED_HNSWIndex : public PyTIEREDIndex {
437+
public:
438+
explicit PyTIERED_HNSWIndex(const HNSWParams &hnsw_params,
439+
const TieredHNSWParams &tiered_hnsw_params) {
440+
441+
// Create primaryIndexParams and specific params for hnsw tiered index.
442+
VecSimParams primary_index_params = {.algo = VecSimAlgo_HNSWLIB, .hnswParams = hnsw_params};
443+
444+
// create TieredIndexParams
445+
TieredIndexParams tiered_params = TieredIndexParams_Init();
446+
447+
tiered_params.primaryIndexParams = &primary_index_params;
448+
tiered_params.specificParams.tieredHnswParams = tiered_hnsw_params;
449+
450+
// create VecSimParams for TieredIndexParams
451+
VecSimParams params = {.algo = VecSimAlgo_TIERED, .tieredParams = tiered_params};
452+
453+
this->index = std::shared_ptr<VecSimIndex>(VecSimIndex_New(&params), VecSimIndex_Free);
454+
// Set the created tiered index in the index external context.
455+
this->jobQueueCtx.index_strong_ref = this->index;
456+
}
457+
size_t HNSWLabelCount() { return this->index->info().hnswInfo.indexLabelCount; }
458+
};
459+
363460
class PyBFIndex : public PyVecSimIndex {
364461
public:
365462
explicit PyBFIndex(const BFParams &bf_params) {
@@ -413,6 +510,10 @@ PYBIND11_MODULE(VecSim, m) {
413510
.def_readwrite("initialCapacity", &BFParams::initialCapacity)
414511
.def_readwrite("blockSize", &BFParams::blockSize);
415512

513+
py::class_<TieredHNSWParams>(m, "TieredHNSWParams")
514+
.def(py::init())
515+
.def_readwrite("swapJobThreshold", &TieredHNSWParams::swapJobThreshold);
516+
416517
py::class_<VecSimParams>(m, "VecSimParams")
417518
.def(py::init())
418519
.def_readwrite("algo", &VecSimParams::algo)
@@ -439,8 +540,11 @@ PYBIND11_MODULE(VecSim, m) {
439540
.def("range_query", &PyVecSimIndex::range, py::arg("vector"), py::arg("radius"),
440541
py::arg("query_param") = nullptr)
441542
.def("index_size", &PyVecSimIndex::indexSize)
543+
.def("index_memory", &PyVecSimIndex::indexMemory)
442544
.def("create_batch_iterator", &PyVecSimIndex::createBatchIterator, py::arg("query_blob"),
443545
py::arg("query_param") = nullptr)
546+
.def("get_distance_from", &PyVecSimIndex::getGetDistanceFrom, py::arg("label"),
547+
py::arg("blob"))
444548
.def("get_vector", &PyVecSimIndex::getVector);
445549

446550
py::class_<PyHNSWLibIndex, PyVecSimIndex>(m, "HNSWIndex")
@@ -460,6 +564,19 @@ PYBIND11_MODULE(VecSim, m) {
460564
.def("range_parallel", &PyHNSWLibIndex::searchRangeParallel, py::arg("queries"),
461565
py::arg("radius"), py::arg("query_param") = nullptr, py::arg("num_threads") = -1);
462566

567+
py::class_<PyTIEREDIndex, PyVecSimIndex>(m, "TIEREDIndex")
568+
.def("wait_for_index", &PyTIERED_HNSWIndex::WaitForIndex, py::arg("waiting_duration") = 10)
569+
.def("get_buffer_limit", &PyTIERED_HNSWIndex::getBufferLimit)
570+
.def_static("get_threads_num", &PyTIEREDIndex::GetThreadsNum);
571+
572+
py::class_<PyTIERED_HNSWIndex, PyTIEREDIndex>(m, "TIERED_HNSWIndex")
573+
.def(
574+
py::init([](const HNSWParams &hnsw_params, const TieredHNSWParams &tiered_hnsw_params) {
575+
return new PyTIERED_HNSWIndex(hnsw_params, tiered_hnsw_params);
576+
}),
577+
py::arg("hnsw_params"), py::arg("tiered_hnsw_params"))
578+
.def("hnsw_label_count", &PyTIERED_HNSWIndex::HNSWLabelCount);
579+
463580
py::class_<PyBFIndex, PyVecSimIndex>(m, "BFIndex")
464581
.def(py::init([](const BFParams &params) { return new PyBFIndex(params); }),
465582
py::arg("params"));
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
*Copyright Redis Ltd. 2021 - present
3+
*Licensed under your choice of the Redis Source Available License 2.0 (RSALv2) or
4+
*the Server Side Public License v1 (SSPLv1).
5+
*/
6+
7+
#pragma once
8+
9+
#include <thread>
10+
#include <condition_variable>
11+
#include <bitset>
12+
13+
#include "VecSim/vec_sim.h"
14+
#include "VecSim/algorithms/hnsw/hnsw_tiered.h"
15+
#include "pybind11/pybind11.h"
16+
17+
namespace tiered_index_mock {
18+
19+
typedef struct RefManagedJob {
20+
AsyncJob *job;
21+
std::weak_ptr<VecSimIndex> index_weak_ref;
22+
} RefManagedJob;
23+
24+
struct SearchJobMock : public AsyncJob {
25+
void *query; // The query vector. ownership is passed to the job in the constructor.
26+
size_t k; // The number of results to return.
27+
size_t n; // The number of vectors in the index (might be useful for the mock)
28+
size_t dim; // The dimension of the vectors in the index (might be useful for the mock)
29+
std::atomic_int &successful_searches; // A reference to a shared counter that counts the number
30+
// of successful searches.
31+
SearchJobMock(std::shared_ptr<VecSimAllocator> allocator, JobCallback searchCB,
32+
VecSimIndex *index_, void *query_, size_t k_, size_t n_, size_t dim_,
33+
std::atomic_int &successful_searches_)
34+
: AsyncJob(allocator, HNSW_SEARCH_JOB, searchCB, index_), query(query_), k(k_), n(n_),
35+
dim(dim_), successful_searches(successful_searches_) {}
36+
~SearchJobMock() { this->allocator->free_allocation(query); }
37+
};
38+
39+
using JobQueue = std::queue<RefManagedJob>;
40+
int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx);
41+
int update_mem_callback(void *mem_ctx, size_t mem);
42+
43+
typedef struct IndexExtCtx {
44+
std::shared_ptr<VecSimIndex> index_strong_ref;
45+
~IndexExtCtx() { std::cout << "ctx dtor" << std::endl; }
46+
} IndexExtCtx;
47+
48+
static const size_t MAX_POOL_SIZE = 16;
49+
static const size_t THREAD_POOL_SIZE = MIN(MAX_POOL_SIZE, std::thread::hardware_concurrency());
50+
extern std::vector<std::thread> thread_pool;
51+
extern std::mutex queue_guard;
52+
extern std::condition_variable queue_cond;
53+
54+
void thread_pool_terminate(JobQueue &jobQ, bool &run_thread);
55+
56+
class ThreadParams {
57+
public:
58+
bool &run_thread;
59+
std::bitset<MAX_POOL_SIZE> &executions_status;
60+
const unsigned int thread_index;
61+
JobQueue &jobQ;
62+
ThreadParams(bool &run_thread, std::bitset<MAX_POOL_SIZE> &executions_status,
63+
const unsigned int thread_index, JobQueue &jobQ)
64+
: run_thread(run_thread), executions_status(executions_status), thread_index(thread_index),
65+
jobQ(jobQ) {}
66+
67+
ThreadParams(const ThreadParams &other) = default;
68+
};
69+
70+
void inline MarkExecuteInProcess(std::bitset<MAX_POOL_SIZE> &executions_status,
71+
size_t thread_index) {
72+
executions_status.set(thread_index);
73+
}
74+
75+
void inline MarkExecuteDone(std::bitset<MAX_POOL_SIZE> &executions_status, size_t thread_index) {
76+
executions_status.reset(thread_index);
77+
}
78+
void thread_main_loop(ThreadParams params) {
79+
while (params.run_thread) {
80+
std::unique_lock<std::mutex> lock(queue_guard);
81+
// Wake up and acquire the lock (atomically) ONLY if the job queue is not empty at that
82+
// point, or if the thread should not run anymore (and quit in that case).
83+
queue_cond.wait(lock, [&params]() { return !(params.jobQ.empty()) || !params.run_thread; });
84+
if (!params.run_thread)
85+
return;
86+
auto managed_job = params.jobQ.front();
87+
MarkExecuteInProcess(params.executions_status, params.thread_index);
88+
params.jobQ.pop();
89+
90+
lock.unlock();
91+
// Upgrade the index weak reference to a strong ref while we run the job over the index.
92+
if (auto temp_ref = managed_job.index_weak_ref.lock()) {
93+
managed_job.job->Execute(managed_job.job);
94+
MarkExecuteDone(params.executions_status, params.thread_index);
95+
}
96+
}
97+
}
98+
99+
/*
100+
* Mock callbacks for testing async tiered index. We use a simple std::queue to simulate the job
101+
* queue.
102+
*/
103+
104+
std::mutex queue_guard;
105+
std::condition_variable queue_cond;
106+
std::vector<std::thread> thread_pool;
107+
108+
int submit_callback(void *job_queue, AsyncJob **jobs, size_t len, void *index_ctx) {
109+
{
110+
std::unique_lock<std::mutex> lock(queue_guard);
111+
for (size_t i = 0; i < len; i++) {
112+
// Wrap the job with a struct that contains a weak reference to the related index.
113+
auto owned_job = RefManagedJob{
114+
.job = jobs[i],
115+
.index_weak_ref = reinterpret_cast<IndexExtCtx *>(index_ctx)->index_strong_ref};
116+
static_cast<JobQueue *>(job_queue)->push(owned_job);
117+
}
118+
}
119+
if (len == 1) {
120+
queue_cond.notify_one();
121+
} else {
122+
queue_cond.notify_all();
123+
}
124+
return VecSim_OK;
125+
}
126+
127+
int update_mem_callback(void *mem_ctx, size_t mem) {
128+
*(size_t *)mem_ctx = mem;
129+
return VecSim_OK;
130+
}
131+
132+
// Main loop for background worker threads that execute the jobs form the job queue.
133+
// run_thread uses as a signal to the thread that indicates whether it should keep running or
134+
// stop and terminate the thread.
135+
136+
void thread_pool_terminate(JobQueue &jobQ, bool &run_thread) {
137+
// Check every 10 ms if queue is empty, and if so, terminate the threads loop.
138+
while (true) {
139+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
140+
std::unique_lock<std::mutex> lock(queue_guard);
141+
if (jobQ.empty()) {
142+
run_thread = false;
143+
queue_cond.notify_all();
144+
break;
145+
}
146+
}
147+
for (size_t i = 0; i < THREAD_POOL_SIZE; i++) {
148+
thread_pool[i].join();
149+
}
150+
thread_pool.clear();
151+
}
152+
} // namespace tiered_index_mock

tests/flow/common.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,23 @@
77
from scipy import spatial
88
from numpy.testing import assert_allclose
99
import time
10+
import math
1011

12+
def create_hnsw_params(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01,
13+
is_multi=False):
14+
hnsw_params = HNSWParams()
15+
16+
hnsw_params.dim = dim
17+
hnsw_params.metric = metric
18+
hnsw_params.type = data_type
19+
hnsw_params.M = m
20+
hnsw_params.efConstruction = ef_construction
21+
hnsw_params.initialCapacity = num_elements
22+
hnsw_params.efRuntime = ef_runtime
23+
hnsw_params.epsilon = epsilon
24+
hnsw_params.multi = is_multi
25+
26+
return hnsw_params
1127
# Helper function for creating an index,uses the default HNSW parameters if not specified.
1228
def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200, m=16, ef_runtime=10, epsilon=0.01,
1329
is_multi=False):
@@ -24,3 +40,5 @@ def create_hnsw_index(dim, num_elements, metric, data_type, ef_construction=200,
2440
hnsw_params.multi = is_multi
2541

2642
return HNSWIndex(hnsw_params)
43+
44+

0 commit comments

Comments
 (0)