From d82b7e402642c65c8fc0e526945f54baf51a7772 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 25 Sep 2025 09:34:28 +0200 Subject: [PATCH 1/8] tests: fix min severity doesn't get updated If the logger is started with `Activate()` before `SetActive()`, it won't log anything, as the logger updates the "min severity" value of loggers only when starting them, and if they're not active at that point, they will just be ignored, so the min severity remains at info. --- test/base-testloggerfixture.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index 69c073b02a..c5a4f63e24 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -88,8 +88,8 @@ struct TestLoggerFixture TestLoggerFixture() { testLogger->SetSeverity(testLogger->SeverityToString(LogDebug)); - testLogger->Activate(true); testLogger->SetActive(true); + testLogger->Activate(true); } ~TestLoggerFixture() From 5fa68e6867270280642d869fed4b481b69922abf Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 8 Sep 2025 11:24:21 +0200 Subject: [PATCH 2/8] Make timeouts in JsonRpcConnection configurable --- lib/base/tlsstream.hpp | 11 +++--- lib/remote/jsonrpcconnection-heartbeat.cpp | 7 +++- lib/remote/jsonrpcconnection.cpp | 41 ++++++++++++++++------ lib/remote/jsonrpcconnection.hpp | 11 ++++-- 4 files changed, 51 insertions(+), 19 deletions(-) diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 9eed8d3b11..bc347a36ac 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -28,32 +28,31 @@ class SeenStream : public ARS { public: template - SeenStream(Args&&... args) : ARS(std::forward(args)...) + explicit SeenStream(Args&&... args) : ARS(std::forward(args)...), m_Seen(nullptr) { - m_Seen.store(nullptr); } template auto async_read_some(Args&&... args) -> decltype(((ARS*)nullptr)->async_read_some(std::forward(args)...)) { { - auto seen (m_Seen.load()); + auto* seen (m_Seen.load()); if (seen) { - *seen = Utility::GetTime(); + *seen = std::chrono::steady_clock::now(); } } return ((ARS*)this)->async_read_some(std::forward(args)...); } - inline void SetSeen(double* seen) + void SetSeen(std::chrono::steady_clock::time_point* seen) { m_Seen.store(seen); } private: - std::atomic m_Seen; + std::atomic m_Seen; }; struct UnbufferedAsioTlsStreamParams diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 9b83c13632..4722bb3204 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -21,12 +21,17 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler); * cluster connection alive when there isn't much going on. */ +void JsonRpcConnection::SetHeartbeatInterval(std::chrono::milliseconds interval) +{ + m_HeartbeatInterval = interval; +} + void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc) { boost::system::error_code ec; for (;;) { - m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20)); + m_HeartbeatTimer.expires_after(m_HeartbeatInterval); m_HeartbeatTimer.async_wait(yc[ec]); if (m_ShuttingDown) { diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 0dab1ed5f8..6e1c4dc518 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -38,7 +38,7 @@ JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const Stri JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), - m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), + m_Timestamp(Utility::GetTime()), m_Seen(std::chrono::steady_clock::now()), m_IoStrand(io), m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup), m_CheckLivenessTimer(io), m_HeartbeatTimer(io) { @@ -81,7 +81,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) break; } - m_Seen = Utility::GetTime(); + m_Seen = std::chrono::steady_clock::now(); if (m_Endpoint) { m_Endpoint->AddMessageReceived(jsonString.GetLength()); } @@ -236,6 +236,11 @@ void JsonRpcConnection::SendRawMessage(const String& message) }); } +void JsonRpcConnection::SetLivenessTimeout(std::chrono::milliseconds timeout) +{ + m_LivenessTimeout = timeout; +} + void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { if (m_ShuttingDown) { @@ -411,31 +416,47 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) * leaking the connection. Therefore close it after a timeout. */ - m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10)); + auto anonymousTimeout = m_LivenessTimeout / 6; + m_CheckLivenessTimer.expires_after(anonymousTimeout); m_CheckLivenessTimer.async_wait(yc[ec]); if (m_ShuttingDown) { return; } - auto remote (m_Stream->lowest_layer().remote_endpoint()); + { + auto remote(m_Stream->lowest_layer().remote_endpoint()); - Log(LogInformation, "JsonRpcConnection") - << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds."; + auto msg = Log(LogInformation, "JsonRpcConnection"); + msg << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after "; + if (anonymousTimeout > 1s) { + msg << anonymousTimeout.count() / 1000 << " seconds."; + } else { + msg << anonymousTimeout.count() << " milliseconds"; + } + } Disconnect(); } else { for (;;) { - m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); + m_CheckLivenessTimer.expires_after(m_LivenessTimeout / 2); m_CheckLivenessTimer.async_wait(yc[ec]); if (m_ShuttingDown) { break; } - if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { - Log(LogInformation, "JsonRpcConnection") - << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; + if (m_Seen + m_LivenessTimeout < std::chrono::steady_clock::now() && + (!m_Endpoint || !m_Endpoint->GetSyncing())) { + { + auto msg = Log(LogInformation, "JsonRpcConnection"); + msg << "No messages for identity '" << m_Identity << "' have been received in the last "; + if (m_LivenessTimeout > 1s) { + msg << m_LivenessTimeout.count() / 1000 << " seconds."; + } else { + msg << m_LivenessTimeout.count() << " milliseconds"; + } + } Disconnect(); break; diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index df846527ae..d88101100a 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -20,6 +20,8 @@ namespace icinga { +using namespace std::chrono_literals; + enum ClientRole { ClientInbound, @@ -61,6 +63,9 @@ class JsonRpcConnection final : public Object void SendMessage(const Dictionary::Ptr& request); void SendRawMessage(const String& request); + void SetLivenessTimeout(std::chrono::milliseconds timeout); + void SetHeartbeatInterval(std::chrono::milliseconds interval); + static Value HeartbeatAPIHandler(const intrusive_ptr& origin, const Dictionary::Ptr& params); static double GetWorkQueueRate(); @@ -74,14 +79,16 @@ class JsonRpcConnection final : public Object Shared::Ptr m_Stream; ConnectionRole m_Role; double m_Timestamp; - double m_Seen; + std::chrono::steady_clock::time_point m_Seen; boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioEvent m_OutgoingMessagesQueued; AsioEvent m_WriterDone; Atomic m_ShuttingDown; WaitGroup::Ptr m_WaitGroup; - boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; + std::chrono::milliseconds m_LivenessTimeout{60s}; + std::chrono::milliseconds m_HeartbeatInterval{20s}; + boost::asio::steady_timer m_CheckLivenessTimer, m_HeartbeatTimer; JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); From 4bd51fec17ab1e4940f06d3bc23d047247e0aa80 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 15 Sep 2025 12:16:00 +0200 Subject: [PATCH 3/8] Cleanup non-existing method from JsonRpcConnection --- lib/remote/jsonrpcconnection.hpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index d88101100a..84abf61c67 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -102,8 +102,6 @@ class JsonRpcConnection final : public Object void MessageHandler(const Dictionary::Ptr& message); - void CertificateRequestResponseHandler(const Dictionary::Ptr& message); - void SendMessageInternal(const Dictionary::Ptr& request); }; From 5f3a8c631741cb66a93b890df6c1d07a5bd9aa38 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Mon, 29 Sep 2025 11:00:27 +0200 Subject: [PATCH 4/8] Add Assert-Macros for the TestLogger Also add a Clear() function to clear existing log content. --- test/base-testloggerfixture.hpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/base-testloggerfixture.hpp b/test/base-testloggerfixture.hpp index c5a4f63e24..1a424d50ba 100644 --- a/test/base-testloggerfixture.hpp +++ b/test/base-testloggerfixture.hpp @@ -10,6 +10,12 @@ #include #include +#define CHECK_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(ExpectLogPattern(pattern, timeout)) +#define REQUIRE_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(ExpectLogPattern(pattern, timeout)) + +#define CHECK_NO_LOG_MESSAGE(pattern, timeout) BOOST_CHECK(!ExpectLogPattern(pattern, timeout)) +#define REQUIRE_NO_LOG_MESSAGE(pattern, timeout) BOOST_REQUIRE(!ExpectLogPattern(pattern, timeout)) + namespace icinga { class TestLogger : public Logger @@ -52,6 +58,13 @@ class TestLogger : public Logger return ret; } + void Clear() + { + std::lock_guard lock(m_Mutex); + m_Expects.clear(); + m_LogEntries.clear(); + } + private: void ProcessLogEntry(const LogEntry& entry) override { From f0e81912b7fbf2a0e0c308f749fbcbbd1acbed86 Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Tue, 9 Sep 2025 11:59:00 +0200 Subject: [PATCH 5/8] Add header providing timed asserts for unit tests _WITHIN asserts if the given condition becomes true within the given timeout. _EDGE_WITHIN aserts if the given condition becomes true no sooner than time 1 but not after time 2. _DONE_WITHIN asserts the execution time of the given expression is under the given duration. _DONE_BETWEEN asserts the execution time of the given expression is between the given durations. --- test/test-timedasserts.hpp | 134 +++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 test/test-timedasserts.hpp diff --git a/test/test-timedasserts.hpp b/test/test-timedasserts.hpp new file mode 100644 index 0000000000..743b3a6b18 --- /dev/null +++ b/test/test-timedasserts.hpp @@ -0,0 +1,134 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#pragma once + +#include +#include +#include +#include + +#define ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto pred = [&, this]() { return static_cast(condition); }; \ + BOOST_##level(AssertWithTimeout(pred, timeout, #condition)); \ + } while (0) + +#define REQUIRE_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, REQUIRE) +#define CHECK_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, CHECK) +#define WARN_WITHIN(condition, timeout) ASSERT_CONDITION_WITHIN_TIMEOUT(condition, timeout, WARN) + +#define ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto pred = [&, this]() { return static_cast(cond); }; \ + BOOST_##level(AssertEdgeWithinTimeout(pred, time1, time2, #cond)); \ + } while (0) + +#define REQUIRE_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, REQUIRE) +#define CHECK_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, CHECK) +#define WARN_EDGE_WITHIN(cond, time1, time2) ASSERT_CONDITION_EDGE_WITHIN_TIMEOUT(cond, time1, time2, WARN) + +#define ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, level) \ + /* NOLINTNEXTLINE */ \ + do { \ + /* NOLINTNEXTLINE */ \ + auto task = [&, this]() { expr; }; \ + BOOST_##level(AssertDoneWithin(task, time1, time2, #expr)); \ + } while (0) + +#define REQUIRE_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, REQUIRE) +#define CHECK_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, CHECK) +#define WARN_DONE_BETWEEN(expr, time1, time2) ASSERT_DONE_WITHIN_TIMEOUT(expr, time1, time2, WARN) + +#define REQUIRE_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, REQUIRE) +#define CHECK_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, CHECK) +#define WARN_DONE_WITHIN(expr, time1) ASSERT_DONE_WITHIN_TIMEOUT(expr, 0s, time1, WARN) + +namespace icinga { + +using namespace std::chrono_literals; + +/** + * Assert that the predicate `fn` will switch from false to true in the given time window + * + * @param fn The predicate to check + * @param timeout The duration in which the predicate is expected to return true + * @param cond A string representing the condition for use in error messages + * + * @return a boost assertion result. + */ +static boost::test_tools::assertion_result AssertWithTimeout( + const std::function& fn, + const std::chrono::duration& timeout, + std::string_view cond +) +{ + std::size_t iterations = timeout / 1ms; + auto stepDur = timeout / iterations; + for (std::size_t i = 0; i < iterations && !fn(); i++) { + std::this_thread::sleep_for(stepDur); + } + boost::test_tools::assertion_result retVal{fn()}; + retVal.message() << "Condition (" << cond << ") not true within " << timeout.count() << "s"; + return retVal; +} + +/** + * Assert that the predicate `fn` will switch from false to true in the given time window + * + * @param fn The predicate to check + * @param falseUntil The duration for which the predicate is expected to be false + * @param trueWithin The duration in which the predicate is expected to return true + * @param cond A string representing the condition for use in error messages + * + * @return a boost assertion result. + */ +static boost::test_tools::assertion_result AssertEdgeWithinTimeout( + const std::function& fn, + const std::chrono::duration& falseUntil, + const std::chrono::duration& trueWithin, + std::string_view cond +) +{ + std::size_t iterations = falseUntil / 1ms; + auto stepDur = falseUntil / iterations; + for (std::size_t i = 0; i < iterations && !fn(); i++) { + std::this_thread::sleep_for(stepDur); + } + if (fn()) { + boost::test_tools::assertion_result retVal{false}; + retVal.message() << "Condition (" << cond << ") was true before " << falseUntil.count() << "s"; + return retVal; + } + return AssertWithTimeout(fn, trueWithin, cond); +} + +/** + * Assert that the given function takes a duration between lower and upper to complete. + * + * @param fn The function to execute + * @param lower the lower bound to compare the duration against + * @param upper the upper bound to compare the duration against + * + * @return a boost assertion result. + */ +template +static boost::test_tools::assertion_result AssertDoneWithin( + const std::function& fn, + const std::chrono::duration& lower, + const std::chrono::duration& upper, + std::string_view fnString +) +{ + auto start = std::chrono::steady_clock::now(); + fn(); + auto duration = std::chrono::steady_clock::now() - start; + boost::test_tools::assertion_result retVal{duration > lower && duration < upper}; + retVal.message() << fnString << " took " << std::chrono::duration(duration).count() << "s"; + return retVal; +} + +} // namespace icinga From 51d2a8560413e75dcb21ccc59d577a96f6ece54b Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 1 Oct 2025 12:37:43 +0200 Subject: [PATCH 6/8] Add unit-tests for JsonRpcConnection --- test/CMakeLists.txt | 1 + test/remote-jsonrpcconnection.cpp | 427 ++++++++++++++++++++++++++++++ 2 files changed, 428 insertions(+) create mode 100644 test/remote-jsonrpcconnection.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c317d4ebe4..4161c5603c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -122,6 +122,7 @@ set(base_test_SOURCES remote-configpackageutility.cpp remote-httpserverconnection.cpp remote-httpmessage.cpp + remote-jsonrpcconnection.cpp remote-url.cpp ${base_OBJS} $ diff --git a/test/remote-jsonrpcconnection.cpp b/test/remote-jsonrpcconnection.cpp new file mode 100644 index 0000000000..b3bee763ce --- /dev/null +++ b/test/remote-jsonrpcconnection.cpp @@ -0,0 +1,427 @@ +/* Icinga 2 | (c) 2025 Icinga GmbH | GPLv2+ */ + +#include + +#include "base/json.hpp" +#include "remote/apifunction.hpp" +#include "remote/jsonrpc.hpp" +#include "remote/jsonrpcconnection.hpp" +#include "test/base-testloggerfixture.hpp" +#include "test/base-tlsstream-fixture.hpp" +#include "test/icingaapplication-fixture.hpp" +#include "test/test-ctest.hpp" +#include "test/test-timedasserts.hpp" +#include + +using namespace icinga; + +class JsonRpcConnectionFixture : public TlsStreamFixture, public TestLoggerFixture +{ +public: + JsonRpcConnectionFixture() + { + ScriptGlobal::Set("NodeName", "server"); + ApiListener::Ptr listener = new ApiListener; + listener->OnConfigLoaded(); + } + + JsonRpcConnection::Ptr ConnectEndpoint( + const Shared::Ptr& stream, + const String& identity, + bool authenticated = true, + bool deferStart = false + ) + { + Zone::Ptr zone = new Zone; + zone->SetName(identity); + zone->Register(); + + Endpoint::Ptr endpoint = new Endpoint; + endpoint->SetName(identity); + endpoint->SetZoneName(identity); + endpoint->Register(); + + StoppableWaitGroup::Ptr wg = new StoppableWaitGroup; + JsonRpcConnection::Ptr conn = new JsonRpcConnection(wg, identity, authenticated, stream, RoleClient); + if (!deferStart) { + conn->Start(); + } + + endpoint->AddClient(conn); + m_Connections[conn] = std::move(wg); + + return conn; + } + + using ConnectionPair = std::pair; + + ConnectionPair ConnectEndpointPair( + const String& nameA, + const String& nameB, + bool authenticated = true, + bool deferStart = false + ) + { + auto aToB = ConnectEndpoint(client, nameB, authenticated, deferStart); + auto bToA = ConnectEndpoint(server, nameA, authenticated, deferStart); + return std::make_pair(std::move(aToB), std::move(bToA)); + } + + void JoinWaitgroup(const JsonRpcConnection::Ptr& conn) { m_Connections[conn]->Join(); } + +private: + std::map m_Connections; +}; + +class TestApiHandler +{ +public: + struct Message + { + MessageOrigin::Ptr origin; + Dictionary::Ptr params; + }; + + using TestFn = std::function; + + static void RegisterTestFn(std::string handle, TestFn fn) { m_TestFns[std::move(handle)] = std::move(fn); } + + static Value TestApiFunction(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) + { + auto it = m_TestFns.find(String(params->Get("test")).GetData()); + if (it != m_TestFns.end()) { + return it->second(Message{origin, params}); + } + return Empty; + } + +private: + static inline std::unordered_map m_TestFns; +}; + +REGISTER_APIFUNCTION(Test, test, &TestApiHandler::TestApiFunction); + +BOOST_FIXTURE_TEST_SUITE(remote_jsonrpcconnection, JsonRpcConnectionFixture, + *CTestProperties("FIXTURES_REQUIRED ssl_certs") + *boost::unit_test::label("cluster")) + +BOOST_AUTO_TEST_CASE(construction) +{ + auto test = ConnectEndpoint(server, "test"); + BOOST_REQUIRE_EQUAL(test->GetEndpoint()->GetName(), "test"); + BOOST_REQUIRE_EQUAL(test->GetIdentity(), "test"); + BOOST_REQUIRE_CLOSE(test->GetTimestamp(), Utility::GetTime(), 1); + BOOST_REQUIRE(test->IsAuthenticated()); +} + +BOOST_AUTO_TEST_CASE(send_message) +{ + auto [aToB, bToA] = ConnectEndpointPair("a", "b"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + std::promise msgPromise; + TestApiHandler::RegisterTestFn("test", [&](TestApiHandler::Message msg) { + msgPromise.set_value(std::move(msg)); + return Empty; + }); + + // Test sending a regular message from a->b + aToB->SendMessage(message); + + auto msgFuture = msgPromise.get_future(); + BOOST_REQUIRE(msgFuture.wait_for(1s) == std::future_status::ready); + TestApiHandler::Message msg; + BOOST_CHECK_NO_THROW(msg = msgFuture.get()); + BOOST_REQUIRE_EQUAL(msg.origin->FromClient, bToA); + BOOST_REQUIRE_EQUAL(msg.params->Get("test"), "test"); + + aToB->Disconnect(); + CHECK_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 1s); + + // Sending messages after disconnect should result in an exception + BOOST_REQUIRE_THROW(aToB->SendMessage(message), std::runtime_error); +} + +BOOST_AUTO_TEST_CASE(send_raw_message) +{ + auto [aToB, bToA] = ConnectEndpointPair("a", "b"); + + auto message = JsonEncode(new Dictionary{{ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "params", new Dictionary{{"test", "test"}} } + }}); + + std::promise msgPromise; + TestApiHandler::RegisterTestFn("test", [&](TestApiHandler::Message msg) { + msgPromise.set_value(std::move(msg)); + return Empty; + }); + + // Test sending a raw message from b->a + bToA->SendRawMessage(message); + + auto msgFuture = msgPromise.get_future(); + BOOST_REQUIRE(msgFuture.wait_for(1s) == std::future_status::ready); + TestApiHandler::Message msg; + BOOST_CHECK_NO_THROW(msg = msgFuture.get()); + BOOST_REQUIRE_EQUAL(msg.origin->FromClient, aToB); + BOOST_REQUIRE_EQUAL(msg.params->Get("test"), "test"); + + aToB->Disconnect(); + CHECK_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 1s); + + // Sending messages after disconnect should result in an exception + BOOST_REQUIRE_THROW(bToA->SendRawMessage(message), std::runtime_error); +} + +BOOST_AUTO_TEST_CASE(old_message) +{ + auto conn = ConnectEndpoint(server, "test"); + BOOST_REQUIRE_EQUAL(conn->GetEndpoint()->GetRemoteLogPosition(), Timestamp{}); + + auto ts = Utility::GetTime(); + Dictionary::Ptr message = new Dictionary{{ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "ts", ts }, + { "params", new Dictionary{{"test", "test"}} } + }}; + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Received 'test::Test' message from identity 'test'.", 1s); + BOOST_CHECK_EQUAL(conn->GetEndpoint()->GetRemoteLogPosition(), ts); + testLogger->Clear(); + + message->Set("ts", Timestamp{}); + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + CHECK_NO_LOG_MESSAGE("Received 'test::Test' message from identity 'test'.", 0s); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(message_result) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "test1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { return "Success"; }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + auto msg = JsonRpc::DecodeMessage(JsonRpc::ReadMessage(client)); + BOOST_CHECK_EQUAL(msg->Get("id"), "test1"); + BOOST_CHECK_EQUAL(msg->Get("result"), "Success"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(message_result_noparams) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "test1" } + }); + + std::atomic_bool handlerCalled = false; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + handlerCalled = true; + return Empty; + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Ensure the message has been processed. + REQUIRE_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + + // The handler must not have been called since our message doesn't have any parameters. + BOOST_REQUIRE(!handlerCalled); + + auto msg = JsonRpc::DecodeMessage(JsonRpc::ReadMessage(client)); + BOOST_REQUIRE_EQUAL(msg->Get("id"), "test1"); + BOOST_REQUIRE_EQUAL(msg->Get("result"), Empty); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(call_to_nonexistant_function) +{ + auto conn = ConnectEndpoint(server, "test"); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::FooBar" }, + { "params", new Dictionary{} } + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::FooBar' message for identity 'test'.*", 1s); + CHECK_LOG_MESSAGE("Call to non-existent function.*", 0s); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(heartbeat_message) +{ + auto conn = ConnectEndpoint(server, "test", false, true); + conn->SetHeartbeatInterval(500ms); + conn->Start(); + + String msgString; + CHECK_DONE_BETWEEN(msgString = JsonRpc::ReadMessage(client), 500ms, 520ms); + + Dictionary::Ptr msg; + BOOST_REQUIRE_NO_THROW(msg = JsonRpc::DecodeMessage(msgString)); + BOOST_CHECK_EQUAL(msg->Get("method"), "event::Heartbeat"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(anonymous_disconnect) +{ + auto [a, b] = ConnectEndpointPair("a", "b", false, true); + BOOST_CHECK(!a->IsAuthenticated()); + BOOST_CHECK(!b->IsAuthenticated()); + + // Actual timeout we test here is this value divided by 6, so 50ms. + a->SetLivenessTimeout(300ms); + a->Start(); + b->SetLivenessTimeout(300ms); + b->Start(); + + CHECK_EDGE_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 45ms, 60ms); +} + +BOOST_AUTO_TEST_CASE(liveness_disconnect) +{ + auto [a, b] = ConnectEndpointPair("a", "b", true, true); + BOOST_CHECK(a->IsAuthenticated()); + BOOST_CHECK(b->IsAuthenticated()); + + a->SetLivenessTimeout(50ms); + a->Start(); + b->SetLivenessTimeout(50ms); + b->Start(); + + CHECK_EDGE_WITHIN(!client->lowest_layer().is_open() && !server->lowest_layer().is_open(), 45ms, 60ms); +} + +/* TODO: This test currently completes successfully, but only because of a scheduling + * detail of when the coroutine is spawned vs. when WriteOutgoingMessages() resumes. + * Ideally at some point we would want to rethink consistency during shutdown, wait + * for ACKs to some important messages and use the waitgroup to wait until the reply + * comes in. So currently this is disabled, because we can't reliably assume that + * remaining messages will get sent, but also because it doesn't test functionality + * we're using in any meaningful way at the moment. + */ +BOOST_AUTO_TEST_CASE(wg_join_during_send, *boost::unit_test::disabled{}) +{ + auto conn = ConnectEndpoint(server, "test"); + BOOST_CHECK(conn->IsAuthenticated()); + + std::promise beforeWgJoinPromise; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + beforeWgJoinPromise.set_value(); + return "Response"; + }); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "wgTest1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Wait until the API-function is running, then join the WaitGroup. + BOOST_REQUIRE(beforeWgJoinPromise.get_future().wait_for(1s) == std::future_status::ready); + JoinWaitgroup(conn); + + // We still need to receive a response, even though the waitgroup is joined. + String messageRaw; + BOOST_REQUIRE_NO_THROW(messageRaw = JsonRpc::ReadMessage(client)); + BOOST_REQUIRE_NO_THROW(message = JsonRpc::DecodeMessage(messageRaw)); + BOOST_CHECK_EQUAL(message->Get("id"), "wgTest1"); + BOOST_CHECK_EQUAL(message->Get("result"), "Response"); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_CASE(send_after_wg_join) +{ + auto conn = ConnectEndpoint(server, "test"); + + BOOST_CHECK(conn->IsAuthenticated()); + + std::atomic_bool handlerRan = false; + TestApiHandler::RegisterTestFn("test", [&](const TestApiHandler::Message&) { + handlerRan = true; + return Empty; + }); + + Dictionary::Ptr message = new Dictionary({ + { "jsonrpc", "2.0" }, + { "method", "test::Test" }, + { "id", "wgTest1" }, + { "params", new Dictionary{{"test", "test"}} } + }); + + // Join the wait-group even before sending the message. + JoinWaitgroup(conn); + + // The message should be received without error, but it should not be processed. + BOOST_REQUIRE_NO_THROW(JsonRpc::SendMessage(client, message)); + client->flush(); + + // Wait until the message has been processed. + CHECK_LOG_MESSAGE("Processed JSON-RPC 'test::Test' message for identity 'test'.*", 1s); + + // Verify that the handler hasn't run. + BOOST_CHECK(!handlerRan); + + // Since the message has not been processed, JsonRpcConnection also shouldn't have sent anything. + BOOST_CHECK_EQUAL(Endpoint::GetByName("test")->GetBytesSentPerSecond(), 0); + + conn->Disconnect(); + BOOST_CHECK(Shutdown(client)); + CHECK_WITHIN(!server->lowest_layer().is_open(), 1s); +} + +BOOST_AUTO_TEST_SUITE_END() From 3468bb1a52e611581fa17b3892b3c2764c16d58e Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 1 Oct 2025 15:17:43 +0200 Subject: [PATCH 7/8] Why is it always Windows? --- lib/remote/jsonrpcconnection.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 6e1c4dc518..5fc0432b95 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -419,6 +419,9 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) auto anonymousTimeout = m_LivenessTimeout / 6; m_CheckLivenessTimer.expires_after(anonymousTimeout); m_CheckLivenessTimer.async_wait(yc[ec]); + if (ec) { + Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); + } if (m_ShuttingDown) { return; @@ -441,6 +444,9 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) for (;;) { m_CheckLivenessTimer.expires_after(m_LivenessTimeout / 2); m_CheckLivenessTimer.async_wait(yc[ec]); + if (ec) { + Log(LogCritical, "JsonRpcConnection") << "Error waiting for Liveness timer: " << ec.message(); + } if (m_ShuttingDown) { break; From 20ea6d93f9adcbf4ebc7a983ec7f2f4a810363af Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Wed, 1 Oct 2025 15:18:37 +0200 Subject: [PATCH 8/8] And only Windows? --- .github/workflows/container-image.yml | 1 - .github/workflows/linux.yml | 1 - 2 files changed, 2 deletions(-) diff --git a/.github/workflows/container-image.yml b/.github/workflows/container-image.yml index 18985d65b8..61be0afb99 100644 --- a/.github/workflows/container-image.yml +++ b/.github/workflows/container-image.yml @@ -4,7 +4,6 @@ on: push: branches: - master - pull_request: {} release: types: - published diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index ac09815deb..386eb2a201 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -5,7 +5,6 @@ on: branches: - master - 'support/*' - pull_request: {} concurrency: group: linux-${{ github.event_name == 'push' && github.sha || github.ref }}