From 696ad6138af43aabcb74d3d55121420ae4bca08a Mon Sep 17 00:00:00 2001 From: Jonas Devlieghere Date: Tue, 17 Jun 2025 14:38:44 -0500 Subject: [PATCH] [lldb] Support non-blocking reads in JSONRPCTransport Support non-blocking reads for JSONRPCTransport so we can implement a multiplexed reader using the MainLoop. Pavel pointed out in #143628 that the implementation there (which was using blocking reads) can easily to reading partial JSON RPC packets. --- lldb/include/lldb/Host/JSONTransport.h | 19 +++++++--- lldb/source/Host/common/JSONTransport.cpp | 42 +++++++++++++--------- lldb/unittests/Host/JSONTransportTest.cpp | 43 ++++++++++++++++++++--- 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/lldb/include/lldb/Host/JSONTransport.h b/lldb/include/lldb/Host/JSONTransport.h index 4087cdf2b42f7..36a67c929a1c6 100644 --- a/lldb/include/lldb/Host/JSONTransport.h +++ b/lldb/include/lldb/Host/JSONTransport.h @@ -85,7 +85,8 @@ class JSONTransport { /// Reads the next message from the input stream. template - llvm::Expected Read(const std::chrono::microseconds &timeout) { + llvm::Expected + Read(std::optional timeout = std::nullopt) { llvm::Expected message = ReadImpl(timeout); if (!message) return message.takeError(); @@ -97,10 +98,20 @@ class JSONTransport { virtual llvm::Error WriteImpl(const std::string &message) = 0; virtual llvm::Expected - ReadImpl(const std::chrono::microseconds &timeout) = 0; + ReadImpl(std::optional timeout) = 0; + + llvm::Expected + ReadFull(IOObject &descriptor, size_t length, + std::optional timeout) const; + + llvm::Expected + ReadUntil(IOObject &descriptor, llvm::StringRef delimiter, + std::optional timeout); lldb::IOObjectSP m_input; lldb::IOObjectSP m_output; + + std::string m_buffer; }; /// A transport class for JSON with a HTTP header. @@ -113,7 +124,7 @@ class HTTPDelimitedJSONTransport : public JSONTransport { protected: virtual llvm::Error WriteImpl(const std::string &message) override; virtual llvm::Expected - ReadImpl(const std::chrono::microseconds &timeout) override; + ReadImpl(std::optional timeout) override; // FIXME: Support any header. static constexpr llvm::StringLiteral kHeaderContentLength = @@ -131,7 +142,7 @@ class JSONRPCTransport : public JSONTransport { protected: virtual llvm::Error WriteImpl(const std::string &message) override; virtual llvm::Expected - ReadImpl(const std::chrono::microseconds &timeout) override; + ReadImpl(std::optional timeout) override; static constexpr llvm::StringLiteral kMessageSeparator = "\n"; }; diff --git a/lldb/source/Host/common/JSONTransport.cpp b/lldb/source/Host/common/JSONTransport.cpp index 1a0851d5c4365..0fae74fb87b68 100644 --- a/lldb/source/Host/common/JSONTransport.cpp +++ b/lldb/source/Host/common/JSONTransport.cpp @@ -27,9 +27,9 @@ using namespace lldb_private; /// ReadFull attempts to read the specified number of bytes. If EOF is /// encountered, an empty string is returned. -static Expected -ReadFull(IOObject &descriptor, size_t length, - std::optional timeout = std::nullopt) { +Expected JSONTransport::ReadFull( + IOObject &descriptor, size_t length, + std::optional timeout) const { if (!descriptor.IsValid()) return llvm::make_error(); @@ -67,19 +67,22 @@ ReadFull(IOObject &descriptor, size_t length, return data.substr(0, length); } -static Expected -ReadUntil(IOObject &descriptor, StringRef delimiter, - std::optional timeout = std::nullopt) { - std::string buffer; - buffer.reserve(delimiter.size() + 1); - while (!llvm::StringRef(buffer).ends_with(delimiter)) { +Expected +JSONTransport::ReadUntil(IOObject &descriptor, StringRef delimiter, + std::optional timeout) { + if (!timeout || *timeout != std::chrono::microseconds::zero()) { + m_buffer.clear(); + m_buffer.reserve(delimiter.size() + 1); + } + + while (!llvm::StringRef(m_buffer).ends_with(delimiter)) { Expected next = - ReadFull(descriptor, buffer.empty() ? delimiter.size() : 1, timeout); + ReadFull(descriptor, m_buffer.empty() ? delimiter.size() : 1, timeout); if (auto Err = next.takeError()) return std::move(Err); - buffer += *next; + m_buffer += *next; } - return buffer.substr(0, buffer.size() - delimiter.size()); + return m_buffer.substr(0, m_buffer.size() - delimiter.size()); } JSONTransport::JSONTransport(IOObjectSP input, IOObjectSP output) @@ -89,11 +92,15 @@ void JSONTransport::Log(llvm::StringRef message) { LLDB_LOG(GetLog(LLDBLog::Host), "{0}", message); } -Expected -HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { +Expected HTTPDelimitedJSONTransport::ReadImpl( + std::optional timeout) { if (!m_input || !m_input->IsValid()) return llvm::make_error(); + if (timeout && *timeout == std::chrono::microseconds::zero()) + return llvm::createStringError( + "HTTPDelimitedJSONTransport does not support non-blocking reads"); + IOObject *input = m_input.get(); Expected message_header = ReadFull(*input, kHeaderContentLength.size(), timeout); @@ -104,7 +111,8 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { kHeaderContentLength, *message_header) .str()); - Expected raw_length = ReadUntil(*input, kHeaderSeparator); + Expected raw_length = + ReadUntil(*input, kHeaderSeparator, timeout); if (!raw_length) return handleErrors(raw_length.takeError(), [&](const TransportEOFError &E) -> llvm::Error { @@ -117,7 +125,7 @@ HTTPDelimitedJSONTransport::ReadImpl(const std::chrono::microseconds &timeout) { return createStringError( formatv("invalid content length {0}", *raw_length).str()); - Expected raw_json = ReadFull(*input, length); + Expected raw_json = ReadFull(*input, length, timeout); if (!raw_json) return handleErrors( raw_json.takeError(), [&](const TransportEOFError &E) -> llvm::Error { @@ -143,7 +151,7 @@ Error HTTPDelimitedJSONTransport::WriteImpl(const std::string &message) { } Expected -JSONRPCTransport::ReadImpl(const std::chrono::microseconds &timeout) { +JSONRPCTransport::ReadImpl(std::optional timeout) { if (!m_input || !m_input->IsValid()) return make_error(); diff --git a/lldb/unittests/Host/JSONTransportTest.cpp b/lldb/unittests/Host/JSONTransportTest.cpp index 4621869887ac8..cc43d7d851cb1 100644 --- a/lldb/unittests/Host/JSONTransportTest.cpp +++ b/lldb/unittests/Host/JSONTransportTest.cpp @@ -16,7 +16,7 @@ using namespace lldb_private; namespace { template class JSONTransportTest : public PipeTest { protected: - std::unique_ptr transport; + std::unique_ptr transport; void SetUp() override { PipeTest::SetUp(); @@ -36,7 +36,13 @@ class HTTPDelimitedJSONTransportTest using JSONTransportTest::JSONTransportTest; }; -class JSONRPCTransportTest : public JSONTransportTest { +class TestJSONRPCTransport : public JSONRPCTransport { +public: + using JSONRPCTransport::JSONRPCTransport; + using JSONRPCTransport::WriteImpl; // For partial writes. +}; + +class JSONRPCTransportTest : public JSONTransportTest { public: using JSONTransportTest::JSONTransportTest; }; @@ -84,7 +90,6 @@ TEST_F(HTTPDelimitedJSONTransportTest, ReadWithEOF) { Failed()); } - TEST_F(HTTPDelimitedJSONTransportTest, InvalidTransport) { transport = std::make_unique(nullptr, nullptr); ASSERT_THAT_EXPECTED( @@ -142,13 +147,43 @@ TEST_F(JSONRPCTransportTest, Write) { } TEST_F(JSONRPCTransportTest, InvalidTransport) { - transport = std::make_unique(nullptr, nullptr); + transport = std::make_unique(nullptr, nullptr); ASSERT_THAT_EXPECTED( transport->Read(std::chrono::milliseconds(1)), Failed()); } #ifndef _WIN32 +TEST_F(HTTPDelimitedJSONTransportTest, NonBlockingRead) { + ASSERT_THAT_EXPECTED( + transport->Read(std::chrono::microseconds::zero()), + llvm::FailedWithMessage( + "HTTPDelimitedJSONTransport does not support non-blocking reads")); +} + +TEST_F(JSONRPCTransportTest, NonBlockingRead) { + llvm::StringRef head = R"({"str")"; + llvm::StringRef tail = R"(: "foo"})" + "\n"; + + ASSERT_THAT_EXPECTED(input.Write(head.data(), head.size()), Succeeded()); + ASSERT_THAT_EXPECTED( + transport->Read(std::chrono::microseconds::zero()), + Failed()); + + ASSERT_THAT_EXPECTED(input.Write(tail.data(), tail.size()), Succeeded()); + while (true) { + llvm::Expected result = + transport->Read(std::chrono::microseconds::zero()); + if (result.errorIsA()) { + llvm::consumeError(result.takeError()); + continue; + } + ASSERT_THAT_EXPECTED(result, HasValue(testing::FieldsAre(/*str=*/"foo"))); + break; + } +} + TEST_F(HTTPDelimitedJSONTransportTest, ReadWithTimeout) { ASSERT_THAT_EXPECTED( transport->Read(std::chrono::milliseconds(1)),