Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 4e6891f

Browse files
author
Ian Sturdy
authored
Add the DeltaProducer frontend. (#111)
1 parent bb60b50 commit 4e6891f

6 files changed

Lines changed: 292 additions & 0 deletions

File tree

opencensus/stats/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ cc_library(
5757
"internal/aggregation.cc",
5858
"internal/aggregation_window.cc",
5959
"internal/bucket_boundaries.cc",
60+
"internal/delta_producer.cc",
6061
"internal/distribution.cc",
6162
"internal/measure.cc",
6263
"internal/measure_data.cc",
@@ -77,6 +78,7 @@ cc_library(
7778
"bucket_boundaries.h",
7879
"distribution.h",
7980
"internal/aggregation_window.h",
81+
"internal/delta_producer.h",
8082
"internal/measure_data.h",
8183
"internal/measure_registry_impl.h",
8284
"internal/set_aggregation_window.h",
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "opencensus/stats/internal/delta_producer.h"
16+
17+
#include <algorithm>
18+
#include <cstdint>
19+
#include <memory>
20+
#include <vector>
21+
22+
#include "absl/synchronization/mutex.h"
23+
#include "absl/time/clock.h"
24+
#include "absl/time/time.h"
25+
#include "opencensus/stats/bucket_boundaries.h"
26+
#include "opencensus/stats/internal/measure_data.h"
27+
#include "opencensus/stats/internal/measure_registry_impl.h"
28+
#include "opencensus/stats/internal/stats_manager.h"
29+
30+
namespace opencensus {
31+
namespace stats {
32+
33+
void Delta::Record(std::initializer_list<Measurement> measurements,
34+
TagSet tags) {
35+
auto it = delta_.find(tags);
36+
if (it == delta_.end()) {
37+
it = delta_.emplace_hint(it, std::piecewise_construct,
38+
std::make_tuple(std::move(tags)),
39+
std::make_tuple(std::vector<MeasureData>()));
40+
it->second.reserve(registered_boundaries_.size());
41+
for (const auto& boundaries_for_measure : registered_boundaries_) {
42+
it->second.emplace_back(boundaries_for_measure);
43+
}
44+
}
45+
for (const auto& measurement : measurements) {
46+
const uint64_t index = MeasureRegistryImpl::IdToIndex(measurement.id_);
47+
ABSL_ASSERT(index < registered_boundaries_.size());
48+
switch (MeasureRegistryImpl::IdToType(measurement.id_)) {
49+
case MeasureDescriptor::Type::kDouble:
50+
it->second[index].Add(measurement.value_double_);
51+
break;
52+
case MeasureDescriptor::Type::kInt64:
53+
it->second[index].Add(measurement.value_int_);
54+
break;
55+
}
56+
}
57+
}
58+
59+
void Delta::clear() {
60+
registered_boundaries_.clear();
61+
delta_.clear();
62+
}
63+
64+
void Delta::SwapAndReset(
65+
std::vector<std::vector<BucketBoundaries>>& registered_boundaries,
66+
Delta* other) {
67+
registered_boundaries_.swap(other->registered_boundaries_);
68+
delta_.swap(other->delta_);
69+
delta_.clear();
70+
registered_boundaries_ = registered_boundaries;
71+
}
72+
73+
DeltaProducer* DeltaProducer::Get() {
74+
static DeltaProducer* global_delta_producer = new DeltaProducer;
75+
return global_delta_producer;
76+
}
77+
78+
void DeltaProducer::AddMeasure() {
79+
delta_mu_.Lock();
80+
absl::MutexLock harvester_lock(&harvester_mu_);
81+
registered_boundaries_.push_back({});
82+
SwapDeltas();
83+
delta_mu_.Unlock();
84+
ConsumeLastDelta();
85+
}
86+
87+
void DeltaProducer::AddBoundaries(uint64_t index,
88+
const BucketBoundaries& boundaries) {
89+
delta_mu_.Lock();
90+
auto& measure_boundaries = registered_boundaries_[index];
91+
if (std::find(measure_boundaries.begin(), measure_boundaries.end(),
92+
boundaries) == measure_boundaries.end()) {
93+
absl::MutexLock harvester_lock(&harvester_mu_);
94+
measure_boundaries.push_back(boundaries);
95+
SwapDeltas();
96+
delta_mu_.Unlock();
97+
ConsumeLastDelta();
98+
} else {
99+
delta_mu_.Unlock();
100+
}
101+
}
102+
103+
void DeltaProducer::Record(std::initializer_list<Measurement> measurements,
104+
TagSet tags) {
105+
absl::MutexLock l(&delta_mu_);
106+
active_delta_.Record(measurements, std::move(tags));
107+
}
108+
109+
void DeltaProducer::Flush() {
110+
delta_mu_.Lock();
111+
absl::MutexLock harvester_lock(&harvester_mu_);
112+
SwapDeltas();
113+
delta_mu_.Unlock();
114+
ConsumeLastDelta();
115+
}
116+
117+
DeltaProducer::DeltaProducer()
118+
: harvester_thread_(&DeltaProducer::RunHarvesterLoop, this) {}
119+
120+
void DeltaProducer::SwapDeltas() {
121+
ABSL_ASSERT(last_delta_.delta().empty() && "Last delta was not consumed.");
122+
active_delta_.SwapAndReset(registered_boundaries_, &last_delta_);
123+
}
124+
125+
void DeltaProducer::ConsumeLastDelta() {
126+
// TODO: implement.
127+
last_delta_.clear();
128+
}
129+
130+
void DeltaProducer::RunHarvesterLoop() {
131+
absl::Time next_harvest_time = absl::Now() + harvest_interval_;
132+
while (true) {
133+
const absl::Time now = absl::Now();
134+
absl::SleepFor(next_harvest_time - now);
135+
// Account for the possibility that the last harvest took longer than
136+
// harvest_interval_ and we are already past next_harvest_time.
137+
next_harvest_time = std::max(next_harvest_time, now) + harvest_interval_;
138+
Flush();
139+
}
140+
}
141+
142+
void ExperimentalDeltaProducerRecord(
143+
std::initializer_list<Measurement> measurements, TagSet tags) {
144+
DeltaProducer::Get()->Record(measurements, std::move(tags));
145+
}
146+
147+
} // namespace stats
148+
} // namespace opencensus
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2018, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef OPENCENSUS_STATS_INTERNAL_DELTA_PRODUCER_H_
16+
#define OPENCENSUS_STATS_INTERNAL_DELTA_PRODUCER_H_
17+
18+
#include <cstdint>
19+
#include <memory>
20+
#include <thread>
21+
#include <unordered_map>
22+
#include <vector>
23+
24+
#include "absl/synchronization/mutex.h"
25+
#include "absl/time/time.h"
26+
#include "opencensus/stats/bucket_boundaries.h"
27+
#include "opencensus/stats/distribution.h"
28+
#include "opencensus/stats/internal/measure_data.h"
29+
#include "opencensus/stats/measure.h"
30+
#include "opencensus/stats/tag_set.h"
31+
32+
namespace opencensus {
33+
namespace stats {
34+
35+
// Delta is thread-compatible.
36+
class Delta final {
37+
public:
38+
void Record(std::initializer_list<Measurement> measurements, TagSet tags);
39+
40+
// Swaps registered_boundaries_ and delta_ with *other, clears delta_, and
41+
// updates registered_boundaries_.
42+
void SwapAndReset(
43+
std::vector<std::vector<BucketBoundaries>>& registered_boundaries,
44+
Delta* other);
45+
46+
// Clears registered_boundaries_ and delta_.
47+
void clear();
48+
49+
const std::unordered_map<TagSet, std::vector<MeasureData>, TagSet::Hash>&
50+
delta() const {
51+
return delta_;
52+
}
53+
54+
private:
55+
// A copy of registered_boundaries_ in the DeltaProducer as of when the
56+
// delta was started.
57+
std::vector<std::vector<BucketBoundaries>> registered_boundaries_;
58+
59+
// The actual data. Each MeasureData[] contains one element for each
60+
// registered measure.
61+
std::unordered_map<TagSet, std::vector<MeasureData>, TagSet::Hash> delta_;
62+
};
63+
64+
// DeltaProducer is thread-safe.
65+
class DeltaProducer final {
66+
public:
67+
// Returns a pointer to the singleton DeltaProducer.
68+
static DeltaProducer* Get();
69+
70+
// Adds a new Measure.
71+
void AddMeasure();
72+
73+
// Adds a new BucketBoundaries for the measure 'index' if it does not already
74+
// exist.
75+
void AddBoundaries(uint64_t index, const BucketBoundaries& boundaries);
76+
77+
void Record(std::initializer_list<Measurement> measurements, TagSet tags)
78+
LOCKS_EXCLUDED(delta_mu_);
79+
80+
// Flushes the active delta and blocks until it is harvested.
81+
void Flush() LOCKS_EXCLUDED(delta_mu_, harvester_mu_);
82+
83+
private:
84+
DeltaProducer();
85+
86+
// Flushing has two stages: swapping active_delta_ to last_delta_ and
87+
// consuming last_delta_. Callers should release delta_mu_ before calling
88+
// ConsumeLastDelta so that Record() is blocked for as little time as
89+
// possible. SwapDeltas should never be called without then calling
90+
// ConsumeLastDelta--otherwise the delta will be lost.
91+
void SwapDeltas() EXCLUSIVE_LOCKS_REQUIRED(delta_mu_, harvester_mu_);
92+
void ConsumeLastDelta() EXCLUSIVE_LOCKS_REQUIRED(harvester_mu_)
93+
LOCKS_EXCLUDED(delta_mu_);
94+
95+
// Loops flushing the active delta (calling SwapDeltas and ConsumeLastDelta())
96+
// every harvest_interval_.
97+
void RunHarvesterLoop();
98+
99+
const absl::Duration harvest_interval_ = absl::Seconds(5);
100+
101+
// Guards the active delta and its configuration. Anything that changes the
102+
// delta configuration (e.g. adding a measure or BucketBoundaries) must
103+
// acquire delta_mu_, update configuration, and call SwapDeltas() before
104+
// releasing delta_mu_ to prevent Record() from accessing the delta with
105+
// mismatched configuration.
106+
mutable absl::Mutex delta_mu_;
107+
108+
// The BucketBoundaries of each registered view with Distribution aggregation,
109+
// by measure. Array indices in the outer array correspond to measure indices.
110+
std::vector<std::vector<BucketBoundaries>> registered_boundaries_
111+
GUARDED_BY(delta_mu_);
112+
Delta active_delta_ GUARDED_BY(delta_mu_);
113+
114+
// Guards the last_delta_; acquired by the main thread when triggering a
115+
// flush.
116+
mutable absl::Mutex harvester_mu_ ACQUIRED_AFTER(delta_mu_);
117+
// TODO: consider making this a lockless queue to avoid blocking the main
118+
// thread when calling a flush during harvesting.
119+
Delta last_delta_ GUARDED_BY(harvester_mu_);
120+
std::thread harvester_thread_ GUARDED_BY(harvester_mu_);
121+
};
122+
123+
// TODO: Replace Record with this when ready.
124+
void ExperimentalDeltaProducerRecord(
125+
std::initializer_list<Measurement> measurements, TagSet tags = TagSet({}));
126+
127+
} // namespace stats
128+
} // namespace opencensus
129+
130+
#endif // OPENCENSUS_STATS_INTERNAL_DELTA_PRODUCER_H_

opencensus/stats/internal/measure_registry_impl.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <iostream>
1818

19+
#include "opencensus/stats/internal/delta_producer.h"
1920
#include "opencensus/stats/internal/stats_manager.h"
2021

2122
namespace opencensus {
@@ -47,6 +48,7 @@ MeasureDouble MeasureRegistryImpl::RegisterDouble(
4748
name, units, description, MeasureDescriptor::Type::kDouble)));
4849
if (measure.IsValid()) {
4950
StatsManager::Get()->AddMeasure(measure);
51+
DeltaProducer::Get()->AddMeasure();
5052
}
5153
return measure;
5254
}
@@ -58,6 +60,7 @@ MeasureInt MeasureRegistryImpl::RegisterInt(absl::string_view name,
5860
name, units, description, MeasureDescriptor::Type::kInt64)));
5961
if (measure.IsValid()) {
6062
StatsManager::Get()->AddMeasure(measure);
63+
DeltaProducer::Get()->AddMeasure();
6164
}
6265
return measure;
6366
}

opencensus/stats/internal/stats_manager.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
#include "absl/base/macros.h"
2121
#include "absl/memory/memory.h"
2222
#include "absl/time/time.h"
23+
#include "opencensus/stats/aggregation.h"
24+
#include "opencensus/stats/bucket_boundaries.h"
25+
#include "opencensus/stats/internal/delta_producer.h"
26+
#include "opencensus/stats/view_descriptor.h"
2327

2428
namespace opencensus {
2529
namespace stats {
@@ -178,6 +182,10 @@ StatsManager::ViewInformation* StatsManager::AddConsumer(
178182
return nullptr;
179183
}
180184
const uint64_t index = MeasureRegistryImpl::IdToIndex(descriptor.measure_id_);
185+
if (descriptor.aggregation().type() == Aggregation::Type::kDistribution) {
186+
DeltaProducer::Get()->AddBoundaries(
187+
index, descriptor.aggregation().bucket_boundaries());
188+
}
181189
return measures_[index].AddConsumer(descriptor);
182190
}
183191

opencensus/stats/measure.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Measurement final {
7070

7171
private:
7272
friend class StatsManager;
73+
friend class Delta;
7374

7475
const uint64_t id_;
7576
union {

0 commit comments

Comments
 (0)