@@ -60,6 +60,8 @@ void JsonRpcConnection::Start()
60
60
61
61
void JsonRpcConnection::HandleIncomingMessages (boost::asio::yield_context yc)
62
62
{
63
+ namespace ch = std::chrono;
64
+
63
65
m_Stream->next_layer ().SetSeen (&m_Seen);
64
66
65
67
for (;;) {
@@ -78,9 +80,36 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
78
80
m_Seen = Utility::GetTime ();
79
81
80
82
try {
83
+ auto start (ch::steady_clock::now ());
84
+ ch::steady_clock::duration cpuBoundDuration;
85
+
86
+ String rpcMethod;
87
+
88
+ Log statsLog (LogDebug, " JsonRpcConnection" );
89
+
90
+ Defer addLogStats ([this , &statsLog, &rpcMethod, &start, &cpuBoundDuration]() {
91
+ statsLog << " Processing JSON-RPC '" << rpcMethod << " ' message for identity '" << m_Identity << " '" ;
92
+
93
+ if (ch::duration_cast<ch::seconds>(cpuBoundDuration).count ()) {
94
+ statsLog << " waited '" << ch::duration_cast<ch::milliseconds>(cpuBoundDuration).count () << " ms' on semaphore and" ;
95
+ }
96
+
97
+ auto duration = ch::steady_clock::now () - start;
98
+ if (duration >= ch::seconds (5 )) {
99
+ // Processing that RPC message seems to take an unexpectedly long time,
100
+ // so promote the log entry from debug to warning.
101
+ statsLog.SetSeverity (LogWarning);
102
+ }
103
+
104
+ statsLog << " took total " << ch::duration_cast<ch::milliseconds>(duration).count () << " ms." ;
105
+ });
106
+
81
107
CpuBoundWork handleMessage (yc);
82
108
83
- MessageHandler (message);
109
+ // Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
110
+ cpuBoundDuration = ch::steady_clock::now () - start;
111
+
112
+ MessageHandler (message, rpcMethod);
84
113
85
114
l_TaskStats.InsertValue (Utility::GetTime (), 1 );
86
115
} catch (const std::exception& ex) {
@@ -245,9 +274,15 @@ void JsonRpcConnection::Disconnect()
245
274
});
246
275
}
247
276
248
- void JsonRpcConnection::MessageHandler (const String& jsonString)
277
+ void JsonRpcConnection::MessageHandler (const String& jsonString, String& rpcMethod )
249
278
{
250
279
Dictionary::Ptr message = JsonRpc::DecodeMessage (jsonString);
280
+ Defer setRpcMethod ([&message, &rpcMethod]() {
281
+ rpcMethod = message->Get (" method" );
282
+ if (rpcMethod.IsEmpty ()) {
283
+ rpcMethod = " UNKNOWN" ;
284
+ }
285
+ });
251
286
252
287
if (m_Endpoint && message->Contains (" ts" )) {
253
288
double ts = message->Get (" ts" );
0 commit comments