Skip to content

Commit ac0cf25

Browse files
committed
[lldb] Update JSONTransport to use MainLoop for reading.
This updates JSONTransport to use a MainLoop for reading messages. This also allows us to read in larger chunks than we did previously. With the event driven reading operations we can read in chunks and store the contents in an internal buffer. Separately we can parse the buffer and split the contents up into messages. Our previous version approach would read a byte at a time, which is less efficient.
1 parent 4b6e54a commit ac0cf25

File tree

11 files changed

+443
-334
lines changed

11 files changed

+443
-334
lines changed

lldb/include/lldb/Host/JSONTransport.h

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
#ifndef LLDB_HOST_JSONTRANSPORT_H
1414
#define LLDB_HOST_JSONTRANSPORT_H
1515

16+
#include "lldb/Host/MainLoopBase.h"
1617
#include "lldb/lldb-forward.h"
1718
#include "llvm/ADT/StringRef.h"
1819
#include "llvm/Support/Error.h"
1920
#include "llvm/Support/FormatVariadic.h"
2021
#include "llvm/Support/JSON.h"
21-
#include <chrono>
22+
#include <string>
2223
#include <system_error>
24+
#include <vector>
2325

2426
namespace lldb_private {
2527

@@ -68,6 +70,10 @@ class TransportInvalidError : public llvm::ErrorInfo<TransportInvalidError> {
6870
/// A transport class that uses JSON for communication.
6971
class JSONTransport {
7072
public:
73+
using ReadHandleUP = MainLoopBase::ReadHandleUP;
74+
template <typename T>
75+
using Callback = std::function<void(MainLoopBase &, llvm::Expected<T>)>;
76+
7177
JSONTransport(lldb::IOObjectSP input, lldb::IOObjectSP output);
7278
virtual ~JSONTransport() = default;
7379

@@ -83,24 +89,59 @@ class JSONTransport {
8389
return WriteImpl(message);
8490
}
8591

86-
/// Reads the next message from the input stream.
92+
/// Registers the transport with the MainLoop.
8793
template <typename T>
88-
llvm::Expected<T> Read(const std::chrono::microseconds &timeout) {
89-
llvm::Expected<std::string> message = ReadImpl(timeout);
90-
if (!message)
91-
return message.takeError();
92-
return llvm::json::parse<T>(/*JSON=*/*message);
94+
llvm::Expected<ReadHandleUP> RegisterReadObject(MainLoopBase &loop,
95+
Callback<T> callback) {
96+
Status error;
97+
ReadHandleUP handle = loop.RegisterReadObject(
98+
m_input,
99+
[&](MainLoopBase &loop) {
100+
char buf[1024];
101+
size_t len = sizeof(buf);
102+
do {
103+
if (llvm::Error error = m_input->Read(buf, len).takeError()) {
104+
callback(loop, std::move(error));
105+
return;
106+
}
107+
108+
if (len == 0) // EOF
109+
break;
110+
111+
m_buffer.append(std::string(buf, len));
112+
} while (len == sizeof(buf));
113+
114+
llvm::Expected<std::vector<std::string>> messages = Parse();
115+
if (llvm::Error error = messages.takeError()) {
116+
callback(loop, std::move(error));
117+
return;
118+
}
119+
120+
for (const auto &message : *messages)
121+
if constexpr (std::is_same<T, std::string>::value)
122+
callback(loop, message);
123+
else
124+
callback(loop, llvm::json::parse<T>(message));
125+
126+
// On EOF, request termination after handling all the messages.
127+
if (len == 0)
128+
callback(loop, llvm::make_error<TransportEOFError>());
129+
},
130+
error);
131+
if (error.Fail())
132+
return error.takeError();
133+
return handle;
93134
}
94135

95136
protected:
96137
virtual void Log(llvm::StringRef message);
97138

98139
virtual llvm::Error WriteImpl(const std::string &message) = 0;
99-
virtual llvm::Expected<std::string>
100-
ReadImpl(const std::chrono::microseconds &timeout) = 0;
140+
virtual llvm::Expected<std::vector<std::string>> Parse() = 0;
101141

102142
lldb::IOObjectSP m_input;
103143
lldb::IOObjectSP m_output;
144+
std::string m_buffer;
104145
};
105146

106147
/// A transport class for JSON with a HTTP header.
@@ -111,14 +152,12 @@ class HTTPDelimitedJSONTransport : public JSONTransport {
111152
virtual ~HTTPDelimitedJSONTransport() = default;
112153

113154
protected:
114-
virtual llvm::Error WriteImpl(const std::string &message) override;
115-
virtual llvm::Expected<std::string>
116-
ReadImpl(const std::chrono::microseconds &timeout) override;
117-
118-
// FIXME: Support any header.
119-
static constexpr llvm::StringLiteral kHeaderContentLength =
120-
"Content-Length: ";
121-
static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n\r\n";
155+
llvm::Error WriteImpl(const std::string &message) override;
156+
llvm::Expected<std::vector<std::string>> Parse() override;
157+
158+
static constexpr llvm::StringLiteral kHeaderContentLength = "Content-Length";
159+
static constexpr llvm::StringLiteral kHeaderFieldSeparator = ":";
160+
static constexpr llvm::StringLiteral kHeaderSeparator = "\r\n";
122161
};
123162

124163
/// A transport class for JSON RPC.
@@ -129,9 +168,8 @@ class JSONRPCTransport : public JSONTransport {
129168
virtual ~JSONRPCTransport() = default;
130169

131170
protected:
132-
virtual llvm::Error WriteImpl(const std::string &message) override;
133-
virtual llvm::Expected<std::string>
134-
ReadImpl(const std::chrono::microseconds &timeout) override;
171+
llvm::Error WriteImpl(const std::string &message) override;
172+
llvm::Expected<std::vector<std::string>> Parse() override;
135173

136174
static constexpr llvm::StringLiteral kMessageSeparator = "\n";
137175
};

lldb/source/Host/common/JSONTransport.cpp

Lines changed: 67 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -7,127 +7,77 @@
77
//===----------------------------------------------------------------------===//
88

99
#include "lldb/Host/JSONTransport.h"
10-
#include "lldb/Utility/IOObject.h"
1110
#include "lldb/Utility/LLDBLog.h"
1211
#include "lldb/Utility/Log.h"
13-
#include "lldb/Utility/SelectHelper.h"
1412
#include "lldb/Utility/Status.h"
1513
#include "lldb/lldb-forward.h"
1614
#include "llvm/ADT/StringExtras.h"
1715
#include "llvm/ADT/StringRef.h"
1816
#include "llvm/Support/Error.h"
1917
#include "llvm/Support/raw_ostream.h"
20-
#include <optional>
2118
#include <string>
2219
#include <utility>
2320

2421
using namespace llvm;
2522
using namespace lldb;
2623
using namespace lldb_private;
2724

28-
/// ReadFull attempts to read the specified number of bytes. If EOF is
29-
/// encountered, an empty string is returned.
30-
static Expected<std::string>
31-
ReadFull(IOObject &descriptor, size_t length,
32-
std::optional<std::chrono::microseconds> timeout = std::nullopt) {
33-
if (!descriptor.IsValid())
34-
return llvm::make_error<TransportInvalidError>();
35-
36-
bool timeout_supported = true;
37-
// FIXME: SelectHelper does not work with NativeFile on Win32.
38-
#if _WIN32
39-
timeout_supported = descriptor.GetFdType() == IOObject::eFDTypeSocket;
40-
#endif
41-
42-
if (timeout && timeout_supported) {
43-
SelectHelper sh;
44-
sh.SetTimeout(*timeout);
45-
sh.FDSetRead(
46-
reinterpret_cast<lldb::socket_t>(descriptor.GetWaitableHandle()));
47-
Status status = sh.Select();
48-
if (status.Fail()) {
49-
// Convert timeouts into a specific error.
50-
if (status.GetType() == lldb::eErrorTypePOSIX &&
51-
status.GetError() == ETIMEDOUT)
52-
return make_error<TransportTimeoutError>();
53-
return status.takeError();
54-
}
55-
}
56-
57-
std::string data;
58-
data.resize(length);
59-
Status status = descriptor.Read(data.data(), length);
60-
if (status.Fail())
61-
return status.takeError();
62-
63-
// Read returns '' on EOF.
64-
if (length == 0)
65-
return make_error<TransportEOFError>();
66-
67-
// Return the actual number of bytes read.
68-
return data.substr(0, length);
69-
}
70-
71-
static Expected<std::string>
72-
ReadUntil(IOObject &descriptor, StringRef delimiter,
73-
std::optional<std::chrono::microseconds> timeout = std::nullopt) {
74-
std::string buffer;
75-
buffer.reserve(delimiter.size() + 1);
76-
while (!llvm::StringRef(buffer).ends_with(delimiter)) {
77-
Expected<std::string> next =
78-
ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout);
79-
if (auto Err = next.takeError())
80-
return std::move(Err);
81-
buffer += *next;
82-
}
83-
return buffer.substr(0, buffer.size() - delimiter.size());
84-
}
85-
8625
JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output)
8726
: m_input(std::move(input)), m_output(std::move(output)) {}
8827

8928
void JSONTransport::Log(llvm::StringRef message) {
9029
LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message);
9130
}
9231

93-
Expected<std::string>
94-
HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) {
95-
if (!m_input || !m_input->IsValid())
96-
return llvm::make_error<TransportInvalidError>();
32+
Expected<std::vector<std::string>> HTTPDelimitedJSONTransport::Parse() {
33+
if (m_buffer.empty())
34+
return std::vector<std::string>{};
35+
36+
std::vector<std::string> messages;
37+
llvm::StringRef buf = m_buffer;
38+
size_t content_length = 0, end_of_last_message = 0, cursor = 0;
39+
do {
40+
auto idx = buf.find(kHeaderSeparator, cursor);
41+
if (idx == StringRef::npos)
42+
break;
43+
44+
auto header = buf.slice(cursor, idx);
45+
cursor = idx + kHeaderSeparator.size();
46+
47+
// An empty line separates the headers from the message body.
48+
if (header.empty()) {
49+
// Not enough data, wait for the next chunk to arrive.
50+
if (content_length + cursor > buf.size())
51+
break;
52+
53+
std::string body = buf.substr(cursor, content_length).str();
54+
end_of_last_message = cursor + content_length;
55+
cursor += content_length;
56+
Log(llvm::formatv("--> {0}", body).str());
57+
messages.push_back(body);
58+
content_length = 0;
59+
continue;
60+
}
61+
62+
// HTTP Headers are `<field-name>: [<field-value>]`.
63+
if (!header.contains(kHeaderFieldSeparator))
64+
return make_error<StringError>("malformed content header",
65+
inconvertibleErrorCode());
66+
67+
auto [name, value] = header.split(kHeaderFieldSeparator);
68+
if (name.lower() == kHeaderContentLength.lower()) {
69+
value = value.trim();
70+
if (value.trim().consumeInteger(10, content_length))
71+
return make_error<StringError>(
72+
formatv("invalid content length: {0}", value).str(),
73+
inconvertibleErrorCode());
74+
}
75+
} while (cursor < buf.size());
9776

98-
IOObject *input = m_input.get();
99-
Expected<std::string> message_header =
100-
ReadFull(*input, kHeaderContentLength.size(), timeout);
101-
if (!message_header)
102-
return message_header.takeError();
103-
if (*message_header != kHeaderContentLength)
104-
return createStringError(formatv("expected '{0}' and got '{1}'",
105-
kHeaderContentLength, *message_header)
106-
.str());
107-
108-
Expected<std::string> raw_length = ReadUntil(*input, kHeaderSeparator);
109-
if (!raw_length)
110-
return handleErrors(raw_length.takeError(),
111-
[&](const TransportEOFError &E) -> llvm::Error {
112-
return createStringError(
113-
"unexpected EOF while reading header separator");
114-
});
115-
116-
size_t length;
117-
if (!to_integer(*raw_length, length))
118-
return createStringError(
119-
formatv("invalid content length {0}", *raw_length).str());
120-
121-
Expected<std::string> raw_json = ReadFull(*input, length);
122-
if (!raw_json)
123-
return handleErrors(
124-
raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error {
125-
return createStringError("unexpected EOF while reading JSON");
126-
});
127-
128-
Log(llvm::formatv("--> {0}", *raw_json).str());
129-
130-
return raw_json;
77+
// Store the remainder of the buffer for the next read callback.
78+
m_buffer = buf.substr(end_of_last_message);
79+
80+
return messages;
13181
}
13282

13383
Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
@@ -138,25 +88,29 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) {
13888

13989
std::string Output;
14090
raw_string_ostream OS(Output);
141-
OS << kHeaderContentLength << message.length() << kHeaderSeparator << message;
91+
OS << kHeaderContentLength << kHeaderFieldSeparator << ' ' << message.length()
92+
<< kHeaderSeparator << kHeaderSeparator << message;
14293
size_t num_bytes = Output.size();
14394
return m_output->Write(Output.data(), num_bytes).takeError();
14495
}
14596

146-
Expected<std::string>
147-
JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) {
148-
if (!m_input || !m_input->IsValid())
149-
return make_error<TransportInvalidError>();
150-
151-
IOObject *input = m_input.get();
152-
Expected<std::string> raw_json =
153-
ReadUntil(*input, kMessageSeparator, timeout);
154-
if (!raw_json)
155-
return raw_json.takeError();
156-
157-
Log(llvm::formatv("--> {0}", *raw_json).str());
158-
159-
return *raw_json;
97+
Expected<std::vector<std::string>> JSONRPCTransport::Parse() {
98+
std::vector<std::string> messages;
99+
StringRef buf = m_buffer;
100+
do {
101+
size_t idx = buf.find(kMessageSeparator);
102+
if (idx == StringRef::npos)
103+
break;
104+
std::string raw_json = buf.substr(0, idx).str();
105+
buf = buf.substr(idx + 1);
106+
Log(llvm::formatv("--> {0}", raw_json).str());
107+
messages.push_back(raw_json);
108+
} while (!buf.empty());
109+
110+
// Store the remainder of the buffer for the next read callback.
111+
m_buffer = buf.str();
112+
113+
return messages;
160114
}
161115

162116
Error JSONRPCTransport::WriteImpl(const std::string &message) {

lldb/test/API/tools/lldb-dap/io/TestDAP_io.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,18 @@ def test_invalid_header(self):
4848
lldb-dap handles invalid message headers.
4949
"""
5050
process = self.launch()
51-
process.stdin.write(b"not the corret message header")
51+
process.stdin.write(b"not the correct message header")
5252
process.stdin.close()
53-
self.assertEqual(process.wait(timeout=5.0), 1)
53+
self.assertEqual(process.wait(timeout=5.0), 0)
5454

5555
def test_partial_header(self):
5656
"""
57-
lldb-dap handles parital message headers.
57+
lldb-dap handles partial message headers.
5858
"""
5959
process = self.launch()
6060
process.stdin.write(b"Content-Length: ")
6161
process.stdin.close()
62-
self.assertEqual(process.wait(timeout=5.0), 1)
62+
self.assertEqual(process.wait(timeout=5.0), 0)
6363

6464
def test_incorrect_content_length(self):
6565
"""
@@ -68,7 +68,7 @@ def test_incorrect_content_length(self):
6868
process = self.launch()
6969
process.stdin.write(b"Content-Length: abc")
7070
process.stdin.close()
71-
self.assertEqual(process.wait(timeout=5.0), 1)
71+
self.assertEqual(process.wait(timeout=5.0), 0)
7272

7373
def test_partial_content_length(self):
7474
"""
@@ -77,4 +77,4 @@ def test_partial_content_length(self):
7777
process = self.launch()
7878
process.stdin.write(b"Content-Length: 10\r\n\r\n{")
7979
process.stdin.close()
80-
self.assertEqual(process.wait(timeout=5.0), 1)
80+
self.assertEqual(process.wait(timeout=5.0), 0)

0 commit comments

Comments
 (0)