Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 3e7d344

Browse files
author
Bogdan Drutu
authored
Add source format to TraceData and change ProcessSpan to accept context. (#489)
1 parent c88cf9b commit 3e7d344

File tree

16 files changed

+115
-79
lines changed

16 files changed

+115
-79
lines changed

cmd/occollector/app/sender/jaeger_thrift_http_sender.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package sender
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"fmt"
2021
"io"
2122
"io/ioutil"
@@ -26,6 +27,7 @@ import (
2627
"go.uber.org/zap"
2728

2829
"github.com/census-instrumentation/opencensus-service/data"
30+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2931
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
3032
)
3133

@@ -41,6 +43,8 @@ type JaegerThriftHTTPSender struct {
4143
logger *zap.Logger
4244
}
4345

46+
var _ processor.SpanProcessor = (*JaegerThriftHTTPSender)(nil)
47+
4448
// HTTPOption sets a parameter for the HttpCollector
4549
type HTTPOption func(s *JaegerThriftHTTPSender)
4650

@@ -80,7 +84,7 @@ func NewJaegerThriftHTTPSender(
8084
}
8185

8286
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
83-
func (s *JaegerThriftHTTPSender) ProcessSpans(td data.TraceData, spanFormat string) error {
87+
func (s *JaegerThriftHTTPSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
8488
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
8589
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
8690
if err != nil {

cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package sender
1616

1717
import (
18+
"context"
19+
1820
"go.uber.org/zap"
1921

2022
reporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
@@ -31,7 +33,7 @@ type JaegerThriftTChannelSender struct {
3133
reporter reporter.Reporter
3234
}
3335

34-
var _ processor.SpanProcessor = (*JaegerThriftHTTPSender)(nil)
36+
var _ processor.SpanProcessor = (*JaegerThriftTChannelSender)(nil)
3537

3638
// NewJaegerThriftTChannelSender creates new TChannel-based sender.
3739
func NewJaegerThriftTChannelSender(
@@ -45,7 +47,7 @@ func NewJaegerThriftTChannelSender(
4547
}
4648

4749
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
48-
func (s *JaegerThriftTChannelSender) ProcessSpans(td data.TraceData, spanFormat string) error {
50+
func (s *JaegerThriftTChannelSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
4951
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
5052
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
5153
if err != nil {

data/data.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ type MetricsData struct {
3030

3131
// TraceData is a struct that groups proto spans with a unique node and a resource.
3232
type TraceData struct {
33-
Node *commonpb.Node
34-
Resource *resourcepb.Resource
35-
Spans []*tracepb.Span
33+
Node *commonpb.Node
34+
Resource *resourcepb.Resource
35+
Spans []*tracepb.Span
36+
SourceFormat string
3637
}

internal/collector/processor/exporter_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func NewTraceExporterProcessor(traceExporters ...consumer.TraceConsumer) SpanPro
3434
return &exporterSpanProcessor{tp: multiconsumer.NewTraceProcessor(traceExporters)}
3535
}
3636

37-
func (sp *exporterSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
38-
err := sp.tp.ConsumeTraceData(context.Background(), td)
37+
func (sp *exporterSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
38+
err := sp.tp.ConsumeTraceData(ctx, td)
3939
if err != nil {
4040
return err
4141
}

internal/collector/processor/multi_processor.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package processor
1616

1717
import (
18+
"context"
19+
1820
"github.com/census-instrumentation/opencensus-service/data"
1921
"github.com/census-instrumentation/opencensus-service/internal"
2022

@@ -25,7 +27,7 @@ import (
2527

2628
// MultiProcessorOption represents options that can be applied to a MultiSpanProcessor.
2729
type MultiProcessorOption func(*multiSpanProcessor)
28-
type preProcessFn func(data.TraceData, string)
30+
type preProcessFn func(context.Context, data.TraceData)
2931

3032
// MultiSpanProcessor enables processing on multiple processors.
3133
// For each incoming span batch, it calls ProcessSpans method on each span
@@ -59,7 +61,7 @@ func WithPreProcessFn(preProcFn preProcessFn) MultiProcessorOption {
5961
// in each ExportTraceServiceRequest.
6062
func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiProcessorOption {
6163
return WithPreProcessFn(
62-
func(td data.TraceData, spanFormat string) {
64+
func(ctx context.Context, td data.TraceData) {
6365
if len(attributes) == 0 {
6466
return
6567
}
@@ -107,13 +109,13 @@ func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiP
107109
}
108110

109111
// ProcessSpans implements the SpanProcessor interface
110-
func (msp *multiSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
112+
func (msp *multiSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
111113
for _, preProcessFn := range msp.preProcessFns {
112-
preProcessFn(td, spanFormat)
114+
preProcessFn(ctx, td)
113115
}
114116
var errors []error
115117
for _, sp := range msp.processors {
116-
err := sp.ProcessSpans(td, spanFormat)
118+
err := sp.ProcessSpans(ctx, td)
117119
if err != nil {
118120
errors = append(errors, err)
119121
}

internal/collector/processor/multi_processor_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package processor
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"sync/atomic"
2021
"testing"
@@ -37,7 +38,7 @@ func TestMultiSpanProcessorMultiplexing(t *testing.T) {
3738
var wantSpansCount = 0
3839
for i := 0; i < 2; i++ {
3940
wantSpansCount += len(td.Spans)
40-
tt.ProcessSpans(td, "test")
41+
tt.ProcessSpans(context.Background(), td)
4142
}
4243

4344
for _, p := range processors {
@@ -66,7 +67,7 @@ func TestMultiSpanProcessorWhenOneErrors(t *testing.T) {
6667

6768
var wantSpansCount = 0
6869
for i := 0; i < 2; i++ {
69-
err := tt.ProcessSpans(td, "test")
70+
err := tt.ProcessSpans(context.Background(), td)
7071
if err == nil {
7172
t.Errorf("Wanted error got nil")
7273
return
@@ -91,7 +92,7 @@ func TestMultiSpanProcessorWithPreProcessFn(t *testing.T) {
9192
}
9293

9394
calledFnCount := int32(0)
94-
testPreProcessFn := func(data.TraceData, string) {
95+
testPreProcessFn := func(context.Context, data.TraceData) {
9596
atomic.AddInt32(&calledFnCount, 1)
9697
}
9798

@@ -104,7 +105,7 @@ func TestMultiSpanProcessorWithPreProcessFn(t *testing.T) {
104105
batchCount := 2
105106
for i := 0; i < batchCount; i++ {
106107
wantSpansCount += len(batch.Spans)
107-
tt.ProcessSpans(batch, "test")
108+
tt.ProcessSpans(context.Background(), batch)
108109
}
109110

110111
for _, p := range processors {
@@ -160,7 +161,7 @@ func multiSpanProcessorWithAddAttributesTestHelper(t *testing.T, overwrite bool)
160161

161162
spans := make([]*tracepb.Span, 0, len(td.Spans)*2)
162163
for i := 0; i < 2; i++ {
163-
tt.ProcessSpans(td, "test")
164+
tt.ProcessSpans(context.Background(), td)
164165
spans = append(spans, td.Spans...)
165166
}
166167

@@ -197,7 +198,7 @@ type mockSpanProcessor struct {
197198

198199
var _ SpanProcessor = &mockSpanProcessor{}
199200

200-
func (p *mockSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) error {
201+
func (p *mockSpanProcessor) ProcessSpans(ctx context.Context, td data.TraceData) error {
201202
batchSize := len(td.Spans)
202203
p.TotalSpans += batchSize
203204
if p.MustFail {

internal/collector/processor/nodebatcher/node_batcher.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"sync"
2323
"time"
2424

25+
"github.com/census-instrumentation/opencensus-service/observability"
26+
2527
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
2628
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
2729
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
@@ -102,9 +104,9 @@ func NewBatcher(name string, logger *zap.Logger, sender processor.SpanProcessor,
102104

103105
// ProcessSpans implements batcher as a SpanProcessor and takes the provided spans and adds them to
104106
// batches
105-
func (b *batcher) ProcessSpans(td data.TraceData, spanFormat string) error {
106-
bucketID := b.genBucketID(td.Node, td.Resource, spanFormat)
107-
bucket := b.getOrAddBucket(bucketID, td.Node, td.Resource, spanFormat)
107+
func (b *batcher) ProcessSpans(ctx context.Context, td data.TraceData) error {
108+
bucketID := b.genBucketID(td.Node, td.Resource, td.SourceFormat)
109+
bucket := b.getOrAddBucket(bucketID, td.Node, td.Resource, td.SourceFormat)
108110
bucket.add(td.Spans)
109111
return nil
110112
}
@@ -219,18 +221,19 @@ func (nb *nodeBatch) sendItems(
219221
tdItems = append(tdItems, items...)
220222
}
221223
td := data.TraceData{
222-
Node: nb.node,
223-
Resource: nb.resource,
224-
Spans: tdItems,
224+
Node: nb.node,
225+
Resource: nb.resource,
226+
Spans: tdItems,
227+
SourceFormat: nb.format,
225228
}
226-
227229
statsTags := processor.StatsTagsForBatch(
228230
nb.parent.name, processor.ServiceNameForNode(nb.node), nb.format,
229231
)
230232
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
231233

232234
// TODO: This process should be done in an async way, perhaps with a channel + goroutine worker(s)
233-
_ = nb.parent.sender.ProcessSpans(td, nb.format)
235+
ctx := observability.ContextWithReceiverName(context.Background(), nb.format)
236+
_ = nb.parent.sender.ProcessSpans(ctx, td)
234237
}
235238

236239
func (nb *nodeBatch) getAndReset() ([][]*tracepb.Span, uint32) {

internal/collector/processor/nodebatcher/node_batcher_test.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package nodebatcher
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"testing"
2021
"time"
@@ -149,9 +150,10 @@ func TestConcurrentNodeAdds(t *testing.T) {
149150
Node: &commonpb.Node{
150151
ServiceInfo: &commonpb.ServiceInfo{Name: fmt.Sprintf("svc-%d", requestNum)},
151152
},
152-
Spans: spans,
153+
Spans: spans,
154+
SourceFormat: "oc_trace",
153155
}
154-
batcher.ProcessSpans(td, "oc")
156+
go batcher.ProcessSpans(context.Background(), td)
155157
}
156158

157159
err := <-waitForCn
@@ -196,23 +198,24 @@ func TestBucketRemove(t *testing.T) {
196198
Node: &commonpb.Node{
197199
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
198200
},
199-
Spans: spans,
201+
Spans: spans,
202+
SourceFormat: "oc_trace",
200203
}
201-
batcher.ProcessSpans(request, "oc")
204+
batcher.ProcessSpans(context.Background(), request)
202205

203206
err := <-waitForCn
204207
if err != nil {
205208
t.Errorf("failed to wait for sender %s", err)
206209
}
207210

208-
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) == nil {
211+
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) == nil {
209212
t.Errorf("Bucket should exist but does not.")
210213
}
211214

212215
// Doesn't seem to be a great way to test this without waiting
213216
<-time.After(2 * time.Duration(removeAfterTicks) * tickTime)
214217

215-
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) != nil {
218+
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) != nil {
216219
t.Errorf("Bucket should be deleted but is not.")
217220
}
218221
}
@@ -246,16 +249,17 @@ func TestBucketTickerStop(t *testing.T) {
246249
Node: &commonpb.Node{
247250
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
248251
},
249-
Spans: spans,
252+
Spans: spans,
253+
SourceFormat: "oc_trace",
250254
}
251-
batcher.ProcessSpans(request, "oc")
255+
batcher.ProcessSpans(context.Background(), request)
252256

253257
err := <-waitForCn
254258
if err == nil {
255259
t.Errorf("Unexpectedly received spans")
256260
}
257261

258-
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc")) == nil {
262+
if batcher.getBucket(batcher.genBucketID(request.Node, nil, "oc_trace")) == nil {
259263
t.Errorf("Bucket should not be deleted but is.")
260264
}
261265
}
@@ -275,9 +279,10 @@ func TestConcurrentBatchAdds(t *testing.T) {
275279
Node: &commonpb.Node{
276280
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
277281
},
278-
Spans: spans,
282+
Spans: spans,
283+
SourceFormat: "oc_trace",
279284
}
280-
batcher.ProcessSpans(request, "oc")
285+
go batcher.ProcessSpans(context.Background(), request)
281286
}
282287

283288
err := <-waitForCn
@@ -310,14 +315,15 @@ func BenchmarkConcurrentBatchAdds(b *testing.B) {
310315
Node: &commonpb.Node{
311316
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
312317
},
313-
Spans: spans,
318+
Spans: spans,
319+
SourceFormat: "oc_trace",
314320
}
315321
requests = append(requests, request)
316322

317323
b.Run("v1", func(b *testing.B) {
318324
for i := 0; i < b.N; i++ {
319325
for _, td := range requests {
320-
_ = batcher.ProcessSpans(td, "oc")
326+
_ = batcher.ProcessSpans(context.Background(), td)
321327
}
322328
}
323329
})
@@ -335,7 +341,7 @@ func newNopSender() *nopSender {
335341
return &nopSender{}
336342
}
337343

338-
func (ts *nopSender) ProcessSpans(td data.TraceData, spanFormat string) error {
344+
func (ts *nopSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
339345
return nil
340346
}
341347

@@ -353,7 +359,7 @@ func newTestSender() *testSender {
353359
}
354360
}
355361

356-
func (ts *testSender) ProcessSpans(td data.TraceData, spanFormat string) error {
362+
func (ts *testSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
357363
ts.reqChan <- td
358364
return nil
359365
}

internal/collector/processor/processor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package processor
1616

1717
import (
18+
"context"
19+
1820
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
1921
"go.opencensus.io/stats"
2022
"go.opencensus.io/stats/view"
@@ -27,7 +29,7 @@ import (
2729
// SpanProcessor handles batches of spans converted to OpenCensus proto format.
2830
type SpanProcessor interface {
2931
// ProcessSpans processes spans and return with the number of spans that failed and an error.
30-
ProcessSpans(td data.TraceData, spanFormat string) error
32+
ProcessSpans(ctx context.Context, td data.TraceData) error
3133
// TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that.
3234
}
3335

internal/collector/processor/processor_to_sink.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,7 @@ func WrapWithSpanSink(format string, p SpanProcessor) consumer.TraceConsumer {
3737
}
3838

3939
func (ps *protoProcessorSink) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
40-
return ps.protoProcessor.ProcessSpans(td, ps.sourceFormat)
40+
// For the moment ensure that source format is set here before we change receivers to set this.
41+
td.SourceFormat = ps.sourceFormat
42+
return ps.protoProcessor.ProcessSpans(ctx, td)
4143
}

0 commit comments

Comments
 (0)