Skip to content

Commit 3678b46

Browse files
authored
Introduced worker registry for better graceful coordinator shutdown (#20)
1 parent ef85286 commit 3678b46

12 files changed

+179
-14
lines changed

prod/native/libcommon/code/AgentGlobals.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,13 @@ AgentGlobals::AgentGlobals(std::shared_ptr<LoggerInterface> logger,
5959
opAmp_(std::make_shared<opentelemetry::php::transport::OpAmp>(logger_, config_, httpTransportAsync_, resourceDetector_)),
6060
sharedMemory_(std::make_shared<opentelemetry::php::SharedMemoryState>()),
6161
requestScope_(std::make_shared<opentelemetry::php::RequestScope>(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();}, [this]() { return coordinatorConfigProvider_->triggerUpdateIfChanged(); })),
62-
messagesDispatcher_(std::make_shared<opentelemetry::php::coordinator::CoordinatorMessagesDispatcher>(logger_, httpTransportAsync_)),
62+
workerRegistry_(std::make_shared<opentelemetry::php::coordinator::WorkerRegistry>(logger_)),
63+
messagesDispatcher_(std::make_shared<opentelemetry::php::coordinator::CoordinatorMessagesDispatcher>(logger_, httpTransportAsync_, workerRegistry_)),
6364
coordinatorConfigProvider_(std::make_shared<opentelemetry::php::coordinator::CoordinatorConfigurationProvider>(logger_, opAmp_)),
64-
coordinatorProcess_(std::make_shared<opentelemetry::php::coordinator::CoordinatorProcess>(logger_, messagesDispatcher_, coordinatorConfigProvider_))
65+
coordinatorProcess_(std::make_shared<opentelemetry::php::coordinator::CoordinatorProcess>(logger_, messagesDispatcher_, coordinatorConfigProvider_, workerRegistry_))
6566
{
66-
forkableRegistry_->registerForkable(httpTransportAsync_);
67-
forkableRegistry_->registerForkable(opAmp_);
67+
// forkableRegistry_->registerForkable(httpTransportAsync_);
68+
// forkableRegistry_->registerForkable(opAmp_);
6869

6970
configManager_->attachLogger(logger_);
7071

prod/native/libcommon/code/AgentGlobals.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "PhpSapi.h"
44
#include "Logger.h"
5+
#include "coordinator/WorkerRegistry.h"
56
#include <functional>
67
#include <memory>
78

@@ -39,6 +40,7 @@ namespace coordinator {
3940
class CoordinatorMessagesDispatcher;
4041
class CoordinatorProcess;
4142
class CoordinatorConfigurationProvider;
43+
class WorkerRegistry;
4244
} // namespace coordinator
4345
namespace transport {
4446
class CurlSender;
@@ -83,6 +85,7 @@ class AgentGlobals {
8385
std::shared_ptr<opentelemetry::php::transport::OpAmp> opAmp_;
8486
std::shared_ptr<SharedMemoryState> sharedMemory_;
8587
std::shared_ptr<RequestScope> requestScope_;
88+
std::shared_ptr<coordinator::WorkerRegistry> workerRegistry_;
8689
std::shared_ptr<coordinator::CoordinatorMessagesDispatcher> messagesDispatcher_;
8790
std::shared_ptr<coordinator::CoordinatorConfigurationProvider> coordinatorConfigProvider_;
8891
std::shared_ptr<coordinator::CoordinatorProcess> coordinatorProcess_;

prod/native/libcommon/code/coordinator/CoordinatorConfigurationProvider.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include "transport/OpAmp.h"
55

66
#include <memory>
7-
#include <span>
87
#include <string>
98
#include <utility>
109

prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "CoordinatorMessagesDispatcher.h"
2+
#include "LoggerInterface.h"
23
#include "coordinator/proto/CoordinatorCommands.pb.h"
34

45
namespace opentelemetry::php::coordinator {
@@ -60,6 +61,17 @@ void CoordinatorMessagesDispatcher::processRecievedMessage(const std::span<const
6061
httpTransport_->enqueue(p.endpoint_hash(), buf);
6162
break;
6263
}
64+
case coordinator::CoordinatorCommand::WORKER_STARTED: {
65+
ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Worker started");
66+
workerRegistry_->registerWorker(command.worker_started().process_id(), command.worker_started().parent_process_id());
67+
break;
68+
}
69+
case coordinator::CoordinatorCommand::WORKER_IS_GOING_TO_SHUTDOWN: {
70+
ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Worker pid: {} ppid: {} is going to shutdown", command.worker_is_going_to_shutdown().process_id(), command.worker_is_going_to_shutdown().parent_process_id());
71+
workerRegistry_->unregisterWorker(command.worker_is_going_to_shutdown().process_id());
72+
break;
73+
}
74+
6375
default:
6476
ELOG_WARNING(logger_, COORDINATOR, "CoordinatorMessagesDispatcher: Unknown CoordinatorCommand type={}", static_cast<int>(command.type()));
6577
break;

prod/native/libcommon/code/coordinator/CoordinatorMessagesDispatcher.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include "LoggerInterface.h"
66
#include "transport/HttpTransportAsyncInterface.h"
7+
#include "WorkerRegistry.h"
78

89
#include <memory>
910
#include <span>
@@ -14,7 +15,7 @@ namespace opentelemetry::php::coordinator {
1415

1516
class CoordinatorMessagesDispatcher {
1617
public:
17-
CoordinatorMessagesDispatcher(std::shared_ptr<LoggerInterface> logger, std::shared_ptr<transport::HttpTransportAsyncInterface> httpTransport) : logger_(std::move(logger)), httpTransport_(std::move(httpTransport)) {
18+
CoordinatorMessagesDispatcher(std::shared_ptr<LoggerInterface> logger, std::shared_ptr<transport::HttpTransportAsyncInterface> httpTransport, std::shared_ptr<WorkerRegistry> workerRegistry) : logger_(std::move(logger)), httpTransport_(std::move(httpTransport)), workerRegistry_(std::move(workerRegistry)) {
1819
}
1920

2021
~CoordinatorMessagesDispatcher() = default;
@@ -24,6 +25,7 @@ class CoordinatorMessagesDispatcher {
2425
private:
2526
std::shared_ptr<LoggerInterface> logger_;
2627
std::shared_ptr<transport::HttpTransportAsyncInterface> httpTransport_;
28+
std::shared_ptr<WorkerRegistry> workerRegistry_;
2729
};
2830

2931
}

prod/native/libcommon/code/coordinator/CoordinatorProcess.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ void CoordinatorProcess::coordinatorLoop() {
1313
unsigned int priority = 0;
1414

1515
try {
16-
if (commandQueue_->timed_receive(buffer, maxMqPayloadSize, receivedSize, priority, std::chrono::steady_clock::now() + std::chrono::milliseconds(10))) {
16+
if (commandQueue_->timed_receive(buffer, maxMqPayloadSize, receivedSize, priority, std::chrono::steady_clock::now() + std::chrono::milliseconds(100))) {
1717
processor_.processReceivedChunk(reinterpret_cast<const CoordinatorPayload *>(buffer), receivedSize);
1818
}
1919
} catch (std::exception &ex) {
@@ -28,10 +28,16 @@ void CoordinatorProcess::setupPeriodicTasks() {
2828
periodicTaskExecutor_ = std::make_unique<PeriodicTaskExecutor>(std::vector<PeriodicTaskExecutor::task_t>{[this](PeriodicTaskExecutor::time_point_t now) {
2929
// Check parent process is alive
3030
if (getppid() != parentProcessId_) {
31-
ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: parent process has exited, shutting down coordinator process");
32-
working_ = false;
31+
ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: parent process has exited. Checking if workers are still alive.");
32+
workerRegistry_->verifyWorkersAlive();
33+
if (workerRegistry_->getWorkerCount() > 0) {
34+
ELOG_DEBUG(logger_, COORDINATOR, "CoordinatorProcess: there are still {} alive workers, continuing work", workerRegistry_->getWorkerCount());
35+
} else {
36+
working_ = false;
37+
}
3338
}
3439

40+
3541
static auto lastCleanupTime = std::chrono::steady_clock::now();
3642
if (now - lastCleanupTime >= cleanUpLostMessagesInterval) {
3743
processor_.cleanupAbandonedMessages(now, std::chrono::seconds(10));

prod/native/libcommon/code/coordinator/CoordinatorProcess.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212

1313
#include <atomic>
1414
#include <chrono>
15-
#include <functional>
1615
#include <memory>
1716
#include <string>
1817

1918
#include "CoordinatorConfigurationProvider.h"
19+
#include "coordinator/WorkerRegistrar.h"
20+
#include "coordinator/WorkerRegistry.h"
2021

2122
namespace opentelemetry::php::coordinator {
2223

@@ -29,17 +30,19 @@ constexpr static std::chrono::minutes cleanUpLostMessagesInterval(1);
2930
class CoordinatorProcess : public boost::noncopyable, public ForkableInterface {
3031

3132
public:
32-
CoordinatorProcess(std::shared_ptr<LoggerInterface> logger, std::shared_ptr<CoordinatorMessagesDispatcher> messagesDispatcher, std::shared_ptr<CoordinatorConfigurationProvider> configProvider) : logger_(std::move(logger)), messagesDispatcher_(std::move(messagesDispatcher)), configProvider_(std::move(configProvider)) {
33+
CoordinatorProcess(std::shared_ptr<LoggerInterface> logger, std::shared_ptr<CoordinatorMessagesDispatcher> messagesDispatcher, std::shared_ptr<CoordinatorConfigurationProvider> configProvider, std::shared_ptr<WorkerRegistry> workerRegistry) : logger_(std::move(logger)), messagesDispatcher_(std::move(messagesDispatcher)), configProvider_(std::move(configProvider)), workerRegistry_(std::move(workerRegistry)) {
3334
}
3435
~CoordinatorProcess() {
3536
}
3637

3738
void prefork() final {
3839
periodicTaskExecutor_->prefork();
40+
workerRegistrar_->prefork();
3941
}
4042

4143
void postfork([[maybe_unused]] bool child) final {
4244
periodicTaskExecutor_->postfork(child);
45+
workerRegistrar_->postfork(child);
4346
}
4447

4548
// returns true in scope of forked CoordinatorProcess
@@ -83,6 +86,8 @@ class CoordinatorProcess : public boost::noncopyable, public ForkableInterface {
8386
CoordinatorTelemetrySignalsSender coordinatorSender_{logger_, [this](const std::string &payload) { return processor_.sendPayload(payload); }};
8487
std::shared_ptr<CoordinatorMessagesDispatcher> messagesDispatcher_;
8588
std::shared_ptr<CoordinatorConfigurationProvider> configProvider_;
89+
std::unique_ptr<WorkerRegistrar> workerRegistrar_{std::make_unique<WorkerRegistrar>(logger_, [this](const std::string &payload) { return processor_.sendPayload(payload); })};
90+
std::shared_ptr<WorkerRegistry> workerRegistry_;
8691

8792
int processId_ = 0;
8893
int parentProcessId_ = 0;

prod/native/libcommon/code/coordinator/CoordinatorTelemetrySignalsSender.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ class CoordinatorTelemetrySignalsSender : public transport::HttpTransportAsyncIn
2020

2121
~CoordinatorTelemetrySignalsSender() = default;
2222

23-
void initializeConnection(std::string endpointUrl, std::size_t endpointHash, std::string contentType, enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay, opentelemetry::php::transport::HttpEndpointSSLOptions sslOptions);
24-
void enqueue(std::size_t endpointHash, std::span<std::byte> payload, responseCallback_t callback = {});
25-
void updateRetryDelay(size_t endpointHash, std::chrono::milliseconds retryDelay) {
23+
void initializeConnection(std::string endpointUrl, std::size_t endpointHash, std::string contentType, enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay, opentelemetry::php::transport::HttpEndpointSSLOptions sslOptions) override;
24+
void enqueue(std::size_t endpointHash, std::span<std::byte> payload, responseCallback_t callback = {}) override;
25+
void updateRetryDelay(size_t endpointHash, std::chrono::milliseconds retryDelay) override {
2626
}
2727

2828
private:
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#include "WorkerRegistrar.h"
2+
3+
#include "os/OsUtils.h"
4+
#include "coordinator/proto/CoordinatorCommands.pb.h"
5+
6+
namespace opentelemetry::php::coordinator {
7+
8+
void WorkerRegistrar::registerWorker() {
9+
coordinator::WorkerStartedCommand command;
10+
command.set_process_id(osutils::getCurrentProcessId());
11+
command.set_parent_process_id(osutils::getParentProcessId());
12+
13+
coordinator::CoordinatorCommand coordCommand;
14+
coordCommand.set_type(coordinator::CoordinatorCommand::WORKER_STARTED);
15+
*coordCommand.mutable_worker_started() = command;
16+
17+
if (!sendPayload_(coordCommand.SerializeAsString())) {
18+
ELOG_DEBUG(logger_, COORDINATOR, "WorkerRegistrar: failed to send worker registration message to coordinator process");
19+
} else {
20+
ELOG_DEBUG(logger_, COORDINATOR, "WorkerRegistrar: sent worker registration message to coordinator process");
21+
}
22+
}
23+
24+
void WorkerRegistrar::unregisterWorker() {
25+
coordinator::WorkerIsGoingToShutdownCommand command;
26+
command.set_process_id(osutils::getCurrentProcessId());
27+
command.set_parent_process_id(osutils::getParentProcessId());
28+
29+
coordinator::CoordinatorCommand coordCommand;
30+
coordCommand.set_type(coordinator::CoordinatorCommand::WORKER_IS_GOING_TO_SHUTDOWN);
31+
*coordCommand.mutable_worker_is_going_to_shutdown() = command;
32+
33+
if (!sendPayload_(coordCommand.SerializeAsString())) {
34+
ELOG_DEBUG(logger_, COORDINATOR, "WorkerRegistrar: failed to send worker unregister message");
35+
} else {
36+
ELOG_DEBUG(logger_, COORDINATOR, "WorkerRegistrar: sent worker unregistration message to coordinator process");
37+
}
38+
}
39+
40+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#pragma once
2+
3+
4+
#include "LoggerInterface.h"
5+
#include "ForkableInterface.h"
6+
#include <functional>
7+
#include <memory>
8+
9+
namespace opentelemetry::php::coordinator {
10+
11+
class WorkerRegistrar : public ForkableInterface {
12+
public:
13+
using sendPayload_t = std::function<bool(std::string const &payload)>;
14+
15+
WorkerRegistrar(std::shared_ptr<LoggerInterface> logger, sendPayload_t sendPayload) : logger_(std::move(logger)), sendPayload_(std::move(sendPayload)) {
16+
}
17+
18+
~WorkerRegistrar() {
19+
unregisterWorker();
20+
}
21+
22+
void prefork() final {
23+
}
24+
25+
void postfork([[maybe_unused]] bool child) final {
26+
if (child) {
27+
registerWorker();
28+
}
29+
}
30+
31+
private:
32+
void registerWorker();
33+
void unregisterWorker();
34+
35+
std::shared_ptr<LoggerInterface> logger_;
36+
sendPayload_t sendPayload_;
37+
38+
};
39+
40+
}

0 commit comments

Comments
 (0)