Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ _libmultiprocess_ is a library and code generator that allows calling C++ class

The `*.capnp` data definition files are consumed by the _libmultiprocess_ code generator and each `X.capnp` file generates `X.capnp.c++`, `X.capnp.h`, `X.capnp.proxy-client.c++`, `X.capnp.proxy-server.c++`, `X.capnp.proxy-types.c++`, `X.capnp.proxy-types.h`, and `X.capnp.proxy.h` output files. The generated files include `mp::ProxyClient<Interface>` and `mp::ProxyServer<Interface>` class specializations for all the interfaces in the `.capnp` files. These allow methods on C++ objects in one process to be called from other processes over IPC sockets.

The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object)` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.
The `ProxyServer` objects help translate IPC requests from a socket to method calls on a local object. The `ProxyServer` objects are just used internally by the `mp::ServeStream(loop, socket, wrapped_object)` and `mp::ListenConnections(loop, socket, wrapped_object[, max_connections])` functions, and aren't exposed externally. The `ProxyClient` classes are exposed, and returned from the `mp::ConnectStream(loop, socket)` function and meant to be used directly. The classes implement methods described in `.capnp` definitions, and whenever any method is called, a request with the method arguments is sent over the associated IPC connection, and the corresponding `wrapped_object` method on the other end of the connection is called, with the `ProxyClient` method blocking until it returns and forwarding back any return value to the `ProxyClient` method caller.

## Example

Expand Down
12 changes: 11 additions & 1 deletion doc/versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,18 @@ Library versions are tracked with simple
Versioning policy is described in the [version.h](../include/mp/version.h)
include.

## v11
## v12

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In e5eba30 doc/version: Bump version 11 > 12: can you add an explanation to the commit message why this needs a version bump?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, updated the commit message to explain why the version bump is needed in 188793e

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #269 (comment)

Done, updated the commit message to explain why the version bump is needed in 188793e

It does seem nice to bump the version number so client code in bitcoin core or elsewhere could potentially decide whether to call ListenConnections with the extra connection limit parameter, or omit it if not supported.

But I'd tweak the commit message to drop "before bumping the version to v12" because when v11 and v11.0 tags are created they will need to point to last merge commit before this PR. So they won't actually include these release notes. (This is confusing and one reason I want versions.md to move to a separate branch in #287 and #288).

- Current unstable version.
- Adds an optional per-listener `max_connections` parameter to `ListenConnections()`
so servers can stop accepting new connections when a local connection cap is reached,
and resume accepting after existing connections disconnect.

## [v11.0](https://github.com/bitcoin-core/libmultiprocess/commits/v11.0)
- Tolerates unexpected exceptions in event loop `post()` callbacks.
- Tolerates exceptions from remote destroy during cleanup in `ProxyClient`.
- Supports primitive `std::optional` struct fields in the code generator (`mpgen`).
- Adds `TypeName()` and improves debug log coverage for Proxy object lifecycle.
- Updates build compatibility with recent Nix and CMake versions.

## [v10.0](https://github.com/bitcoin-core/libmultiprocess/commits/v10.0)
- Increases spawn test timeout to avoid spurious failures.
Expand Down
69 changes: 53 additions & 16 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#include <capnp/rpc-twoparty.h>

#include <assert.h>
#include <algorithm>
#include <condition_variable>
#include <cstdlib>
#include <functional>
#include <kj/function.h>
#include <map>
Expand All @@ -25,6 +27,7 @@

namespace mp {
struct ThreadContext;
struct Listener;

struct InvokeContext
{
Expand Down Expand Up @@ -356,6 +359,9 @@ class EventLoop

//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_async_request_done;

//! Hook called on the server thread when the client has connected.
std::function<void()> testing_hook_connected;
};

//! Single element task queue used to handle recursive capnp calls. (If the
Expand Down Expand Up @@ -831,8 +837,8 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
//! handles requests from the stream by calling the init object. Embed the
//! ProxyServer in a Connection object that is stored and erased if
//! disconnected. This should be called from the event loop thread.
template <typename InitInterface, typename InitImpl>
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
template <typename InitInterface, typename InitImpl, typename OnDisconnect>
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init, OnDisconnect&& on_disconnect)
{
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
// Disable deleter so proxy server object doesn't attempt to delete the
Expand All @@ -842,23 +848,50 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
});
auto it = loop.m_incoming_connections.begin();
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
it->onDisconnect([&loop, it] {
if (loop.testing_hook_connected) loop.testing_hook_connected();
it->onDisconnect([&loop, it, on_disconnect = std::forward<OnDisconnect>(on_disconnect)]() mutable {
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
on_disconnect();
});
}

//! Given connection receiver and an init object, handle incoming connections by
//! calling _Serve, to create ProxyServer objects and forward requests to the
//! init object.
struct Listener
{
explicit Listener(kj::Own<kj::ConnectionReceiver>&& receiver, std::optional<size_t> max_connections)
: m_receiver(kj::mv(receiver)), m_max_connections(max_connections) {}

bool atCapacity() const
{
return m_max_connections && m_active_connections >= *m_max_connections;
}

//! Handle incoming connections by calling _Serve, to create ProxyServer
//! objects and forward requests to the init object.
template <typename InitInterface, typename InitImpl>
void listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<Listener>& self);

kj::Own<kj::ConnectionReceiver> m_receiver;
std::optional<size_t> m_max_connections;
size_t m_active_connections{0};
};

template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
void Listener::listen(EventLoop& loop, InitImpl& init, const std::shared_ptr<Listener>& self)
{
auto* ptr = listener.get();
loop.m_task_set->add(ptr->accept().then(
[&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
_Serve<InitInterface>(loop, kj::mv(stream), init);
_Listen<InitInterface>(loop, kj::mv(listener), init);
if (atCapacity()) return;

auto* receiver = m_receiver.get();
loop.m_task_set->add(receiver->accept().then(
[&loop, &init, self](kj::Own<kj::AsyncIoStream>&& stream) {
++self->m_active_connections;
_Serve<InitInterface>(loop, kj::mv(stream), init, [&loop, &init, self] {
const bool resume_accept{self->atCapacity()};
assert(self->m_active_connections > 0);
--self->m_active_connections;
if (resume_accept) self->listen<InitInterface>(loop, init, self);
});
self->listen<InitInterface>(loop, init, self);
}));
}

Expand All @@ -868,18 +901,22 @@ template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
{
_Serve<InitInterface>(
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init,
[] {});
}

//! Given listening socket file descriptor and an init object, handle incoming
//! connections and requests by calling methods on the Init object.
template <typename InitInterface, typename InitImpl>
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
void ListenConnections(EventLoop& loop, int fd, InitImpl& init, std::optional<size_t> max_connections = std::nullopt)
{
loop.sync([&]() {
_Listen<InitInterface>(loop,
auto listener{std::make_shared<Listener>(
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
max_connections)};
listener->listen<InitInterface>(loop, init, listener);
});
}

Expand Down
2 changes: 1 addition & 1 deletion include/mp/version.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! pointing at the prior merge commit. The /doc/versions.md file should also be
//! updated, noting any significant or incompatible changes made since the
//! previous version.
#define MP_MAJOR_VERSION 11
#define MP_MAJOR_VERSION 12

//! Minor version number. Should be incremented in stable branches after
//! backporting changes. The /doc/versions.md file should also be updated to
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test)
${MP_PROXY_HDRS}
mp/test/foo-types.h
mp/test/foo.h
mp/test/listen_tests.cpp
mp/test/spawn_tests.cpp
mp/test/test.cpp
)
Expand Down
Loading
Loading