Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit c54ee82

Browse files
author
Bogdan Drutu
authored
Update collector code to use [Trace|Metrics]Data. (#431)
1 parent bad985a commit c54ee82

34 files changed

Lines changed: 253 additions & 283 deletions

cmd/occollector/app/sender/jaeger_thrift_http_sender.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"github.com/apache/thrift/lib/go/thrift"
2626
"go.uber.org/zap"
2727

28-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
28+
"github.com/census-instrumentation/opencensus-service/data"
2929
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
3030
)
3131

@@ -80,15 +80,11 @@ func NewJaegerThriftHTTPSender(
8080
}
8181

8282
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
83-
func (s *JaegerThriftHTTPSender) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
83+
func (s *JaegerThriftHTTPSender) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
8484
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
85-
if batch == nil {
86-
return 0, fmt.Errorf("Jaeger sender received nil batch")
87-
}
88-
89-
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(batch)
85+
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
9086
if err != nil {
91-
return uint64(len(batch.Spans)), err
87+
return uint64(len(td.Spans)), err
9288
}
9389

9490
mSpans := tBatch.Spans

cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
reporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
2121

22-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
22+
"github.com/census-instrumentation/opencensus-service/data"
2323
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2424
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
2525
)
@@ -45,9 +45,9 @@ func NewJaegerThriftTChannelSender(
4545
}
4646

4747
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
48-
func (s *JaegerThriftTChannelSender) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
48+
func (s *JaegerThriftTChannelSender) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
4949
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
50-
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(batch)
50+
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
5151
if err != nil {
5252
return uint64(len(tBatch.Spans)), err
5353
}

internal/collector/processor/exporter_processor.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package processor
1717
import (
1818
"context"
1919

20-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2120
"github.com/census-instrumentation/opencensus-service/data"
2221
"github.com/census-instrumentation/opencensus-service/processor"
2322
)
@@ -33,8 +32,8 @@ func NewTraceExporterProcessor(traceExporters ...processor.TraceDataProcessor) S
3332
return &exporterSpanProcessor{tdp: processor.NewMultiTraceDataProcessor(traceExporters)}
3433
}
3534

36-
func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
37-
err := sp.tdp.ProcessTraceData(context.Background(), data.TraceData{Node: batch.Node, Resource: batch.Resource, Spans: batch.Spans})
35+
func (sp *exporterSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
36+
err := sp.tdp.ProcessTraceData(context.Background(), td)
3837
if err != nil {
3938
// TODO: determine if the number of dropped spans is needed because it was wrong anyway.
4039
return 0, err

internal/collector/processor/multi_processor.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,17 @@
1515
package processor
1616

1717
import (
18+
"github.com/census-instrumentation/opencensus-service/data"
1819
"github.com/census-instrumentation/opencensus-service/internal"
1920

2021
"github.com/spf13/cast"
2122

22-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2323
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
2424
)
2525

2626
// MultiProcessorOption represents options that can be applied to a MultiSpanProcessor.
2727
type MultiProcessorOption func(*multiSpanProcessor)
28-
type preProcessFn func(*agenttracepb.ExportTraceServiceRequest, string)
28+
type preProcessFn func(data.TraceData, string)
2929

3030
// MultiSpanProcessor enables processing on multiple processors.
3131
// For each incoming span batch, it calls ProcessSpans method on each span
@@ -59,11 +59,11 @@ func WithPreProcessFn(preProcFn preProcessFn) MultiProcessorOption {
5959
// in each ExportTraceServiceRequest.
6060
func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiProcessorOption {
6161
return WithPreProcessFn(
62-
func(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) {
62+
func(td data.TraceData, spanFormat string) {
6363
if len(attributes) == 0 {
6464
return
6565
}
66-
for _, span := range batch.Spans {
66+
for _, span := range td.Spans {
6767
if span == nil {
6868
// We will not create nil spans with just attributes on them
6969
continue
@@ -107,14 +107,14 @@ func WithAddAttributes(attributes map[string]interface{}, overwrite bool) MultiP
107107
}
108108

109109
// ProcessSpans implements the SpanProcessor interface
110-
func (msp *multiSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
110+
func (msp *multiSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
111111
for _, preProcessFn := range msp.preProcessFns {
112-
preProcessFn(batch, spanFormat)
112+
preProcessFn(td, spanFormat)
113113
}
114114
var maxFailures uint64
115115
var errors []error
116116
for _, sp := range msp.processors {
117-
failures, err := sp.ProcessSpans(batch, spanFormat)
117+
failures, err := sp.ProcessSpans(td, spanFormat)
118118
if err != nil {
119119
errors = append(errors, err)
120120
}

internal/collector/processor/multi_processor_test.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import (
1919
"sync/atomic"
2020
"testing"
2121

22-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2322
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
23+
"github.com/census-instrumentation/opencensus-service/data"
2424
)
2525

2626
func TestMultiSpanProcessorMultiplexing(t *testing.T) {
@@ -30,14 +30,14 @@ func TestMultiSpanProcessorMultiplexing(t *testing.T) {
3030
}
3131

3232
tt := NewMultiSpanProcessor(processors)
33-
batch := &agenttracepb.ExportTraceServiceRequest{
33+
td := data.TraceData{
3434
Spans: make([]*tracepb.Span, 7),
3535
}
3636

3737
var wantSpansCount = 0
3838
for i := 0; i < 2; i++ {
39-
wantSpansCount += len(batch.Spans)
40-
tt.ProcessSpans(batch, "test")
39+
wantSpansCount += len(td.Spans)
40+
tt.ProcessSpans(td, "test")
4141
}
4242

4343
for _, p := range processors {
@@ -65,14 +65,14 @@ func TestMultiSpanProcessorSomeNotOk(t *testing.T) {
6565
for i := range spans {
6666
spans[i] = &tracepb.Span{}
6767
}
68-
batch := &agenttracepb.ExportTraceServiceRequest{
68+
td := data.TraceData{
6969
Spans: spans,
7070
}
7171

7272
var wantSpansCount = 0
7373
for i := 0; i < 2; i++ {
74-
failures, _ := tt.ProcessSpans(batch, "test")
75-
batchSize := len(batch.Spans)
74+
failures, _ := tt.ProcessSpans(td, "test")
75+
batchSize := len(td.Spans)
7676
wantSpansCount += batchSize
7777
if wantFailures != failures {
7878
t.Errorf("Wanted %d failures but got %d", wantFailures, failures)
@@ -99,18 +99,18 @@ func TestMultiSpanProcessorWhenOneErrors(t *testing.T) {
9999
m.MustFail = true
100100

101101
tt := NewMultiSpanProcessor(processors)
102-
batch := &agenttracepb.ExportTraceServiceRequest{
102+
td := data.TraceData{
103103
Spans: make([]*tracepb.Span, 5),
104104
}
105105

106106
var wantSpansCount = 0
107107
for i := 0; i < 2; i++ {
108-
failures, err := tt.ProcessSpans(batch, "test")
108+
failures, err := tt.ProcessSpans(td, "test")
109109
if err == nil {
110110
t.Errorf("Wanted error got nil")
111111
return
112112
}
113-
batchSize := len(batch.Spans)
113+
batchSize := len(td.Spans)
114114
wantSpansCount += batchSize
115115
if failures != uint64(batchSize) {
116116
t.Errorf("Wanted all spans to fail, got a different value.")
@@ -133,12 +133,12 @@ func TestMultiSpanProcessorWithPreProcessFn(t *testing.T) {
133133
}
134134

135135
calledFnCount := int32(0)
136-
testPreProcessFn := func(*agenttracepb.ExportTraceServiceRequest, string) {
136+
testPreProcessFn := func(data.TraceData, string) {
137137
atomic.AddInt32(&calledFnCount, 1)
138138
}
139139

140140
tt := NewMultiSpanProcessor(processors, WithPreProcessFn(testPreProcessFn))
141-
batch := &agenttracepb.ExportTraceServiceRequest{
141+
batch := data.TraceData{
142142
Spans: make([]*tracepb.Span, 7),
143143
}
144144

@@ -187,9 +187,9 @@ func multiSpanProcessorWithAddAttributesTestHelper(t *testing.T, overwrite bool)
187187
}, overwrite),
188188
)
189189

190-
batch := &agenttracepb.ExportTraceServiceRequest{}
190+
td := data.TraceData{}
191191
for i := 0; i < 7; i++ {
192-
batch.Spans = append(batch.Spans, &tracepb.Span{
192+
td.Spans = append(td.Spans, &tracepb.Span{
193193
Attributes: &tracepb.Span_Attributes{
194194
AttributeMap: map[string]*tracepb.AttributeValue{
195195
"some_int": {
@@ -200,10 +200,10 @@ func multiSpanProcessorWithAddAttributesTestHelper(t *testing.T, overwrite bool)
200200
})
201201
}
202202

203-
spans := make([]*tracepb.Span, 0, len(batch.Spans)*2)
203+
spans := make([]*tracepb.Span, 0, len(td.Spans)*2)
204204
for i := 0; i < 2; i++ {
205-
tt.ProcessSpans(batch, "test")
206-
spans = append(spans, batch.Spans...)
205+
tt.ProcessSpans(td, "test")
206+
spans = append(spans, td.Spans...)
207207
}
208208

209209
expectedSomeIntValue := int64(4567)
@@ -240,8 +240,8 @@ type mockSpanProcessor struct {
240240

241241
var _ SpanProcessor = &mockSpanProcessor{}
242242

243-
func (p *mockSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
244-
batchSize := len(batch.Spans)
243+
func (p *mockSpanProcessor) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
244+
batchSize := len(td.Spans)
245245
p.TotalSpans += batchSize
246246
if p.MustFail {
247247
return uint64(batchSize), fmt.Errorf("this processor must fail")

internal/collector/processor/nodebatcher/node_batcher.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,13 @@ import (
2626
"unsafe"
2727

2828
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
29-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3029
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
3130
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3231
"github.com/golang/protobuf/proto"
3332
"go.opencensus.io/stats"
3433
"go.uber.org/zap"
3534

35+
"github.com/census-instrumentation/opencensus-service/data"
3636
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3737
)
3838

@@ -107,10 +107,10 @@ func NewBatcher(name string, logger *zap.Logger, sender processor.SpanProcessor,
107107

108108
// ProcessSpans implements batcher as a SpanProcessor and takes the provided spans and adds them to
109109
// batches
110-
func (b *batcher) ProcessSpans(request *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
111-
bucketID := b.genBucketID(request.Node, request.Resource, spanFormat)
112-
bucket := b.getOrAddBucket(bucketID, request.Node, request.Resource, spanFormat)
113-
bucket.add(request.Spans)
110+
func (b *batcher) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
111+
bucketID := b.genBucketID(td.Node, td.Resource, spanFormat)
112+
bucket := b.getOrAddBucket(bucketID, td.Node, td.Resource, spanFormat)
113+
bucket.add(td.Spans)
114114
return 0, nil
115115
}
116116

@@ -270,12 +270,12 @@ func (nb *nodeBatcher) sendBatch(batch *batch) {
270270
if len(spans) == 0 {
271271
return
272272
}
273-
request := &agenttracepb.ExportTraceServiceRequest{
273+
td := data.TraceData{
274274
Node: nb.node,
275275
Resource: nb.resource,
276276
Spans: spans,
277277
}
278-
_, err := nb.parent.sender.ProcessSpans(request, nb.spanFormat)
278+
_, err := nb.parent.sender.ProcessSpans(td, nb.spanFormat)
279279
// Assumed that the next processor always handles a batch, and doesn't error
280280
if err != nil {
281281
nb.logger.Error(

internal/collector/processor/nodebatcher/node_batcher_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"time"
2121

2222
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
23-
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2423
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
2524
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
25+
"github.com/census-instrumentation/opencensus-service/data"
2626
"go.uber.org/zap"
2727
)
2828

@@ -144,13 +144,13 @@ func TestConcurrentNodeAdds(t *testing.T) {
144144
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
145145
spans = append(spans, &tracepb.Span{Name: getTestSpanName(requestNum, spanIndex)})
146146
}
147-
request := &agenttracepb.ExportTraceServiceRequest{
147+
td := data.TraceData{
148148
Node: &commonpb.Node{
149149
ServiceInfo: &commonpb.ServiceInfo{Name: fmt.Sprintf("svc-%d", requestNum)},
150150
},
151151
Spans: spans,
152152
}
153-
go batcher.ProcessSpans(request, "oc")
153+
go batcher.ProcessSpans(td, "oc")
154154
}
155155

156156
err := sender.waitFor(requestCount*spansPerRequest, 3*time.Second)
@@ -190,7 +190,7 @@ func TestBucketRemove(t *testing.T) {
190190
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
191191
spans = append(spans, &tracepb.Span{Name: getTestSpanName(0, spanIndex)})
192192
}
193-
request := &agenttracepb.ExportTraceServiceRequest{
193+
request := data.TraceData{
194194
Node: &commonpb.Node{
195195
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
196196
},
@@ -225,7 +225,7 @@ func TestConcurrentBatchAdds(t *testing.T) {
225225
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
226226
spans = append(spans, &tracepb.Span{Name: getTestSpanName(requestNum, spanIndex)})
227227
}
228-
request := &agenttracepb.ExportTraceServiceRequest{
228+
request := data.TraceData{
229229
Node: &commonpb.Node{
230230
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
231231
},
@@ -254,7 +254,7 @@ func TestConcurrentBatchAdds(t *testing.T) {
254254
func BenchmarkConcurrentBatchAdds(b *testing.B) {
255255
sender := newTestSender()
256256
batcher := NewBatcher("test", zap.NewNop(), sender).(*batcher)
257-
request := &agenttracepb.ExportTraceServiceRequest{
257+
request := data.TraceData{
258258
Node: &commonpb.Node{
259259
ServiceInfo: &commonpb.ServiceInfo{Name: "svc"},
260260
},
@@ -281,21 +281,21 @@ func getTestSpanName(requestNum, index int) *tracepb.TruncatableString {
281281
}
282282

283283
type testSender struct {
284-
reqChan chan *agenttracepb.ExportTraceServiceRequest
284+
reqChan chan data.TraceData
285285
batchesReceived int
286286
spansReceived int
287287
spansReceivedByName map[string]*tracepb.Span
288288
}
289289

290290
func newTestSender() *testSender {
291291
return &testSender{
292-
reqChan: make(chan *agenttracepb.ExportTraceServiceRequest, 100),
292+
reqChan: make(chan data.TraceData, 100),
293293
spansReceivedByName: make(map[string]*tracepb.Span),
294294
}
295295
}
296296

297-
func (ts *testSender) ProcessSpans(request *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
298-
ts.reqChan <- request
297+
func (ts *testSender) ProcessSpans(td data.TraceData, spanFormat string) (uint64, error) {
298+
ts.reqChan <- td
299299
return 0, nil
300300
}
301301

0 commit comments

Comments
 (0)