Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 548627b

Browse files
isturdyg-easy
authored andcommitted
Delay creation of exporter threads until a handler is registered, and and do not include the time it takes to export in the export interval. (#20)
1 parent f50981b commit 548627b

4 files changed

Lines changed: 47 additions & 14 deletions

File tree

opencensus/stats/internal/stats_exporter.cc

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <thread> // NOLINT
1818

19+
#include "absl/memory/memory.h"
1920
#include "absl/synchronization/mutex.h"
2021
#include "absl/time/clock.h"
2122
#include "absl/time/time.h"
@@ -41,9 +42,15 @@ class StatsExporterImpl {
4142
views_.erase(std::string(name));
4243
}
4344

45+
// Adds a handler, which cannot be subsequently removed (except by
46+
// ClearHandlersForTesting()). The background thread is started when the
47+
// first handler is registered.
4448
void RegisterHandler(std::unique_ptr<StatsExporter::Handler> handler) {
4549
absl::MutexLock l(&mu_);
4650
handlers_.push_back(std::move(handler));
51+
if (!thread_started_) {
52+
StartExportThread();
53+
}
4754
}
4855

4956
void Export() {
@@ -59,7 +66,7 @@ class StatsExporterImpl {
5966
}
6067

6168
private:
62-
StatsExporterImpl() : t_(&StatsExporterImpl::RunWorkerLoop, this) {}
69+
StatsExporterImpl() {}
6370

6471
void SendToHandlers(const ViewDescriptor& descriptor, const ViewData& data)
6572
SHARED_LOCKS_REQUIRED(mu_) {
@@ -68,10 +75,20 @@ class StatsExporterImpl {
6875
}
6976
}
7077

78+
void StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(mu_) {
79+
t_ = std::thread(&StatsExporterImpl::RunWorkerLoop, this);
80+
thread_started_ = true;
81+
}
82+
7183
// Loops forever, calling Export() every export_interval_.
7284
void RunWorkerLoop() {
85+
absl::Time next_export_time = absl::Now() + export_interval_;
7386
while (true) {
74-
absl::SleepFor(export_interval_);
87+
// SleepFor() returns immediately when given a negative duration.
88+
absl::SleepFor(next_export_time - absl::Now());
89+
// In case the last export took longer than the export interval, we
90+
// calculate the next time from now.
91+
next_export_time = absl::Now() + export_interval_;
7592
Export();
7693
}
7794
}
@@ -83,7 +100,9 @@ class StatsExporterImpl {
83100
std::vector<std::unique_ptr<StatsExporter::Handler>> handlers_
84101
GUARDED_BY(mu_);
85102
std::unordered_map<std::string, std::unique_ptr<View>> views_ GUARDED_BY(mu_);
86-
std::thread t_;
103+
104+
bool thread_started_ GUARDED_BY(mu_) = false;
105+
std::thread t_ GUARDED_BY(mu_);
87106
};
88107

89108
void StatsExporter::AddView(const ViewDescriptor& view) {

opencensus/stats/internal/stats_exporter_test.cc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,19 @@ class MockExporter : public StatsExporter::Handler {
5858
std::vector<ViewDescriptor> actual_descriptors_;
5959
};
6060

61-
constexpr char kMeasureId[] = "first_measure_id";
61+
constexpr char kMeasureId[] = "test_measure_id";
62+
63+
MeasureDouble TestMeasure() {
64+
static MeasureDouble measure =
65+
MeasureRegistry::RegisterDouble(kMeasureId, "ops", "");
66+
return measure;
67+
}
6268

6369
class StatsExporterTest : public ::testing::Test {
6470
protected:
6571
void SetUp() {
72+
// Access the measure to ensure it has been registered.
73+
TestMeasure();
6674
descriptor1_.set_name("id1");
6775
descriptor1_.set_measure(kMeasureId);
6876
descriptor1_.set_aggregation(Aggregation::Count());
@@ -87,9 +95,6 @@ class StatsExporterTest : public ::testing::Test {
8795

8896
static void Export() { StatsExporter::ExportForTesting(); }
8997

90-
MeasureDouble first_measure_ = MeasureRegistry::RegisterDouble(
91-
kMeasureId, "ops", "Usage of resource 1.");
92-
9398
ViewDescriptor descriptor1_;
9499
ViewDescriptor descriptor1_edited_;
95100
ViewDescriptor descriptor2_;
@@ -128,7 +133,7 @@ TEST_F(StatsExporterTest, MultipleExporters) {
128133
TEST_F(StatsExporterTest, TimedExport) {
129134
MockExporter::Register({descriptor1_});
130135
StatsExporter::AddView(descriptor1_);
131-
absl::SleepFor(absl::Seconds(15));
136+
absl::SleepFor(absl::Seconds(11));
132137
}
133138

134139
} // namespace stats

opencensus/trace/internal/span_exporter_impl.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ SpanExporterImpl* SpanExporterImpl::Get() {
3535
// Create detached worker thread
3636
SpanExporterImpl::SpanExporterImpl(uint32_t buffer_size,
3737
absl::Duration interval)
38-
: buffer_size_(buffer_size),
39-
interval_(interval),
40-
size_(0),
41-
t_(&SpanExporterImpl::RunWorkerLoop, this) {}
38+
: buffer_size_(buffer_size), interval_(interval), size_(0) {}
4239

4340
void SpanExporterImpl::RegisterHandler(
4441
std::unique_ptr<SpanExporter::Handler> handler) {
4542
absl::MutexLock l(&handler_mu_);
4643
handlers_.emplace_back(std::move(handler));
44+
if (!thread_started_) {
45+
StartExportThread();
46+
}
4747
}
4848

4949
void SpanExporterImpl::AddSpan(
@@ -53,23 +53,30 @@ void SpanExporterImpl::AddSpan(
5353
size_.fetch_add(1, std::memory_order_acq_rel);
5454
}
5555

56+
void SpanExporterImpl::StartExportThread() {
57+
t_ = std::thread(&SpanExporterImpl::RunWorkerLoop, this);
58+
thread_started_ = true;
59+
}
60+
5661
void SpanExporterImpl::RunWorkerLoop() {
5762
std::vector<opencensus::trace::exporter::SpanData> span_data_;
5863
std::vector<std::shared_ptr<opencensus::trace::SpanImpl>> spans_copy_;
5964
// Thread loops forever.
6065
// TODO: Add in shutdown mechanism.
66+
absl::Time next_forced_export_time = absl::Now() + interval_;
6167
while (true) {
6268
{
6369
absl::MutexLock l(&span_mu_);
6470
// Wait until batch is full or interval time has been exceeded.
65-
span_mu_.AwaitWithTimeout(
71+
span_mu_.AwaitWithDeadline(
6672
absl::Condition(
6773
+[](SpanExporterImpl* ptr) {
6874
return (ptr->size_.load(std::memory_order_acquire) >=
6975
ptr->buffer_size_);
7076
},
7177
this),
72-
interval_);
78+
next_forced_export_time);
79+
next_forced_export_time = absl::Now() + interval_;
7380
if (spans_.empty()) {
7481
continue;
7582
}

opencensus/trace/internal/span_exporter_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class SpanExporterImpl {
6565
SpanExporterImpl& operator=(SpanExporterImpl&&) = delete;
6666
friend class Span;
6767

68+
void StartExportThread() EXCLUSIVE_LOCKS_REQUIRED(handler_mu_);
6869
void RunWorkerLoop();
6970
// Calls all registered handlers and exports the spans contained in span_data.
7071
void Export(const std::vector<SpanData>& span_data);
@@ -83,6 +84,7 @@ class SpanExporterImpl {
8384
GUARDED_BY(span_mu_);
8485
std::vector<std::unique_ptr<SpanExporter::Handler>> handlers_
8586
GUARDED_BY(handler_mu_);
87+
bool thread_started_ GUARDED_BY(handler_mu_) = false;
8688
std::thread t_;
8789
};
8890

0 commit comments

Comments
 (0)