Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit c88cf9b

Browse files
author
Bogdan Drutu
authored
Update all exporters to use the new helper TraceExporter. (#488)
* Update all exporters to use the new helper TraceExporter. * Add todo for the service name span attribute.
1 parent 834b3c4 commit c88cf9b

File tree

8 files changed

+68
-57
lines changed

8 files changed

+68
-57
lines changed

exporter/awsexporter/aws_xray.go

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/census-instrumentation/opencensus-service/consumer"
3030
"github.com/census-instrumentation/opencensus-service/data"
31+
"github.com/census-instrumentation/opencensus-service/exporter/exporterhelper"
3132
"github.com/census-instrumentation/opencensus-service/exporter/exporterwrapper"
3233
)
3334

@@ -57,8 +58,6 @@ type awsXRayExporter struct {
5758
defaultOptions []xray.Option
5859
}
5960

60-
var _ consumer.TraceConsumer = (*awsXRayExporter)(nil)
61-
6261
// AWSXRayTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
6362
// AWS X-Ray according to the configuration settings.
6463
func AWSXRayTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps []consumer.MetricsConsumer, doneFns []func() error, err error) {
@@ -84,7 +83,17 @@ func AWSXRayTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsume
8483
defaultServiceName: xc.DefaultServiceName,
8584
}
8685

87-
tps = append(tps, axe)
86+
axte, err := exporterhelper.NewTraceExporter(
87+
"aws_xray",
88+
axe.PushTraceData,
89+
exporterhelper.WithSpanName("ocservice.exporter.AwsXray.ConsumeTraceData"),
90+
exporterhelper.WithRecordMetrics(true),
91+
)
92+
if err != nil {
93+
return nil, nil, nil, err
94+
}
95+
96+
tps = append(tps, axte)
8897
doneFns = append(doneFns, func() error {
8998
axe.Flush()
9099
return nil
@@ -146,25 +155,13 @@ func transformConfigToXRayOptions(axrCfg *awsXRayConfig) (xopts []xray.Option, e
146155

147156
// ExportSpans is the method that translates OpenCensus-Proto Traces into AWS X-Ray spans.
148157
// It uniquely maintains
149-
func (axe *awsXRayExporter) ConsumeTraceData(ctx context.Context, td data.TraceData) (xerr error) {
150-
ctx, span := trace.StartSpan(ctx,
151-
"opencensus.service.exporter.aws_xray.ExportSpans",
152-
trace.WithSampler(trace.NeverSample()))
153-
154-
defer func() {
155-
if xerr != nil && span.IsRecordingEvents() {
156-
span.SetStatus(trace.Status{
157-
Code: int32(trace.StatusCodeUnknown),
158-
Message: xerr.Error(),
159-
})
160-
}
161-
span.End()
162-
}()
163-
158+
func (axe *awsXRayExporter) PushTraceData(ctx context.Context, td data.TraceData) (int, error) {
164159
serviceName := td.Node.GetServiceInfo().GetName()
165160
if serviceName == "" {
166161
serviceName = axe.defaultServiceName
167162
}
163+
// TODO: Consider to do this in the exporterhelper.
164+
span := trace.FromContext(ctx)
168165
if span.IsRecordingEvents() {
169166
span.Annotate([]trace.Attribute{
170167
trace.StringAttribute("service_name", serviceName),
@@ -173,7 +170,7 @@ func (axe *awsXRayExporter) ConsumeTraceData(ctx context.Context, td data.TraceD
173170

174171
exp, err := axe.getOrMakeExporterByServiceName(serviceName)
175172
if err != nil {
176-
return err
173+
return len(td.Spans), err
177174
}
178175
return exporterwrapper.PushOcProtoSpansToOCTraceExporter(exp, td)
179176
}

exporter/datadogexporter/datadog.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ func DatadogTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsume
7171
return nil
7272
})
7373

74+
dgte, err := exporterwrapper.NewExporterWrapper("datadog", "ocservice.exporter.DataDog.ConsumeTraceData", de)
75+
if err != nil {
76+
return nil, nil, nil, err
77+
}
78+
7479
// TODO: Examine the Datadog exporter to see
7580
// if trace.ExportSpan was constraining and if perhaps the
7681
// upload can use the context and information from the Node.
77-
tps = append(tps, exporterwrapper.NewExporterWrapper("datadog", de))
82+
tps = append(tps, dgte)
7883

7984
// TODO: (@odeke-em, @songya23) implement ExportMetrics for Datadog.
8085
// mes = append(mes, oexp)

exporter/exporterwrapper/exporterwrapper.go

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ import (
2727
"go.opencensus.io/trace"
2828

2929
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
30-
"github.com/census-instrumentation/opencensus-service/consumer"
3130
"github.com/census-instrumentation/opencensus-service/data"
31+
"github.com/census-instrumentation/opencensus-service/exporter"
32+
"github.com/census-instrumentation/opencensus-service/exporter/exporterhelper"
3233
"github.com/census-instrumentation/opencensus-service/internal"
3334
spandatatranslator "github.com/census-instrumentation/opencensus-service/translator/trace/spandata"
3435
)
@@ -41,42 +42,22 @@ import (
4142
// by various vendors and contributors. Eventually the goal is to
4243
// get those exporters converted to directly receive
4344
// OpenCensus Proto TraceData.
44-
func NewExporterWrapper(exporterName string, ocExporter trace.Exporter) consumer.TraceConsumer {
45-
return &ocExporterWrapper{spanName: "opencensus.service.exporter." + exporterName + ".ExportTrace", ocExporter: ocExporter}
46-
}
47-
48-
type ocExporterWrapper struct {
49-
spanName string
50-
ocExporter trace.Exporter
51-
}
52-
53-
var _ consumer.TraceConsumer = (*ocExporterWrapper)(nil)
54-
55-
func (octew *ocExporterWrapper) ConsumeTraceData(ctx context.Context, td data.TraceData) (aerr error) {
56-
ctx, span := trace.StartSpan(ctx,
57-
octew.spanName, trace.WithSampler(trace.NeverSample()))
58-
59-
if span.IsRecordingEvents() {
60-
span.Annotate([]trace.Attribute{
61-
trace.Int64Attribute("n_spans", int64(len(td.Spans))),
62-
}, "")
63-
}
64-
65-
defer func() {
66-
if aerr != nil && span.IsRecordingEvents() {
67-
span.SetStatus(trace.Status{Code: trace.StatusCodeInternal, Message: aerr.Error()})
68-
}
69-
span.End()
70-
}()
71-
72-
return PushOcProtoSpansToOCTraceExporter(octew.ocExporter, td)
45+
func NewExporterWrapper(exporterName string, spanName string, ocExporter trace.Exporter) (exporter.TraceExporter, error) {
46+
return exporterhelper.NewTraceExporter(
47+
exporterName,
48+
func(ctx context.Context, td data.TraceData) (int, error) {
49+
return PushOcProtoSpansToOCTraceExporter(ocExporter, td)
50+
},
51+
exporterhelper.WithSpanName(spanName),
52+
exporterhelper.WithRecordMetrics(true),
53+
)
7354
}
7455

7556
// TODO: Remove PushOcProtoSpansToOCTraceExporter after aws-xray is changed to ExporterWrapper.
7657

7758
// PushOcProtoSpansToOCTraceExporter pushes TraceData to the given trace.Exporter by converting the
7859
// protos to trace.SpanData.
79-
func PushOcProtoSpansToOCTraceExporter(ocExporter trace.Exporter, td data.TraceData) error {
60+
func PushOcProtoSpansToOCTraceExporter(ocExporter trace.Exporter, td data.TraceData) (int, error) {
8061
var errs []error
8162
var goodSpans []*tracepb.Span
8263
for _, span := range td.Spans {
@@ -89,5 +70,5 @@ func PushOcProtoSpansToOCTraceExporter(ocExporter trace.Exporter, td data.TraceD
8970
}
9071
}
9172

92-
return internal.CombineErrors(errs)
73+
return len(td.Spans) - len(goodSpans), internal.CombineErrors(errs)
9374
}

exporter/honeycombexporter/honeycomb.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@ func HoneycombTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsu
4747

4848
rawExp := honeycomb.NewExporter(hc.WriteKey, hc.DatasetName)
4949

50-
tps = append(tps, exporterwrapper.NewExporterWrapper("honeycomb", rawExp))
50+
hcte, err := exporterwrapper.NewExporterWrapper("honeycomb", "ocservice.exporter.HoneyComb.ConsumeTraceData", rawExp)
51+
if err != nil {
52+
return nil, nil, nil, err
53+
}
54+
55+
tps = append(tps, hcte)
5156
doneFns = append(doneFns, func() error {
5257
rawExp.Close()
5358
return nil

exporter/jaegerexporter/jaeger.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,14 @@ func JaegerExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps
6161
je.Flush()
6262
return nil
6363
})
64+
65+
jte, err := exporterwrapper.NewExporterWrapper("jaeger", "ocservice.exporter.Jaeger.ConsumeTraceData", je)
66+
if err != nil {
67+
return nil, nil, nil, err
68+
}
6469
// TODO: Examine "contrib.go.opencensus.io/exporter/jaeger" to see
6570
// if trace.ExportSpan was constraining and if perhaps the Jaeger
6671
// upload can use the context and information from the Node.
67-
tps = append(tps, exporterwrapper.NewExporterWrapper("jaeger", je))
72+
tps = append(tps, jte)
6873
return
6974
}

exporter/kafkaexporter/kafka.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ func KafkaExportersFromViper(v *viper.Viper) (tps []consumer.TraceConsumer, mps
5353
return nil, nil, nil, fmt.Errorf("Cannot configure Kafka Trace exporter: %v", kerr)
5454
}
5555

56-
tps = append(tps, exporterwrapper.NewExporterWrapper("kafka", kde))
56+
kte, err := exporterwrapper.NewExporterWrapper("kafka", "ocservice.exporter.Kafka.ConsumeTraceData", kde)
57+
if err != nil {
58+
return nil, nil, nil, err
59+
}
60+
61+
tps = append(tps, kte)
5762
doneFns = append(doneFns, func() error {
5863
kde.Flush()
5964
return nil

exporter/opencensusexporter/opencensus.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,15 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons
118118
}
119119

120120
oce := &ocagentExporter{exporters: exporters}
121-
oexp, _ := exporterhelper.NewTraceExporter("oc_trace", oce.PushTraceData, exporterhelper.WithRecordMetrics(true))
121+
oexp, err := exporterhelper.NewTraceExporter(
122+
"oc_trace",
123+
oce.PushTraceData,
124+
exporterhelper.WithSpanName("ocservice.exporter.OpenCensus.ConsumeTraceData"),
125+
exporterhelper.WithRecordMetrics(true))
126+
127+
if err != nil {
128+
return nil, nil, nil, err
129+
}
122130

123131
tps = append(tps, oexp)
124132

exporter/stackdriverexporter/stackdriver.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,16 @@ func StackdriverTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCon
8888
exporter: sde,
8989
}
9090

91+
sdte, err := exporterwrapper.NewExporterWrapper("stackdriver_trace", "ocservice.exporter.Stackdriver.ConsumeTraceData", sde)
92+
if err != nil {
93+
return nil, nil, nil, err
94+
}
95+
9196
// TODO: Examine "contrib.go.opencensus.io/exporter/stackdriver" to see
9297
// if trace.ExportSpan was constraining and if perhaps the Stackdriver
9398
// upload can use the context and information from the Node.
9499
if sc.EnableTracing {
95-
tps = append(tps, exporterwrapper.NewExporterWrapper("stackdriver", sde))
100+
tps = append(tps, sdte)
96101
}
97102

98103
if sc.EnableMetrics {

0 commit comments

Comments
 (0)