Skip to content

Commit dafc44c

Browse files
authored
Improved dependency ownership in coordinator and globals (#21) (#23)
1 parent 3678b46 commit dafc44c

32 files changed

+421
-463
lines changed

prod/native/extension/code/InternalFunctionInstrumentation.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
#pragma once
22

33
#include "AutoZval.h"
4-
#include "PhpBridgeInterface.h"
54
#include "LoggerInterface.h"
6-
#include "InternalFunctionInstrumentationStorage.h"
75
#include "InstrumentedFunctionHooksStorage.h"
86
#include <string_view>
97
#include <Zend/zend_observer.h>

prod/native/extension/code/ModuleEntry.cpp

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,22 @@
1111
#include "ModuleInit.h"
1212

1313
#include "AutoZval.h"
14-
#include "os/OsUtils.h"
1514
#include "config/OptionValueProvider.h"
16-
17-
#include "CallOnScopeExit.h"
18-
#include "ConfigurationManager.h"
15+
#include "coordinator/CoordinatorSharedDataQueue.h"
16+
#include "coordinator/CoordinatorProcess.h"
17+
#include "CommonUtils.h"
18+
#include "os/OsUtils.h"
1919
#include "InferredSpans.h"
20-
#include "InstrumentedFunctionHooksStorage.h"
2120
#include "InternalFunctionInstrumentation.h"
2221
#include "Logger.h"
2322
#include "ModuleInfo.h"
2423
#include "ModuleFunctions.h"
25-
#include "PeriodicTaskExecutor.h"
2624
#include "PhpBridge.h"
2725
#include "PhpBridgeInterface.h"
2826
#include "RequestScope.h"
29-
#include "SharedMemoryState.h"
30-
#include "transport/OpAmp.h"
27+
#include "ResourceDetector.h"
28+
#include "SigSegvHandler.h"
29+
#include "VendorCustomizationsInterface.h"
3130

3231
ZEND_DECLARE_MODULE_GLOBALS( opentelemetry_distro )
3332

@@ -56,22 +55,79 @@ PHP_MINFO_FUNCTION(opentelemetry_distro) {
5655
opentelemetry::php::printPhpInfo(zend_module);
5756
}
5857

58+
namespace opentelemetry::php {
59+
60+
void forkCoordinatorProcess(std::shared_ptr<opentelemetry::php::LoggerInterface> logger, std::function<void(opentelemetry::php::ConfigurationSnapshot const &)> loggerConfigUpdateFunc, std::shared_ptr<coordinator::CoordinatorSharedDataQueue> shareDataQueue, std::shared_ptr<opentelemetry::php::config::OptionValueProvider> optionValueProvider, std::shared_ptr<coordinator::CoordinatorConfigurationProvider> coordinatorConfigProvider, std::shared_ptr<PhpBridgeInterface> phpBridge) {
61+
auto parentProcessId = getpid();
62+
auto processId = fork();
63+
if (processId < 0) {
64+
if (logger) {
65+
ELOG_DEBUG(logger, COORDINATOR, "CoordinatorProcess: fork() failed: {} ({})", strerror(errno), errno);
66+
}
67+
} else if (processId == 0) {
68+
ELOG_DEBUG(logger, COORDINATOR, "CoordinatorProcess starting collector process");
69+
registerSigSegvHandler(logger.get());
70+
71+
auto resourceDetector = std::make_shared<opentelemetry::php::ResourceDetector>(std::move(phpBridge));
72+
73+
opentelemetry::php::coordinator::CoordinatorProcess(parentProcessId, processId, logger, loggerConfigUpdateFunc, ::getVendorCustomizations ? ::getVendorCustomizations() : nullptr, optionValueProvider, std::move(shareDataQueue), coordinatorConfigProvider, std::move(resourceDetector)).start();
74+
ELOG_DEBUG(logger, COORDINATOR, "CoordinatorProcess: collector process is going to finish");
75+
std::exit(0);
76+
} else {
77+
if (logger) {
78+
ELOG_DEBUG(logger, COORDINATOR, "CoordinatorProcess parent process continues initialization");
79+
}
80+
}
81+
}
82+
83+
} // namespace opentelemetry::php
5984

6085
static PHP_GINIT_FUNCTION(opentelemetry_distro) {
6186
//TODO for ZTS logger must be initialized in MINIT! (share fd between threads) - different lifecycle
6287

63-
//TODO store in globals and allow watch for config change (change of level)
6488
auto logSinkStdErr = std::make_shared<opentelemetry::php::LoggerSinkStdErr>();
6589
auto logSinkSysLog = std::make_shared<opentelemetry::php::LoggerSinkSysLog>();
6690
auto logSinkFile = std::make_shared<opentelemetry::php::LoggerSinkFile>();
6791

6892
auto logger = std::make_shared<opentelemetry::php::Logger>(std::vector<std::shared_ptr<opentelemetry::php::LoggerSinkInterface>>{logSinkStdErr, logSinkSysLog, logSinkFile});
6993

94+
auto loggerConfigUpdateFunc = [logger = logger, stderrsink = logSinkStdErr, syslogsink = logSinkSysLog, filesink = logSinkFile](opentelemetry::php::ConfigurationSnapshot const &cfg) {
95+
stderrsink->setLevel(cfg.log_level_stderr);
96+
syslogsink->setLevel(cfg.log_level_syslog);
97+
if (filesink) {
98+
if (cfg.log_file.empty()) {
99+
filesink->setLevel(LogLevel::logLevel_off);
100+
} else {
101+
filesink->setLevel(cfg.log_level_file);
102+
filesink->reopen(opentelemetry::utils::getParameterizedString(cfg.log_file));
103+
}
104+
}
105+
106+
logger->setLogFeatures(opentelemetry::utils::parseLogFeatures(logger, cfg.log_features));
107+
};
108+
70109
ELOGF_DEBUG(logger, MODULE, "%s: GINIT called; parent PID: %d", __FUNCTION__, static_cast<int>(opentelemetry::osutils::getParentProcessId()));
71110
opentelemetry_distro_globals->globals = nullptr;
72111

112+
auto shareDataQueue = std::make_shared<opentelemetry::php::coordinator::CoordinatorSharedDataQueue>(logger);
113+
auto coordinatorConfigProvider = std::make_shared<opentelemetry::php::coordinator::CoordinatorConfigurationProvider>(logger);
114+
73115
auto phpBridge = std::make_shared<opentelemetry::php::PhpBridge>(logger);
74116

117+
auto optionValueProvider = std::make_shared<opentelemetry::php::config::OptionValueProvider>([](std::string_view iniName) -> std::optional<std::string> {
118+
auto val = cfg_get_entry(iniName.data(), iniName.length());
119+
120+
opentelemetry::php::AutoZval autoZval(val);
121+
auto optStringView = autoZval.getOptStringView();
122+
if (!optStringView.has_value()) {
123+
return std::nullopt;
124+
}
125+
126+
return std::string(*optStringView);
127+
});
128+
129+
forkCoordinatorProcess(logger, loggerConfigUpdateFunc, shareDataQueue, optionValueProvider, coordinatorConfigProvider, phpBridge);
130+
75131
auto hooksStorage = std::make_shared<opentelemetry::php::InstrumentedFunctionHooksStorage_t>();
76132

77133
auto inferredSpans = std::make_shared<opentelemetry::php::InferredSpans>([interruptFlag = reinterpret_cast<void *>(&EG(vm_interrupt))]() {
@@ -85,19 +141,7 @@ static PHP_GINIT_FUNCTION(opentelemetry_distro) {
85141
});
86142

87143
try {
88-
auto optionValueProvider = std::make_shared<opentelemetry::php::config::OptionValueProvider>([](std::string_view iniName) -> std::optional<std::string> {
89-
auto val = cfg_get_entry(iniName.data(), iniName.length());
90-
91-
opentelemetry::php::AutoZval autoZval(val);
92-
auto optStringView = autoZval.getOptStringView();
93-
if (!optStringView.has_value()) {
94-
return std::nullopt;
95-
}
96-
97-
return std::string(*optStringView);
98-
});
99-
100-
opentelemetry_distro_globals->globals = new opentelemetry::php::AgentGlobals(logger, std::move(logSinkStdErr), std::move(logSinkSysLog), std::move(logSinkFile), std::move(phpBridge), std::move(hooksStorage), std::move(inferredSpans), std::move(optionValueProvider));
144+
opentelemetry_distro_globals->globals = new opentelemetry::php::AgentGlobals(logger, loggerConfigUpdateFunc, std::move(phpBridge), std::move(hooksStorage), std::move(inferredSpans), std::move(shareDataQueue), std::move(coordinatorConfigProvider), std::move(optionValueProvider));
101145
} catch (std::exception const &e) {
102146
ELOGF_CRITICAL(logger, MODULE, "Unable to allocate AgentGlobals. '%s'", e.what());
103147
}

prod/native/extension/code/ModuleFunctions.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11

22
#include "ModuleFunctions.h"
3+
#include "ConfigurationStorage.h"
34
#include "LoggerInterface.h"
45
#include "LogFeature.h"
56
#include "ModuleGlobals.h"
@@ -220,7 +221,7 @@ PHP_FUNCTION(initialize) {
220221
ZEND_HASH_FOREACH_END();
221222

222223
opentelemetry::php::transport::HttpEndpointSSLOptions sslOptions = getSSLOptionsForSignalsEndpoint(std::string_view(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)));
223-
OTEL_GL(coordinatorProcess_)->getCoordinatorSender().initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(timeout)), static_cast<std::size_t>(maxRetries), std::chrono::milliseconds(retryDelay), sslOptions);
224+
OTEL_GL(httpTransportAsync_)->initializeConnection(std::string(ZSTR_VAL(endpoint), ZSTR_LEN(endpoint)), ZSTR_HASH(endpoint), std::string(ZSTR_VAL(contentType), ZSTR_LEN(contentType)), endpointHeaders, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::duration<double>(timeout)), static_cast<std::size_t>(maxRetries), std::chrono::milliseconds(retryDelay), sslOptions);
224225
}
225226

226227
ZEND_BEGIN_ARG_INFO_EX(enqueue_arginfo, 0, 0, 2)
@@ -236,7 +237,7 @@ PHP_FUNCTION(enqueue) {
236237
Z_PARAM_STR(payload)
237238
ZEND_PARSE_PARAMETERS_END();
238239

239-
OTEL_GL(coordinatorProcess_)->getCoordinatorSender().enqueue(ZSTR_HASH(endpoint), std::span<std::byte>(reinterpret_cast<std::byte *>(ZSTR_VAL(payload)), ZSTR_LEN(payload)));
240+
OTEL_GL(httpTransportAsync_)->enqueue(ZSTR_HASH(endpoint), std::span<std::byte>(reinterpret_cast<std::byte *>(ZSTR_VAL(payload)), ZSTR_LEN(payload)));
240241
}
241242

242243
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(set_object_property_value_arginfo, 0, 3, _IS_BOOL, 0)

prod/native/extension/code/ModuleInit.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
#include "otel_distro_version.h"
66
#include "CommonUtils.h"
77
#include "ConfigurationManager.h"
8+
#include "ConfigurationStorage.h"
89
#include "ConfigurationSnapshot.h"
10+
#include "coordinator/CoordinatorConfigurationProvider.h"
911
#include "ForkHandler.h"
1012
#include "Hooking.h"
1113
#include "InternalFunctionInstrumentation.h"
1214
#include "ModuleIniEntries.h"
1315
#include "ModuleGlobals.h"
1416
#include "SigSegvHandler.h"
1517
#include "os/OsUtils.h"
16-
#include "coordinator/CoordinatorProcess.h"
1718
#include "VendorCustomizationsInterface.h"
1819

1920
#include <curl/curl.h>
@@ -73,11 +74,6 @@ void moduleInit(int moduleType, int moduleNumber) {
7374
return;
7475
}
7576

76-
if (globals->coordinatorProcess_->start()) {
77-
delete globals;
78-
std::exit(0);
79-
}
80-
8177
// add config update watcher in worker process
8278
globals->coordinatorConfigProvider_->addConfigUpdateWatcher([globals](opentelemetry::php::coordinator::CoordinatorConfigurationProvider::configFiles_t const &cfgFiles) {
8379
ELOG_DEBUG(globals->logger_, COORDINATOR, "Received config update with {} files. Updating dynamic config and global config storage", cfgFiles.size());

prod/native/extension/code/SigSegvHandler.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#include <signal.h>
99

10+
namespace opentelemetry::php {
11+
1012
namespace signalHandlerData {
1113
typedef void (*OsSignalHandler)(int);
1214
static OsSignalHandler oldSigSegvHandler = nullptr;
@@ -48,4 +50,6 @@ void unregisterSigSegvHandler() {
4850
signal(SIGSEGV, signalHandlerData::oldSigSegvHandler);
4951
signalHandlerData::oldSigSegvHandler = nullptr;
5052
}
51-
}
53+
}
54+
55+
} // namespace opentelemetry::php

prod/native/extension/code/SigSegvHandler.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22

33
#include "LoggerInterface.h"
44

5+
namespace opentelemetry::php {
56
void registerSigSegvHandler(opentelemetry::php::LoggerInterface *logger);
6-
void unregisterSigSegvHandler();
7+
void unregisterSigSegvHandler();
8+
} // namespace opentelemetry::php

prod/native/libcommon/code/AgentGlobals.cpp

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,77 +19,59 @@
1919
#include "coordinator/CoordinatorProcess.h"
2020
#include "coordinator/CoordinatorMessagesDispatcher.h"
2121
#include "coordinator/CoordinatorConfigurationProvider.h"
22+
#include "coordinator/CoordinatorSharedDataQueue.h"
23+
#include "coordinator/CoordinatorTelemetrySignalsSender.h"
24+
#include "coordinator/WorkerRegistrar.h"
2225
#include "transport/HttpTransportAsync.h"
2326
#include "transport/OpAmp.h"
2427
#include "DependencyAutoLoaderGuard.h"
2528
#include "VendorCustomizationsInterface.h"
29+
#include <memory>
2630
#include <signal.h>
2731

2832
namespace opentelemetry::php {
2933
// clang-format off
3034

3135
AgentGlobals::AgentGlobals(std::shared_ptr<LoggerInterface> logger,
32-
std::shared_ptr<LoggerSinkInterface> logSinkStdErr,
33-
std::shared_ptr<LoggerSinkInterface> logSinkSysLog,
34-
std::shared_ptr<LoggerSinkFile> logSinkFile,
36+
std::function<void(opentelemetry::php::ConfigurationSnapshot const &)> loggerConfigUpdateFunc,
3537
std::shared_ptr<PhpBridgeInterface> bridge,
3638
std::shared_ptr<InstrumentedFunctionHooksStorageInterface> hooksStorage,
3739
std::shared_ptr<InferredSpans> inferredSpans,
40+
std::shared_ptr<coordinator::CoordinatorSharedDataQueue> sharedDataQueue,
41+
std::shared_ptr<coordinator::CoordinatorConfigurationProvider> sharedCoordinatorConfigProvider,
3842
std::shared_ptr<config::OptionValueProviderInterface> defaultOptionValueProvider) :
3943
vendorCustomizations_(::getVendorCustomizations ? ::getVendorCustomizations() : nullptr),
4044
forkableRegistry_(std::make_shared<ForkableRegistry>()),
41-
configManager_(std::make_shared<ConfigurationManager>(
45+
configManager_(std::make_shared<ConfigurationManager>(logger,
4246
std::make_shared<config::PrioritizedOptionValueProviderChain>(std::initializer_list<std::pair<int, std::shared_ptr<config::OptionValueProviderInterface>>>{
4347
{0, defaultOptionValueProvider},
4448
vendorCustomizations_ ? vendorCustomizations_->getOptionValueProvider() : std::pair<int, std::shared_ptr<opentelemetry::php::config::OptionValueProviderInterface>>{0, nullptr} // create dummy pair if vendor customizations or its option provider is not available, to avoid checks in PrioritizedOptionValueProviderChain
4549
}))),
4650
config_(std::make_shared<opentelemetry::php::ConfigurationStorage>([this](ConfigurationSnapshot &cfg) { return configManager_->updateIfChanged(cfg); })),
4751
logger_(std::move(logger)),
48-
logSinkStdErr_(std::move(logSinkStdErr)),
49-
logSinkSysLog_(std::move(logSinkSysLog)),
50-
logSinkFile_(std::move(logSinkFile)),
5152
bridge_(std::move(bridge)),
53+
sharedMemory_(std::make_shared<opentelemetry::php::SharedMemoryState>()),
54+
coordinatorConfigProvider_(std::move(sharedCoordinatorConfigProvider)),
55+
processor_(std::make_shared<opentelemetry::php::coordinator::ChunkedMessageProcessor>(logger_, sharedDataQueue, [](const std::span<const std::byte> data) { })),
56+
httpTransportAsync_(std::make_shared<opentelemetry::php::coordinator::CoordinatorTelemetrySignalsSender>(logger_, [this](std::string const &payload) { return processor_->sendPayload(payload); })),
5257
dependencyAutoLoaderGuard_(std::make_shared<DependencyAutoLoaderGuard>(bridge_, logger_)),
5358
hooksStorage_(std::move(hooksStorage)),
5459
sapi_(std::make_shared<opentelemetry::php::PhpSapi>(bridge_->getPhpSapiName())),
5560
inferredSpans_(std::move(inferredSpans)),
5661
periodicTaskExecutor_(),
57-
httpTransportAsync_(std::make_shared<opentelemetry::php::transport::HttpTransportAsync<>>(logger_, config_)),
58-
resourceDetector_(std::make_shared<opentelemetry::php::ResourceDetector>(bridge_)),
59-
opAmp_(std::make_shared<opentelemetry::php::transport::OpAmp>(logger_, config_, httpTransportAsync_, resourceDetector_)),
60-
sharedMemory_(std::make_shared<opentelemetry::php::SharedMemoryState>()),
6162
requestScope_(std::make_shared<opentelemetry::php::RequestScope>(logger_, bridge_, sapi_, sharedMemory_, dependencyAutoLoaderGuard_, inferredSpans_, config_, [hs = hooksStorage_]() { hs->clear(); }, [this]() { return getPeriodicTaskExecutor();}, [this]() { return coordinatorConfigProvider_->triggerUpdateIfChanged(); })),
62-
workerRegistry_(std::make_shared<opentelemetry::php::coordinator::WorkerRegistry>(logger_)),
63-
messagesDispatcher_(std::make_shared<opentelemetry::php::coordinator::CoordinatorMessagesDispatcher>(logger_, httpTransportAsync_, workerRegistry_)),
64-
coordinatorConfigProvider_(std::make_shared<opentelemetry::php::coordinator::CoordinatorConfigurationProvider>(logger_, opAmp_)),
65-
coordinatorProcess_(std::make_shared<opentelemetry::php::coordinator::CoordinatorProcess>(logger_, messagesDispatcher_, coordinatorConfigProvider_, workerRegistry_))
63+
workerRegistrar_(std::make_shared<opentelemetry::php::coordinator::WorkerRegistrar>(logger_, [this](const std::string &payload) { return processor_->sendPayload(payload); }))
6664
{
67-
// forkableRegistry_->registerForkable(httpTransportAsync_);
68-
// forkableRegistry_->registerForkable(opAmp_);
65+
forkableRegistry_->registerForkable(workerRegistrar_);
6966

70-
configManager_->attachLogger(logger_);
71-
72-
config_->addConfigUpdateWatcher([logger = logger_, stderrsink = logSinkStdErr_, syslogsink = logSinkSysLog_, filesink = logSinkFile_](ConfigurationSnapshot const &cfg) {
73-
stderrsink->setLevel(cfg.log_level_stderr);
74-
syslogsink->setLevel(cfg.log_level_syslog);
75-
if (filesink) {
76-
if (cfg.log_file.empty()) {
77-
filesink->setLevel(LogLevel::logLevel_off);
78-
} else {
79-
filesink->setLevel(cfg.log_level_file);
80-
filesink->reopen(utils::getParameterizedString(cfg.log_file));
81-
}
82-
}
83-
84-
logger->setLogFeatures(utils::parseLogFeatures(logger, cfg.log_features));
85-
});
86-
}
67+
config_->addConfigUpdateWatcher(loggerConfigUpdateFunc);
68+
}
8769

8870

8971
AgentGlobals::~AgentGlobals() {
9072
ELOG_DEBUG(logger_, MODULE, "AgentGlobals shutdown");
9173
config_->removeAllConfigUpdateWatchers();
92-
opAmp_->removeAllConfigUpdateWatchers();
74+
forkableRegistry_->clear();
9375
}
9476

9577
std::shared_ptr<PeriodicTaskExecutor> AgentGlobals::getPeriodicTaskExecutor() {

0 commit comments

Comments
 (0)