Skip to content

Commit 92c67f2

Browse files
committed
Implement scalable counter
Implementation is based on chapter 5.2.2 of Paul E. McKenney (2017), "Is Parallel Programming Hard, And, If So, What Can You Do About It?"
1 parent 9d5a687 commit 92c67f2

File tree

6 files changed

+142
-12
lines changed

6 files changed

+142
-12
lines changed

core/benchmarks/counter_bench.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,26 @@ static void BM_Counter_Increment(benchmark::State& state) {
1010
BuildCounter().Name("benchmark_counter").Help("").Register(registry);
1111
auto& counter = counter_family.Add({});
1212

13-
while (state.KeepRunning()) counter.Increment();
13+
while (state.KeepRunning()) {
14+
counter.Increment();
15+
}
1416
}
1517
BENCHMARK(BM_Counter_Increment);
1618

19+
class BM_Counter : public benchmark::Fixture {
20+
protected:
21+
BM_Counter() { this->ThreadPerCpu(); }
22+
23+
prometheus::Counter counter{};
24+
};
25+
26+
BENCHMARK_F(BM_Counter, ConcurrentIncrement)
27+
(benchmark::State& state) {
28+
for (auto _ : state) {
29+
counter.Increment();
30+
}
31+
}
32+
1733
static void BM_Counter_Collect(benchmark::State& state) {
1834
using prometheus::BuildCounter;
1935
using prometheus::Counter;

core/benchmarks/gauge_bench.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ static void BM_Gauge_Increment(benchmark::State& state) {
1414
}
1515
BENCHMARK(BM_Gauge_Increment);
1616

17+
class BM_Gauge : public benchmark::Fixture {
18+
protected:
19+
BM_Gauge() { this->ThreadPerCpu(); }
20+
21+
prometheus::Gauge gauge{};
22+
};
23+
24+
BENCHMARK_F(BM_Gauge, ConcurrentIncrement)
25+
(benchmark::State& state) {
26+
for (auto _ : state) {
27+
gauge.Increment();
28+
}
29+
}
30+
1731
static void BM_Gauge_Decrement(benchmark::State& state) {
1832
using prometheus::BuildGauge;
1933
using prometheus::Gauge;

core/include/prometheus/counter.h

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#pragma once
22

3+
#include <array>
4+
#include <atomic>
5+
#include <exception>
6+
37
#include "prometheus/client_metric.h"
4-
#include "prometheus/gauge.h"
58
#include "prometheus/metric_type.h"
69

710
namespace prometheus {
@@ -17,7 +20,17 @@ namespace prometheus {
1720
/// - errors
1821
///
1922
/// Do not use a counter to expose a value that can decrease - instead use a
20-
/// Gauge.
23+
/// Gauge. If an montonically increasing counter is applicable a counter shall
24+
/// be prefered to a Gauge because of a better update performance.
25+
///
26+
/// The implementation exhibits a performance which is near a sequential
27+
/// implementation and scales linearly with increasing number of updater threads
28+
/// in a multi-threaded environment invoking Increment(). However, this
29+
/// excellent update-side scalability comes at read-side expense invoking
30+
/// Collect(). Increment() can therefor be used in the fast-path of the code,
31+
/// where the count is updated extremely frequently. The Collect() function on
32+
/// the other hand shall read the counter at a low sample rate, e.g., in the
33+
/// order of milliseconds.
2134
///
2235
/// The class is thread-safe. No concurrent call to any API of this type causes
2336
/// a data race.
@@ -29,20 +42,55 @@ class Counter {
2942
Counter() = default;
3043

3144
/// \brief Increment the counter by 1.
32-
void Increment();
45+
void Increment() { IncrementUnchecked(1.0); }
3346

3447
/// \brief Increment the counter by a given amount.
3548
///
3649
/// The counter will not change if the given amount is negative.
37-
void Increment(double);
50+
void Increment(const double value) {
51+
if (value < 0.0) {
52+
return;
53+
}
54+
IncrementUnchecked(value);
55+
}
3856

3957
/// \brief Get the current value of the counter.
4058
double Value() const;
4159

4260
ClientMetric Collect() const;
4361

4462
private:
45-
Gauge gauge_{0.0};
63+
int ThreadId() {
64+
thread_local int id{-1};
65+
66+
if (id == -1) {
67+
id = AssignThreadId();
68+
}
69+
return id;
70+
}
71+
72+
int AssignThreadId() {
73+
const int id{count_.fetch_add(1)};
74+
75+
if (id >= per_thread_counter_.size()) {
76+
std::terminate();
77+
}
78+
79+
return id;
80+
}
81+
82+
void IncrementUnchecked(const double v) {
83+
CacheLine& c = per_thread_counter_[ThreadId()];
84+
const double new_value{c.v.load(std::memory_order_relaxed) + v};
85+
c.v.store(new_value, std::memory_order_relaxed);
86+
}
87+
88+
struct alignas(128) CacheLine {
89+
std::atomic<double> v{0.0};
90+
};
91+
92+
std::atomic<int> count_{0};
93+
std::array<CacheLine, 256> per_thread_counter_{};
4694
};
4795

4896
} // namespace prometheus

core/include/prometheus/gauge.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ namespace prometheus {
1717
/// memory usage, but also "counts" that can go up and down, like the number of
1818
/// running processes.
1919
///
20-
/// The class is thread-safe. No concurrent call to any API of this type causes a data race.
20+
/// If an montonically increasing counter is applicable a Counter shall be
21+
/// prefered to a Gauge because of a better update performance.
22+
///
23+
/// The class is thread-safe. No concurrent call to any API of this type causes
24+
/// a data race.
2125
class Gauge {
2226
public:
2327
static const MetricType metric_type{MetricType::Gauge};

core/src/counter.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#include "prometheus/counter.h"
22

3-
namespace prometheus {
4-
5-
void Counter::Increment() { gauge_.Increment(); }
3+
#include <numeric>
64

7-
void Counter::Increment(const double val) { gauge_.Increment(val); }
5+
namespace prometheus {
86

9-
double Counter::Value() const { return gauge_.Value(); }
7+
double Counter::Value() const {
8+
return std::accumulate(
9+
std::begin(per_thread_counter_), std::end(per_thread_counter_), 0.0,
10+
[](const double a, const CacheLine& b) { return a + b.v; });
11+
}
1012

1113
ClientMetric Counter::Collect() const {
1214
ClientMetric metric;

core/tests/counter_test.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
#include <gmock/gmock.h>
44

5+
#include <thread>
6+
#include <vector>
7+
8+
#include <prometheus/counter.h>
9+
510
namespace prometheus {
611
namespace {
712

@@ -37,5 +42,46 @@ TEST(CounterTest, inc_negative_value) {
3742
EXPECT_EQ(counter.Value(), 5.0);
3843
}
3944

45+
TEST(CounterTest, concurrent_writes) {
46+
Counter counter;
47+
std::vector<std::thread> threads(std::thread::hardware_concurrency());
48+
49+
for (auto& thread : threads) {
50+
thread = std::thread{[&counter]() {
51+
for (int i{0}; i < 100000; ++i) {
52+
counter.Increment();
53+
}
54+
}};
55+
}
56+
57+
for (auto& thread : threads) {
58+
thread.join();
59+
}
60+
61+
EXPECT_EQ(100000 * threads.size(), counter.Value());
62+
}
63+
64+
TEST(CounterTest, concurrent_read_write) {
65+
Counter counter;
66+
std::vector<double> values;
67+
values.reserve(100000);
68+
69+
std::thread reader{[&counter, &values]() {
70+
for (int i{0}; i < 100000; ++i) {
71+
values.push_back(counter.Value());
72+
}
73+
}};
74+
std::thread writer{[&counter]() {
75+
for (int i{0}; i < 100000; ++i) {
76+
counter.Increment();
77+
}
78+
}};
79+
80+
reader.join();
81+
writer.join();
82+
83+
EXPECT_TRUE(std::is_sorted(std::begin(values), std::end(values)));
84+
}
85+
4086
} // namespace
4187
} // namespace prometheus

0 commit comments

Comments
 (0)