Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 7b57179

Browse files
dinoolivasongy23
authored andcommitted
Add garbage collection support to the Prometheus metrics adjuster (#613)
* Add cleanup of job map for jobs that have gone away * Add timeseries-level gc to the metrics_adjuster * Fix typo in comment * Update job and ts GC strategy to a standard mark-an-sweep approach * Expand comments on GC alternatives * Update code to use 'defer Unlock' where possible * Add comments about double-checking synchronized structures to verify that gc() is necessary
1 parent 6b475fc commit 7b57179

File tree

3 files changed

+279
-67
lines changed

3 files changed

+279
-67
lines changed

receiver/prometheusreceiver/internal/metrics_adjuster.go

Lines changed: 172 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,99 @@ import (
66
"github.com/golang/protobuf/ptypes/wrappers"
77
"go.uber.org/zap"
88
"strings"
9+
"sync"
10+
"time"
911
)
1012

13+
// Notes on garbage collection (gc):
14+
//
15+
// Job-level gc:
16+
// The Prometheus receiver will likely execute in a long running service whose lifetime may exceed
17+
// the lifetimes of many of the jobs that it is collecting from. In order to keep the JobsMap from
18+
// leaking memory for entries of no-longer existing jobs, the JobsMap needs to remove entries that
19+
// haven't been accessed for a long period of time.
20+
//
21+
// Timeseries-level gc:
22+
// Some jobs that the Prometheus receiver is collecting from may export timeseries based on metrics
23+
// from other jobs (e.g. cAdvisor). In order to keep the timeseriesMap from leaking memory for entries
24+
// of no-longer existing jobs, the timeseriesMap for each job needs to remove entries that haven't
25+
// been accessed for a long period of time.
26+
//
27+
// The gc strategy uses a standard mark-and-sweep approach - each time a timeseriesMap is accessed,
28+
// it is marked. Similarly, each time a timeseriesinfo is accessed, it is also marked.
29+
//
30+
// At the end of each JobsMap.get(), if the last time the JobsMap was gc'd exceeds the 'gcInterval',
31+
// the JobsMap is locked and any timeseriesMaps that are unmarked are removed from the JobsMap
32+
// otherwise the timeseriesMap is gc'd
33+
//
34+
// The gc for the timeseriesMap is straightforward - the map is locked and, for each timeseriesinfo
35+
// in the map, if it has not been marked, it is removed otherwise it is unmarked.
36+
//
37+
// Alternative Strategies
38+
// 1. If the job-level gc doesn't run often enough, or runs too often, a separate go routine can
39+
// be spawned at JobMap creation time that gc's at periodic intervals. This approach potentially
40+
// adds more contention and latency to each scrape so the current approach is used. Note that
41+
// the go routine will need to be cancelled upon StopMetricsReception().
42+
// 2. If the gc of each timeseriesMap during the gc of the JobsMap causes too much contention,
43+
// the gc of timeseriesMaps can be moved to the end of MetricsAdjuster().AdjustMetrics(). This
44+
// approach requires adding 'lastGC' Time and (potentially) a gcInterval duration to
45+
// timeseriesMap so the current approach is used instead.
46+
47+
// timeseriesinfo contains the information necessary to adjust from the initial point and to detect
48+
// resets.
1149
type timeseriesinfo struct {
50+
mark bool
1251
initial *metricspb.TimeSeries
1352
previous *metricspb.TimeSeries
1453
}
1554

16-
type metricsInstanceMap map[string]*timeseriesinfo
17-
18-
func newMetricsInstanceMap() *metricsInstanceMap {
19-
mim := metricsInstanceMap(make(map[string]*timeseriesinfo))
20-
return &mim
55+
// timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for
56+
// the instance.
57+
type timeseriesMap struct {
58+
sync.RWMutex
59+
mark bool
60+
tsiMap map[string]*timeseriesinfo
2161
}
2262

23-
func (mim *metricsInstanceMap) get(metric *metricspb.Metric, values []*metricspb.LabelValue) *timeseriesinfo {
63+
// Get the timeseriesinfo for the timeseries associated with the metric and label values.
64+
func (tsm *timeseriesMap) get(
65+
metric *metricspb.Metric, values []*metricspb.LabelValue) *timeseriesinfo {
2466
name := metric.GetMetricDescriptor().GetName()
25-
sig := getSignature(name, values)
26-
tsi, ok := (*mim)[sig]
67+
sig := getTimeseriesSignature(name, values)
68+
tsi, ok := tsm.tsiMap[sig]
2769
if !ok {
2870
tsi = &timeseriesinfo{}
29-
(*mim)[sig] = tsi
71+
tsm.tsiMap[sig] = tsi
3072
}
73+
tsm.mark = true
74+
tsi.mark = true
3175
return tsi
3276
}
3377

34-
// create a unique signature consisting of a metric's name and label values
35-
func getSignature(name string, values []*metricspb.LabelValue) string {
78+
// Remove timeseries that have aged out.
79+
func (tsm *timeseriesMap) gc() {
80+
tsm.Lock()
81+
defer tsm.Unlock()
82+
// this shouldn't happen under the current gc() strategy
83+
if !tsm.mark {
84+
return
85+
}
86+
for ts, tsi := range tsm.tsiMap {
87+
if !tsi.mark {
88+
delete(tsm.tsiMap, ts)
89+
} else {
90+
tsi.mark = false
91+
}
92+
}
93+
tsm.mark = false
94+
}
95+
96+
func newTimeseriesMap() *timeseriesMap {
97+
return &timeseriesMap{mark: true, tsiMap: map[string]*timeseriesinfo{}}
98+
}
99+
100+
// Create a unique timeseries signature consisting of the metric name and label values.
101+
func getTimeseriesSignature(name string, values []*metricspb.LabelValue) string {
36102
labelValues := make([]string, 0, len(values))
37103
for _, label := range values {
38104
if label.GetValue() != "" {
@@ -42,46 +108,86 @@ func getSignature(name string, values []*metricspb.LabelValue) string {
42108
return fmt.Sprintf("%s,%s", name, strings.Join(labelValues, ","))
43109
}
44110

45-
// JobsMap maps from a job instance to a map of metric instances for the job.
46-
type JobsMap map[string]*metricsInstanceMap
111+
// JobsMap maps from a job instance to a map of timeseries instances for the job.
112+
type JobsMap struct {
113+
sync.RWMutex
114+
gcInterval time.Duration
115+
lastGC time.Time
116+
jobsMap map[string]*timeseriesMap
117+
}
47118

48119
// NewJobsMap creates a new (empty) JobsMap.
49-
func NewJobsMap() *JobsMap {
50-
jm := JobsMap(make(map[string]*metricsInstanceMap))
51-
return &jm
120+
func NewJobsMap(gcInterval time.Duration) *JobsMap {
121+
return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)}
52122
}
53123

54-
func (jm *JobsMap) get(job, instance string) *metricsInstanceMap {
124+
// Remove jobs and timeseries that have aged out.
125+
func (jm *JobsMap) gc() {
126+
jm.Lock()
127+
defer jm.Unlock()
128+
// once the structure is locked, confrim that gc() is still necessary
129+
if time.Now().Sub(jm.lastGC) > jm.gcInterval {
130+
for sig, tsm := range jm.jobsMap {
131+
if !tsm.mark {
132+
delete(jm.jobsMap, sig)
133+
} else {
134+
tsm.gc()
135+
}
136+
}
137+
jm.lastGC = time.Now()
138+
}
139+
}
140+
141+
func (jm *JobsMap) maybeGC() {
142+
// speculatively check if gc() is necessary, recheck once the structure is locked
143+
if time.Now().Sub(jm.lastGC) > jm.gcInterval {
144+
go jm.gc()
145+
}
146+
}
147+
148+
func (jm *JobsMap) get(job, instance string) *timeseriesMap {
55149
sig := job + ":" + instance
56-
metricsMap, ok := (*jm)[sig]
57-
if !ok {
58-
metricsMap = newMetricsInstanceMap()
59-
(*jm)[sig] = metricsMap
150+
jm.RLock()
151+
tsm, ok := jm.jobsMap[sig]
152+
jm.RUnlock()
153+
defer jm.maybeGC()
154+
if ok {
155+
return tsm
156+
}
157+
jm.Lock()
158+
defer jm.Unlock()
159+
tsm2, ok2 := jm.jobsMap[sig]
160+
if ok2 {
161+
return tsm2
60162
}
61-
return metricsMap
163+
tsm2 = newTimeseriesMap()
164+
jm.jobsMap[sig] = tsm2
165+
return tsm2
62166
}
63167

64168
// MetricsAdjuster takes a map from a metric instance to the initial point in the metrics instance
65169
// and provides AdjustMetrics, which takes a sequence of metrics and adjust their values based on
66170
// the initial points.
67171
type MetricsAdjuster struct {
68-
metricsMap *metricsInstanceMap
69-
logger *zap.SugaredLogger
172+
tsm *timeseriesMap
173+
logger *zap.SugaredLogger
70174
}
71175

72176
// NewMetricsAdjuster is a constructor for MetricsAdjuster.
73-
func NewMetricsAdjuster(metricsMap *metricsInstanceMap, logger *zap.SugaredLogger) *MetricsAdjuster {
177+
func NewMetricsAdjuster(tsm *timeseriesMap, logger *zap.SugaredLogger) *MetricsAdjuster {
74178
return &MetricsAdjuster{
75-
metricsMap: metricsMap,
76-
logger: logger,
179+
tsm: tsm,
180+
logger: logger,
77181
}
78182
}
79183

80-
// AdjustMetrics takes a sequence of metrics and adjust their values based on the initial points in the
81-
// metricsMap. If the metric is the first point in the timeseries, or the timeseries has been reset, it is
82-
// removed from the sequence and added to the the metricsMap.
184+
// AdjustMetrics takes a sequence of metrics and adjust their values based on the initial and
185+
// previous points in the timeseriesMap. If the metric is the first point in the timeseries, or the
186+
// timeseries has been reset, it is removed from the sequence and added to the the timeseriesMap.
83187
func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) []*metricspb.Metric {
84188
var adjusted = make([]*metricspb.Metric, 0, len(metrics))
189+
ma.tsm.Lock()
190+
defer ma.tsm.Unlock()
85191
for _, metric := range metrics {
86192
if ma.adjustMetric(metric) {
87193
adjusted = append(adjusted, metric)
@@ -90,7 +196,9 @@ func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) []*metrics
90196
return adjusted
91197
}
92198

93-
// returns true if at least one of the metric's timeseries was adjusted and false if all of the timeseries are an initial occurence or a reset.
199+
// Returns true if at least one of the metric's timeseries was adjusted and false if all of the
200+
// timeseries are an initial occurence or a reset.
201+
//
94202
// Types of metrics returned supported by prometheus:
95203
// - MetricDescriptor_GAUGE_DOUBLE
96204
// - MetricDescriptor_GAUGE_DISTRIBUTION
@@ -107,17 +215,19 @@ func (ma *MetricsAdjuster) adjustMetric(metric *metricspb.Metric) bool {
107215
}
108216
}
109217

110-
// Returns true if at least one of the metric's timeseries was adjusted and false if all of the timeseries are an initial occurence or a reset.
218+
// Returns true if at least one of the metric's timeseries was adjusted and false if all of the
219+
// timeseries are an initial occurence or a reset.
111220
func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) bool {
112221
filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries()))
113222
for _, current := range metric.GetTimeseries() {
114-
tsi := ma.metricsMap.get(metric, current.GetLabelValues())
223+
tsi := ma.tsm.get(metric, current.GetLabelValues())
115224
if tsi.initial == nil {
116225
// initial timeseries
117226
tsi.initial = current
118227
tsi.previous = current
119228
} else {
120-
if ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) {
229+
if ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial,
230+
tsi.previous) {
121231
tsi.previous = current
122232
filtered = append(filtered, current)
123233
} else {
@@ -131,29 +241,38 @@ func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) bool
131241
return len(filtered) > 0
132242
}
133243

134-
// returns true if 'current' was adjusted and false if 'current' is an the initial occurence or a reset of the timeseries.
135-
func (ma *MetricsAdjuster) adjustTimeseries(metricType metricspb.MetricDescriptor_Type, current, initial, previous *metricspb.TimeSeries) bool {
136-
if !ma.adjustPoints(metricType, current.GetPoints(), initial.GetPoints(), previous.GetPoints()) {
244+
// Returns true if 'current' was adjusted and false if 'current' is an the initial occurence or a
245+
// reset of the timeseries.
246+
func (ma *MetricsAdjuster) adjustTimeseries(metricType metricspb.MetricDescriptor_Type,
247+
current, initial, previous *metricspb.TimeSeries) bool {
248+
if !ma.adjustPoints(
249+
metricType, current.GetPoints(), initial.GetPoints(), previous.GetPoints()) {
137250
return false
138251
}
139252
current.StartTimestamp = initial.StartTimestamp
140253
return true
141254
}
142255

143-
func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Type, current, initial, previous []*metricspb.Point) bool {
256+
func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Type,
257+
current, initial, previous []*metricspb.Point) bool {
144258
if len(current) != 1 || len(initial) != 1 || len(current) != 1 {
145-
ma.logger.Infof("len(current): %v, len(initial): %v, len(previous): %v should all be 1", len(current), len(initial), len(previous))
259+
ma.logger.Infof(
260+
"len(current): %v, len(initial): %v, len(previous): %v should all be 1",
261+
len(current), len(initial), len(previous))
146262
return true
147263
}
148264
return ma.adjustPoint(metricType, current[0], initial[0], previous[0])
149265
}
150266

151-
// Note: There is an important, subtle point here. When a new timeseries or a reset is detected, current and initial are the same object.
152-
// When initial == previous, the previous value/count/sum are all the initial value. When initial != previous, the previous value/count/sum has
153-
// been adjusted wrt the initial value so both they must be combined to find the actual previous value/count/sum. This happens because the
154-
// timeseries are updated in-place - if new copies of the timeseries were created instead, previous could be used directly but this would
155-
// mean reallocating all of the metrics.
156-
func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type, current, initial, previous *metricspb.Point) bool {
267+
// Note: There is an important, subtle point here. When a new timeseries or a reset is detected,
268+
// current and initial are the same object. When initial == previous, the previous value/count/sum
269+
// are all the initial value. When initial != previous, the previous value/count/sum has been
270+
// adjusted wrt the initial value so both they must be combined to find the actual previous
271+
// value/count/sum. This happens because the timeseries are updated in-place - if new copies of the
272+
// timeseries were created instead, previous could be used directly but this would mean reallocating
273+
// all of the metrics.
274+
func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type,
275+
current, initial, previous *metricspb.Point) bool {
157276
switch metricType {
158277
case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE:
159278
currentValue := current.GetDoubleValue()
@@ -166,7 +285,8 @@ func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Typ
166285
// reset detected
167286
return false
168287
}
169-
current.Value = &metricspb.Point_DoubleValue{DoubleValue: currentValue - initialValue}
288+
current.Value =
289+
&metricspb.Point_DoubleValue{DoubleValue: currentValue - initialValue}
170290
case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION:
171291
// note: sum of squared deviation not currently supported
172292
currentDist := current.GetDistributionValue()
@@ -200,8 +320,10 @@ func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Typ
200320
// reset detected
201321
return false
202322
}
203-
current.GetSummaryValue().Count = &wrappers.Int64Value{Value: currentCount - initialCount}
204-
current.GetSummaryValue().Sum = &wrappers.DoubleValue{Value: currentSum - initialSum}
323+
current.GetSummaryValue().Count =
324+
&wrappers.Int64Value{Value: currentCount - initialCount}
325+
current.GetSummaryValue().Sum =
326+
&wrappers.DoubleValue{Value: currentSum - initialSum}
205327
default:
206328
// this shouldn't happen
207329
ma.logger.Infof("adjust unexpect point type %v, skipping ...", metricType.String())
@@ -212,7 +334,8 @@ func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Typ
212334
func (ma *MetricsAdjuster) adjustBuckets(current, initial []*metricspb.DistributionValue_Bucket) {
213335
if len(current) != len(initial) {
214336
// this shouldn't happen
215-
ma.logger.Infof("len(current buckets): %v != len(initial buckets): %v", len(current), len(initial))
337+
ma.logger.Infof("len(current buckets): %v != len(initial buckets): %v",
338+
len(current), len(initial))
216339
}
217340
for i := 0; i < len(current); i++ {
218341
current[i].Count -= initial[i].Count

0 commit comments

Comments
 (0)