Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit f4c5591

Browse files
author
Bogdan Drutu
authored
Remove processor to sink. (#497)
1 parent b597ff0 commit f4c5591

File tree

11 files changed

+14
-59
lines changed

11 files changed

+14
-59
lines changed

internal/collector/jaeger/receiver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030

3131
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
3232
"github.com/census-instrumentation/opencensus-service/consumer"
33-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3433
"github.com/census-instrumentation/opencensus-service/receiver"
3534
"github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver"
3635
)
@@ -51,8 +50,7 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
5150
return nil, err
5251
}
5352

54-
ss := processor.WithSourceName("jaeger", traceConsumer)
55-
if err := jtr.StartTraceReception(ctx, ss); err != nil {
53+
if err := jtr.StartTraceReception(ctx, traceConsumer); err != nil {
5654
return nil, err
5755
}
5856

internal/collector/opencensus/receiver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
2828
"github.com/census-instrumentation/opencensus-service/consumer"
29-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3029
"github.com/census-instrumentation/opencensus-service/receiver"
3130
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
3231
)
@@ -48,8 +47,8 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
4847
if err != nil {
4948
return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err)
5049
}
51-
ss := processor.WithSourceName("oc_trace", traceConsumer)
52-
if err := ocr.StartTraceReception(context.Background(), ss); err != nil {
50+
51+
if err := ocr.StartTraceReception(context.Background(), traceConsumer); err != nil {
5352
return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err)
5453
}
5554

internal/collector/processor/processor_to_sink.go

Lines changed: 0 additions & 43 deletions
This file was deleted.

internal/collector/zipkin/receiver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626

2727
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
2828
"github.com/census-instrumentation/opencensus-service/consumer"
29-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3029
"github.com/census-instrumentation/opencensus-service/receiver"
3130
"github.com/census-instrumentation/opencensus-service/receiver/zipkinreceiver"
3231
)
@@ -43,9 +42,8 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
4342
if err != nil {
4443
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
4544
}
46-
ss := processor.WithSourceName("zipkin", traceConsumer)
4745

48-
if err := zi.StartTraceReception(context.Background(), ss); err != nil {
46+
if err := zi.StartTraceReception(context.Background(), traceConsumer); err != nil {
4947
return nil, fmt.Errorf("Cannot start Zipkin receiver to address %q: %v", addr, err)
5048
}
5149

internal/collector/zipkin/scribe/receiver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
2727
"github.com/census-instrumentation/opencensus-service/consumer"
28-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2928
"github.com/census-instrumentation/opencensus-service/receiver"
3029
"github.com/census-instrumentation/opencensus-service/receiver/zipkinreceiver/scribe"
3130
)
@@ -41,9 +40,8 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
4140
if err != nil {
4241
return nil, fmt.Errorf("Failed to create the Zipkin Scribe receiver: %v", err)
4342
}
44-
ss := processor.WithSourceName("zipkin-scribe", traceConsumer)
4543

46-
if err := sr.StartTraceReception(context.Background(), ss); err != nil {
44+
if err := sr.StartTraceReception(context.Background(), traceConsumer); err != nil {
4745
return nil, fmt.Errorf("Cannot start Zipkin Scribe receiver %+v: %v", rOpts, err)
4846
}
4947

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch)
249249

250250
if err == nil {
251251
ok = true
252+
td.SourceFormat = "jaeger"
252253
jr.nextConsumer.ConsumeTraceData(ctx, td)
253254
// We MUST unconditionally record metrics from this reception.
254255
observability.RecordTraceReceiverMetrics(ctxWithReceiverName, len(batch.Spans), len(batch.Spans)-len(td.Spans))

receiver/jaegerreceiver/trace_receiver_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ func TestReception(t *testing.T) {
140140
"int64": "10000000",
141141
},
142142
},
143-
144143
Spans: []*tracepb.Span{
145144
{
146145
TraceId: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x80},
@@ -209,6 +208,7 @@ func TestReception(t *testing.T) {
209208
},
210209
},
211210
},
211+
SourceFormat: "jaeger",
212212
},
213213
}
214214

receiver/opencensusreceiver/octrace/opencensus.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,10 @@ func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
123123
}
124124

125125
td := &data.TraceData{
126-
Node: lastNonNilNode,
127-
Resource: resource,
128-
Spans: recv.Spans,
126+
Node: lastNonNilNode,
127+
Resource: resource,
128+
Spans: recv.Spans,
129+
SourceFormat: "oc_trace",
129130
}
130131

131132
ocr.messageChan <- &traceDataWithCtx{data: td, ctx: ctxWithReceiverName}

receiver/zipkinreceiver/scribe/scribe_receiver.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ func (sc *scribeCollector) Log(messages []*scribe.LogEntry) (r scribe.ResultCode
174174

175175
tdsSize := 0
176176
for _, td := range tds {
177+
td.SourceFormat = "zipkin-scribe"
177178
sc.nextConsumer.ConsumeTraceData(sc.defaultCtx, td)
178179
tdsSize += len(td.Spans)
179180
}

receiver/zipkinreceiver/scribe/scribe_receiver_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,5 @@ var wantTraceData = data.TraceData{
264264
},
265265
},
266266
},
267+
SourceFormat: "zipkin-scribe",
267268
}

0 commit comments

Comments
 (0)