Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 6521526

Browse files
author
Paulo Janotti
authored
Add in-memory tail-sampling prototype (#372)
* Add in-memory tail-sampling prototype This brings the tail-sampling prototype to the collector. Overview of this prototype at https://omnition.io/docs/opencensus-tail-based-sampling-demo Design at https://omnition.io/docs/opencensus-tail-based-sampling Goals for the prototype: * Allow early experimentation * Collect information to guide future design decisions Tail sampling is implemented as a SpanProcessor that holds traces in memory until the configured decision wait time is reached. Each trace is then evaluated against the configured sample policy and it is sampled or not. After the decision the trace still stays in memory for some time, so late arriving spans can be properly handled, either by discarding them, in case of not sampled traces, or forwarded to the proper exporters, if the trace was sampled. Clean-up and more tests Remove demo file * PR Feedback 00 * Move tail sampling processor to its own package
1 parent a35a5fa commit 6521526

17 files changed

Lines changed: 1579 additions & 10 deletions

File tree

cmd/occollector/app/builder/builder.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ const (
3030
zipkinScribeEntry = "zipkin-scribe"
3131

3232
// flags
33-
configCfg = "config"
34-
jaegerReceiverFlg = "receive-jaeger"
35-
ocReceiverFlg = "receive-oc-trace"
36-
zipkinReceiverFlg = "receive-zipkin"
37-
zipkinScribeReceiverFlg = "receive-zipkin-scribe"
38-
debugProcessorFlg = "debug-processor"
33+
configCfg = "config"
34+
jaegerReceiverFlg = "receive-jaeger"
35+
ocReceiverFlg = "receive-oc-trace"
36+
zipkinReceiverFlg = "receive-zipkin"
37+
zipkinScribeReceiverFlg = "receive-zipkin-scribe"
38+
debugProcessorFlg = "debug-processor"
39+
useTailSamplingAlwaysSample = "tail-sampling-always-sample"
3940
)
4041

4142
// Flags adds flags related to basic building of the collector application to the given flagset.
@@ -50,6 +51,8 @@ func Flags(flags *flag.FlagSet) {
5051
flags.Bool(zipkinScribeReceiverFlg, false,
5152
fmt.Sprintf("Flag to run the Zipkin Scribe receiver, default settings: %+v", *NewDefaultZipkinScribeReceiverCfg()))
5253
flags.Bool(debugProcessorFlg, false, "Flag to add a debug processor (combine with log level DEBUG to log incoming spans)")
54+
flags.Bool(useTailSamplingAlwaysSample, false, "Flag to use a tail-based sampling processor with an always sample policy, "+
55+
"unless tail sampling setting is present on configuration file.")
5356
}
5457

5558
// GetConfigFile gets the config file from the config file flag.
@@ -62,6 +65,11 @@ func DebugProcessorEnabled(v *viper.Viper) bool {
6265
return v.GetBool(debugProcessorFlg)
6366
}
6467

68+
// DebugTailSamplingEnabled returns true if the debug processor is enabled, and false otherwise
69+
func DebugTailSamplingEnabled(v *viper.Viper) bool {
70+
return v.GetBool(useTailSamplingAlwaysSample)
71+
}
72+
6573
// JaegerReceiverCfg holds configuration for Jaeger receivers.
6674
type JaegerReceiverCfg struct {
6775
// ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests

cmd/occollector/app/builder/builder_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
package builder
1616

1717
import (
18+
"encoding/json"
1819
"reflect"
20+
"sort"
1921
"testing"
2022
"time"
2123

@@ -126,6 +128,88 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
126128
}
127129
}
128130

131+
func TestTailSamplingPoliciesConfiguration(t *testing.T) {
132+
v, err := loadViperFromFile("./testdata/sampling_config.yaml")
133+
if err != nil {
134+
t.Fatalf("Failed to load viper from test file: %v", err)
135+
}
136+
137+
wCfg := NewDefaultSamplingCfg()
138+
if wCfg.Mode != NoSampling {
139+
t.Fatalf("Default SamplingCfg Mode should be NoSampling")
140+
}
141+
wCfg.Mode = TailSampling
142+
wCfg.Policies = []*PolicyCfg{
143+
{
144+
Name: "string-tag-filter1",
145+
Type: StringTagFilter,
146+
Exporters: []string{"jaeger1"},
147+
Configuration: &StringTagFilterCfg{
148+
Tag: "test",
149+
Values: []string{"value 1", "value 2"},
150+
},
151+
},
152+
{
153+
Name: "numeric-tag-filter2",
154+
Type: NumericTagFilter,
155+
Exporters: []string{"jaeger2"},
156+
Configuration: &NumericTagFilterCfg{
157+
Tag: "http.status_code",
158+
MinValue: 400,
159+
MaxValue: 999,
160+
},
161+
},
162+
{
163+
Name: "string-tag-filter3",
164+
Type: StringTagFilter,
165+
Exporters: []string{"jaeger3"},
166+
Configuration: &StringTagFilterCfg{
167+
Tag: "test.different",
168+
Values: []string{"key 1", "key 2"},
169+
},
170+
},
171+
{
172+
Name: "numeric-tag-filter4",
173+
Type: NumericTagFilter,
174+
Exporters: []string{"jaeger4", "jaeger5"},
175+
Configuration: &NumericTagFilterCfg{
176+
Tag: "http.status_code",
177+
MinValue: 400,
178+
MaxValue: 999,
179+
},
180+
},
181+
}
182+
183+
gCfg := NewDefaultSamplingCfg().InitFromViper(v)
184+
sort.Slice(gCfg.Policies, func(i, j int) bool {
185+
if len(gCfg.Policies[i].Exporters) == len(gCfg.Policies[j].Exporters) {
186+
return gCfg.Policies[i].Exporters[0] < gCfg.Policies[j].Exporters[0]
187+
}
188+
return len(gCfg.Policies[i].Exporters) < len(gCfg.Policies[j].Exporters)
189+
})
190+
191+
if !reflect.DeepEqual(gCfg, wCfg) {
192+
gb, _ := json.MarshalIndent(gCfg, "", " ")
193+
t.Fatalf("Wanted %+v but got %+v\ngot json:\n%s", *wCfg, *gCfg, string(gb))
194+
}
195+
}
196+
197+
func TestTailSamplingConfig(t *testing.T) {
198+
v, err := loadViperFromFile("./testdata/sampling_config.yaml")
199+
if err != nil {
200+
t.Fatalf("Failed to load viper from test file: %v", err)
201+
}
202+
203+
wCfg := NewDefaultTailBasedCfg()
204+
wCfg.DecisionWait = 31 * time.Second
205+
wCfg.NumTraces = 20001
206+
207+
gCfg := NewDefaultTailBasedCfg().InitFromViper(v)
208+
if !reflect.DeepEqual(gCfg, wCfg) {
209+
t.Fatalf("Wanted %+v but got %+v", *wCfg, *gCfg)
210+
}
211+
}
212+
129213
func loadViperFromFile(file string) (*viper.Viper, error) {
130214
v := viper.New()
131215
v.SetConfigFile(file)
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package builder
16+
17+
import (
18+
"time"
19+
20+
"github.com/spf13/viper"
21+
)
22+
23+
const (
24+
modeTag = "mode"
25+
policiesTag = "policies"
26+
samplingTag = "sampling"
27+
)
28+
29+
// Mode indicates the sampling mode
30+
type Mode string
31+
32+
const (
33+
// NoSampling mode is the default and means that all data arriving at the collector
34+
// is passed ahead.
35+
NoSampling Mode = "no-sampling"
36+
// TailSampling is the mode in which trace data is temporarily retained until an evaluation
37+
// if the trace should be sampled is performed.
38+
TailSampling Mode = "tail"
39+
)
40+
41+
// PolicyType indicates the type of sampling policy.
42+
type PolicyType string
43+
44+
const (
45+
// AlwaysSample samples all traces, typically used for debugging.
46+
AlwaysSample PolicyType = "always-sample"
47+
// NumericTagFilter sample traces that have a given numberic tag in a specified
48+
// range, e.g.: tag "http.status_code" >= 399 and <= 999.
49+
NumericTagFilter PolicyType = "numeric-tag-filter"
50+
// StringTagFilter sample traces that a tag, of type string, matching
51+
// one of the listed values.
52+
StringTagFilter PolicyType = "string-tag-filter"
53+
// RateLimiting allows all traces until the specified limits are satisfied.
54+
RateLimiting PolicyType = "rate-limiting"
55+
)
56+
57+
// PolicyCfg holds the common configuration to all policies.
58+
type PolicyCfg struct {
59+
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
60+
Name string
61+
// Type of the policy this will be used to match the proper configuration of the policy.
62+
Type PolicyType
63+
// Exporters hold the name of the exporters that the policy evaluator uses to make decisions
64+
// about whether or not sending the traces.
65+
Exporters []string
66+
// Configuration holds the settings specific to the policy.
67+
Configuration interface{}
68+
}
69+
70+
// NumericTagFilterCfg holds the configurable settings to create a numeric tag filter
71+
// sampling policy evaluator.
72+
type NumericTagFilterCfg struct {
73+
// Tag that the filter is going to be matching against.
74+
Tag string `mapstructure:"tag"`
75+
// MinValue is the minimum value of the tag to be considered a match.
76+
MinValue int64 `mapstructure:"min-value"`
77+
// MaxValue is the maximum value of the tag to be considered a match.
78+
MaxValue int64 `mapstructure:"max-value"`
79+
}
80+
81+
// StringTagFilterCfg holds the configurable settings to create a string tag filter
82+
// sampling policy evaluator.
83+
type StringTagFilterCfg struct {
84+
// Tag that the filter is going to be matching against.
85+
Tag string `mapstructure:"tag"`
86+
// Values is the set of tag values that if any is equal to the actual tag valueto be considered a match.
87+
Values []string `mapstructure:"values"`
88+
}
89+
90+
// RateLimitingCfg holds the configurable settings to create a string tag filter
91+
// sampling policy evaluator.
92+
type RateLimitingCfg struct {
93+
// SpansPerSecond limit to the number of spans per second
94+
SpansPerSecond int64 `mapstructure:"spans-per-second"`
95+
}
96+
97+
// SamplingCfg holds the sampling configuration.
98+
type SamplingCfg struct {
99+
// Mode specifies the sampling mode to be used.
100+
Mode Mode `mapstructure:"mode"`
101+
// Policies contains the list of policies to be used by sampling.
102+
Policies []*PolicyCfg `mapstructure:"policies"`
103+
}
104+
105+
// NewDefaultSamplingCfg creates a SamplingCfg with the default values.
106+
func NewDefaultSamplingCfg() *SamplingCfg {
107+
return &SamplingCfg{
108+
Mode: NoSampling,
109+
}
110+
}
111+
112+
// InitFromViper initializes SamplingCfg with properties from viper.
113+
func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg {
114+
sv := v.Sub(samplingTag)
115+
if sv == nil {
116+
return sCfg
117+
}
118+
119+
sCfg.Mode = Mode(sv.GetString(modeTag))
120+
121+
pv := sv.Sub(policiesTag)
122+
if pv == nil {
123+
return sCfg
124+
}
125+
126+
for policyName := range sv.GetStringMap(policiesTag) {
127+
polSub := pv.Sub(policyName)
128+
polCfg := &PolicyCfg{}
129+
polCfg.Name = policyName
130+
polCfg.Type = PolicyType(polSub.GetString("policy"))
131+
polCfg.Exporters = polSub.GetStringSlice("exporters")
132+
133+
cfgSub := polSub.Sub("configuration")
134+
if cfgSub != nil {
135+
// As the number of polices grow this likely should be in a map.
136+
var cfg interface{}
137+
switch polCfg.Type {
138+
case NumericTagFilter:
139+
numTagFilterCfg := &NumericTagFilterCfg{}
140+
cfg = numTagFilterCfg
141+
case StringTagFilter:
142+
strTagFilterCfg := &StringTagFilterCfg{}
143+
cfg = strTagFilterCfg
144+
case RateLimiting:
145+
rateLimitingCfg := &RateLimitingCfg{}
146+
cfg = rateLimitingCfg
147+
}
148+
cfgSub.Unmarshal(cfg)
149+
polCfg.Configuration = cfg
150+
}
151+
152+
sCfg.Policies = append(sCfg.Policies, polCfg)
153+
}
154+
return sCfg
155+
}
156+
157+
// TailBasedCfg holds the configuration for tail-based sampling.
158+
type TailBasedCfg struct {
159+
// DecisionWait is the desired wait time from the arrival of the first span of
160+
// trace until the decision about sampling it or not is evaluated.
161+
DecisionWait time.Duration `mapstructure:"decision-wait"`
162+
// NumTraces is the number of traces kept on memory. Typically most of the data
163+
// of a trace is released after a sampling decision is taken.
164+
NumTraces uint64 `mapstructure:"num-traces"`
165+
}
166+
167+
// NewDefaultTailBasedCfg creates a TailBasedCfg with the default values.
168+
func NewDefaultTailBasedCfg() *TailBasedCfg {
169+
return &TailBasedCfg{
170+
DecisionWait: 30 * time.Second,
171+
NumTraces: 50000,
172+
}
173+
}
174+
175+
// InitFromViper initializes TailBasedCfg with properties from viper.
176+
func (tCfg *TailBasedCfg) InitFromViper(v *viper.Viper) *TailBasedCfg {
177+
tv := v.Sub(samplingTag)
178+
if tv == nil {
179+
return tCfg
180+
}
181+
if tv == nil || tv.GetString(modeTag) != string(TailSampling) {
182+
return tCfg
183+
}
184+
185+
tv.Unmarshal(tCfg)
186+
return tCfg
187+
}

cmd/occollector/app/builder/testdata/queued_exporters.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
queued-exporters:
1+
queued-exporters:
22
proc-tchannel:
33
num-workers: 13
44
queue-size: 1300

0 commit comments

Comments
 (0)