Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 77741c8

Browse files
author
Steven Karis
authored
Allow all exporters to be batched and queued (#376)
* Add batching to all exporters Testing Done: unit tests
1 parent 5a51f7c commit 77741c8

17 files changed

Lines changed: 190 additions & 176 deletions

File tree

cmd/ocagent/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/census-instrumentation/opencensus-service/exporter"
3636
"github.com/census-instrumentation/opencensus-service/internal"
3737
"github.com/census-instrumentation/opencensus-service/internal/config"
38+
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
3839
"github.com/census-instrumentation/opencensus-service/receiver"
3940
"github.com/census-instrumentation/opencensus-service/receiver/jaeger"
4041
"github.com/census-instrumentation/opencensus-service/receiver/opencensus"
@@ -87,7 +88,12 @@ func runOCAgent() {
8788
log.Fatalf("Could not instantiate logger: %v", err)
8889
}
8990

90-
traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(logger, yamlBlob)
91+
// TODO(skaris): move the rest of the configs to use viper
92+
v, err := viperutils.ViperFromYAMLBytes([]byte(yamlBlob))
93+
if err != nil {
94+
log.Fatalf("Config: failed to create viper from YAML: %v", err)
95+
}
96+
traceExporters, metricsExporters, closeFns, err := config.ExportersFromViperConfig(logger, v)
9197
if err != nil {
9298
log.Fatalf("Config: failed to create exporters from YAML: %v", err)
9399
}

cmd/occollector/app/builder/builder_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
9898
DiscoveryMinPeers: 7,
9999
DiscoveryConnCheckTimeout: time.Second * 7,
100100
}
101+
fst.RawConfig = v.Sub(queuedExportersConfigKey).Sub("proc-tchannel")
101102
snd := NewDefaultQueuedSpanProcessorCfg()
102103
snd.Name = "proc-http"
103104
snd.RetryOnFailure = false
@@ -108,6 +109,7 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
108109
Headers: map[string]string{"x-header-key": "00000000-0000-0000-0000-000000000001"},
109110
Timeout: time.Second * 5,
110111
}
112+
snd.RawConfig = v.Sub(queuedExportersConfigKey).Sub("proc-http")
111113

112114
wCfg := &MultiSpanProcessorCfg{
113115
Processors: []*QueuedSpanProcessorCfg{fst, snd},

cmd/occollector/app/builder/processor_builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type QueuedSpanProcessorCfg struct {
108108
SenderConfig interface{}
109109
// BatchingConfig sets config parameters related to batching
110110
BatchingConfig BatchingConfig `mapstructure:"batching"`
111+
RawConfig *viper.Viper
111112
}
112113

113114
// AttributesCfg holds configuration for attributes that can be added to all spans
@@ -155,6 +156,7 @@ func (qOpts *QueuedSpanProcessorCfg) InitFromViper(v *viper.Viper) *QueuedSpanPr
155156
}
156157
qOpts.SenderConfig = thsOpts
157158
}
159+
qOpts.RawConfig = v
158160
return qOpts
159161
}
160162

cmd/occollector/app/collector/processors.go

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package collector
1616

1717
import (
1818
"fmt"
19-
"io/ioutil"
2019
"os"
2120
"time"
2221

@@ -38,18 +37,7 @@ import (
3837

3938
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []exporter.TraceExporter, []exporter.MetricsExporter) {
4039
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
41-
cfg := builder.GetConfigFile(v)
42-
if cfg == "" {
43-
logger.Info("No config file, exporters can be only configured via the file.")
44-
return nil, nil, nil
45-
}
46-
47-
cfgBlob, err := ioutil.ReadFile(cfg)
48-
if err != nil {
49-
logger.Fatal("Cannot read config file for exporters", zap.Error(err))
50-
}
51-
52-
traceExporters, metricsExporters, doneFns, err := config.ExportersFromYAMLConfig(logger, cfgBlob)
40+
traceExporters, metricsExporters, doneFns, err := config.ExportersFromViperConfig(logger, v)
5341
if err != nil {
5442
logger.Fatal("Failed to create config for exporters", zap.Error(err))
5543
}
@@ -101,9 +89,23 @@ func buildQueuedSpanProcessor(
10189
sender.HTTPTimeout(thriftHTTPSenderOpts.Timeout),
10290
)
10391
}
92+
doneFns, traceExporters, _ := createExporters(opts.RawConfig, logger)
10493

105-
if spanSender == nil {
106-
logger.Fatal("Unrecognized sender type or no exporters configured", zap.String("SenderType", string(opts.SenderType)))
94+
if spanSender == nil && len(traceExporters) == 0 {
95+
if opts.SenderType != "" {
96+
logger.Fatal("Unrecognized sender type", zap.String("SenderType", string(opts.SenderType)))
97+
}
98+
logger.Fatal("No senders or exporters configured.")
99+
}
100+
101+
allSendersAndExporters := make([]processor.SpanProcessor, 0, 1+len(traceExporters))
102+
if spanSender != nil {
103+
allSendersAndExporters = append(allSendersAndExporters, spanSender)
104+
}
105+
for _, traceExporter := range traceExporters {
106+
allSendersAndExporters = append(
107+
allSendersAndExporters, processor.NewTraceExporterProcessor(traceExporter),
108+
)
107109
}
108110

109111
var batchingOptions []nodebatcher.Option
@@ -134,19 +136,25 @@ func buildQueuedSpanProcessor(
134136
}
135137
}
136138

137-
// build queued span processor with underlying sender
138-
queuedSpanProcessor = queued.NewQueuedSpanProcessor(
139-
spanSender,
140-
queued.Options.WithLogger(logger),
141-
queued.Options.WithName(opts.Name),
142-
queued.Options.WithNumWorkers(opts.NumWorkers),
143-
queued.Options.WithQueueSize(opts.QueueSize),
144-
queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
145-
queued.Options.WithBackoffDelay(opts.BackoffDelay),
146-
queued.Options.WithBatching(opts.BatchingConfig.Enable),
147-
queued.Options.WithBatchingOptions(batchingOptions...),
148-
)
149-
return nil, queuedSpanProcessor, nil
139+
queuedProcessors := make([]processor.SpanProcessor, 0, len(allSendersAndExporters))
140+
for _, senderOrExporter := range allSendersAndExporters {
141+
// build queued span processor with underlying sender
142+
queuedProcessors = append(
143+
queuedProcessors,
144+
queued.NewQueuedSpanProcessor(
145+
senderOrExporter,
146+
queued.Options.WithLogger(logger),
147+
queued.Options.WithName(opts.Name),
148+
queued.Options.WithNumWorkers(opts.NumWorkers),
149+
queued.Options.WithQueueSize(opts.QueueSize),
150+
queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
151+
queued.Options.WithBackoffDelay(opts.BackoffDelay),
152+
queued.Options.WithBatching(opts.BatchingConfig.Enable),
153+
queued.Options.WithBatchingOptions(batchingOptions...),
154+
),
155+
)
156+
}
157+
return doneFns, processor.NewMultiSpanProcessor(queuedProcessors), nil
150158
}
151159

152160
func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToSpanProcessor map[string]processor.SpanProcessor, v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor, error) {

exporter/exporterparser/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ queued-exporters:
8181
collector-endpoint: "https://ingest.omnition.io"
8282
headers: { "x-omnition-api-key": "00000000-0000-0000-0000-000000000001" }
8383
timeout: 5s
84+
# Non-sender exporters can now also be used by setting the exporters section in queued-exporters.
85+
exporters:
86+
opencensus:
87+
endpoint: "127.0.0.1:55566"
88+
compression: "gzip"
8489
my-org-jaeger: # A second processor with its own configuration options
8590
num-workers: 2
8691
queue-size: 100

exporter/exporterparser/datadog.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,50 +18,46 @@ import (
1818
"context"
1919

2020
datadog "github.com/DataDog/opencensus-go-exporter-datadog"
21+
"github.com/spf13/viper"
2122

2223
"github.com/census-instrumentation/opencensus-service/data"
2324
"github.com/census-instrumentation/opencensus-service/exporter"
2425
)
2526

2627
type datadogConfig struct {
2728
// Namespace specifies the namespaces to which metric keys are appended.
28-
Namespace string `yaml:"namespace,omitempty"`
29+
Namespace string `mapstructure:"namespace,omitempty"`
2930

3031
// TraceAddr specifies the host[:port] address of the Datadog Trace Agent.
3132
// It defaults to localhost:8126.
32-
TraceAddr string `yaml:"trace_addr,omitempty"`
33+
TraceAddr string `mapstructure:"trace_addr,omitempty"`
3334

3435
// MetricsAddr specifies the host[:port] address for DogStatsD. It defaults
3536
// to localhost:8125.
36-
MetricsAddr string `yaml:"metrics_addr,omitempty"`
37+
MetricsAddr string `mapstructure:"metrics_addr,omitempty"`
3738

3839
// Tags specifies a set of global tags to attach to each metric.
39-
Tags []string `yaml:"tags,omitempty"`
40+
Tags []string `mapstructure:"tags,omitempty"`
4041

41-
EnableTracing bool `yaml:"enable_tracing,omitempty"`
42-
EnableMetrics bool `yaml:"enable_metrics,omitempty"`
42+
EnableTracing bool `mapstructure:"enable_tracing,omitempty"`
43+
EnableMetrics bool `mapstructure:"enable_metrics,omitempty"`
4344
}
4445

4546
type datadogExporter struct {
4647
exporter *datadog.Exporter
4748
}
4849

49-
// DatadogTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
50+
// DatadogTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
5051
// Datadog according to the configuration settings.
51-
func DatadogTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
52+
func DatadogTraceExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
5253
var cfg struct {
53-
Exporters *struct {
54-
Datadog *datadogConfig `yaml:"datadog"`
55-
} `yaml:"exporters"`
54+
Datadog *datadogConfig `mapstructure:"datadog,omitempty"`
5655
}
57-
if err := yamlUnmarshal(config, &cfg); err != nil {
56+
if err := v.Unmarshal(&cfg); err != nil {
5857
return nil, nil, nil, err
5958
}
60-
if cfg.Exporters == nil {
61-
return nil, nil, nil, nil
62-
}
6359

64-
dc := cfg.Exporters.Datadog
60+
dc := cfg.Datadog
6561
if dc == nil {
6662
return nil, nil, nil, nil
6763
}

exporter/exporterparser/exparser.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
// Package exporterparser provides support for parsing and creating the
16-
// respective exporters given a YAML configuration payload.
16+
// respective exporters given a viper configuration.
1717
// For now it currently only provides statically imported OpenCensus
1818
// exporters like:
1919
// * Stackdriver Tracing and Monitoring
@@ -23,10 +23,8 @@ package exporterparser
2323

2424
import (
2525
"context"
26-
"fmt"
2726

2827
"go.opencensus.io/trace"
29-
yaml "gopkg.in/yaml.v2"
3028

3129
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3230
"github.com/census-instrumentation/opencensus-service/data"
@@ -53,10 +51,3 @@ func exportSpans(ctx context.Context, exporterName string, te trace.Exporter, td
5351

5452
return internal.CombineErrors(errs)
5553
}
56-
57-
func yamlUnmarshal(yamlBlob []byte, dest interface{}) error {
58-
if err := yaml.Unmarshal(yamlBlob, dest); err != nil {
59-
return fmt.Errorf("Cannot YAML unmarshal data: %v", err)
60-
}
61-
return nil
62-
}

exporter/exporterparser/jaeger.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,35 @@ package exporterparser
1717
import (
1818
"context"
1919

20+
"github.com/spf13/viper"
21+
"go.opencensus.io/exporter/jaeger"
22+
2023
"github.com/census-instrumentation/opencensus-service/data"
2124
"github.com/census-instrumentation/opencensus-service/exporter"
22-
"go.opencensus.io/exporter/jaeger"
2325
)
2426

2527
// Slight modified version of go/src/go.opencensus.io/exporter/jaeger/jaeger.go
2628
type jaegerConfig struct {
27-
CollectorEndpoint string `yaml:"collector_endpoint,omitempty"`
28-
Username string `yaml:"username,omitempty"`
29-
Password string `yaml:"password,omitempty"`
30-
ServiceName string `yaml:"service_name,omitempty"`
29+
CollectorEndpoint string `mapstructure:"collector_endpoint,omitempty"`
30+
Username string `mapstructure:"username,omitempty"`
31+
Password string `mapstructure:"password,omitempty"`
32+
ServiceName string `mapstructure:"service_name,omitempty"`
3133
}
3234

3335
type jaegerExporter struct {
3436
exporter *jaeger.Exporter
3537
}
3638

37-
// JaegerExportersFromYAML parses the yaml bytes and returns exporter.TraceExporters targeting
39+
// JaegerExportersFromViper unmarshals the viper and returns exporter.TraceExporters targeting
3840
// Jaeger according to the configuration settings.
39-
func JaegerExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
41+
func JaegerExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
4042
var cfg struct {
41-
Exporters *struct {
42-
Jaeger *jaegerConfig `yaml:"jaeger"`
43-
} `yaml:"exporters"`
43+
Jaeger *jaegerConfig `mapstructure:"jaeger"`
4444
}
45-
if err := yamlUnmarshal(config, &cfg); err != nil {
45+
if err := v.Unmarshal(&cfg); err != nil {
4646
return nil, nil, nil, err
4747
}
48-
if cfg.Exporters == nil {
49-
return nil, nil, nil, nil
50-
}
51-
jc := cfg.Exporters.Jaeger
48+
jc := cfg.Jaeger
5249
if jc == nil {
5350
return nil, nil, nil, nil
5451
}

exporter/exporterparser/kafka.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@ import (
1818
"context"
1919
"fmt"
2020

21+
"github.com/spf13/viper"
2122
"github.com/yancl/opencensus-go-exporter-kafka"
2223

2324
"github.com/census-instrumentation/opencensus-service/data"
2425
"github.com/census-instrumentation/opencensus-service/exporter"
2526
)
2627

2728
type kafkaConfig struct {
28-
Brokers []string `yaml:"brokers,omitempty"`
29-
Topic string `yaml:"topic,omitempty"`
29+
Brokers []string `mapstructure:"brokers,omitempty"`
30+
Topic string `mapstructure:"topic,omitempty"`
3031
}
3132

3233
type kafkaExporter struct {
@@ -35,22 +36,17 @@ type kafkaExporter struct {
3536

3637
var _ exporter.TraceExporter = (*kafkaExporter)(nil)
3738

38-
// KafkaExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
39+
// KafkaExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
3940
// Kafka according to the configuration settings.
40-
func KafkaExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
41+
func KafkaExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
4142
var cfg struct {
42-
Exporters *struct {
43-
Kafka *kafkaConfig `yaml:"kafka"`
44-
} `yaml:"exporters"`
43+
Kafka *kafkaConfig `mapstructure:"kafka"`
4544
}
4645

47-
if err := yamlUnmarshal(config, &cfg); err != nil {
46+
if err := v.Unmarshal(&cfg); err != nil {
4847
return nil, nil, nil, err
4948
}
50-
if cfg.Exporters == nil {
51-
return nil, nil, nil, nil
52-
}
53-
kc := cfg.Exporters.Kafka
49+
kc := cfg.Kafka
5450
if kc == nil {
5551
return nil, nil, nil, nil
5652
}

exporter/exporterparser/opencensus.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020

2121
"contrib.go.opencensus.io/exporter/ocagent"
22+
"github.com/spf13/viper"
2223

2324
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2425
"github.com/census-instrumentation/opencensus-service/data"
@@ -29,8 +30,8 @@ import (
2930
)
3031

3132
type opencensusConfig struct {
32-
Endpoint string `yaml:"endpoint,omitempty"`
33-
Compression string `yaml:"compression,omitempty"`
33+
Endpoint string `mapstructure:"endpoint,omitempty"`
34+
Compression string `mapstructure:"compression,omitempty"`
3435
// TODO: add insecure, service name options.
3536
}
3637

@@ -40,21 +41,16 @@ type ocagentExporter struct {
4041

4142
var _ exporter.TraceExporter = (*ocagentExporter)(nil)
4243

43-
// OpenCensusTraceExportersFromYAML parses the yaml bytes and returns an exporter.TraceExporter targeting
44+
// OpenCensusTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
4445
// OpenCensus Agent/Collector according to the configuration settings.
45-
func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
46+
func OpenCensusTraceExportersFromViper(v *viper.Viper) (tes []exporter.TraceExporter, mes []exporter.MetricsExporter, doneFns []func() error, err error) {
4647
var cfg struct {
47-
Exporters *struct {
48-
OpenCensus *opencensusConfig `yaml:"opencensus"`
49-
} `yaml:"exporters"`
48+
OpenCensus *opencensusConfig `mapstructure:"opencensus"`
5049
}
51-
if err := yamlUnmarshal(config, &cfg); err != nil {
50+
if err := v.Unmarshal(&cfg); err != nil {
5251
return nil, nil, nil, err
5352
}
54-
if cfg.Exporters == nil {
55-
return nil, nil, nil, nil
56-
}
57-
ocac := cfg.Exporters.OpenCensus
53+
ocac := cfg.OpenCensus
5854
if ocac == nil {
5955
return nil, nil, nil, nil
6056
}

0 commit comments

Comments
 (0)