Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3a46c22
Added arrow as third party dependency
Sujan242 Mar 14, 2025
2063b5d
Added build with arrow tables functionality
Sujan242 Mar 17, 2025
e1aa5ac
updated stress tests to include attributes
Sujan242 Mar 17, 2025
c0aece3
renamed duplicate function
Sujan242 Mar 17, 2025
147dbe1
added libarrow as conda dependency
Sujan242 Mar 17, 2025
29e4614
updated data structure for storing tables
Sujan242 Mar 18, 2025
fbedcac
Merge pull request #1 from Sujan242/build-with-attributes
Sujan242 Mar 18, 2025
b8012db
modified gitignore
Mar 23, 2025
8281295
index partition bug fix
Sujan242 Apr 1, 2025
3db5f45
wip
Apr 1, 2025
35dfe11
Merge branch 'main' of https://github.com/Sujan242/quake
amithbhat1 Apr 1, 2025
318bb8c
written code for add & remove
amithbhat1 Apr 2, 2025
3bc3e5e
compile works
amithbhat1 Apr 2, 2025
bf953a2
index partition bug fix
Sujan242 Apr 5, 2025
e70db43
handled null attributes table
Sujan242 Apr 5, 2025
1d8a5b5
Adding some search things
Apr 4, 2025
f7eb486
Older tests working fine with filter search
Apr 5, 2025
f12d21e
Tests for search added
Apr 5, 2025
ac50b7f
fix remove logical bug - attr table can be NULL
amithbhat1 Apr 5, 2025
1288863
allow attr_table to be null - add vector
amithbhat1 Apr 5, 2025
711e414
fixed tests
amithbhat1 Apr 5, 2025
d530d46
Merge pull request #3 from Sujan242/attr-manip
amithbhat1 Apr 5, 2025
332ea7a
Merge branch 'main' into hybrid-query
amithbhat1 Apr 5, 2025
68706a6
fixed conflicts
Sujan242 Apr 5, 2025
f6305c9
added conda to quake_env
Sujan242 Apr 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
git config --global --add safe.directory '*'
eval "$(conda shell.bash hook)"
conda activate quake-env
conda install libarrow-all=19.0.1 -c conda-forge
mkdir -p build
cd build
cmake -DCMAKE_BUILD_TYPE=${{ env.BUILD_TYPE }} \
Expand All @@ -46,6 +47,7 @@ jobs:
git config --global --add safe.directory '*'
eval "$(conda shell.bash hook)"
conda activate quake-env
conda install libarrow-all=19.0.1 -c conda-forge
pip install --no-use-pep517 .
pip install pytest
python -m pytest test/python
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
__pycache__
__pycache__
.vscode
build/
quake.egg-info/
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
url = https://github.com/pybind/pybind11.git
[submodule "src/cpp/third_party/concurrentqueue"]
path = src/cpp/third_party/concurrentqueue
url = https://github.com/cameron314/concurrentqueue.git
url = https://github.com/cameron314/concurrentqueue.git
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ endif()
# Find Required Packages
# ---------------------------------------------------------------
find_package(Torch REQUIRED)
find_package(Arrow REQUIRED) # Corrected to uppercase 'Arrow'
find_package(Python3 COMPONENTS Development Interpreter REQUIRED)

message(STATUS "Torch include dir: ${TORCH_INCLUDE_DIRS}")
message(STATUS "Torch libraries: ${TORCH_LIBRARIES}")
message(STATUS "Python include dir: ${Python3_INCLUDE_DIRS}")
message(STATUS "Arrow include dir: ${ARROW_INCLUDE_DIR}")

set(PYTHON_INCLUDE_DIR ${Python3_INCLUDE_DIRS})

Expand All @@ -115,6 +117,7 @@ target_include_directories(${PROJECT_NAME}
${TORCH_INCLUDE_DIRS}
${project_INCLUDE_DIR}
${project_THIRD_PARTY_DIR}/concurrentqueue/
${ARROW_INCLUDE_DIR}
faiss
)

Expand Down Expand Up @@ -150,6 +153,7 @@ else()
endif()

target_link_libraries(${PROJECT_NAME} PUBLIC ${LINK_LIBS})
target_link_libraries(${PROJECT_NAME} PUBLIC Arrow::arrow_shared)

IF(CMAKE_BUILD_TYPE MATCHES Debug AND QUAKE_USE_TSAN)
message("Using thread sanitizer")
Expand Down
1 change: 1 addition & 0 deletions environments/ubuntu-latest/conda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- faiss-cpu
- matplotlib
- pytest
- libarrow-all=19.0.1
- pip
- pip:
- sphinx
Expand Down
4 changes: 3 additions & 1 deletion src/cpp/include/clustering.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ shared_ptr<Clustering> kmeans(Tensor vectors,
int n_clusters,
MetricType metric_type,
int niter = 5,
Tensor initial_centroids = Tensor());
std::shared_ptr<arrow::Table> attributes_table = nullptr,
Tensor initial_centroids = Tensor()
);


/**
Expand Down
16 changes: 16 additions & 0 deletions src/cpp/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
#include <pthread.h>
#include <ctime>

#include <arrow/api.h>
#include <arrow/array.h>
#include <arrow/table.h>
#include <arrow/type.h>
#include <arrow/chunked_array.h>

#ifdef QUAKE_USE_NUMA
#include <numa.h>
#include <numaif.h>
Expand Down Expand Up @@ -81,6 +87,7 @@ constexpr bool DEFAULT_PRECOMPUTED = true; ///< Default flag to us
constexpr float DEFAULT_INITIAL_SEARCH_FRACTION = 0.02f; ///< Default initial fraction of partitions to search.
constexpr float DEFAULT_RECOMPUTE_THRESHOLD = 0.001f; ///< Default threshold to trigger recomputation of search parameters.
constexpr int DEFAULT_APS_FLUSH_PERIOD_US = 100; ///< Default period (in microseconds) for flushing the APS buffer.
constexpr int DEFAULT_PRICE_THRESHOLD = INT_MAX;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the price threshold in case no value is given by the user. Will modify this while making the filtering column name agnostic


// Default constants for maintenance policy parameters
constexpr const char* DEFAULT_MAINTENANCE_POLICY = "query_cost"; ///< Default maintenance policy type.
Expand Down Expand Up @@ -164,6 +171,12 @@ inline string metric_type_to_str(faiss::MetricType metric) {
}
}

enum class FilteringType {
PRE_FILTERING,
POST_FILTERING,
IN_FILTERING
};

/**
* @brief Parameters for the search operation
*/
Expand All @@ -178,6 +191,8 @@ struct SearchParams {
float recompute_threshold = DEFAULT_RECOMPUTE_THRESHOLD;
float initial_search_fraction = DEFAULT_INITIAL_SEARCH_FRACTION;
int aps_flush_period_us = DEFAULT_APS_FLUSH_PERIOD_US;
int price_threshold = DEFAULT_PRICE_THRESHOLD;
FilteringType filteringType = FilteringType::IN_FILTERING;

SearchParams() = default;
};
Expand Down Expand Up @@ -250,6 +265,7 @@ struct Clustering {
Tensor partition_ids;
vector<Tensor> vectors;
vector<Tensor> vector_ids;
vector<shared_ptr<arrow::Table>> attributes_tables;

int64_t ntotal() const {
int64_t n = 0;
Expand Down
24 changes: 23 additions & 1 deletion src/cpp/include/dynamic_inverted_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,36 @@ namespace faiss {
* @param n_entry Number of entries to add.
* @param ids Pointer to the vector IDs.
* @param codes Pointer to the encoded vectors.
* @param data_frames Arrow data frames for the attributes.
* @return Number of entries added.
* @throws std::runtime_error if the partition does not exist.
*/
size_t add_entries(
size_t list_no,
size_t n_entry,
const idx_t *ids,
const uint8_t *codes) override;
const uint8_t *codes,
std::shared_ptr<arrow::Table> attributes_table
);

/**
* @brief Append new entries (codes and IDs) to a partition.
*
* @param list_no Partition number.
* @param n_entry Number of entries to add.
* @param ids Pointer to the vector IDs.
* @param codes Pointer to the encoded vectors.
* @param data_frames Optional Arrow data frames.
* @return Number of entries added.
* @throws std::runtime_error if the partition does not exist.
*/
size_t add_entries(
size_t list_no,
size_t n_entry,
const idx_t *ids,
const uint8_t *codes
) ;


/**
* @brief Update existing entries in a partition.
Expand Down
12 changes: 11 additions & 1 deletion src/cpp/include/index_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class IndexPartition {

uint8_t* codes_ = nullptr; ///< Pointer to the encoded vectors (raw memory block)
idx_t* ids_ = nullptr; ///< Pointer to the vector IDs
std::shared_ptr<arrow::Table> attributes_table_ = {};

std::unordered_map<idx_t, int64_t> id_to_index_; ///< Map of vector ID to index

Expand Down Expand Up @@ -88,7 +89,7 @@ class IndexPartition {
* @param new_ids Pointer to the new vector IDs.
* @param new_codes Pointer to the new encoded vectors.
*/
void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes);
void append(int64_t n_entry, const idx_t* new_ids, const uint8_t* new_codes, std::shared_ptr<arrow::Table> attributes_table=nullptr);

/**
* @brief Update existing entries in place.
Expand All @@ -111,6 +112,15 @@ class IndexPartition {
*/
void remove(int64_t index);

/**
* @brief Remove the associated attribute of an entry from the partition. Used in conjuntion with the remove(index) function
*
* Removes the attribute by performing masking & filtering
*
* @param index Index of the vector to remove.
*/
void removeAttribute(int64_t index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this function needed? seems unnecessary


/**
* @brief Resize the partition.
*
Expand Down
28 changes: 22 additions & 6 deletions src/cpp/include/list_scanning.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class TypedTopKBuffer {
partitions_scanned_.fetch_add(1, std::memory_order_relaxed);
}

void remove(int rejected_index) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid modifying the topkbuffer class. Just make sure that elements you add to the buffer pass the filter (in the case of pre-filtering)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this for post-filtering case. So after we get topk buffer from one partition, we need to remove whatever doesn't pass the filter. This function serves that purpose

topk_[rejected_index] = topk_[--curr_offset_];
}

DistanceType flush() {
std::lock_guard<std::recursive_mutex> buffer_lock(buffer_mutex_);
if (curr_offset_ > k_) {
Expand Down Expand Up @@ -280,11 +284,22 @@ inline void scan_list_with_ids_l2(const float *query_vec,
const int64_t *list_ids,
int list_size,
int d,
TopkBuffer &buffer) {
TopkBuffer &buffer,
bool* bitmap = nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch to a vector so we avoid memory leaks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

const float *vec = list_vecs;
for (int l = 0; l < list_size; l++) {
buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]);
vec += d;

if (bitmap == nullptr) {
for (int l = 0; l < list_size; l++) {
buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]);
vec += d;
}
} else {
for (int l = 0; l < list_size; l++) {
if (bitmap[l]) {
buffer.add(sqrt(faiss::fvec_L2sqr(query_vec, vec, d)), list_ids[l]);
}
vec += d;
}
}
}

Expand All @@ -295,7 +310,8 @@ inline void scan_list(const float *query_vec,
int list_size,
int d,
TopkBuffer &buffer,
faiss::MetricType metric = faiss::METRIC_L2) {
faiss::MetricType metric = faiss::METRIC_L2,
bool* bitmap = nullptr) {
// Dispatch based on metric type and whether list_ids is provided.
if (metric == faiss::METRIC_INNER_PRODUCT) {
if (list_ids == nullptr)
Expand All @@ -306,7 +322,7 @@ inline void scan_list(const float *query_vec,
if (list_ids == nullptr)
scan_list_no_ids_l2(query_vec, list_vecs, list_size, d, buffer);
else
scan_list_with_ids_l2(query_vec, list_vecs, list_ids, list_size, d, buffer);
scan_list_with_ids_l2(query_vec, list_vecs, list_ids, list_size, d, buffer, bitmap);
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/cpp/include/partition_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <common.h>
#include <dynamic_inverted_list.h>
#include <arrow/api.h>

class QuakeIndex;

Expand Down Expand Up @@ -56,7 +57,15 @@ class PartitionManager {
* @param assignments Tensor of shape [num_vectors] containing partition IDs. If not provided, vectors are assigned using the parent index.
* @return Timing information for the operation.
*/
shared_ptr<ModifyTimingInfo> add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true);
shared_ptr<ModifyTimingInfo> add(const Tensor &vectors, const Tensor &vector_ids, const Tensor &assignments = Tensor(), bool check_uniques = true,std::shared_ptr<arrow::Table> attributes_table = {});

/**
* @brief Filter the appropriate row from the attribute table
* @param table Arrow table for the attributes.
* @param vector_id Vector_id by which we are filtering.
* @return Table containing only the row pertaining to the vector_id
*/
std::shared_ptr<arrow::Table> filterRowById(std::shared_ptr<arrow::Table> table, int64_t vector_id);

/**
* @brief Remove vectors by ID from the index.
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/include/quake_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class QuakeIndex {
* @param build_params Parameters for building the index.
* @return Timing information for the build.
*/
shared_ptr<BuildTimingInfo> build(Tensor x, Tensor ids, shared_ptr<IndexBuildParams> build_params);
shared_ptr<BuildTimingInfo> build(Tensor x, Tensor ids, shared_ptr<IndexBuildParams> build_params, std::shared_ptr<arrow::Table> attributes_table = nullptr);

/**
* @brief Search for vectors in the index.
Expand All @@ -73,9 +73,10 @@ class QuakeIndex {
* @brief Add vectors to the index.
* @param x Tensor of shape [num_vectors, dimension].
* @param ids Tensor of shape [num_vectors].
* * @param attributes_table Associated attribute_table for each vector_id.
* @return Timing information for the add operation.
*/
shared_ptr<ModifyTimingInfo> add(Tensor x, Tensor ids);
shared_ptr<ModifyTimingInfo> add(Tensor x, Tensor ids, std::shared_ptr<arrow::Table> attributes_table = {});

/**
* @brief Remove vectors from the index.
Expand Down
62 changes: 58 additions & 4 deletions src/cpp/src/clustering.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
// - Use descriptive variable names

#include "clustering.h"
#include <faiss/IndexFlat.h>
#include "faiss/Clustering.h"
#include "index_partition.h"
#include <list_scanning.h>
#include <faiss/IndexFlat.h>
#include "faiss/Clustering.h"
#include <arrow/compute/api_vector.h>
#include <arrow/api.h>
#include <arrow/compute/api.h>

shared_ptr<Clustering> kmeans(Tensor vectors,
Tensor ids,
int n_clusters,
MetricType metric_type,
int niter,
std::shared_ptr<arrow::Table> attributes_table,
Tensor /* initial_centroids */) {
// Ensure enough vectors are available and sizes match.
assert(vectors.size(0) >= n_clusters * 2);
Expand Down Expand Up @@ -54,17 +58,67 @@ shared_ptr<Clustering> kmeans(Tensor vectors,
// Partition vectors and ids by cluster.
vector<Tensor> cluster_vectors(n_clusters);
vector<Tensor> cluster_ids(n_clusters);
vector<shared_ptr<arrow::Table>> cluster_attributes_tables(n_clusters);

for (int i = 0; i < n_clusters; i++) {
cluster_vectors[i] = vectors.index({assignments == i});
cluster_ids[i] = ids.index({assignments == i});
auto mask = (assignments == i);

// List of vectors present in the cluster i
cluster_vectors[i] = vectors.index({mask});
// List of vectorIds present in the cluster i
cluster_ids[i] = ids.index({mask});

if(attributes_table == nullptr) {
cluster_attributes_tables[i] = nullptr;
continue;
}

auto cluster_ids_tensor = cluster_ids[i]; // Assuming this is a tensor with IDs
std::vector<int64_t> cluster_ids_vec(cluster_ids_tensor.data<int64_t>(),
cluster_ids_tensor.data<int64_t>() + cluster_ids_tensor.numel());

// Convert to Arrow Array
arrow::Int64Builder id_builder;
id_builder.AppendValues(cluster_ids_vec);
std::shared_ptr<arrow::Array> cluster_ids_array;
id_builder.Finish(&cluster_ids_array);

// Get the "id" column from the attributes table
std::shared_ptr<arrow::ChunkedArray> id_column = attributes_table->GetColumnByName("id");

auto lookup_options = std::make_shared<arrow::compute::SetLookupOptions>(cluster_ids_array);
// Apply set lookup to filter rows
auto result = arrow::compute::CallFunction(
"index_in",
{id_column->chunk(0)},
lookup_options.get()
);

auto index_array = std::static_pointer_cast<arrow::Int32Array>(result->make_array());

auto mask_result = arrow::compute::CallFunction(
"not_equal",
{index_array, arrow::MakeScalar(-1)}
);

// Convert result to a Boolean mask
auto mask_table = std::static_pointer_cast<arrow::BooleanArray>(mask_result->make_array());

// Filter the table using the mask
auto filtered_table_result = arrow::compute::Filter(attributes_table, mask_table);

cluster_attributes_tables[i] = filtered_table_result->table();
}


Tensor partition_ids = torch::arange(n_clusters, torch::kInt64);

shared_ptr<Clustering> clustering = std::make_shared<Clustering>();
clustering->centroids = centroids;
clustering->partition_ids = partition_ids;
clustering->vectors = cluster_vectors;
clustering->vector_ids = cluster_ids;
clustering->attributes_tables = cluster_attributes_tables;

delete index_ptr;
return clustering;
Expand Down
Loading