Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 0dc6881

Browse files
author
Steven Karis
authored
Add compression to opencensus receiver/exporter (#347)
Add compression to opencensus receiver/exporter and enable the exporter in the collector
1 parent 22bfaed commit 0dc6881

10 files changed

Lines changed: 171 additions & 61 deletions

File tree

cmd/ocagent/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828

2929
"go.opencensus.io/stats/view"
3030
"go.opencensus.io/zpages"
31+
"go.uber.org/zap"
32+
"go.uber.org/zap/zapcore"
3133

3234
"github.com/census-instrumentation/opencensus-service/data"
3335
"github.com/census-instrumentation/opencensus-service/exporter"
@@ -75,7 +77,15 @@ func runOCAgent() {
7577
log.Fatalf("Configuration logical error: %v", err)
7678
}
7779

78-
traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(yamlBlob)
80+
// TODO: don't hardcode info level logging
81+
conf := zap.NewProductionConfig()
82+
conf.Level.SetLevel(zapcore.InfoLevel)
83+
logger, err := conf.Build()
84+
if err != nil {
85+
log.Fatalf("Could not instantiate logger: %v", err)
86+
}
87+
88+
traceExporters, metricsExporters, closeFns, err := config.ExportersFromYAMLConfig(logger, yamlBlob)
7989
if err != nil {
8090
log.Fatalf("Config: failed to create exporters from YAML: %v", err)
8191
}

cmd/occollector/app/collector/processors.go

Lines changed: 17 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -26,75 +26,41 @@ import (
2626
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
2727
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/sender"
2828
"github.com/census-instrumentation/opencensus-service/exporter"
29-
"github.com/census-instrumentation/opencensus-service/exporter/exporterparser"
3029
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
30+
"github.com/census-instrumentation/opencensus-service/internal/config"
3131
)
3232

33-
func createExporters(v *viper.Viper, logger *zap.Logger) (doneFns []func(), traceExporters []exporter.TraceExporter, metricsExporters []exporter.MetricsExporter) {
33+
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []exporter.TraceExporter, []exporter.MetricsExporter) {
3434
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
35-
parseFns := []struct {
36-
name string
37-
fn func([]byte) ([]exporter.TraceExporter, []exporter.MetricsExporter, []func() error, error)
38-
}{
39-
{name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML},
40-
{name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML},
41-
{name: "zipkin", fn: exporterparser.ZipkinExportersFromYAML},
42-
{name: "jaeger", fn: exporterparser.JaegerExportersFromYAML},
43-
{name: "kafka", fn: exporterparser.KafkaExportersFromYAML},
44-
}
4535

46-
config := builder.GetConfigFile(v)
47-
if config == "" {
36+
cfg := builder.GetConfigFile(v)
37+
if cfg == "" {
4838
logger.Info("No config file, exporters can be only configured via the file.")
49-
return
39+
return nil, nil, nil
5040
}
5141

52-
cfgBlob, err := ioutil.ReadFile(config)
42+
cfgBlob, err := ioutil.ReadFile(cfg)
5343
if err != nil {
5444
logger.Fatal("Cannot read config file for exporters", zap.Error(err))
5545
}
5646

57-
for _, cfg := range parseFns {
58-
tes, mes, tesDoneFns, err := cfg.fn(cfgBlob)
59-
if err != nil {
60-
logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err))
61-
}
62-
63-
var anyTraceExporterEnabled, anyMetricsExporterEnabled bool
47+
traceExporters, metricsExporters, doneFns, err := config.ExportersFromYAMLConfig(logger, cfgBlob)
48+
if err != nil {
49+
logger.Fatal("Failed to create config for exporters", zap.Error(err))
50+
}
6451

65-
for _, te := range tes {
66-
if te != nil {
67-
traceExporters = append(traceExporters, te)
68-
anyTraceExporterEnabled = true
52+
wrappedDoneFns := make([]func(), 0, len(doneFns))
53+
for _, doneFn := range doneFns {
54+
wrapperFn := func() {
55+
if err := doneFn(); err != nil {
56+
logger.Warn("Error when closing exporters", zap.Error(err))
6957
}
7058
}
71-
for _, me := range mes {
72-
if me != nil {
73-
metricsExporters = append(metricsExporters, me)
74-
anyMetricsExporterEnabled = true
75-
}
76-
}
77-
for _, tesDoneFn := range tesDoneFns {
78-
if tesDoneFn != nil {
79-
wrapperFn := func() {
80-
if err := tesDoneFn(); err != nil {
81-
logger.Warn("Error when closing exporter", zap.String("exporter", cfg.name), zap.Error(err))
82-
}
83-
}
84-
doneFns = append(doneFns, wrapperFn)
85-
}
86-
}
87-
88-
if anyTraceExporterEnabled {
89-
logger.Info("Trace Exporter enabled", zap.String("exporter", cfg.name))
90-
}
91-
if anyMetricsExporterEnabled {
92-
logger.Info("Metrices Exporter enabled", zap.String("exporter", cfg.name))
93-
}
9459

60+
wrappedDoneFns = append(wrappedDoneFns, wrapperFn)
9561
}
9662

97-
return doneFns, traceExporters, metricsExporters
63+
return wrappedDoneFns, traceExporters, metricsExporters
9864
}
9965

10066
func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg) (processor.SpanProcessor, error) {

exporter/exporterparser/opencensus.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ import (
2424
"github.com/census-instrumentation/opencensus-service/data"
2525
"github.com/census-instrumentation/opencensus-service/exporter"
2626
"github.com/census-instrumentation/opencensus-service/internal"
27+
"github.com/census-instrumentation/opencensus-service/internal/compression"
28+
"github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
2729
)
2830

2931
type opencensusConfig struct {
30-
Endpoint string `yaml:"endpoint,omitempty"`
32+
Endpoint string `yaml:"endpoint,omitempty"`
33+
Compression string `yaml:"compression,omitempty"`
3134
// TODO: add insecure, service name options.
3235
}
3336

@@ -60,7 +63,16 @@ func OpenCensusTraceExportersFromYAML(config []byte) (tes []exporter.TraceExport
6063
return nil, nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
6164
}
6265

63-
sde, serr := ocagent.NewExporter(ocagent.WithAddress(ocac.Endpoint), ocagent.WithInsecure())
66+
opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint), ocagent.WithInsecure()}
67+
if ocac.Compression != "" {
68+
if compressionKey := grpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported {
69+
opts = append(opts, ocagent.UseCompressor(compressionKey))
70+
} else {
71+
return nil, nil, nil, fmt.Errorf("Unsupported compression type: %s", ocac.Compression)
72+
}
73+
}
74+
75+
sde, serr := ocagent.NewExporter(opts...)
6476
if serr != nil {
6577
return nil, nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
6678
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/census-instrumentation/opencensus-service
22

33
require (
44
cloud.google.com/go v0.32.0 // indirect
5-
contrib.go.opencensus.io/exporter/ocagent v0.4.2
5+
contrib.go.opencensus.io/exporter/ocagent v0.4.3
66
contrib.go.opencensus.io/exporter/stackdriver v0.9.1
77
git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 // indirect
88
github.com/BurntSushi/toml v0.3.1 // indirect

internal/compression/grpc/grpc.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import (
18+
"strings"
19+
20+
"google.golang.org/grpc/encoding/gzip"
21+
22+
"github.com/census-instrumentation/opencensus-service/internal/compression"
23+
)
24+
25+
var (
26+
// Map of opencensus compression types to grpc registered compression types
27+
grpcCompressionKeyMap = map[string]string{
28+
compression.Gzip: gzip.Name,
29+
}
30+
)
31+
32+
// GetGRPCCompressionKey returns the grpc registered compression key if the
33+
// passed in compression key is supported, and Unsupported otherwise
34+
func GetGRPCCompressionKey(compressionType string) string {
35+
compressionKey := strings.ToLower(compressionType)
36+
if encodingKey, ok := grpcCompressionKeyMap[compressionKey]; ok {
37+
return encodingKey
38+
}
39+
return compression.Unsupported
40+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import (
18+
"testing"
19+
20+
"github.com/census-instrumentation/opencensus-service/internal/compression"
21+
)
22+
23+
func TestGetGRPCCompressionKey(t *testing.T) {
24+
if GetGRPCCompressionKey("gzip") != compression.Gzip {
25+
t.Error("gzip is marked as supported but returned unsupported")
26+
}
27+
28+
if GetGRPCCompressionKey("Gzip") != compression.Gzip {
29+
t.Error("Capitalization of Gzip should not matter")
30+
}
31+
32+
if GetGRPCCompressionKey("badType") != compression.Unsupported {
33+
t.Error("badType is not supported but was returned as supported")
34+
}
35+
}

internal/compression/grpc/gzip.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpc
16+
17+
import _ "google.golang.org/grpc/encoding/gzip" // import the gzip package with auto-registers the gzip grpc compressor

internal/compression/keys.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package compression
16+
17+
// Compression keys for supported compression types within opencensus collector
18+
const (
19+
Unsupported = ""
20+
Gzip = "gzip"
21+
)

internal/config/config.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"reflect"
2222
"strings"
2323

24+
"go.uber.org/zap"
2425
yaml "gopkg.in/yaml.v2"
2526

2627
"github.com/census-instrumentation/opencensus-service/exporter"
@@ -299,7 +300,7 @@ func eqLocalHost(host string) bool {
299300
// + kafka
300301
// + opencensus
301302
// + prometheus
302-
func ExportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExporter, metricsExporters []exporter.MetricsExporter, doneFns []func() error, err error) {
303+
func ExportersFromYAMLConfig(logger *zap.Logger, config []byte) ([]exporter.TraceExporter, []exporter.MetricsExporter, []func() error, error) {
303304
parseFns := []struct {
304305
name string
305306
fn func([]byte) ([]exporter.TraceExporter, []exporter.MetricsExporter, []func() error, error)
@@ -313,28 +314,35 @@ func ExportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExpo
313314
{name: "prometheus", fn: exporterparser.PrometheusExportersFromYAML},
314315
}
315316

317+
var traceExporters []exporter.TraceExporter
318+
var metricsExporters []exporter.MetricsExporter
319+
var doneFns []func() error
316320
for _, cfg := range parseFns {
317-
tes, mes, tesDoneFns, terr := cfg.fn(config)
321+
tes, mes, tesDoneFns, err := cfg.fn(config)
318322
if err != nil {
319-
err = fmt.Errorf("Failed to create config for %q: %v", cfg.name, terr)
320-
return
323+
err = fmt.Errorf("Failed to create config for %q: %v", cfg.name, err)
324+
return nil, nil, nil, err
321325
}
322326

323327
for _, te := range tes {
324328
if te != nil {
325329
traceExporters = append(traceExporters, te)
330+
logger.Info("Trace Exporter enabled", zap.String("exporter", cfg.name))
326331
}
327332
}
328333

329334
for _, me := range mes {
330335
if me != nil {
331336
metricsExporters = append(metricsExporters, me)
337+
logger.Info("Metrices Exporter enabled", zap.String("exporter", cfg.name))
332338
}
333339
}
334340

335341
for _, doneFn := range tesDoneFns {
336-
doneFns = append(doneFns, doneFn)
342+
if doneFn != nil {
343+
doneFns = append(doneFns, doneFn)
344+
}
337345
}
338346
}
339-
return
347+
return traceExporters, metricsExporters, doneFns, nil
340348
}

receiver/receiver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"github.com/census-instrumentation/opencensus-service/data"
2121
"github.com/census-instrumentation/opencensus-service/internal"
22+
_ "github.com/census-instrumentation/opencensus-service/internal/compression/grpc" // load in supported grpc compression encodings
2223
)
2324

2425
// A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter.

0 commit comments

Comments
 (0)