|
| 1 | +package internal |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" |
| 6 | + "github.com/golang/protobuf/ptypes/wrappers" |
| 7 | + "go.uber.org/zap" |
| 8 | + "strings" |
| 9 | +) |
| 10 | + |
| 11 | +type timeseriesinfo struct { |
| 12 | + initial *metricspb.TimeSeries |
| 13 | + previous *metricspb.TimeSeries |
| 14 | +} |
| 15 | + |
| 16 | +type metricsInstanceMap map[string]*timeseriesinfo |
| 17 | + |
| 18 | +func newMetricsInstanceMap() *metricsInstanceMap { |
| 19 | + mim := metricsInstanceMap(make(map[string]*timeseriesinfo)) |
| 20 | + return &mim |
| 21 | +} |
| 22 | + |
| 23 | +func (mim *metricsInstanceMap) get(metric *metricspb.Metric, values []*metricspb.LabelValue) *timeseriesinfo { |
| 24 | + name := metric.GetMetricDescriptor().GetName() |
| 25 | + sig := getSignature(name, values) |
| 26 | + tsi, ok := (*mim)[sig] |
| 27 | + if !ok { |
| 28 | + tsi = ×eriesinfo{} |
| 29 | + (*mim)[sig] = tsi |
| 30 | + } |
| 31 | + return tsi |
| 32 | +} |
| 33 | + |
| 34 | +// create a unique signature consisting of a metric's name and label values |
| 35 | +func getSignature(name string, values []*metricspb.LabelValue) string { |
| 36 | + labelValues := make([]string, 0, len(values)) |
| 37 | + for _, label := range values { |
| 38 | + if label.GetValue() != "" { |
| 39 | + labelValues = append(labelValues, label.GetValue()) |
| 40 | + } |
| 41 | + } |
| 42 | + return fmt.Sprintf("%s,%s", name, strings.Join(labelValues, ",")) |
| 43 | +} |
| 44 | + |
| 45 | +// JobsMap maps from a job instance to a map of metric instances for the job. |
| 46 | +type JobsMap map[string]*metricsInstanceMap |
| 47 | + |
| 48 | +// NewJobsMap creates a new (empty) JobsMap. |
| 49 | +func NewJobsMap() *JobsMap { |
| 50 | + jm := JobsMap(make(map[string]*metricsInstanceMap)) |
| 51 | + return &jm |
| 52 | +} |
| 53 | + |
| 54 | +func (jm *JobsMap) get(job, instance string) *metricsInstanceMap { |
| 55 | + sig := job + ":" + instance |
| 56 | + metricsMap, ok := (*jm)[sig] |
| 57 | + if !ok { |
| 58 | + metricsMap = newMetricsInstanceMap() |
| 59 | + (*jm)[sig] = metricsMap |
| 60 | + } |
| 61 | + return metricsMap |
| 62 | +} |
| 63 | + |
| 64 | +// MetricsAdjuster takes a map from a metric instance to the initial point in the metrics instance |
| 65 | +// and provides AdjustMetrics, which takes a sequence of metrics and adjust their values based on |
| 66 | +// the initial points. |
| 67 | +type MetricsAdjuster struct { |
| 68 | + metricsMap *metricsInstanceMap |
| 69 | + logger *zap.SugaredLogger |
| 70 | +} |
| 71 | + |
| 72 | +// NewMetricsAdjuster is a constructor for MetricsAdjuster. |
| 73 | +func NewMetricsAdjuster(metricsMap *metricsInstanceMap, logger *zap.SugaredLogger) *MetricsAdjuster { |
| 74 | + return &MetricsAdjuster{ |
| 75 | + metricsMap: metricsMap, |
| 76 | + logger: logger, |
| 77 | + } |
| 78 | +} |
| 79 | + |
| 80 | +// AdjustMetrics takes a sequence of metrics and adjust their values based on the initial points in the |
| 81 | +// metricsMap. If the metric is the first point in the timeseries, or the timeseries has been reset, it is |
| 82 | +// removed from the sequence and added to the the metricsMap. |
| 83 | +func (ma *MetricsAdjuster) AdjustMetrics(metrics []*metricspb.Metric) []*metricspb.Metric { |
| 84 | + var adjusted = make([]*metricspb.Metric, 0, len(metrics)) |
| 85 | + for _, metric := range metrics { |
| 86 | + if ma.adjustMetric(metric) { |
| 87 | + adjusted = append(adjusted, metric) |
| 88 | + } |
| 89 | + } |
| 90 | + return adjusted |
| 91 | +} |
| 92 | + |
| 93 | +// returns true if at least one of the metric's timeseries was adjusted and false if all of the timeseries are an initial occurence or a reset. |
| 94 | +// Types of metrics returned supported by prometheus: |
| 95 | +// - MetricDescriptor_GAUGE_DOUBLE |
| 96 | +// - MetricDescriptor_GAUGE_DISTRIBUTION |
| 97 | +// - MetricDescriptor_CUMULATIVE_DOUBLE |
| 98 | +// - MetricDescriptor_CUMULATIVE_DISTRIBUTION |
| 99 | +// - MetricDescriptor_SUMMARY |
| 100 | +func (ma *MetricsAdjuster) adjustMetric(metric *metricspb.Metric) bool { |
| 101 | + switch metric.MetricDescriptor.Type { |
| 102 | + case metricspb.MetricDescriptor_GAUGE_DOUBLE, metricspb.MetricDescriptor_GAUGE_DISTRIBUTION: |
| 103 | + // gauges don't need to be adjusted so no additional processing is necessary |
| 104 | + return true |
| 105 | + default: |
| 106 | + return ma.adjustMetricTimeseries(metric) |
| 107 | + } |
| 108 | +} |
| 109 | + |
| 110 | +// Returns true if at least one of the metric's timeseries was adjusted and false if all of the timeseries are an initial occurence or a reset. |
| 111 | +func (ma *MetricsAdjuster) adjustMetricTimeseries(metric *metricspb.Metric) bool { |
| 112 | + filtered := make([]*metricspb.TimeSeries, 0, len(metric.GetTimeseries())) |
| 113 | + for _, current := range metric.GetTimeseries() { |
| 114 | + tsi := ma.metricsMap.get(metric, current.GetLabelValues()) |
| 115 | + if tsi.initial == nil { |
| 116 | + // initial timeseries |
| 117 | + tsi.initial = current |
| 118 | + tsi.previous = current |
| 119 | + } else { |
| 120 | + if ma.adjustTimeseries(metric.MetricDescriptor.Type, current, tsi.initial, tsi.previous) { |
| 121 | + tsi.previous = current |
| 122 | + filtered = append(filtered, current) |
| 123 | + } else { |
| 124 | + // reset timeseries |
| 125 | + tsi.initial = current |
| 126 | + tsi.previous = current |
| 127 | + } |
| 128 | + } |
| 129 | + } |
| 130 | + metric.Timeseries = filtered |
| 131 | + return len(filtered) > 0 |
| 132 | +} |
| 133 | + |
| 134 | +// returns true if 'current' was adjusted and false if 'current' is an the initial occurence or a reset of the timeseries. |
| 135 | +func (ma *MetricsAdjuster) adjustTimeseries(metricType metricspb.MetricDescriptor_Type, current, initial, previous *metricspb.TimeSeries) bool { |
| 136 | + if !ma.adjustPoints(metricType, current.GetPoints(), initial.GetPoints(), previous.GetPoints()) { |
| 137 | + return false |
| 138 | + } |
| 139 | + current.StartTimestamp = initial.StartTimestamp |
| 140 | + return true |
| 141 | +} |
| 142 | + |
| 143 | +func (ma *MetricsAdjuster) adjustPoints(metricType metricspb.MetricDescriptor_Type, current, initial, previous []*metricspb.Point) bool { |
| 144 | + if len(current) != 1 || len(initial) != 1 || len(current) != 1 { |
| 145 | + ma.logger.Infof("len(current): %v, len(initial): %v, len(previous): %v should all be 1", len(current), len(initial), len(previous)) |
| 146 | + return true |
| 147 | + } |
| 148 | + return ma.adjustPoint(metricType, current[0], initial[0], previous[0]) |
| 149 | +} |
| 150 | + |
| 151 | +// Note: There is an important, subtle point here. When a new timeseries or a reset is detected, current and initial are the same object. |
| 152 | +// When initial == previous, the previous value/count/sum are all the initial value. When initial != previous, the previous value/count/sum has |
| 153 | +// been adjusted wrt the initial value so both they must be combined to find the actual previous value/count/sum. This happens because the |
| 154 | +// timeseries are updated in-place - if new copies of the timeseries were created instead, previous could be used directly but this would |
| 155 | +// mean reallocating all of the metrics. |
| 156 | +func (ma *MetricsAdjuster) adjustPoint(metricType metricspb.MetricDescriptor_Type, current, initial, previous *metricspb.Point) bool { |
| 157 | + switch metricType { |
| 158 | + case metricspb.MetricDescriptor_CUMULATIVE_DOUBLE: |
| 159 | + currentValue := current.GetDoubleValue() |
| 160 | + initialValue := initial.GetDoubleValue() |
| 161 | + previousValue := initialValue |
| 162 | + if initial != previous { |
| 163 | + previousValue += previous.GetDoubleValue() |
| 164 | + } |
| 165 | + if currentValue < previousValue { |
| 166 | + // reset detected |
| 167 | + return false |
| 168 | + } |
| 169 | + current.Value = &metricspb.Point_DoubleValue{DoubleValue: currentValue - initialValue} |
| 170 | + case metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION: |
| 171 | + // note: sum of squared deviation not currently supported |
| 172 | + currentDist := current.GetDistributionValue() |
| 173 | + initialDist := initial.GetDistributionValue() |
| 174 | + previousCount := initialDist.Count |
| 175 | + previousSum := initialDist.Sum |
| 176 | + if initial != previous { |
| 177 | + previousCount += previous.GetDistributionValue().Count |
| 178 | + previousSum += previous.GetDistributionValue().Sum |
| 179 | + } |
| 180 | + if currentDist.Count < previousCount || currentDist.Sum < previousSum { |
| 181 | + // reset detected |
| 182 | + return false |
| 183 | + } |
| 184 | + currentDist.Count -= initialDist.Count |
| 185 | + currentDist.Sum -= initialDist.Sum |
| 186 | + ma.adjustBuckets(currentDist.Buckets, initialDist.Buckets) |
| 187 | + case metricspb.MetricDescriptor_SUMMARY: |
| 188 | + // note: for summary, we don't adjust the snapshot |
| 189 | + currentCount := current.GetSummaryValue().Count.GetValue() |
| 190 | + currentSum := current.GetSummaryValue().Sum.GetValue() |
| 191 | + initialCount := initial.GetSummaryValue().Count.GetValue() |
| 192 | + initialSum := initial.GetSummaryValue().Sum.GetValue() |
| 193 | + previousCount := initialCount |
| 194 | + previousSum := initialSum |
| 195 | + if initial != previous { |
| 196 | + previousCount += previous.GetSummaryValue().Count.GetValue() |
| 197 | + previousSum += previous.GetSummaryValue().Sum.GetValue() |
| 198 | + } |
| 199 | + if currentCount < previousCount || currentSum < previousSum { |
| 200 | + // reset detected |
| 201 | + return false |
| 202 | + } |
| 203 | + current.GetSummaryValue().Count = &wrappers.Int64Value{Value: currentCount - initialCount} |
| 204 | + current.GetSummaryValue().Sum = &wrappers.DoubleValue{Value: currentSum - initialSum} |
| 205 | + default: |
| 206 | + // this shouldn't happen |
| 207 | + ma.logger.Infof("adjust unexpect point type %v, skipping ...", metricType.String()) |
| 208 | + } |
| 209 | + return true |
| 210 | +} |
| 211 | + |
| 212 | +func (ma *MetricsAdjuster) adjustBuckets(current, initial []*metricspb.DistributionValue_Bucket) { |
| 213 | + if len(current) != len(initial) { |
| 214 | + // this shouldn't happen |
| 215 | + ma.logger.Infof("len(current buckets): %v != len(initial buckets): %v", len(current), len(initial)) |
| 216 | + } |
| 217 | + for i := 0; i < len(current); i++ { |
| 218 | + current[i].Count -= initial[i].Count |
| 219 | + } |
| 220 | +} |
0 commit comments