16
16
#include < thread>
17
17
#include < VecSim/algorithms/hnsw/hnsw_single.h>
18
18
#include < VecSim/algorithms/brute_force/brute_force_single.h>
19
+ #include " tiered_index_mock.h"
19
20
20
21
namespace py = pybind11;
22
+ using namespace tiered_index_mock ;
21
23
22
24
// Helper function that iterates query results and wrap them in python numpy object -
23
25
// a tuple of two 2D arrays: (labels, distances)
@@ -174,6 +176,13 @@ class PyVecSimIndex {
174
176
175
177
size_t indexSize () { return VecSimIndex_IndexSize (index.get ()); }
176
178
179
+ size_t indexMemory () { return this ->index ->getAllocationSize (); }
180
+
181
+ double getGetDistanceFrom (size_t id, const py::object &input) {
182
+ py::array query (input);
183
+ return this ->index ->getDistanceFrom (id, (const char *)query.data (0 ));
184
+ }
185
+
177
186
PyBatchIterator createBatchIterator (const py::object &input, VecSimQueryParams *query_params) {
178
187
py::array query (input);
179
188
return PyBatchIterator (
@@ -372,6 +381,94 @@ class PyHNSWLibIndex : public PyVecSimIndex {
372
381
}
373
382
};
374
383
384
+ class PyTIEREDIndex : public PyVecSimIndex {
385
+
386
+ protected:
387
+ JobQueue jobQueue; // External queue that holds the jobs.
388
+ IndexExtCtx jobQueueCtx; // External context to be sent to the submit callback.
389
+ SubmitCB submitCb; // A callback that submits an array of jobs into a given jobQueue.
390
+ size_t memoryCtx; // External context that stores the index memory consumption.
391
+ UpdateMemoryCB UpdateMemCb; // A callback that updates the memoryCtx
392
+ // with a given memory (number).
393
+ size_t flatBufferLimit; // Maximum size allowed for the flat buffer. If flat buffer is full, use
394
+ // in-place insertion.
395
+ bool run_thread;
396
+ std::bitset<MAX_POOL_SIZE> executions_status;
397
+
398
+ TieredIndexParams TieredIndexParams_Init () {
399
+ TieredIndexParams ret = {
400
+ .jobQueue = &this ->jobQueue ,
401
+ .jobQueueCtx = &this ->jobQueueCtx ,
402
+ .submitCb = this ->submitCb ,
403
+ .memoryCtx = &this ->memoryCtx ,
404
+ .UpdateMemCb = this ->UpdateMemCb ,
405
+ .flatBufferLimit = this ->flatBufferLimit ,
406
+ };
407
+
408
+ return ret;
409
+ }
410
+
411
+ public:
412
+ explicit PyTIEREDIndex (size_t BufferLimit = 20000000 )
413
+ : submitCb(submit_callback), memoryCtx(0 ), UpdateMemCb(update_mem_callback), flatBufferLimit(BufferLimit),
414
+ run_thread(true ) {
415
+
416
+ for (size_t i = 0 ; i < THREAD_POOL_SIZE; i++) {
417
+ ThreadParams params (run_thread, executions_status, i, jobQueue);
418
+ thread_pool.emplace_back (thread_main_loop, params);
419
+ }
420
+ }
421
+
422
+ virtual ~PyTIEREDIndex () = 0 ;
423
+
424
+ void WaitForIndex (size_t waiting_duration = 10 ) {
425
+ bool keep_wating = true ;
426
+ while (keep_wating) {
427
+ std::this_thread::sleep_for (std::chrono::milliseconds (waiting_duration));
428
+ std::unique_lock<std::mutex> lock (queue_guard);
429
+ if (jobQueue.empty ()) {
430
+ while (true ) {
431
+ if (executions_status.count () == 0 ) {
432
+ keep_wating = false ;
433
+ break ;
434
+ }
435
+ std::this_thread::sleep_for (std::chrono::milliseconds (waiting_duration));
436
+ }
437
+ }
438
+ }
439
+ }
440
+
441
+
442
+ static size_t GetThreadsNum () { return THREAD_POOL_SIZE; }
443
+
444
+ size_t getBufferLimit () {return flatBufferLimit; }
445
+ };
446
+
447
+ PyTIEREDIndex::~PyTIEREDIndex () { thread_pool_terminate (jobQueue, run_thread); }
448
+ class PyTIERED_HNSWIndex : public PyTIEREDIndex {
449
+ public:
450
+ explicit PyTIERED_HNSWIndex (const HNSWParams &hnsw_params,
451
+ const TieredHNSWParams &tiered_hnsw_params) {
452
+
453
+ // Create primaryIndexParams and specific params for hnsw tiered index.
454
+ VecSimParams primary_index_params = {.algo = VecSimAlgo_HNSWLIB, .hnswParams = hnsw_params};
455
+
456
+ // create TieredIndexParams
457
+ TieredIndexParams tiered_params = TieredIndexParams_Init ();
458
+
459
+ tiered_params.primaryIndexParams = &primary_index_params;
460
+ tiered_params.specificParams .tieredHnswParams = tiered_hnsw_params;
461
+
462
+ // create VecSimParams for TieredIndexParams
463
+ VecSimParams params = {.algo = VecSimAlgo_TIERED, .tieredParams = tiered_params};
464
+
465
+ this ->index = std::shared_ptr<VecSimIndex>(VecSimIndex_New (¶ms), VecSimIndex_Free);
466
+ // Set the created tiered index in the index external context.
467
+ this ->jobQueueCtx .index_strong_ref = this ->index ;
468
+ }
469
+ size_t HNSWLabelCount () { return this ->index ->info ().hnswInfo .indexLabelCount ; }
470
+ };
471
+
375
472
class PyBFIndex : public PyVecSimIndex {
376
473
public:
377
474
explicit PyBFIndex (const BFParams &bf_params) {
@@ -425,6 +522,10 @@ PYBIND11_MODULE(VecSim, m) {
425
522
.def_readwrite (" initialCapacity" , &BFParams::initialCapacity)
426
523
.def_readwrite (" blockSize" , &BFParams::blockSize);
427
524
525
+ py::class_<TieredHNSWParams>(m, " TieredHNSWParams" )
526
+ .def (py::init ())
527
+ .def_readwrite (" swapJobThreshold" , &TieredHNSWParams::swapJobThreshold);
528
+
428
529
py::class_<VecSimParams>(m, " VecSimParams" )
429
530
.def (py::init ())
430
531
.def_readwrite (" algo" , &VecSimParams::algo)
@@ -451,8 +552,11 @@ PYBIND11_MODULE(VecSim, m) {
451
552
.def (" range_query" , &PyVecSimIndex::range, py::arg (" vector" ), py::arg (" radius" ),
452
553
py::arg (" query_param" ) = nullptr )
453
554
.def (" index_size" , &PyVecSimIndex::indexSize)
555
+ .def (" index_memory" , &PyVecSimIndex::indexMemory)
454
556
.def (" create_batch_iterator" , &PyVecSimIndex::createBatchIterator, py::arg (" query_blob" ),
455
557
py::arg (" query_param" ) = nullptr )
558
+ .def (" get_distance_from" , &PyVecSimIndex::getGetDistanceFrom, py::arg (" label" ),
559
+ py::arg (" blob" ))
456
560
.def (" get_vector" , &PyVecSimIndex::getVector);
457
561
458
562
py::class_<PyHNSWLibIndex, PyVecSimIndex>(m, " HNSWIndex" )
@@ -472,6 +576,19 @@ PYBIND11_MODULE(VecSim, m) {
472
576
.def (" range_parallel" , &PyHNSWLibIndex::searchRangeParallel, py::arg (" queries" ),
473
577
py::arg (" radius" ), py::arg (" query_param" ) = nullptr , py::arg (" num_threads" ) = -1 );
474
578
579
+ py::class_<PyTIEREDIndex, PyVecSimIndex>(m, " TIEREDIndex" )
580
+ .def (" wait_for_index" , &PyTIERED_HNSWIndex::WaitForIndex, py::arg (" waiting_duration" ) = 10 )
581
+ .def (" get_buffer_limit" , &PyTIERED_HNSWIndex::getBufferLimit)
582
+ .def_static (" get_threads_num" , &PyTIEREDIndex::GetThreadsNum);
583
+
584
+ py::class_<PyTIERED_HNSWIndex, PyTIEREDIndex>(m, " TIERED_HNSWIndex" )
585
+ .def (
586
+ py::init ([](const HNSWParams &hnsw_params, const TieredHNSWParams &tiered_hnsw_params) {
587
+ return new PyTIERED_HNSWIndex (hnsw_params, tiered_hnsw_params);
588
+ }),
589
+ py::arg (" hnsw_params" ), py::arg (" tiered_hnsw_params" ))
590
+ .def (" hnsw_label_count" , &PyTIERED_HNSWIndex::HNSWLabelCount);
591
+
475
592
py::class_<PyBFIndex, PyVecSimIndex>(m, " BFIndex" )
476
593
.def (py::init ([](const BFParams ¶ms) { return new PyBFIndex (params); }),
477
594
py::arg (" params" ));
0 commit comments