Skip to content

Commit bf66633

Browse files
committed
Implement scalable counter
Implementation is based on chapter 5.2.2 of Paul E. McKenney (2017), "Is Parallel Programming Hard, And, If So, What Can You Do About It?"
1 parent a18dd96 commit bf66633

File tree

5 files changed

+120
-12
lines changed

5 files changed

+120
-12
lines changed

core/include/prometheus/counter.h

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,55 @@
11
#pragma once
22

3+
#include <array>
34
#include <atomic>
5+
#include <exception>
46

57
#include "prometheus/client_metric.h"
6-
#include "prometheus/gauge.h"
78

89
namespace prometheus {
910

1011
class Counter {
1112
public:
1213
static const MetricType metric_type = MetricType::Counter;
1314

14-
void Increment();
15-
void Increment(double);
15+
void Increment() { Increment(1.0); }
16+
17+
void Increment(const double v) {
18+
CacheLine& c = per_thread_counter_[ThreadId()];
19+
const double new_value{c.v.load(std::memory_order_relaxed) + v};
20+
c.v.store(new_value, std::memory_order_relaxed);
21+
}
22+
1623
double Value() const;
1724

18-
ClientMetric Collect();
25+
ClientMetric Collect() const;
1926

2027
private:
21-
Gauge gauge_;
28+
int ThreadId() {
29+
thread_local int id{-1};
30+
31+
if (id == -1) {
32+
id = AssignThreadId();
33+
}
34+
return id;
35+
}
36+
37+
int AssignThreadId() {
38+
const int id{count_.fetch_add(1)};
39+
40+
if (id >= per_thread_counter_.size()) {
41+
std::terminate();
42+
}
43+
44+
return id;
45+
}
46+
47+
struct alignas(128) CacheLine {
48+
std::atomic<double> v{0.0};
49+
};
50+
51+
std::atomic<int> count_{0};
52+
std::array<CacheLine, 256> per_thread_counter_{};
2253
};
2354

2455
} // namespace prometheus

core/src/counter.cc

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1-
#include "prometheus/counter.h"
21

3-
namespace prometheus {
2+
#include "prometheus/counter.h"
43

5-
void Counter::Increment() { gauge_.Increment(); }
4+
#include <algorithm>
65

7-
void Counter::Increment(double val) { gauge_.Increment(val); }
6+
namespace prometheus {
87

9-
double Counter::Value() const { return gauge_.Value(); }
8+
double Counter::Value() const {
9+
return std::accumulate(
10+
std::begin(per_thread_counter_), std::end(per_thread_counter_), 0.0,
11+
[](const double a, const CacheLine& b) { return a + b.v; });
12+
}
1013

11-
ClientMetric Counter::Collect() {
14+
ClientMetric Counter::Collect() const {
1215
ClientMetric metric;
1316
metric.counter.value = Value();
1417
return metric;

core/tests/benchmark/counter_bench.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,26 @@ static void BM_Counter_Increment(benchmark::State& state) {
1010
BuildCounter().Name("benchmark_counter").Help("").Register(registry);
1111
auto& counter = counter_family.Add({});
1212

13-
while (state.KeepRunning()) counter.Increment();
13+
while (state.KeepRunning()) {
14+
counter.Increment();
15+
}
1416
}
1517
BENCHMARK(BM_Counter_Increment);
1618

19+
class BM_Counter : public benchmark::Fixture {
20+
protected:
21+
BM_Counter() { this->ThreadPerCpu(); }
22+
23+
prometheus::Counter counter{};
24+
};
25+
26+
BENCHMARK_F(BM_Counter, ConcurrentIncrement)
27+
(benchmark::State& state) {
28+
for (auto _ : state) {
29+
counter.Increment();
30+
}
31+
}
32+
1733
static void BM_Counter_Collect(benchmark::State& state) {
1834
using prometheus::BuildCounter;
1935
using prometheus::Counter;

core/tests/benchmark/gauge_bench.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ static void BM_Gauge_Increment(benchmark::State& state) {
1414
}
1515
BENCHMARK(BM_Gauge_Increment);
1616

17+
class BM_Gauge : public benchmark::Fixture {
18+
protected:
19+
BM_Gauge() { this->ThreadPerCpu(); }
20+
21+
prometheus::Gauge gauge{};
22+
};
23+
24+
BENCHMARK_F(BM_Gauge, ConcurrentIncrement)
25+
(benchmark::State& state) {
26+
for (auto _ : state) {
27+
gauge.Increment();
28+
}
29+
}
30+
1731
static void BM_Gauge_Decrement(benchmark::State& state) {
1832
using prometheus::BuildGauge;
1933
using prometheus::Gauge;

core/tests/counter_test.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include "gmock/gmock.h"
22

3+
#include <thread>
4+
#include <vector>
5+
36
#include <prometheus/counter.h>
47

58
using namespace testing;
@@ -31,3 +34,44 @@ TEST_F(CounterTest, inc_multiple) {
3134
counter.Increment(5);
3235
EXPECT_EQ(counter.Value(), 7.0);
3336
}
37+
38+
TEST_F(CounterTest, concurrent_writes) {
39+
Counter counter;
40+
std::vector<std::thread> threads(std::thread::hardware_concurrency());
41+
42+
for (auto& thread : threads) {
43+
thread = std::thread{[&counter]() {
44+
for (int i = {0}; i < 100000; ++i) {
45+
counter.Increment();
46+
}
47+
}};
48+
}
49+
50+
for (auto& thread : threads) {
51+
thread.join();
52+
}
53+
54+
EXPECT_EQ(100000 * threads.size(), counter.Value());
55+
}
56+
57+
TEST_F(CounterTest, concurrent_read_write) {
58+
Counter counter;
59+
std::vector<double> values;
60+
values.reserve(100000);
61+
62+
std::thread reader{[&counter, &values]() {
63+
for (int i = {0}; i < 100000; ++i) {
64+
values.push_back(counter.Value());
65+
}
66+
}};
67+
std::thread writer{[&counter]() {
68+
for (int i = {0}; i < 100000; ++i) {
69+
counter.Increment();
70+
}
71+
}};
72+
73+
reader.join();
74+
writer.join();
75+
76+
EXPECT_TRUE(std::is_sorted(std::begin(values), std::end(values)));
77+
}

0 commit comments

Comments
 (0)