diff --git a/doc/19-technical-concepts.md b/doc/19-technical-concepts.md
index d268ea7cf3..cc443bab92 100644
--- a/doc/19-technical-concepts.md
+++ b/doc/19-technical-concepts.md
@@ -1148,21 +1148,6 @@ hidden in Boost ASIO, Beast, Coroutine and Context libraries.
#### Data Exchange: Coroutines and I/O Engine
-Light-weight and fast operations such as connection handling or TLS handshakes
-are performed in the default `IoBoundWorkSlot` pool inside the I/O engine.
-
-The I/O engine has another pool available: `CpuBoundWork`.
-
-This is used for processing CPU intensive tasks, such as handling a HTTP request.
-Depending on the available CPU cores, this is limited to `std::thread::hardware_concurrency() * 3u / 2u`.
-
-```
-1 core * 3 / 2 = 1
-2 cores * 3 / 2 = 3
-8 cores * 3 / 2 = 12
-16 cores * 3 / 2 = 24
-```
-
The I/O engine itself is used with all network I/O in Icinga, not only the cluster
and the REST API. Features such as Graphite, InfluxDB, etc. also consume its functionality.
diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp
index 0792be5cc1..c04b9b7876 100644
--- a/lib/base/io-engine.cpp
+++ b/lib/base/io-engine.cpp
@@ -16,63 +16,6 @@
using namespace icinga;
-CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
- : m_Done(false)
-{
- auto& ioEngine (IoEngine::Get());
-
- for (;;) {
- auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
-
- if (availableSlots < 1) {
- ioEngine.m_CpuBoundSemaphore.fetch_add(1);
- IoEngine::YieldCurrentCoroutine(yc);
- continue;
- }
-
- break;
- }
-}
-
-CpuBoundWork::~CpuBoundWork()
-{
- if (!m_Done) {
- IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
- }
-}
-
-void CpuBoundWork::Done()
-{
- if (!m_Done) {
- IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
-
- m_Done = true;
- }
-}
-
-IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc)
- : yc(yc)
-{
- IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
-}
-
-IoBoundWorkSlot::~IoBoundWorkSlot()
-{
- auto& ioEngine (IoEngine::Get());
-
- for (;;) {
- auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
-
- if (availableSlots < 1) {
- ioEngine.m_CpuBoundSemaphore.fetch_add(1);
- IoEngine::YieldCurrentCoroutine(yc);
- continue;
- }
-
- break;
- }
-}
-
LazyInit> IoEngine::m_Instance ([]() { return std::unique_ptr(new IoEngine()); });
IoEngine& IoEngine::Get()
@@ -85,10 +28,14 @@ boost::asio::io_context& IoEngine::GetIoContext()
return m_IoContext;
}
-IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
+IoEngine::IoEngine()
+ : m_IoContext(),
+ m_KeepAlive(boost::asio::make_work_guard(m_IoContext)),
+ m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)),
+ m_AlreadyExpiredTimer(m_IoContext),
+ m_SlowSlotsAvailable(Configuration::Concurrency)
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
- m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this);
@@ -124,6 +71,32 @@ void IoEngine::RunEventLoop()
}
}
+/**
+ * Try to acquire a slot for a slow operation. This is intended to limit the number of concurrent slow operations. In
+ * case no slot is returned, the caller should reject the operation (for example by sending an HTTP error) to prevent an
+ * overload situation.
+ *
+ * @return A RAII-style object representing the slot. operator bool() can be used to check if the operation was
+ * successful and the caller now owns a slot. Its destructor automatically releases the slot.
+ */
+IoEngine::SlowSlot IoEngine::TryAcquireSlowSlot()
+{
+ // This is basically an ad-hoc (partial) semaphore implementation.
+ // TODO(C++20): Use std::counting_semaphore instead.
+
+ std::unique_lock lock(m_SlowSlotsMutex);
+ if (m_SlowSlotsAvailable > 0) {
+ m_SlowSlotsAvailable--;
+ lock.unlock();
+
+ return std::make_unique([this] {
+ std::unique_lock lock(m_SlowSlotsMutex);
+ m_SlowSlotsAvailable++;
+ });
+ }
+ return {};
+}
+
AsioEvent::AsioEvent(boost::asio::io_context& io, bool init)
: m_Timer(io)
{
diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp
index 0883d7810e..42fa148711 100644
--- a/lib/base/io-engine.hpp
+++ b/lib/base/io-engine.hpp
@@ -5,6 +5,7 @@
#include "base/atomic.hpp"
#include "base/debug.hpp"
+#include "base/defer.hpp"
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
@@ -29,46 +30,6 @@
namespace icinga
{
-/**
- * Scope lock for CPU-bound work done in an I/O thread
- *
- * @ingroup base
- */
-class CpuBoundWork
-{
-public:
- CpuBoundWork(boost::asio::yield_context yc);
- CpuBoundWork(const CpuBoundWork&) = delete;
- CpuBoundWork(CpuBoundWork&&) = delete;
- CpuBoundWork& operator=(const CpuBoundWork&) = delete;
- CpuBoundWork& operator=(CpuBoundWork&&) = delete;
- ~CpuBoundWork();
-
- void Done();
-
-private:
- bool m_Done;
-};
-
-/**
- * Scope break for CPU-bound work done in an I/O thread
- *
- * @ingroup base
- */
-class IoBoundWorkSlot
-{
-public:
- IoBoundWorkSlot(boost::asio::yield_context yc);
- IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
- IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
- IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
- IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
- ~IoBoundWorkSlot();
-
-private:
- boost::asio::yield_context yc;
-};
-
/**
* Async I/O engine
*
@@ -76,9 +37,6 @@ class IoBoundWorkSlot
*/
class IoEngine
{
- friend CpuBoundWork;
- friend IoBoundWorkSlot;
-
public:
IoEngine(const IoEngine&) = delete;
IoEngine(IoEngine&&) = delete;
@@ -139,6 +97,9 @@ class IoEngine
Get().m_AlreadyExpiredTimer.async_wait(yc);
}
+ using SlowSlot = std::unique_ptr;
+ SlowSlot TryAcquireSlowSlot();
+
private:
IoEngine();
@@ -150,7 +111,8 @@ class IoEngine
boost::asio::executor_work_guard m_KeepAlive;
std::vector m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
- std::atomic_int_fast32_t m_CpuBoundSemaphore;
+ std::mutex m_SlowSlotsMutex;
+ int m_SlowSlotsAvailable;
};
class TerminateIoThread : public std::exception
diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp
index 1b7798c04a..8a2d592638 100644
--- a/lib/remote/eventshandler.cpp
+++ b/lib/remote/eventshandler.cpp
@@ -100,8 +100,6 @@ bool EventsHandler::HandleRequest(
EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);
- IoBoundWorkSlot dontLockTheIoThread (yc);
-
response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
response.StartStreaming(true);
diff --git a/lib/remote/httpmessage.cpp b/lib/remote/httpmessage.cpp
index 18e5a30164..cf750d493a 100644
--- a/lib/remote/httpmessage.cpp
+++ b/lib/remote/httpmessage.cpp
@@ -129,6 +129,10 @@ void HttpResponse::Flush(boost::asio::yield_context yc)
prepare_payload();
}
+ // If this request currently owns a slow request slot, release it as is must not be held while sending to the client
+ // (otherwise, waiting for a slow client to read its data would block the slot).
+ m_SlowSlot.reset();
+
m_SerializationStarted = true;
if (!m_Serializer.is_header_done()) {
@@ -194,3 +198,19 @@ JsonEncoder HttpResponse::GetJsonEncoder(bool pretty)
{
return JsonEncoder{std::make_shared(*this), pretty};
}
+
+/**
+ * Tries to acquire a slow slot using ApiListener::TryAcquireSlowSlot(). If this was successful, that slot will be
+ * owned by this HttpResponse object and is automatically released when the next write operation is performed using
+ * Flush() or any other method that calls it.
+ *
+ * In case no ApiListener is configured (only relevant for tests), no limiting of concurrent requests takes place and
+ * this method always returns true to allow callers to just check the return value to determine whether to continue.
+ *
+ * @return Whether the operation was successful and the handler can continue.
+ */
+bool HttpResponse::TryAcquireSlowSlot()
+{
+ m_SlowSlot = IoEngine::Get().TryAcquireSlowSlot();
+ return bool(m_SlowSlot);
+}
diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp
index 10d00fd498..1aeb516a1b 100644
--- a/lib/remote/httpmessage.hpp
+++ b/lib/remote/httpmessage.hpp
@@ -5,6 +5,7 @@
#include "base/dictionary.hpp"
#include "base/json.hpp"
#include "base/tlsstream.hpp"
+#include "remote/apilistener.hpp"
#include "remote/apiuser.hpp"
#include "remote/httpserverconnection.hpp"
#include "remote/url.hpp"
@@ -269,6 +270,8 @@ class HttpResponse : public boost::beast::http::response;
Serializer m_Serializer{*this};
@@ -276,6 +279,7 @@ class HttpResponse : public boost::beast::http::response::Ptr m_Stream;
+ IoEngine::SlowSlot m_SlowSlot;
};
} // namespace icinga
diff --git a/lib/remote/httpserverconnection.cpp b/lib/remote/httpserverconnection.cpp
index 39fa2d79de..380f303548 100644
--- a/lib/remote/httpserverconnection.cpp
+++ b/lib/remote/httpserverconnection.cpp
@@ -416,15 +416,37 @@ void ProcessRequest(
HttpRequest& request,
HttpResponse& response,
const WaitGroup::Ptr& waitGroup,
- std::chrono::steady_clock::duration& cpuBoundWorkTime,
boost::asio::yield_context& yc
)
{
try {
- // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
- auto start (std::chrono::steady_clock::now());
- CpuBoundWork handlingRequest (yc);
- cpuBoundWorkTime = std::chrono::steady_clock::now() - start;
+ /* Place some restrictions on the total number of HTTP requests handled concurrently to prevent HTTP requests
+ * from hogging the entire coroutine thread pool by running too many requests handlers at once that don't
+ * regularly yield, starving other coroutines.
+ *
+ * We need to consider two types of handlers here:
+ *
+ * 1. Those performing a more or less expensive operation and then returning the whole response at once.
+ * Not too many of such handlers should run concurrently.
+ * 2. Those already streaming the response while they are running, for example using chunked transfer encoding.
+ * For these, we assume that they will frequently yield to other coroutines, in particular when writing parts
+ * of the response to the client, or in case of EventsHandler also when waiting for new events.
+ *
+ * The following approach handles both of this automatically: we acquire one of a limited number of slots for
+ * each request and release it automatically the first time anything (either the full response after the handler
+ * finished or the first chunk from within the handler) is written using the response object. This means that
+ * we don't have to handle acquiring or releasing that slot inside individual handlers.
+ *
+ * Overall, this is more or less a safeguard preventing long-running HTTP handlers from taking down the entire
+ * Icinga 2 process by blocking the execution of JSON-RPC message handlers. In general, (new) HTTP handlers
+ * shouldn't rely on this behavior but rather ensure that they are quick or at least yield regularly.
+ */
+ if (!response.TryAcquireSlowSlot()) {
+ HttpUtility::SendJsonError(response, request.Params(), 503,
+ "Too many requests already in progress, please try again later.");
+ response.Flush(yc);
+ return;
+ }
HttpHandler::ProcessRequest(waitGroup, request, response, yc);
response.body().Finish();
@@ -497,14 +519,9 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
<< ", user: " << (request.User() ? request.User()->GetName() : "")
<< ", agent: " << request[http::field::user_agent]; //operator[] - Returns the value for a field, or "" if it does not exist.
- ch::steady_clock::duration cpuBoundWorkTime(0);
- Defer addRespCode ([&response, start, &logMsg, &cpuBoundWorkTime]() {
- logMsg << ", status: " << response.result() << ")";
- if (cpuBoundWorkTime >= ch::seconds(1)) {
- logMsg << " waited " << ch::duration_cast(cpuBoundWorkTime).count() << "ms on semaphore and";
- }
-
- logMsg << " took total " << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms.";
+ Defer addRespCode ([&response, start, &logMsg]() {
+ logMsg << ", status: " << response.result() << ")" << " took total "
+ << ch::duration_cast(ch::steady_clock::now() - start).count() << "ms.";
});
if (!HandleAccessControl(request, response, yc)) {
@@ -525,7 +542,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
m_Seen = ch::steady_clock::time_point::max();
- ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc);
+ ProcessRequest(request, response, m_WaitGroup, yc);
if (!request.keep_alive() || !m_ConnectionReusable) {
break;
diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp
index 0dab1ed5f8..21b1845c61 100644
--- a/lib/remote/jsonrpcconnection.cpp
+++ b/lib/remote/jsonrpcconnection.cpp
@@ -87,15 +87,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
}
String rpcMethod("UNKNOWN");
- ch::steady_clock::duration cpuBoundDuration(0);
auto start (ch::steady_clock::now());
try {
- CpuBoundWork handleMessage (yc);
-
- // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
- cpuBoundDuration = ch::steady_clock::now() - start;
-
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
@@ -112,23 +106,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
- << "' (took total " << toMilliseconds(total) << "ms";
-
- if (cpuBoundDuration >= ch::seconds(1)) {
- msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
- }
- msg << ").";
+ << "' (took total " << toMilliseconds(total) << "ms" << ").";
} catch (const std::exception& ex) {
auto total = ch::steady_clock::now() - start;
Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
- << m_Identity << "' (took total " << toMilliseconds(total) << "ms";
-
- if (cpuBoundDuration >= ch::seconds(1)) {
- msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
- }
- msg << "): " << DiagnosticInformation(ex);
+ << m_Identity << "' (took total " << toMilliseconds(total) << "ms" << "): "
+ << DiagnosticInformation(ex);
break;
}