Skip to content

Commit a8d011f

Browse files
committed
[WIP] CheckerComponent#Stop(): wait for own Checkable#ProcessCheckResult() to finish
1 parent a3ceb67 commit a8d011f

File tree

4 files changed

+115
-2
lines changed

4 files changed

+115
-2
lines changed

lib/checker/checkercomponent.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ void CheckerComponent::OnConfigLoaded()
6060
void CheckerComponent::Start(bool runtimeCreated)
6161
{
6262
ObjectImpl<CheckerComponent>::Start(runtimeCreated);
63+
CheckResultProducerComponent::Start();
6364

6465
Log(LogInformation, "CheckerComponent")
6566
<< "'" << GetName() << "' started.";
@@ -81,6 +82,7 @@ void CheckerComponent::Stop(bool runtimeRemoved)
8182
m_CV.notify_all();
8283
}
8384

85+
CheckResultProducerComponent::Stop();
8486
m_ResultTimer->Stop(true);
8587
m_Thread.join();
8688

@@ -244,7 +246,7 @@ void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
244246
cr->SetExecutionStart(now);
245247
cr->SetExecutionEnd(now);
246248

247-
checkable->ProcessCheckResult(cr);
249+
checkable->ProcessCheckResult(cr, this);
248250

249251
Log(LogCritical, "checker", output);
250252
}

lib/checker/checkercomponent.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ struct CheckableNextCheckExtractor
4646
/**
4747
* @ingroup checker
4848
*/
49-
class CheckerComponent final : public ObjectImpl<CheckerComponent>
49+
class CheckerComponent final : public ObjectImpl<CheckerComponent>, public CheckResultProducerComponent
5050
{
5151
public:
5252
DECLARE_OBJECT(CheckerComponent);

lib/icinga/checkresult.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,58 @@ bool UnitTestCRP::try_lock_shared() noexcept
4141
void UnitTestCRP::unlock_shared() noexcept
4242
{
4343
}
44+
45+
bool CheckResultProducerComponent::try_lock_shared() noexcept
46+
{
47+
auto state (ModifyState(
48+
[](auto current) { return current.InstanceIsActive; },
49+
[](auto& desired) { ++desired.ProcessingCheckResults; }
50+
));
51+
52+
return state.InstanceIsActive;
53+
}
54+
55+
void CheckResultProducerComponent::unlock_shared() noexcept
56+
{
57+
std::unique_lock lock (m_Mutex, std::defer_lock);
58+
59+
auto state (ModifyState([&lock](auto& desired) {
60+
--desired.ProcessingCheckResults;
61+
62+
if (!desired.ProcessingCheckResults && !desired.InstanceIsActive && !lock) {
63+
lock.lock();
64+
}
65+
}));
66+
67+
if (!state.ProcessingCheckResults && !state.InstanceIsActive) {
68+
m_CV.notify_all();
69+
}
70+
}
71+
72+
/**
73+
* Allow processing check results.
74+
*/
75+
void CheckResultProducerComponent::Start()
76+
{
77+
ModifyState([](auto& desired) { desired.InstanceIsActive = 1; });
78+
}
79+
80+
/**
81+
* Disallow processing new check results, wait for all currently processed ones to finish.
82+
*/
83+
void CheckResultProducerComponent::Stop()
84+
{
85+
std::unique_lock lock (m_Mutex, std::defer_lock);
86+
87+
auto state (ModifyState([&lock](auto& desired) {
88+
desired.InstanceIsActive = 0;
89+
90+
if (desired.ProcessingCheckResults && !lock) {
91+
lock.lock();
92+
}
93+
}));
94+
95+
if (state.ProcessingCheckResults) {
96+
m_CV.wait(lock, [this] { return !m_State.load().ProcessingCheckResults; });
97+
}
98+
}

lib/icinga/checkresult.hpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
#ifndef CHECKRESULT_H
44
#define CHECKRESULT_H
55

6+
#include "base/atomic.hpp"
67
#include "icinga/i2-icinga.hpp"
78
#include "icinga/checkresult-ti.hpp"
9+
#include <condition_variable>
10+
#include <cstdint>
11+
#include <mutex>
812

913
namespace icinga
1014
{
@@ -54,6 +58,58 @@ class CheckResult final : public ObjectImpl<CheckResult>
5458
double CalculateLatency() const;
5559
};
5660

61+
class CheckResultProducerComponent : public CheckResultProducer
62+
{
63+
public:
64+
bool try_lock_shared() noexcept override;
65+
void unlock_shared() noexcept override;
66+
67+
protected:
68+
void Start();
69+
void Stop();
70+
71+
private:
72+
struct State
73+
{
74+
uint32_t InstanceIsActive = 0;
75+
uint32_t ProcessingCheckResults = 0;
76+
};
77+
78+
Atomic<State> m_State {State{}};
79+
std::mutex m_Mutex;
80+
std::condition_variable m_CV;
81+
82+
/**
83+
* Load m_State into x and if cond(x), pass x to mod by reference and try to store x back.
84+
* If m_State has changed in the meantime, repeat the process.
85+
*
86+
* @return The (not) updated m_State.
87+
*/
88+
template<class C, class M>
89+
State ModifyState(const C& cond, const M& mod)
90+
{
91+
auto expected (m_State.load());
92+
decltype(expected) desired;
93+
94+
do {
95+
if (!cond(expected)) {
96+
return expected;
97+
}
98+
99+
desired = expected;
100+
mod(desired);
101+
} while (!m_State.compare_exchange_weak(expected, desired));
102+
103+
return desired;
104+
}
105+
106+
template<class M>
107+
State ModifyState(const M& mod)
108+
{
109+
return ModifyState([](auto) { return true; }, mod);
110+
}
111+
};
112+
57113
}
58114

59115
#endif /* CHECKRESULT_H */

0 commit comments

Comments
 (0)