Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions doc/19-technical-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -1148,21 +1148,6 @@ hidden in Boost ASIO, Beast, Coroutine and Context libraries.

#### Data Exchange: Coroutines and I/O Engine <a id="technical-concepts-tls-network-io-connection-data-exchange-coroutines"></a>

Light-weight and fast operations such as connection handling or TLS handshakes
are performed in the default `IoBoundWorkSlot` pool inside the I/O engine.

The I/O engine has another pool available: `CpuBoundWork`.

This is used for processing CPU intensive tasks, such as handling a HTTP request.
Depending on the available CPU cores, this is limited to `std::thread::hardware_concurrency() * 3u / 2u`.

```
1 core * 3 / 2 = 1
2 cores * 3 / 2 = 3
8 cores * 3 / 2 = 12
16 cores * 3 / 2 = 24
```

The I/O engine itself is used with all network I/O in Icinga, not only the cluster
and the REST API. Features such as Graphite, InfluxDB, etc. also consume its functionality.

Expand Down
91 changes: 32 additions & 59 deletions lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,6 @@

using namespace icinga;

CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}

break;
}
}

CpuBoundWork::~CpuBoundWork()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
}
}

void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);

m_Done = true;
}
}

IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc)
: yc(yc)
{
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
}

IoBoundWorkSlot::~IoBoundWorkSlot()
{
auto& ioEngine (IoEngine::Get());

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
}

break;
}
}

LazyInit<std::unique_ptr<IoEngine>> IoEngine::m_Instance ([]() { return std::unique_ptr<IoEngine>(new IoEngine()); });

IoEngine& IoEngine::Get()
Expand All @@ -85,10 +28,14 @@ boost::asio::io_context& IoEngine::GetIoContext()
return m_IoContext;
}

IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
IoEngine::IoEngine()
: m_IoContext(),
m_KeepAlive(boost::asio::make_work_guard(m_IoContext)),
m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)),
m_AlreadyExpiredTimer(m_IoContext),
m_SlowSlotsAvailable(Configuration::Concurrency)
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);

for (auto& thread : m_Threads) {
thread = std::thread(&IoEngine::RunEventLoop, this);
Expand Down Expand Up @@ -124,6 +71,32 @@ void IoEngine::RunEventLoop()
}
}

/**
* Try to acquire a slot for a slow operation. This is intended to limit the number of concurrent slow operations. In
* case no slot is returned, the caller should reject the operation (for example by sending an HTTP error) to prevent an
* overload situation.
*
* @return A RAII-style object representing the slot. operator bool() can be used to check if the operation was
* successful and the caller now owns a slot. Its destructor automatically releases the slot.
*/
IoEngine::SlowSlot IoEngine::TryAcquireSlowSlot()
{
// This is basically an ad-hoc (partial) semaphore implementation.
// TODO(C++20): Use std::counting_semaphore instead.

std::unique_lock lock(m_SlowSlotsMutex);
if (m_SlowSlotsAvailable > 0) {
m_SlowSlotsAvailable--;
lock.unlock();

return std::make_unique<Defer>([this] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a good reason to make Defer movable.

std::unique_lock lock(m_SlowSlotsMutex);
m_SlowSlotsAvailable++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, my semaphore from #9990 seems pretty fast. Especially a release could be just an atomic subtraction.

});
}
return {};
}

AsioEvent::AsioEvent(boost::asio::io_context& io, bool init)
: m_Timer(io)
{
Expand Down
50 changes: 6 additions & 44 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "base/atomic.hpp"
#include "base/debug.hpp"
#include "base/defer.hpp"
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
Expand All @@ -29,56 +30,13 @@
namespace icinga
{

/**
* Scope lock for CPU-bound work done in an I/O thread
*
* @ingroup base
*/
class CpuBoundWork
{
public:
CpuBoundWork(boost::asio::yield_context yc);
CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
CpuBoundWork& operator=(CpuBoundWork&&) = delete;
~CpuBoundWork();

void Done();

private:
bool m_Done;
};

/**
* Scope break for CPU-bound work done in an I/O thread
*
* @ingroup base
*/
class IoBoundWorkSlot
{
public:
IoBoundWorkSlot(boost::asio::yield_context yc);
IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
~IoBoundWorkSlot();

private:
boost::asio::yield_context yc;
};

/**
* Async I/O engine
*
* @ingroup base
*/
class IoEngine
{
friend CpuBoundWork;
friend IoBoundWorkSlot;

public:
IoEngine(const IoEngine&) = delete;
IoEngine(IoEngine&&) = delete;
Expand Down Expand Up @@ -139,6 +97,9 @@ class IoEngine
Get().m_AlreadyExpiredTimer.async_wait(yc);
}

using SlowSlot = std::unique_ptr<Defer>;
SlowSlot TryAcquireSlowSlot();

private:
IoEngine();

Expand All @@ -150,7 +111,8 @@ class IoEngine
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;
std::atomic_int_fast32_t m_CpuBoundSemaphore;
std::mutex m_SlowSlotsMutex;
int m_SlowSlotsAvailable;
};

class TerminateIoThread : public std::exception
Expand Down
2 changes: 0 additions & 2 deletions lib/remote/eventshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ bool EventsHandler::HandleRequest(

EventsSubscriber subscriber (std::move(eventTypes), HttpUtility::GetLastParameter(params, "filter"), l_ApiQuery);

IoBoundWorkSlot dontLockTheIoThread (yc);

response.result(http::status::ok);
response.set(http::field::content_type, "application/json");
response.StartStreaming(true);
Expand Down
20 changes: 20 additions & 0 deletions lib/remote/httpmessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ void HttpResponse::Flush(boost::asio::yield_context yc)
prepare_payload();
}

// If this request currently owns a slow request slot, release it as is must not be held while sending to the client
// (otherwise, waiting for a slow client to read its data would block the slot).
m_SlowSlot.reset();

m_SerializationStarted = true;

if (!m_Serializer.is_header_done()) {
Expand Down Expand Up @@ -194,3 +198,19 @@ JsonEncoder HttpResponse::GetJsonEncoder(bool pretty)
{
return JsonEncoder{std::make_shared<HttpResponseJsonWriter>(*this), pretty};
}

/**
* Tries to acquire a slow slot using ApiListener::TryAcquireSlowSlot(). If this was successful, that slot will be
* owned by this HttpResponse object and is automatically released when the next write operation is performed using
* Flush() or any other method that calls it.
*
* In case no ApiListener is configured (only relevant for tests), no limiting of concurrent requests takes place and
* this method always returns true to allow callers to just check the return value to determine whether to continue.
*
* @return Whether the operation was successful and the handler can continue.
*/
bool HttpResponse::TryAcquireSlowSlot()
{
m_SlowSlot = IoEngine::Get().TryAcquireSlowSlot();
return bool(m_SlowSlot);
}
4 changes: 4 additions & 0 deletions lib/remote/httpmessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "base/dictionary.hpp"
#include "base/json.hpp"
#include "base/tlsstream.hpp"
#include "remote/apilistener.hpp"
#include "remote/apiuser.hpp"
#include "remote/httpserverconnection.hpp"
#include "remote/url.hpp"
Expand Down Expand Up @@ -269,13 +270,16 @@ class HttpResponse : public boost::beast::http::response<SerializableBody<boost:

JsonEncoder GetJsonEncoder(bool pretty = false);

bool TryAcquireSlowSlot();

private:
using Serializer = boost::beast::http::response_serializer<HttpResponse::body_type>;
Serializer m_Serializer{*this};
bool m_SerializationStarted = false;

HttpServerConnection::Ptr m_Server;
Shared<AsioTlsStream>::Ptr m_Stream;
IoEngine::SlowSlot m_SlowSlot;
};

} // namespace icinga
45 changes: 31 additions & 14 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,37 @@ void ProcessRequest(
HttpRequest& request,
HttpResponse& response,
const WaitGroup::Ptr& waitGroup,
std::chrono::steady_clock::duration& cpuBoundWorkTime,
boost::asio::yield_context& yc
)
{
try {
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
auto start (std::chrono::steady_clock::now());
CpuBoundWork handlingRequest (yc);
cpuBoundWorkTime = std::chrono::steady_clock::now() - start;
/* Place some restrictions on the total number of HTTP requests handled concurrently to prevent HTTP requests
* from hogging the entire coroutine thread pool by running too many requests handlers at once that don't
* regularly yield, starving other coroutines.
*
* We need to consider two types of handlers here:
*
* 1. Those performing a more or less expensive operation and then returning the whole response at once.
* Not too many of such handlers should run concurrently.
* 2. Those already streaming the response while they are running, for example using chunked transfer encoding.
* For these, we assume that they will frequently yield to other coroutines, in particular when writing parts
* of the response to the client, or in case of EventsHandler also when waiting for new events.
*
* The following approach handles both of this automatically: we acquire one of a limited number of slots for
* each request and release it automatically the first time anything (either the full response after the handler
* finished or the first chunk from within the handler) is written using the response object. This means that
* we don't have to handle acquiring or releasing that slot inside individual handlers.
*
* Overall, this is more or less a safeguard preventing long-running HTTP handlers from taking down the entire
* Icinga 2 process by blocking the execution of JSON-RPC message handlers. In general, (new) HTTP handlers
* shouldn't rely on this behavior but rather ensure that they are quick or at least yield regularly.
*/
if (!response.TryAcquireSlowSlot()) {
HttpUtility::SendJsonError(response, request.Params(), 503,
"Too many requests already in progress, please try again later.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP requests are now rejected with a 503 Service Unavailable error if there's no slot available. This means that if there is too much load on an Icinga 2 instance from HTTP requests, this shows in error messages instead of more and more waiting requests accumulating and increased response times.

This indeed seems clever:

But at least waiting some time before giving up looks clever AND smart to me: https://www.haproxy.com/blog/protect-servers-with-haproxy-connection-limits-and-queues#:~:text=a%20client%20will%20wait%20for%20up%20to%2030%20seconds%20in%20the%20queue,%20after%20which%20HAProxy%20returns%20a%20503%20Service%20Unavailable%20response%20to%20them

I consider even the current behavior better (accumulating and increased response times) than the 503 approach. This handles rush hours smoother.

While on HTTP status codes! This PR lets 1 ApiUser DoS the cluster now, I'd expect a 429 before even running into 503 and/or waiting.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡

At least I'd introduce a new permission making an ApiUser wait as long as necessary, preventing 503.

This is especially useful for critical components which consume the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR lets 1 ApiUser DoS the cluster now

This PR doesn't increase the time an ApiUser can block a slot, so wasn't this the same before, it's just hidden on the server side by waiting instead of retrying on the client side? If you send another request, you get another chance at obtaining that slot, just like you get when the server waits for you.

I'd expect a 429 before even running into 503 and/or waiting.

That's a whole other issue. Regardless of what we do to CpuBoundWork, we can consider doing some rate-limiting, like per ApiUser. That could avoid some overload situations, but not all (if there are just many different users putting load on the server).

response.Flush(yc);
return;
}

HttpHandler::ProcessRequest(waitGroup, request, response, yc);
response.body().Finish();
Expand Down Expand Up @@ -497,14 +519,9 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)
<< ", user: " << (request.User() ? request.User()->GetName() : "<unauthenticated>")
<< ", agent: " << request[http::field::user_agent]; //operator[] - Returns the value for a field, or "" if it does not exist.

ch::steady_clock::duration cpuBoundWorkTime(0);
Defer addRespCode ([&response, start, &logMsg, &cpuBoundWorkTime]() {
logMsg << ", status: " << response.result() << ")";
if (cpuBoundWorkTime >= ch::seconds(1)) {
logMsg << " waited " << ch::duration_cast<ch::milliseconds>(cpuBoundWorkTime).count() << "ms on semaphore and";
}

logMsg << " took total " << ch::duration_cast<ch::milliseconds>(ch::steady_clock::now() - start).count() << "ms.";
Defer addRespCode ([&response, start, &logMsg]() {
logMsg << ", status: " << response.result() << ")" << " took total "
<< ch::duration_cast<ch::milliseconds>(ch::steady_clock::now() - start).count() << "ms.";
});

if (!HandleAccessControl(request, response, yc)) {
Expand All @@ -525,7 +542,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)

m_Seen = ch::steady_clock::time_point::max();

ProcessRequest(request, response, m_WaitGroup, cpuBoundWorkTime, yc);
ProcessRequest(request, response, m_WaitGroup, yc);

if (!request.keep_alive() || !m_ConnectionReusable) {
break;
Expand Down
21 changes: 3 additions & 18 deletions lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
}

String rpcMethod("UNKNOWN");
ch::steady_clock::duration cpuBoundDuration(0);
auto start (ch::steady_clock::now());

try {
CpuBoundWork handleMessage (yc);

// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;

Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
if (String method = message->Get("method"); !method.IsEmpty()) {
rpcMethod = std::move(method);
Expand All @@ -112,23 +106,14 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)

Log msg(total >= ch::seconds(5) ? LogWarning : LogDebug, "JsonRpcConnection");
msg << "Processed JSON-RPC '" << rpcMethod << "' message for identity '" << m_Identity
<< "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}
msg << ").";
<< "' (took total " << toMilliseconds(total) << "ms" << ").";
} catch (const std::exception& ex) {
auto total = ch::steady_clock::now() - start;

Log msg(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection");
msg << "Error while processing JSON-RPC '" << rpcMethod << "' message for identity '"
<< m_Identity << "' (took total " << toMilliseconds(total) << "ms";

if (cpuBoundDuration >= ch::seconds(1)) {
msg << ", waited " << toMilliseconds(cpuBoundDuration) << "ms on semaphore";
}
msg << "): " << DiagnosticInformation(ex);
<< m_Identity << "' (took total " << toMilliseconds(total) << "ms" << "): "
<< DiagnosticInformation(ex);

break;
}
Expand Down
Loading