Skip to content

Commit 54ffb1b

Browse files
authored
Add Comprehensive Tests for Time Slice Mutex Metrics (#237)
* Add Comprehensive Tests for Time Slice Mutex Metrics Summary This PR adds comprehensive test coverage for the new Time Slice Mutex metrics system, ensuring all 7 metrics are properly validated across multiple layers of the codebase. Changes Made New Test Files - **vmsdk/testing/mrmw_mutex_test.cc**: Added 6 comprehensive test cases for direct mutex lock validation - **Enhanced testing/ft_search_test.cc**: Added metrics validation to existing FTSearchTest - **Enhanced testing/index_schema_test.cc**: Added upsert/delete metrics validation to IndexSchemaSubscriptionTest Metrics Coverage All 7 Time Slice Mutex metrics are now fully tested: | Metric | Test Coverage | Validation Method | |--------|---------------|-------------------| | `time_slice_read_periods` | Mutex + Integration | Direct lock operations + search commands | | `time_slice_read_time` | Mutex + Integration | Microsecond timing validation | | `time_slice_queries` | Integration | Search command execution count | | `time_slice_write_periods` | Mutex | Direct write lock operations | | `time_slice_write_time` | Mutex | Microsecond timing validation | | `time_slice_upserts` | Index Operations | Add/modify keyspace notifications | | `time_slice_deletes` | Index Operations | Remove keyspace notifications | Test Architecture **Low-Level Mutex Tests (vmsdk/testing/mrmw_mutex_test.cc)**: - `ReaderLockMetrics` - Single reader lock validation - `WriterLockMetrics` - Single writer lock validation - `MultipleLockMetrics` - Sequential lock accumulation (3R + 2W) - `ConcurrentMetrics` - Thread-safe concurrent access (10R + 5W) - `GlobalStatisticsVerification` - Multi-instance global aggregation - `TimingAccuracy` - Microsecond precision validation (±50μs bounds) **Integration Tests**: - `FTSearchTest` - Real search commands with 100 vector operations - `IndexSchemaSubscriptionTest` - 13 test cases covering add/modify/remove scenarios Key Features - **Multi-layer validation**: From direct mutex operations to high-level commands - **Thread safety**: Concurrent access validation with atomic counters - **Timing precision**: Microsecond-level accuracy testing - **Operation-specific**: Different validation paths for success/failure/skip scenarios - **Global aggregation**: Multiple mutex instances contributing to shared statistics Testing - All existing tests continue to pass - New tests provide comprehensive coverage for metrics accuracy - Thread safety validated under concurrent load - Timing measurements verified within reasonable bounds Files Changed - `vmsdk/testing/mrmw_mutex_test.cc` (new file) - `testing/ft_search_test.cc` (enhanced) - `testing/index_schema_test.cc` (enhanced) This ensures the new "Time Slice Mutex" info section will have reliable, well-tested metrics for monitoring system performance and lock contention. Signed-off-by: yulazariy <[email protected]> * refactor index_schema_test to make more readable Signed-off-by: yulazariy <[email protected]> * Refactor TimeSlicedMRMWStats so we can have faster access Signed-off-by: yulazariy <[email protected]> --------- Signed-off-by: yulazariy <[email protected]>
1 parent 20ceefa commit 54ffb1b

File tree

9 files changed

+354
-33
lines changed

9 files changed

+354
-33
lines changed

src/index_schema.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,13 +414,17 @@ void IndexSchema::ProcessAttributeMutation(
414414
if (index->IsTracked(key)) {
415415
auto res = index->ModifyRecord(key, data_view);
416416
TrackResults(ctx, res, "Modify", stats_.subscription_modify);
417+
if (res.ok() && res.value()) {
418+
++Metrics::GetStats().time_slice_upserts;
419+
}
417420
return;
418421
}
419422
bool was_tracked = IsTrackedByAnyIndex(key);
420423
auto res = index->AddRecord(key, data_view);
421424
TrackResults(ctx, res, "Add", stats_.subscription_add);
422425

423426
if (res.ok() && res.value()) {
427+
++Metrics::GetStats().time_slice_upserts;
424428
// Increment the hash key count if it wasn't tracked and we successfully
425429
// added it to the index.
426430
if (!was_tracked) {
@@ -451,6 +455,7 @@ void IndexSchema::ProcessAttributeMutation(
451455
auto res = index->RemoveRecord(key, deletion_type);
452456
TrackResults(ctx, res, "Remove", stats_.subscription_remove);
453457
if (res.ok() && res.value()) {
458+
++Metrics::GetStats().time_slice_deletes;
454459
// Reduce the hash key count if nothing is tracking the key anymore.
455460
if (!IsTrackedByAnyIndex(key)) {
456461
--stats_.document_cnt;

src/metrics.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ class Metrics {
115115
coordinator_server_search_index_partition_success_latency{
116116
absl::ToInt64Nanoseconds(absl::Nanoseconds(1)),
117117
absl::ToInt64Nanoseconds(absl::Seconds(1)), LATENCY_PRECISION};
118+
// Time Slice Mutex metrics
119+
std::atomic<uint64_t> time_slice_read_periods{0};
120+
std::atomic<uint64_t> time_slice_read_time{0}; // microseconds, cumulative
121+
std::atomic<uint64_t> time_slice_queries{0};
122+
std::atomic<uint64_t> time_slice_write_periods{0};
123+
std::atomic<uint64_t> time_slice_write_time{0}; // microseconds, cumulative
124+
std::atomic<uint64_t> time_slice_upserts{0};
125+
std::atomic<uint64_t> time_slice_deletes{0};
118126
};
119127
static Stats& GetStats() { return GetInstance().stats_; }
120128

src/query/search.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ absl::StatusOr<std::deque<indexes::Neighbor>> Search(
358358
auto vector_index = dynamic_cast<indexes::VectorBase *>(index.get());
359359
auto &time_sliced_mutex = parameters.index_schema->GetTimeSlicedMutex();
360360
vmsdk::ReaderMutexLock lock(&time_sliced_mutex);
361+
++Metrics::GetStats().time_slice_queries;
362+
361363
if (!parameters.filter_parse_results.root_predicate) {
362364
return MaybeAddIndexedContent(PerformVectorSearch(vector_index, parameters),
363365
parameters);

src/valkey_search.cc

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,48 @@ static vmsdk::info_field::Integer ingest_total_failures(
202202
return Metrics::GetStats().ingest_total_failures;
203203
}));
204204

205+
static vmsdk::info_field::Integer time_slice_read_periods(
206+
"time_slice_mutex", "time_slice_read_periods",
207+
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {
208+
return vmsdk::GetGlobalTimeSlicedMRMWStats().read_periods;
209+
}));
210+
211+
static vmsdk::info_field::Integer time_slice_read_time(
212+
"time_slice_mutex", "time_slice_read_time",
213+
vmsdk::info_field::IntegerBuilder().App().Units(vmsdk::info_field::Units::kMicroSeconds).Computed([]() -> long long {
214+
return vmsdk::GetGlobalTimeSlicedMRMWStats().read_time_microseconds;
215+
}));
216+
217+
static vmsdk::info_field::Integer time_slice_write_periods(
218+
"time_slice_mutex", "time_slice_write_periods",
219+
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {
220+
return vmsdk::GetGlobalTimeSlicedMRMWStats().write_periods;
221+
}));
222+
223+
static vmsdk::info_field::Integer time_slice_write_time(
224+
"time_slice_mutex", "time_slice_write_time",
225+
vmsdk::info_field::IntegerBuilder().App().Units(vmsdk::info_field::Units::kMicroSeconds).Computed([]() -> long long {
226+
return vmsdk::GetGlobalTimeSlicedMRMWStats().write_time_microseconds;
227+
}));
228+
229+
static vmsdk::info_field::Integer time_slice_queries(
230+
"time_slice_mutex", "time_slice_queries",
231+
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {
232+
return Metrics::GetStats().time_slice_queries;
233+
}));
234+
235+
static vmsdk::info_field::Integer time_slice_upserts(
236+
"time_slice_mutex", "time_slice_upserts",
237+
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {
238+
return Metrics::GetStats().time_slice_upserts;
239+
}));
240+
241+
static vmsdk::info_field::Integer time_slice_deletes(
242+
"time_slice_mutex", "time_slice_deletes",
243+
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {
244+
return Metrics::GetStats().time_slice_deletes;
245+
}));
246+
205247
static vmsdk::info_field::Integer query_queue_size(
206248
"thread-pool", "query_queue_size",
207249
vmsdk::info_field::IntegerBuilder().App().Computed([]() -> long long {

testing/ft_search_test.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "src/coordinator/coordinator.pb.h"
3636
#include "src/coordinator/util.h"
3737
#include "src/indexes/vector_base.h"
38+
#include "src/metrics.h"
3839
#include "src/query/search.h"
3940
#include "src/schema_manager.h"
4041
#include "src/utils/string_interning.h"
@@ -47,6 +48,7 @@
4748
#include "vmsdk/src/testing_infra/module.h"
4849
#include "vmsdk/src/testing_infra/utils.h"
4950
#include "vmsdk/src/thread_pool.h"
51+
#include "vmsdk/src/time_sliced_mrmw_mutex.h"
5052
#include "vmsdk/src/valkey_module_api/valkey_module.h"
5153

5254
namespace valkey_search {
@@ -503,7 +505,7 @@ TEST_P(FTSearchTest, FTSearchTests) {
503505
delete[] ids;
504506
});
505507
for (size_t i = 0; i < node_ids.size(); ++i) {
506-
auto node_id = node_ids[i];
508+
const auto& node_id = node_ids[i];
507509
EXPECT_CALL(
508510
*kMockValkeyModule,
509511
GetClusterNodeInfo(testing::_, testing::StrEq(node_id), testing::_,
@@ -563,6 +565,14 @@ TEST_P(FTSearchTest, FTSearchTests) {
563565
.Times(::testing::AnyNumber());
564566
auto vectors = DeterministicallyGenerateVectors(100, dimensions, 10.0);
565567
AddVectors(vectors);
568+
569+
// Capture initial metrics before starting searches
570+
auto& stats = Metrics::GetStats();
571+
auto& mrmw_stats = vmsdk::GetGlobalTimeSlicedMRMWStats();
572+
uint64_t initial_queries = stats.time_slice_queries;
573+
uint64_t initial_read_periods = mrmw_stats.read_periods;
574+
uint64_t initial_read_time = mrmw_stats.read_time_microseconds;
575+
566576
RE2 reply_regex(R"(\*3\r\n:1\r\n\+\d+\r\n\*2\r\n\+score\r\n\+.*\r\n)");
567577
uint64_t i = 0;
568578
for (auto &vector : vectors) {
@@ -616,6 +626,11 @@ TEST_P(FTSearchTest, FTSearchTests) {
616626
TestValkeyModule_FreeString(&fake_ctx_, cmd_arg);
617627
}
618628
}
629+
630+
// Verify that metrics were incremented correctly
631+
EXPECT_EQ(stats.time_slice_queries, initial_queries + vectors.size());
632+
EXPECT_GT(mrmw_stats.read_periods, initial_read_periods);
633+
EXPECT_GT(mrmw_stats.read_time_microseconds, initial_read_time);
619634
}
620635

621636
INSTANTIATE_TEST_SUITE_P(

testing/index_schema_test.cc

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "src/indexes/vector_flat.h"
3535
#include "src/indexes/vector_hnsw.h"
3636
#include "src/keyspace_event_manager.h"
37+
#include "src/metrics.h"
3738
#include "src/schema_manager.h"
3839
#include "src/utils/string_interning.h"
3940
#include "testing/common.h"
@@ -82,7 +83,17 @@ struct IndexSchemaSubscriptionTestCase {
8283
};
8384

8485
class IndexSchemaSubscriptionTest
85-
: public ValkeySearchTestWithParam<IndexSchemaSubscriptionTestCase> {};
86+
: public ValkeySearchTestWithParam<IndexSchemaSubscriptionTestCase> {
87+
protected:
88+
// Helper functions to check operation success/failure patterns
89+
static bool IsOperationSuccessful(const absl::optional<absl::StatusOr<bool>>& result) {
90+
return result.has_value() && result.value().ok() && result.value().value();
91+
}
92+
93+
static bool IsOperationFailed(const absl::optional<absl::StatusOr<bool>>& result) {
94+
return result.has_value() && !result.value().ok();
95+
}
96+
};
8697

8798
TEST_P(IndexSchemaSubscriptionTest, OnKeyspaceNotificationTest) {
8899
const IndexSchemaSubscriptionTestCase &test_case = GetParam();
@@ -193,6 +204,12 @@ TEST_P(IndexSchemaSubscriptionTest, OnKeyspaceNotificationTest) {
193204
.skipped_cnt =
194205
index_schema->GetStats().subscription_modify.skipped_cnt};
195206
uint32_t document_cnt = index_schema->GetStats().document_cnt;
207+
208+
// Capture initial Time Slice Mutex metrics
209+
auto& global_stats = Metrics::GetStats();
210+
uint64_t initial_upserts = global_stats.time_slice_upserts;
211+
uint64_t initial_deletes = global_stats.time_slice_deletes;
212+
196213
index_schema->OnKeyspaceNotification(&fake_ctx, VALKEYMODULE_NOTIFY_HASH,
197214
"event", key_valkey_str.get());
198215
if (use_thread_pool) {
@@ -220,51 +237,60 @@ TEST_P(IndexSchemaSubscriptionTest, OnKeyspaceNotificationTest) {
220237
EXPECT_EQ(index_schema->GetStats().document_cnt - document_cnt,
221238
test_case.expected_document_cnt_delta);
222239

223-
// Check field type metrics based on test case's index_type and success/failure
224-
if (test_case.expect_index_add_w_result.has_value() &&
225-
test_case.expect_index_add_w_result.value().ok() &&
226-
test_case.expect_index_add_w_result.value().value()) {
227-
// For successful additions, check the appropriate field type metric
240+
// Determine operation success/failure states using helper functions
241+
bool successful_add = IsOperationSuccessful(test_case.expect_index_add_w_result);
242+
bool successful_modify = IsOperationSuccessful(test_case.expect_index_modify_w_result);
243+
bool successful_remove = IsOperationSuccessful(test_case.expect_index_remove_w_result);
244+
245+
bool failed_operation = IsOperationFailed(test_case.expect_index_add_w_result) ||
246+
IsOperationFailed(test_case.expect_index_modify_w_result) ||
247+
IsOperationFailed(test_case.expect_index_remove_w_result);
248+
249+
bool successful_upsert = successful_add || successful_modify;
250+
bool is_hash_operation = !test_case.open_key_fail &&
251+
test_case.open_key_type == VALKEYMODULE_KEYTYPE_HASH &&
252+
test_case.valkey_hash_data.has_value();
253+
254+
// Check field type metrics for successful operations with document count increase
255+
if (successful_upsert && test_case.expected_document_cnt_delta > 0) {
228256
switch (test_case.index_type) {
229257
case indexes::IndexerType::kVector:
230-
// Only check if document count increased (real vector field indexed)
231-
if (test_case.expected_document_cnt_delta > 0) {
232-
EXPECT_GT(metrics.ingest_field_vector, initial_field_vector);
233-
}
258+
EXPECT_GT(metrics.ingest_field_vector, initial_field_vector);
234259
break;
235260
case indexes::IndexerType::kNumeric:
236-
if (test_case.expected_document_cnt_delta > 0) {
237-
EXPECT_GT(metrics.ingest_field_numeric, initial_field_numeric);
238-
}
261+
EXPECT_GT(metrics.ingest_field_numeric, initial_field_numeric);
239262
break;
240263
case indexes::IndexerType::kTag:
241-
if (test_case.expected_document_cnt_delta > 0) {
242-
EXPECT_GT(metrics.ingest_field_tag, initial_field_tag);
243-
}
264+
EXPECT_GT(metrics.ingest_field_tag, initial_field_tag);
244265
break;
245266
default:
246267
break;
247268
}
248269
}
249270

250-
// Check for failure metrics
251-
if ((test_case.expect_index_add_w_result.has_value() &&
252-
!test_case.expect_index_add_w_result.value().ok()) ||
253-
(test_case.expect_index_modify_w_result.has_value() &&
254-
!test_case.expect_index_modify_w_result.value().ok()) ||
255-
(test_case.expect_index_remove_w_result.has_value() &&
256-
!test_case.expect_index_remove_w_result.value().ok())) {
257-
// For failures, verify the total failures metric increased
271+
// Check failure metrics
272+
if (failed_operation) {
258273
EXPECT_GT(metrics.ingest_total_failures, initial_total_failures);
259274
}
260275

261-
// Check for hash keys metrics
262-
if (!test_case.open_key_fail && test_case.open_key_type == VALKEYMODULE_KEYTYPE_HASH) {
263-
if (test_case.valkey_hash_data.has_value()) {
264-
EXPECT_GT(metrics.ingest_hash_keys, initial_hash_keys);
265-
}
276+
// Check hash keys metrics
277+
if (is_hash_operation) {
278+
EXPECT_GT(metrics.ingest_hash_keys, initial_hash_keys);
266279
}
267-
}
280+
281+
// Verify Time Slice Mutex metrics
282+
if (successful_upsert) {
283+
EXPECT_EQ(global_stats.time_slice_upserts, initial_upserts + 1);
284+
EXPECT_EQ(global_stats.time_slice_deletes, initial_deletes);
285+
} else if (successful_remove) {
286+
EXPECT_EQ(global_stats.time_slice_deletes, initial_deletes + 1);
287+
EXPECT_EQ(global_stats.time_slice_upserts, initial_upserts);
288+
} else {
289+
// No successful operation expected
290+
EXPECT_EQ(global_stats.time_slice_upserts, initial_upserts);
291+
EXPECT_EQ(global_stats.time_slice_deletes, initial_deletes);
292+
}
293+
}
268294
}
269295

270296
INSTANTIATE_TEST_SUITE_P(

vmsdk/src/time_sliced_mrmw_mutex.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
namespace vmsdk {
2020

21+
// Define the global statistics variable
22+
TimeSlicedMRMWStats global_stats;
23+
2124
void ReaderMutexLock::SetMayProlong() {
2225
if (may_prolong_) {
2326
return;

vmsdk/src/time_sliced_mrmw_mutex.h

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@
1818

1919
namespace vmsdk {
2020

21+
struct TimeSlicedMRMWStats {
22+
std::atomic<uint64_t> read_periods{0};
23+
std::atomic<uint64_t> read_time_microseconds{0}; // cumulative
24+
std::atomic<uint64_t> write_periods{0};
25+
std::atomic<uint64_t> write_time_microseconds{0}; // cumulative
26+
};
27+
28+
// Forward declaration for global statistics
29+
extern TimeSlicedMRMWStats global_stats;
30+
31+
// Global statistics accessor function
32+
inline const TimeSlicedMRMWStats& GetGlobalTimeSlicedMRMWStats() {
33+
return global_stats;
34+
}
35+
2136
struct MRMWMutexOptions {
2237
absl::Duration read_quota_duration;
2338
absl::Duration read_switch_grace_period;
@@ -124,6 +139,8 @@ class ABSL_SCOPED_LOCKABLE ReaderMutexLock {
124139
explicit ReaderMutexLock(TimeSlicedMRMWMutex* mutex, bool may_prolong = false)
125140
ABSL_SHARED_LOCK_FUNCTION(mutex)
126141
: mutex_(mutex), may_prolong_(may_prolong) {
142+
++global_stats.read_periods;
143+
timer_.Reset();
127144
mutex->ReaderLock(may_prolong_);
128145
}
129146

@@ -132,18 +149,25 @@ class ABSL_SCOPED_LOCKABLE ReaderMutexLock {
132149
ReaderMutexLock& operator=(const ReaderMutexLock&) = delete;
133150
ReaderMutexLock& operator=(ReaderMutexLock&&) = delete;
134151
void SetMayProlong();
135-
~ReaderMutexLock() ABSL_UNLOCK_FUNCTION() { mutex_->Unlock(may_prolong_); }
152+
~ReaderMutexLock() ABSL_UNLOCK_FUNCTION() {
153+
mutex_->Unlock(may_prolong_);
154+
global_stats.read_time_microseconds +=
155+
absl::ToInt64Microseconds(timer_.Duration());
156+
}
136157

137158
private:
138159
TimeSlicedMRMWMutex* const mutex_;
139160
bool may_prolong_;
161+
vmsdk::StopWatch timer_;
140162
};
141163

142164
class ABSL_SCOPED_LOCKABLE WriterMutexLock {
143165
public:
144166
explicit WriterMutexLock(TimeSlicedMRMWMutex* mutex, bool may_prolong = false)
145167
ABSL_SHARED_LOCK_FUNCTION(mutex)
146168
: mutex_(mutex), may_prolong_(may_prolong) {
169+
++global_stats.write_periods;
170+
timer_.Reset();
147171
mutex->WriterLock(may_prolong_);
148172
}
149173

@@ -152,11 +176,16 @@ class ABSL_SCOPED_LOCKABLE WriterMutexLock {
152176
WriterMutexLock& operator=(const WriterMutexLock&) = delete;
153177
WriterMutexLock& operator=(WriterMutexLock&&) = delete;
154178
void SetMayProlong();
155-
~WriterMutexLock() ABSL_UNLOCK_FUNCTION() { mutex_->Unlock(may_prolong_); }
179+
~WriterMutexLock() ABSL_UNLOCK_FUNCTION() {
180+
mutex_->Unlock(may_prolong_);
181+
global_stats.write_time_microseconds +=
182+
absl::ToInt64Microseconds(timer_.Duration());
183+
}
156184

157185
private:
158186
TimeSlicedMRMWMutex* const mutex_;
159187
bool may_prolong_;
188+
vmsdk::StopWatch timer_;
160189
};
161190

162191
} // namespace vmsdk

0 commit comments

Comments
 (0)