Skip to content

Commit bf74280

Browse files
committed
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
This is inefficient and involves unfair scheduling. The latter implies possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
1 parent b899611 commit bf74280

File tree

2 files changed

+60
-13
lines changed

2 files changed

+60
-13
lines changed

lib/base/io-engine.cpp

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,61 @@
1616

1717
using namespace icinga;
1818

19-
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
19+
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
2020
: m_Done(false)
2121
{
2222
auto& ioEngine (IoEngine::Get());
23+
auto& sem (ioEngine.m_CpuBoundSemaphore);
24+
std::unique_lock<std::mutex> lock (sem.Mutex);
2325

24-
for (;;) {
25-
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
26+
if (sem.FreeSlots) {
27+
--sem.FreeSlots;
28+
return;
29+
}
2630

27-
if (availableSlots < 1) {
28-
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
29-
IoEngine::YieldCurrentCoroutine(yc);
30-
continue;
31-
}
31+
AsioConditionVariable cv (ioEngine.GetIoContext());
32+
33+
sem.Waiting.emplace(&strand, &cv);
34+
lock.unlock();
3235

33-
break;
36+
try {
37+
cv.Wait(yc);
38+
} catch (...) {
39+
Done();
40+
throw;
3441
}
3542
}
3643

44+
class SetAsioCV
45+
{
46+
public:
47+
inline SetAsioCV(AsioConditionVariable& cv) : m_CV(&cv)
48+
{
49+
}
50+
51+
inline void operator()()
52+
{
53+
m_CV->Set();
54+
}
55+
56+
private:
57+
AsioConditionVariable* m_CV;
58+
};
59+
3760
void CpuBoundWork::Done()
3861
{
3962
if (!m_Done) {
40-
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
63+
auto& sem (IoEngine::Get().m_CpuBoundSemaphore);
64+
std::unique_lock<std::mutex> lock (sem.Mutex);
65+
66+
if (sem.Waiting.empty()) {
67+
++sem.FreeSlots;
68+
} else {
69+
auto next (sem.Waiting.front());
70+
sem.Waiting.pop();
71+
72+
boost::asio::post(*next.first, SetAsioCV(*next.second));
73+
}
4174

4275
m_Done = true;
4376
}
@@ -58,7 +91,11 @@ boost::asio::io_context& IoEngine::GetIoContext()
5891
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
5992
{
6093
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
61-
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
94+
95+
{
96+
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
97+
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
98+
}
6299

63100
for (auto& thread : m_Threads) {
64101
thread = std::thread(&IoEngine::RunEventLoop, this);

lib/base/io-engine.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
#include "base/logger.hpp"
99
#include "base/shared-object.hpp"
1010
#include <atomic>
11+
#include <cstdint>
1112
#include <exception>
1213
#include <memory>
14+
#include <mutex>
15+
#include <queue>
1316
#include <thread>
1417
#include <utility>
1518
#include <vector>
@@ -31,7 +34,7 @@ namespace icinga
3134
class CpuBoundWork
3235
{
3336
public:
34-
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
37+
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand);
3538
CpuBoundWork(const CpuBoundWork&) = delete;
3639
CpuBoundWork(CpuBoundWork&&) = delete;
3740
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
@@ -48,6 +51,8 @@ class CpuBoundWork
4851
bool m_Done;
4952
};
5053

54+
class AsioConditionVariable;
55+
5156
/**
5257
* Async I/O engine
5358
*
@@ -120,7 +125,12 @@ class IoEngine
120125
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
121126
std::vector<std::thread> m_Threads;
122127
boost::asio::deadline_timer m_AlreadyExpiredTimer;
123-
std::atomic_int_fast32_t m_CpuBoundSemaphore;
128+
129+
struct {
130+
std::mutex Mutex;
131+
uint_fast32_t FreeSlots;
132+
std::queue<std::pair<boost::asio::io_context::strand*, AsioConditionVariable*>> Waiting;
133+
} m_CpuBoundSemaphore;
124134
};
125135

126136
class TerminateIoThread : public std::exception

0 commit comments

Comments
 (0)