diff --git a/doc/usage.md b/doc/usage.md index f387db4d..df103e9f 100644 --- a/doc/usage.md +++ b/doc/usage.md @@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient` and `mp::ProxyServer` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets. -The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller. +The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller. ## Example diff --git a/doc/versions.md b/doc/versions.md index 3cfa28e3..67b37651 100644 --- a/doc/versions.md +++ b/doc/versions.md @@ -7,8 +7,18 @@ Library versions are tracked with simple Versioning policy is described in the [version.h](../include/mp/version.h) include. -## v11 +## v12 - Current unstable version. +- Adds an optional per-listener `max_connections` parameter to `ListenConnections()` + so servers can stop accepting new connections when a local connection cap is reached, + and resume accepting after existing connections disconnect. + +## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0) +- Tolerates unexpected exceptions in event loop `post()` callbacks. +- Tolerates exceptions from remote destroy during cleanup in `ProxyClient`. +- Supports primitive `std::optional` struct fields in the code generator (`mpgen`). +- Adds `TypeName()` and improves debug log coverage for Proxy object lifecycle. +- Updates build compatibility with recent Nix and CMake versions. ## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0) - Increases spawn test timeout to avoid spurious failures. diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 092ea42e..03aa9f62 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -13,7 +13,9 @@ #include #include +#include #include +#include #include #include #include @@ -25,6 +27,7 @@ namespace mp { struct ThreadContext; +struct Listener; struct InvokeContext { @@ -356,6 +359,9 @@ class EventLoop //! Hook called on the worker thread just before returning results. std::function testing_hook_async_request_done; + + //! Hook called on the server thread when the client has connected. + std::function testing_hook_connected; }; //! Single element task queue used to handle recursive capnp calls. (If the @@ -831,8 +837,8 @@ std::unique_ptr> ConnectStream(EventLoop& loop, int f //! handles requests from the stream by calling the init object. Embed the //! ProxyServer in a Connection object that is stored and erased if //! disconnected. This should be called from the event loop thread. -template -void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init) +template +void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init, OnDisconnect&& on_disconnect) { loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) { // Disable deleter so proxy server object doesn't attempt to delete the @@ -842,23 +848,50 @@ void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init }); auto it = loop.m_incoming_connections.begin(); MP_LOG(loop, Log::Info) << "IPC server: socket connected."; - it->onDisconnect([&loop, it] { + if (loop.testing_hook_connected) loop.testing_hook_connected(); + it->onDisconnect([&loop, it, on_disconnect = std::forward(on_disconnect)]() mutable { MP_LOG(loop, Log::Info) << "IPC server: socket disconnected."; loop.m_incoming_connections.erase(it); + on_disconnect(); }); } -//! Given connection receiver and an init object, handle incoming connections by -//! calling _Serve, to create ProxyServer objects and forward requests to the -//! init object. +struct Listener +{ + explicit Listener(kj::Own&& receiver, std::optional max_connections) + : m_receiver(kj::mv(receiver)), m_max_connections(max_connections) {} + + bool atCapacity() const + { + return m_max_connections && m_active_connections >= *m_max_connections; + } + + //! Handle incoming connections by calling _Serve, to create ProxyServer + //! objects and forward requests to the init object. + template + void listen(EventLoop& loop, InitImpl& init, const std::shared_ptr& self); + + kj::Own m_receiver; + std::optional m_max_connections; + size_t m_active_connections{0}; +}; + template -void _Listen(EventLoop& loop, kj::Own&& listener, InitImpl& init) +void Listener::listen(EventLoop& loop, InitImpl& init, const std::shared_ptr& self) { - auto* ptr = listener.get(); - loop.m_task_set->add(ptr->accept().then( - [&loop, &init, listener = kj::mv(listener)](kj::Own&& stream) mutable { - _Serve(loop, kj::mv(stream), init); - _Listen(loop, kj::mv(listener), init); + if (atCapacity()) return; + + auto* receiver = m_receiver.get(); + loop.m_task_set->add(receiver->accept().then( + [&loop, &init, self](kj::Own&& stream) { + ++self->m_active_connections; + _Serve(loop, kj::mv(stream), init, [&loop, &init, self] { + const bool resume_accept{self->atCapacity()}; + assert(self->m_active_connections > 0); + --self->m_active_connections; + if (resume_accept) self->listen(loop, init, self); + }); + self->listen(loop, init, self); })); } @@ -868,18 +901,22 @@ template void ServeStream(EventLoop& loop, int fd, InitImpl& init) { _Serve( - loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); + loop, + loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), + init, + [] {}); } //! Given listening socket file descriptor and an init object, handle incoming //! connections and requests by calling methods on the Init object. template -void ListenConnections(EventLoop& loop, int fd, InitImpl& init) +void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional max_connections = std::nullopt) { loop.sync([&]() { - _Listen(loop, + auto listener{std::make_shared( loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), - init); + max_connections)}; + listener->listen(loop, init, listener); }); } diff --git a/include/mp/version.h b/include/mp/version.h index 423ed460..4587a288 100644 --- a/include/mp/version.h +++ b/include/mp/version.h @@ -24,7 +24,7 @@ //! pointing at the prior merge commit. The /doc/versions.md file should also be //! updated, noting any significant or incompatible changes made since the //! previous version. -#define MP_MAJOR_VERSION 11 +#define MP_MAJOR_VERSION 12 //! Minor version number. Should be incremented in stable branches after //! backporting changes. The /doc/versions.md file should also be updated to diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 1f21ba44..13246293 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test) ${MP_PROXY_HDRS} mp/test/foo-types.h mp/test/foo.h + mp/test/listen_tests.cpp mp/test/spawn_tests.cpp mp/test/test.cpp ) diff --git a/test/mp/test/listen_tests.cpp b/test/mp/test/listen_tests.cpp new file mode 100644 index 00000000..654271b5 --- /dev/null +++ b/test/mp/test/listen_tests.cpp @@ -0,0 +1,276 @@ +// Copyright (c) The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include + +namespace mp { +namespace test { +namespace { + +class UnixListener +{ +public: + UnixListener() + { + char dir_template[] = "/tmp/mptest-listener-XXXXXX"; + char* dir = mkdtemp(dir_template); + KJ_REQUIRE(dir != nullptr); + m_dir = dir; + m_path = m_dir + "/socket"; + + m_fd = socket(AF_UNIX, SOCK_STREAM, 0); + KJ_REQUIRE(m_fd >= 0); + + sockaddr_un addr{}; + addr.sun_family = AF_UNIX; + KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path)); + std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1); + KJ_REQUIRE(bind(m_fd, reinterpret_cast(&addr), sizeof(addr)) == 0); + KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0); + } + + ~UnixListener() + { + if (m_fd >= 0) close(m_fd); + if (!m_path.empty()) unlink(m_path.c_str()); + if (!m_dir.empty()) rmdir(m_dir.c_str()); + } + + int release() + { + int fd = m_fd; + m_fd = -1; + return fd; + } + + int Connect() const + { + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + KJ_REQUIRE(fd >= 0); + + sockaddr_un addr{}; + addr.sun_family = AF_UNIX; + KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path)); + std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1); + KJ_REQUIRE(connect(fd, reinterpret_cast(&addr), sizeof(addr)) == 0); + return fd; + } + +private: + int m_fd{-1}; + std::string m_dir; + std::string m_path; +}; + +class ClientSetup +{ +public: + explicit ClientSetup(int fd) + : thread([this, fd] { + EventLoop loop("mptest-client", [](mp::LogMessage log) { + KJ_LOG(INFO, log.level, log.message); + if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); + }); + client_promise.set_value(ConnectStream(loop, fd)); + loop.loop(); + }) + { + client = client_promise.get_future().get(); + } + + ~ClientSetup() + { + client.reset(); + thread.join(); + } + + std::promise>> client_promise; + std::unique_ptr> client; + + //! Thread variable should be after other struct members so the thread does + //! not start until the other members are initialized. + std::thread thread; +}; + +class ListenSetup +{ +public: + explicit ListenSetup(std::optional max_connections = std::nullopt) + : thread([this, max_connections] { + EventLoop loop("mptest-server", [this](mp::LogMessage log) { + KJ_LOG(INFO, log.level, log.message); + if (log.level == mp::Log::Raise) throw std::runtime_error(log.message); + if (log.message.find("IPC server: socket disconnected.") != std::string::npos) { + std::lock_guard lock(counter_mutex); + ++disconnected_count; + counter_cv.notify_all(); + } + }); + loop.testing_hook_connected = [&] { + std::lock_guard lock(counter_mutex); + ++connected_count; + counter_cv.notify_all(); + }; + m_loop_ref.emplace(loop); + FooImplementation foo; + ListenConnections(loop, listener.release(), foo, max_connections); + ready_promise.set_value(); + loop.loop(); + }) + { + ready_promise.get_future().get(); + } + + ~ListenSetup() + { + m_loop_ref.reset(); + thread.join(); + } + + size_t ConnectedCount() + { + std::lock_guard lock(counter_mutex); + return connected_count; + } + + void WaitForConnectedCount(size_t expected_count) + { + std::unique_lock lock(counter_mutex); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const bool matched = counter_cv.wait_until(lock, deadline, [&] { + return connected_count >= expected_count; + }); + KJ_REQUIRE(matched); + } + + void WaitForDisconnectedCount(size_t expected_count) + { + std::unique_lock lock(counter_mutex); + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); + const bool matched = counter_cv.wait_until(lock, deadline, [&] { + return disconnected_count >= expected_count; + }); + KJ_REQUIRE(matched); + } + + UnixListener listener; + std::promise ready_promise; + std::optional m_loop_ref; + std::mutex counter_mutex; + std::condition_variable counter_cv; + size_t connected_count{0}; + size_t disconnected_count{0}; + //! Thread variable should be after other struct members so the thread does + //! not start until the other members are initialized. + std::thread thread; +}; + +KJ_TEST("ListenConnections accepts incoming connections") +{ + ListenSetup setup; + auto client = std::make_unique(setup.listener.Connect()); + + setup.WaitForConnectedCount(1); + KJ_EXPECT(client->client->add(1, 2) == 3); +} + +KJ_TEST("ListenConnections enforces a local connection limit") +{ + ListenSetup setup(/*max_connections=*/1); + + auto client1 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(1); + KJ_EXPECT(client1->client->add(1, 2) == 3); + + auto client2 = std::make_unique(setup.listener.Connect()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + KJ_EXPECT(setup.ConnectedCount() == 1); + + client1.reset(); + setup.WaitForDisconnectedCount(1); + setup.WaitForConnectedCount(2); + + KJ_EXPECT(client2->client->add(2, 3) == 5); + + client2.reset(); + setup.WaitForDisconnectedCount(2); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto client3 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(3); + KJ_EXPECT(client3->client->add(3, 4) == 7); +} + +KJ_TEST("ListenConnections keeps capped listeners alive before reaching the limit") +{ + ListenSetup setup(/*max_connections=*/2); + + auto client1 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(1); + KJ_EXPECT(client1->client->add(1, 2) == 3); + + client1.reset(); + setup.WaitForDisconnectedCount(1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + auto client2 = std::make_unique(setup.listener.Connect()); + setup.WaitForConnectedCount(2); + KJ_EXPECT(client2->client->add(2, 3) == 5); +} + +KJ_TEST("ListenConnections accepts parallel connections") +{ + ListenSetup setup(/*max_connections=*/2); + + auto client1 = std::make_unique(setup.listener.Connect()); + auto client2 = std::make_unique(setup.listener.Connect()); + + setup.WaitForConnectedCount(2); + + KJ_EXPECT(client1->client->add(1, 2) == 3); + KJ_EXPECT(client2->client->add(2, 3) == 5); + + auto client3 = std::make_unique(setup.listener.Connect()); + + // Without this sync, ConnectedCount() == 2 might pass even if + // max_connections was not enforced because the event loop has not accepted + // client3 yet. + (**setup.m_loop_ref).sync([] {}); + KJ_EXPECT(setup.ConnectedCount() == 2); + + client1.reset(); + setup.WaitForDisconnectedCount(1); + setup.WaitForConnectedCount(3); + + KJ_EXPECT(client3->client->add(3, 4) == 7); +} + +} // namespace +} // namespace test +} // namespace mp