Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 05bcd95

Browse files
author
Bogdan Drutu
authored
For receivers and exporters record number of received and dropped spans. (#442)
* For receivers and exporters record number of received and dropped spans. * Shorter tag names, unify tags between metrics/trace. * Record when errors, add TODOs for better handling errors.
1 parent a51ae01 commit 05bcd95

8 files changed

Lines changed: 119 additions & 90 deletions

File tree

exporter/opencensusexporter/opencensus.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tdps []processor.TraceDa
105105
return
106106
}
107107

108+
const exporterTagValue = "oc_trace"
109+
108110
func (oce *ocagentExporter) ProcessTraceData(ctx context.Context, td data.TraceData) error {
109111
// Get an exporter worker round-robin
110112
exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))]
@@ -115,10 +117,14 @@ func (oce *ocagentExporter) ProcessTraceData(ctx context.Context, td data.TraceD
115117
Node: td.Node,
116118
},
117119
)
120+
ctxWithExporterName := internal.ContextWithExporterName(ctx, exporterTagValue)
118121
if err != nil {
122+
// TODO: If failed to send all maybe record a different metric. Failed to "Sent", but
123+
// this may not be accurate if we have retry outside of this exporter. Maybe the retry
124+
// processor should record these metrics. For the moment we assume no retry.
125+
internal.RecordTraceExporterMetrics(ctxWithExporterName, len(td.Spans), len(td.Spans))
119126
return err
120127
}
121-
nSpansCounter := internal.NewExportedSpansRecorder("ocagent")
122-
nSpansCounter(ctx, td.Node, td.Spans)
128+
internal.RecordTraceExporterMetrics(ctxWithExporterName, len(td.Spans), 0)
123129
return nil
124130
}

exporter/zipkinexporter/zipkin.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"go.opencensus.io/trace"
3131

3232
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
33-
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3433
"github.com/census-instrumentation/opencensus-service/data"
3534
"github.com/census-instrumentation/opencensus-service/internal"
3635
"github.com/census-instrumentation/opencensus-service/processor"
@@ -204,7 +203,7 @@ func (ze *zipkinExporter) ProcessTraceData(ctx context.Context, td data.TraceDat
204203
span.End()
205204
}()
206205

207-
goodSpans := make([]*tracepb.Span, 0, len(td.Spans))
206+
goodSpans := 0
208207
for _, span := range td.Spans {
209208
sd, err := spandatatranslator.ProtoSpanToOCSpanData(span)
210209
if err != nil {
@@ -217,13 +216,12 @@ func (ze *zipkinExporter) ProcessTraceData(ctx context.Context, td data.TraceDat
217216
ze.mu.Lock()
218217
ze.reporter.Send(zs)
219218
ze.mu.Unlock()
220-
goodSpans = append(goodSpans, span)
219+
goodSpans++
221220
}
222221
}
223222

224223
// And finally record metrics on the number of exported spans.
225-
nSpansCounter := internal.NewExportedSpansRecorder("zipkin")
226-
nSpansCounter(ctx, td.Node, goodSpans)
224+
internal.RecordTraceExporterMetrics(internal.ContextWithExporterName(ctx, "zipkin"), len(td.Spans), len(td.Spans)-goodSpans)
227225

228226
return nil
229227
}

internal/observability.go

Lines changed: 61 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -28,80 +28,88 @@ import (
2828
"go.opencensus.io/stats/view"
2929
"go.opencensus.io/tag"
3030
"go.opencensus.io/trace"
31-
32-
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
33-
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3431
)
3532

3633
var (
37-
tagKeyReceiverName, _ = tag.NewKey("opencensus_receiver")
38-
tagKeyExporterName, _ = tag.NewKey("opencensus_exporter")
39-
)
40-
41-
var mReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1")
34+
tagKeyReceiver, _ = tag.NewKey("oc_receiver")
35+
mReceiverReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1")
36+
mReceiverDroppedSpans = stats.Int64("oc.io/receiver/dropped_spans", "Counts the number of spans dropped by the receiver", "1")
4237

43-
var itemsDistribution = view.Distribution(
44-
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 25, 30, 35, 40, 45, 50, 60, 70, 80, 90,
45-
100, 150, 200, 250, 300, 450, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000,
38+
tagKeyExporter, _ = tag.NewKey("oc_exporter")
39+
mExporterReceivedSpans = stats.Int64("oc.io/exporter/received_spans", "Counts the number of spans received by the exporter", "1")
40+
mExporterDroppedSpans = stats.Int64("oc.io/exporter/dropped_spans", "Counts the number of spans received by the exporter", "1")
4641
)
4742

48-
// ViewReceivedSpansReceiver defines the view for the received spans metric.
49-
var ViewReceivedSpansReceiver = &view.View{
50-
Name: "oc.io/receiver/received_spans",
51-
Description: "The number of spans received by the receiver",
52-
Measure: mReceivedSpans,
53-
Aggregation: itemsDistribution,
54-
TagKeys: []tag.Key{tagKeyReceiverName},
43+
// ViewReceiverReceivedSpans defines the view for the receiver received spans metric.
44+
var ViewReceiverReceivedSpans = &view.View{
45+
Name: mReceiverReceivedSpans.Name(),
46+
Description: mReceiverReceivedSpans.Description(),
47+
Measure: mReceiverReceivedSpans,
48+
Aggregation: view.Sum(),
49+
TagKeys: []tag.Key{tagKeyReceiver},
5550
}
5651

57-
var mExportedSpans = stats.Int64("oc.io/receiver/exported_spans", "Counts the number of exported spans", "1")
52+
// ViewReceiverDroppedSpans defines the view for the receiver dropped spans metric.
53+
var ViewReceiverDroppedSpans = &view.View{
54+
Name: mReceiverDroppedSpans.Name(),
55+
Description: mReceiverDroppedSpans.Description(),
56+
Measure: mReceiverDroppedSpans,
57+
Aggregation: view.Sum(),
58+
TagKeys: []tag.Key{tagKeyReceiver},
59+
}
60+
61+
// ViewExporterReceivedSpans defines the view for the exporter received spans metric.
62+
var ViewExporterReceivedSpans = &view.View{
63+
Name: mExporterReceivedSpans.Name(),
64+
Description: mExporterReceivedSpans.Description(),
65+
Measure: mExporterReceivedSpans,
66+
Aggregation: view.Sum(),
67+
TagKeys: []tag.Key{tagKeyReceiver, tagKeyExporter},
68+
}
5869

59-
// ViewExportedSpans defines the view for exported spans metric.
60-
var ViewExportedSpans = &view.View{
61-
Name: "oc.io/receiver/exported_spans",
62-
Description: "Tracks the number of exported spans",
63-
Measure: mExportedSpans,
64-
Aggregation: itemsDistribution,
65-
TagKeys: []tag.Key{tagKeyExporterName},
70+
// ViewExporterDroppedSpans defines the view for the exporter dropped spans metric.
71+
var ViewExporterDroppedSpans = &view.View{
72+
Name: mExporterDroppedSpans.Name(),
73+
Description: mExporterDroppedSpans.Description(),
74+
Measure: mExporterDroppedSpans,
75+
Aggregation: view.Sum(),
76+
TagKeys: []tag.Key{tagKeyReceiver, tagKeyExporter},
6677
}
6778

6879
// AllViews has the views for the metrics provided by the agent.
6980
var AllViews = []*view.View{
70-
ViewReceivedSpansReceiver,
71-
ViewExportedSpans,
81+
ViewReceiverReceivedSpans,
82+
ViewReceiverDroppedSpans,
83+
ViewExporterReceivedSpans,
84+
ViewExporterDroppedSpans,
7285
}
7386

74-
// ContextWithReceiverName adds the tag "opencensus_receiver" and the name of the
75-
// receiver as the value, and returns the newly created context.
87+
// ContextWithReceiverName adds the tag "oc_receiver" and the name of the receiver as the value,
88+
// and returns the newly created context. For receivers that can receive multiple signals it is
89+
// recommended to encode the signal as suffix (e.g. "oc_trace" and "oc_metrics").
7690
func ContextWithReceiverName(ctx context.Context, receiverName string) context.Context {
77-
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyReceiverName, receiverName))
91+
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyReceiver, receiverName))
7892
return ctx
7993
}
8094

81-
// NewReceivedSpansRecorderStreaming creates a function that uses a context created
82-
// from the name of the receiver to record the number of the spans received
83-
// by the receiver.
84-
func NewReceivedSpansRecorderStreaming(lifetimeCtx context.Context, receiverName string) func(*commonpb.Node, []*tracepb.Span) {
85-
// We create and reuse this context because for streaming RPCs e.g. with gRPC
86-
// the context doesn't change, so it is more useful for avoid expensively adding
87-
// keys on each invocation. We can create the context once and then reuse it
88-
// when recording measurements.
89-
ctx := ContextWithReceiverName(lifetimeCtx, receiverName)
90-
91-
return func(ni *commonpb.Node, spans []*tracepb.Span) {
92-
// TODO: (@odeke-em) perhaps also record information from the node?
93-
stats.Record(ctx, mReceivedSpans.M(int64(len(spans))))
94-
}
95+
// RecordTraceReceiverMetrics records the number of the spans received and dropped by the receiver.
96+
// Use it with a context.Context generated using ContextWithReceiverName().
97+
func RecordTraceReceiverMetrics(ctxWithTraceReceiverName context.Context, receivedSpans int, droppedSpans int) {
98+
stats.Record(ctxWithTraceReceiverName, mReceiverReceivedSpans.M(int64(receivedSpans)), mReceiverDroppedSpans.M(int64(droppedSpans)))
9599
}
96100

97-
// NewExportedSpansRecorder creates a helper function that'll add the name of the
98-
// creating exporter as a tag value in the context that will be used to count the
99-
// the number of spans exported.
100-
func NewExportedSpansRecorder(exporterName string) func(context.Context, *commonpb.Node, []*tracepb.Span) {
101-
return func(ctx context.Context, ni *commonpb.Node, spans []*tracepb.Span) {
102-
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyExporterName, exporterName))
103-
stats.Record(ctx, mExportedSpans.M(int64(len(spans))))
104-
}
101+
// ContextWithExporterName adds the tag "oc_exporter" and the name of the exporter as the value,
102+
// and returns the newly created context. For exporters that can export multiple signals it is
103+
// recommended to encode the signal as suffix (e.g. "oc_trace" and "oc_metrics").
104+
func ContextWithExporterName(ctx context.Context, exporterName string) context.Context {
105+
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyExporter, exporterName))
106+
return ctx
107+
}
108+
109+
// RecordTraceExporterMetrics records the number of the spans received and dropped by the exporter.
110+
// Use it with a context.Context generated using ContextWithExporterName().
111+
func RecordTraceExporterMetrics(ctx context.Context, receivedSpans int, droppedSpans int) {
112+
stats.Record(ctx, mExporterReceivedSpans.M(int64(receivedSpans)), mExporterDroppedSpans.M(int64(droppedSpans)))
105113
}
106114

107115
// GRPCServerWithObservabilityEnabled creates a gRPC server that at a bare minimum has

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/uber/tchannel-go/thrift"
3838
"go.uber.org/zap"
3939

40-
"github.com/census-instrumentation/opencensus-service/data"
4140
"github.com/census-instrumentation/opencensus-service/internal"
4241
"github.com/census-instrumentation/opencensus-service/processor"
4342
"github.com/census-instrumentation/opencensus-service/receiver"
@@ -73,6 +72,8 @@ type jReceiver struct {
7372

7473
tchannel *tchannel.Channel
7574
collectorServer *http.Server
75+
76+
defaultAgentCtx context.Context
7677
}
7778

7879
const (
@@ -95,7 +96,10 @@ const (
9596

9697
// New creates a TraceReceiver that receives traffic as a collector with both Thrift and HTTP transports.
9798
func New(ctx context.Context, config *Configuration) (receiver.TraceReceiver, error) {
98-
return &jReceiver{config: config}, nil
99+
return &jReceiver{
100+
config: config,
101+
defaultAgentCtx: internal.ContextWithReceiverName(context.Background(), "jaeger-agent"),
102+
}, nil
99103
}
100104

101105
var _ receiver.TraceReceiver = (*jReceiver)(nil)
@@ -232,9 +236,11 @@ func (jr *jReceiver) stopTraceReceptionLocked(ctx context.Context) error {
232236
return err
233237
}
234238

239+
const collectorReceiverTagValue = "jaeger-collector"
240+
235241
func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
236242
jbsr := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
237-
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "jaeger-collector")
243+
ctxWithReceiverName := internal.ContextWithReceiverName(ctx, collectorReceiverTagValue)
238244

239245
for _, batch := range batches {
240246
td, err := jaegertranslator.ThriftBatchToOCProto(batch)
@@ -245,7 +251,7 @@ func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch)
245251
ok = true
246252
jr.nextProcessor.ProcessTraceData(ctx, td)
247253
// We MUST unconditionally record metrics from this reception.
248-
spansMetricsFn(td.Node, td.Spans)
254+
internal.RecordTraceReceiverMetrics(ctxWithReceiverName, len(batch.Spans), len(batch.Spans)-len(td.Spans))
249255
}
250256

251257
jbsr = append(jbsr, &jaeger.BatchSubmitResponse{
@@ -267,17 +273,14 @@ func (jr *jReceiver) EmitZipkinBatch(spans []*zipkincore.Span) error {
267273
// EmitBatch implements cmd/agent/reporter.Reporter and it forwards
268274
// Jaeger spans received by the Jaeger agent processor.
269275
func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error {
270-
octrace, err := jaegertranslator.ThriftBatchToOCProto(batch)
276+
td, err := jaegertranslator.ThriftBatchToOCProto(batch)
271277
if err != nil {
272-
// TODO: (@odeke-em) add this error for Jaeger observability metrics
278+
internal.RecordTraceReceiverMetrics(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans))
273279
return err
274280
}
275281

276-
ctx := context.Background()
277-
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "jaeger-agent")
278-
err = jr.nextProcessor.ProcessTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
279-
// We MUST unconditionally record metrics from this reception.
280-
spansMetricsFn(octrace.Node, octrace.Spans)
282+
err = jr.nextProcessor.ProcessTraceData(jr.defaultAgentCtx, td)
283+
internal.RecordTraceReceiverMetrics(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans)-len(td.Spans))
281284

282285
return err
283286
}

receiver/opencensusreceiver/ocmetrics/opencensus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ var _ agentmetricspb.MetricsServiceServer = (*Receiver)(nil)
5555

5656
var errMetricsExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
5757

58-
const receiverName = "opencensus_metrics"
58+
const receiverTagValue = "oc_metrics"
5959

6060
// Export is the gRPC method that receives streamed metrics from
6161
// OpenCensus-metricproto compatible libraries/applications.
6262
func (ocr *Receiver) Export(mes agentmetricspb.MetricsService_ExportServer) error {
6363
// The bundler will receive batches of metrics i.e. []*metricspb.Metric
6464
// We need to ensure that it propagates the receiver name as a tag
65-
ctxWithReceiverName := internal.ContextWithReceiverName(mes.Context(), receiverName)
65+
ctxWithReceiverName := internal.ContextWithReceiverName(mes.Context(), receiverTagValue)
6666
metricsBundler := bundler.NewBundler((*data.MetricsData)(nil), func(payload interface{}) {
6767
ocr.batchMetricExporting(ctxWithReceiverName, payload)
6868
})

receiver/opencensusreceiver/octrace/opencensus.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,13 @@ func (ocr *Receiver) Config(tcs agenttracepb.TraceService_ConfigServer) error {
8888

8989
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
9090

91-
const receiverName = "opencensus_trace"
91+
const receiverTagValue = "oc_trace"
9292

9393
// Export is the gRPC method that receives streamed traces from
9494
// OpenCensus-traceproto compatible libraries/applications.
9595
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
9696
// We need to ensure that it propagates the receiver name as a tag
97-
ctxWithReceiverName := internal.ContextWithReceiverName(tes.Context(), receiverName)
98-
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), receiverName)
97+
ctxWithReceiverName := internal.ContextWithReceiverName(tes.Context(), receiverTagValue)
9998

10099
// The first message MUST have a non-nil Node.
101100
recv, err := tes.Recv()
@@ -131,7 +130,7 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
131130

132131
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}
133132

134-
spansMetricsFn(td.Node, td.Spans)
133+
internal.RecordTraceReceiverMetrics(ctxWithReceiverName, len(td.Spans), 0)
135134

136135
recv, err = tes.Recv()
137136
if err != nil {

receiver/zipkinreceiver/scribe/scribe_receiver.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func NewReceiver(addr string, port uint16, category string) (receiver.TraceRecei
6060
category: category,
6161
msgDecoder: base64.StdEncoding.WithPadding('='),
6262
tBinProtocolFactory: thrift.NewTBinaryProtocolFactory(true, false),
63+
defaultCtx: internal.ContextWithReceiverName(context.Background(), "zipkin-scribe"),
6364
},
6465
}
6566
return r, nil
@@ -128,6 +129,7 @@ type scribeCollector struct {
128129
msgDecoder *base64.Encoding
129130
tBinProtocolFactory *thrift.TBinaryProtocolFactory
130131
nextProcessor processor.TraceDataProcessor
132+
defaultCtx context.Context
131133
}
132134

133135
var _ scribe.Scribe = (*scribeCollector)(nil)
@@ -138,18 +140,21 @@ func (sc *scribeCollector) Log(messages []*scribe.LogEntry) (r scribe.ResultCode
138140
for _, logEntry := range messages {
139141
if sc.category != logEntry.Category {
140142
// Not the specified category, do nothing
143+
// TODO: Is this an error? Should we count this as dropped Span?
141144
continue
142145
}
143146

144147
b, err := sc.msgDecoder.DecodeString(logEntry.Message)
145148
if err != nil {
149+
// TODO: Should we continue to read? What error should we record here?
146150
return scribe.ResultCode_OK, err
147151
}
148152

149153
r := bytes.NewReader(b)
150154
st := thrift.NewStreamTransportR(r)
151155
zs := &zipkincore.Span{}
152156
if err := zs.Read(sc.tBinProtocolFactory.GetProtocol(st)); err != nil {
157+
// TODO: Should we continue to read? What error should we record here?
153158
return scribe.ResultCode_OK, err
154159
}
155160

@@ -162,16 +167,18 @@ func (sc *scribeCollector) Log(messages []*scribe.LogEntry) (r scribe.ResultCode
162167

163168
tds, err := zipkintranslator.V1ThriftBatchToOCProto(zSpans)
164169
if err != nil {
170+
// If failed to convert, record all the received spans as dropped.
171+
internal.RecordTraceReceiverMetrics(sc.defaultCtx, len(zSpans), len(zSpans))
165172
return scribe.ResultCode_OK, err
166173
}
167174

168-
ctx := context.Background()
169-
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "zipkin-scribe")
170-
175+
tdsSize := 0
171176
for _, td := range tds {
172-
sc.nextProcessor.ProcessTraceData(ctx, td)
173-
spansMetricsFn(td.Node, td.Spans)
177+
sc.nextProcessor.ProcessTraceData(sc.defaultCtx, td)
178+
tdsSize += len(td.Spans)
174179
}
175180

181+
internal.RecordTraceReceiverMetrics(sc.defaultCtx, len(zSpans), len(zSpans)-tdsSize)
182+
176183
return scribe.ResultCode_OK, nil
177184
}

0 commit comments

Comments
 (0)