Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 953218b

Browse files
author
Bogdan Drutu
authored
Remove SpanProcessor and replace it with TraceConsumer. (#491)
1 parent 3e7d344 commit 953218b

File tree

20 files changed

+127
-176
lines changed

20 files changed

+127
-176
lines changed

cmd/occollector/app/collector/collector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"go.uber.org/zap"
3030

3131
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
32-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
32+
"github.com/census-instrumentation/opencensus-service/consumer"
3333
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
3434
"github.com/census-instrumentation/opencensus-service/internal/pprofserver"
3535
"github.com/census-instrumentation/opencensus-service/receiver"
@@ -45,7 +45,7 @@ type Application struct {
4545
v *viper.Viper
4646
logger *zap.Logger
4747
healthCheck *healthcheck.HealthCheck
48-
processor processor.SpanProcessor
48+
processor consumer.TraceConsumer
4949
receivers []receiver.TraceReceiver
5050
}
5151

cmd/occollector/app/collector/processors.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/tailsampling"
3535
"github.com/census-instrumentation/opencensus-service/internal/collector/sampling"
3636
"github.com/census-instrumentation/opencensus-service/internal/config"
37+
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
3738
)
3839

3940
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []consumer.TraceConsumer, []consumer.MetricsConsumer) {
@@ -59,11 +60,11 @@ func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []consumer.T
5960

6061
func buildQueuedSpanProcessor(
6162
logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg,
62-
) (closeFns []func(), queuedSpanProcessor processor.SpanProcessor, err error) {
63+
) (closeFns []func(), queuedSpanProcessor consumer.TraceConsumer, err error) {
6364
logger.Info("Constructing queue processor with name", zap.String("name", opts.Name))
6465

6566
// build span batch sender from configured options
66-
var spanSender processor.SpanProcessor
67+
var spanSender consumer.TraceConsumer
6768
switch opts.SenderType {
6869
case builder.ThriftTChannelSenderType:
6970
logger.Info("Initializing thrift-tChannel sender")
@@ -99,14 +100,12 @@ func buildQueuedSpanProcessor(
99100
logger.Fatal("No senders or exporters configured.")
100101
}
101102

102-
allSendersAndExporters := make([]processor.SpanProcessor, 0, 1+len(traceExporters))
103+
allSendersAndExporters := make([]consumer.TraceConsumer, 0, 1+len(traceExporters))
103104
if spanSender != nil {
104105
allSendersAndExporters = append(allSendersAndExporters, spanSender)
105106
}
106107
for _, traceExporter := range traceExporters {
107-
allSendersAndExporters = append(
108-
allSendersAndExporters, processor.NewTraceExporterProcessor(traceExporter),
109-
)
108+
allSendersAndExporters = append(allSendersAndExporters, traceExporter)
110109
}
111110

112111
var batchingOptions []nodebatcher.Option
@@ -137,11 +136,11 @@ func buildQueuedSpanProcessor(
137136
}
138137
}
139138

140-
queuedProcessors := make([]processor.SpanProcessor, 0, len(allSendersAndExporters))
139+
queuedConsumers := make([]consumer.TraceConsumer, 0, len(allSendersAndExporters))
141140
for _, senderOrExporter := range allSendersAndExporters {
142141
// build queued span processor with underlying sender
143-
queuedProcessors = append(
144-
queuedProcessors,
142+
queuedConsumers = append(
143+
queuedConsumers,
145144
queued.NewQueuedSpanProcessor(
146145
senderOrExporter,
147146
queued.Options.WithLogger(logger),
@@ -155,10 +154,10 @@ func buildQueuedSpanProcessor(
155154
),
156155
)
157156
}
158-
return doneFns, processor.NewMultiSpanProcessor(queuedProcessors), nil
157+
return doneFns, processor.NewMultiSpanProcessor(queuedConsumers), nil
159158
}
160159

161-
func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToSpanProcessor map[string]processor.SpanProcessor, v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor, error) {
160+
func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToTraceConsumer map[string]consumer.TraceConsumer, v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer, error) {
162161
var policies []*tailsampling.Policy
163162
seenExporter := make(map[string]bool)
164163
for _, polCfg := range cfg.Policies {
@@ -183,14 +182,14 @@ func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToSpanProcessor map[st
183182
return nil, fmt.Errorf("unknown sampling policy %s", polCfg.Name)
184183
}
185184

186-
var policyProcessors []processor.SpanProcessor
185+
var policyProcessors []consumer.TraceConsumer
187186
for _, exporter := range polCfg.Exporters {
188187
if _, ok := seenExporter[exporter]; ok {
189188
return nil, fmt.Errorf("multiple sampling polices pointing to exporter %q", exporter)
190189
}
191190
seenExporter[exporter] = true
192191

193-
policyProcessor, ok := nameToSpanProcessor[exporter]
192+
policyProcessor, ok := nameToTraceConsumer[exporter]
194193
if !ok {
195194
return nil, fmt.Errorf("invalid exporter %q for sampling policy %q", exporter, polCfg.Name)
196195
}
@@ -225,12 +224,12 @@ func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToSpanProcessor map[st
225224
return tailSamplingProcessor, err
226225
}
227226

228-
func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor, []func()) {
227+
func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer, []func()) {
229228
// Build pipeline from its end: 1st exporters, the OC-proto queue processor, and
230229
// finally the receivers.
231230
var closeFns []func()
232-
var spanProcessors []processor.SpanProcessor
233-
nameToSpanProcessor := make(map[string]processor.SpanProcessor)
231+
var traceConsumers []consumer.TraceConsumer
232+
nameToTraceConsumer := make(map[string]consumer.TraceConsumer)
234233
exportersCloseFns, traceExporters, metricsExporters := createExporters(v, logger)
235234
closeFns = append(closeFns, exportersCloseFns...)
236235
if len(traceExporters) > 0 {
@@ -239,20 +238,19 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
239238
// TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use
240239
// the exporters: this can lose node information and it is not ideal for performance and delegates
241240
// the retry/buffering to the exporters (that are designed to run within the tracing process).
242-
traceExpProc := processor.NewTraceExporterProcessor(traceExporters...)
243-
nameToSpanProcessor["exporters"] = traceExpProc
244-
spanProcessors = append(spanProcessors, traceExpProc)
241+
traceExpProc := multiconsumer.NewTraceProcessor(traceExporters)
242+
nameToTraceConsumer["exporters"] = traceExpProc
243+
traceConsumers = append(traceConsumers, traceExpProc)
245244
}
246245

247246
// TODO: (@pjanotti) make use of metrics exporters
248247
_ = metricsExporters
249248

250249
if builder.LoggingExporterEnabled(v) {
251-
tle, _ := loggingexporter.NewTraceExporter(logger)
252-
dbgProc := processor.NewTraceExporterProcessor(tle)
250+
dbgProc, _ := loggingexporter.NewTraceExporter(logger)
253251
// TODO: Add this to the exporters list and avoid treating it specially. Don't know all the implications.
254-
nameToSpanProcessor["debug"] = dbgProc
255-
spanProcessors = append(spanProcessors, dbgProc)
252+
nameToTraceConsumer["debug"] = dbgProc
253+
traceConsumers = append(traceConsumers, dbgProc)
256254
}
257255

258256
multiProcessorCfg := builder.NewDefaultMultiSpanProcessorCfg().InitFromViper(v)
@@ -263,21 +261,21 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
263261
logger.Error("Failed to build the queued span processor", zap.Error(err))
264262
os.Exit(1)
265263
}
266-
nameToSpanProcessor[queuedJaegerProcessorCfg.Name] = queuedJaegerProcessor
267-
spanProcessors = append(spanProcessors, queuedJaegerProcessor)
264+
nameToTraceConsumer[queuedJaegerProcessorCfg.Name] = queuedJaegerProcessor
265+
traceConsumers = append(traceConsumers, queuedJaegerProcessor)
268266
closeFns = append(closeFns, doneFns...)
269267
}
270268

271-
if len(spanProcessors) == 0 {
269+
if len(traceConsumers) == 0 {
272270
logger.Warn("Nothing to do: no processor was enabled. Shutting down.")
273271
os.Exit(1)
274272
}
275273

276-
var tailSamplingProcessor processor.SpanProcessor
274+
var tailSamplingProcessor consumer.TraceConsumer
277275
samplingProcessorCfg := builder.NewDefaultSamplingCfg().InitFromViper(v)
278276
if samplingProcessorCfg.Mode == builder.TailSampling {
279277
var err error
280-
tailSamplingProcessor, err = buildSamplingProcessor(samplingProcessorCfg, nameToSpanProcessor, v, logger)
278+
tailSamplingProcessor, err = buildSamplingProcessor(samplingProcessorCfg, nameToTraceConsumer, v, logger)
281279
if err != nil {
282280
logger.Error("Falied to build the sampling processor", zap.Error(err))
283281
os.Exit(1)
@@ -287,7 +285,7 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
287285
{
288286
Name: "tail-always-sampling",
289287
Evaluator: sampling.NewAlwaysSample(),
290-
Destination: processor.NewMultiSpanProcessor(spanProcessors),
288+
Destination: processor.NewMultiSpanProcessor(traceConsumers),
291289
},
292290
}
293291
var err error
@@ -301,7 +299,7 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
301299

302300
if tailSamplingProcessor != nil {
303301
// SpanProcessors are going to go all via the tail sampling processor.
304-
spanProcessors = []processor.SpanProcessor{tailSamplingProcessor}
302+
traceConsumers = []consumer.TraceConsumer{tailSamplingProcessor}
305303
}
306304

307305
// Wraps processors in a single one to be connected to all enabled receivers.
@@ -320,5 +318,5 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
320318
),
321319
)
322320
}
323-
return processor.NewMultiSpanProcessor(spanProcessors, processorOptions...), closeFns
321+
return processor.NewMultiSpanProcessor(traceConsumers, processorOptions...), closeFns
324322
}

cmd/occollector/app/collector/receivers.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@ import (
2222
"go.uber.org/zap"
2323

2424
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
25+
"github.com/census-instrumentation/opencensus-service/consumer"
2526
jaegerreceiver "github.com/census-instrumentation/opencensus-service/internal/collector/jaeger"
2627
ocreceiver "github.com/census-instrumentation/opencensus-service/internal/collector/opencensus"
27-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2828
zipkinreceiver "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin"
2929
zipkinscribereceiver "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin/scribe"
3030
"github.com/census-instrumentation/opencensus-service/receiver"
3131
)
3232

33-
func createReceivers(v *viper.Viper, logger *zap.Logger, spanProcessor processor.SpanProcessor) []receiver.TraceReceiver {
33+
func createReceivers(v *viper.Viper, logger *zap.Logger, traceConsumers consumer.TraceConsumer) []receiver.TraceReceiver {
3434
var someReceiverEnabled bool
3535
receivers := []struct {
36-
runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (receiver.TraceReceiver, error)
36+
runFn func(*zap.Logger, *viper.Viper, consumer.TraceConsumer) (receiver.TraceReceiver, error)
3737
enabled bool
3838
}{
3939
{jaegerreceiver.Start, builder.JaegerReceiverEnabled(v)},
@@ -45,7 +45,7 @@ func createReceivers(v *viper.Viper, logger *zap.Logger, spanProcessor processor
4545
var startedTraceReceivers []receiver.TraceReceiver
4646
for _, receiver := range receivers {
4747
if receiver.enabled {
48-
rec, err := receiver.runFn(logger, v, spanProcessor)
48+
rec, err := receiver.runFn(logger, v, traceConsumers)
4949
if err != nil {
5050
// TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating.
5151
for _, startedTraceReceiver := range startedTraceReceivers {

cmd/occollector/app/sender/jaeger_thrift_http_sender.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
"github.com/apache/thrift/lib/go/thrift"
2727
"go.uber.org/zap"
2828

29+
"github.com/census-instrumentation/opencensus-service/consumer"
2930
"github.com/census-instrumentation/opencensus-service/data"
30-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3131
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
3232
)
3333

@@ -43,7 +43,7 @@ type JaegerThriftHTTPSender struct {
4343
logger *zap.Logger
4444
}
4545

46-
var _ processor.SpanProcessor = (*JaegerThriftHTTPSender)(nil)
46+
var _ consumer.TraceConsumer = (*JaegerThriftHTTPSender)(nil)
4747

4848
// HTTPOption sets a parameter for the HttpCollector
4949
type HTTPOption func(s *JaegerThriftHTTPSender)
@@ -83,8 +83,8 @@ func NewJaegerThriftHTTPSender(
8383
return s
8484
}
8585

86-
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
87-
func (s *JaegerThriftHTTPSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
86+
// ConsumeTraceData sends the received data to the configured Jaeger Thrift end-point.
87+
func (s *JaegerThriftHTTPSender) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
8888
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
8989
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
9090
if err != nil {

cmd/occollector/app/sender/jaeger_thrift_tchannel_sender.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121

2222
reporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
2323

24+
"github.com/census-instrumentation/opencensus-service/consumer"
2425
"github.com/census-instrumentation/opencensus-service/data"
25-
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2626
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
2727
)
2828

@@ -33,7 +33,7 @@ type JaegerThriftTChannelSender struct {
3333
reporter reporter.Reporter
3434
}
3535

36-
var _ processor.SpanProcessor = (*JaegerThriftTChannelSender)(nil)
36+
var _ consumer.TraceConsumer = (*JaegerThriftTChannelSender)(nil)
3737

3838
// NewJaegerThriftTChannelSender creates new TChannel-based sender.
3939
func NewJaegerThriftTChannelSender(
@@ -46,8 +46,8 @@ func NewJaegerThriftTChannelSender(
4646
}
4747
}
4848

49-
// ProcessSpans sends the received data to the configured Jaeger Thrift end-point.
50-
func (s *JaegerThriftTChannelSender) ProcessSpans(ctx context.Context, td data.TraceData) error {
49+
// ConsumeTraceData sends the received data to the configured Jaeger Thrift end-point.
50+
func (s *JaegerThriftTChannelSender) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
5151
// TODO: (@pjanotti) In case of failure the translation to Jaeger Thrift is going to be remade, cache it somehow.
5252
tBatch, err := jaegertranslator.OCProtoToJaegerThrift(td)
5353
if err != nil {

internal/collector/jaeger/receiver.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@ import (
2929
"go.uber.org/zap"
3030

3131
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
32+
"github.com/census-instrumentation/opencensus-service/consumer"
3233
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3334
"github.com/census-instrumentation/opencensus-service/receiver"
3435
"github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver"
3536
)
3637

3738
// Start starts the Jaeger receiver endpoint.
38-
func Start(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (receiver.TraceReceiver, error) {
39+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
3940
rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v)
4041
if err != nil {
4142
return nil, err
@@ -50,7 +51,7 @@ func Start(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor)
5051
return nil, err
5152
}
5253

53-
ss := processor.WrapWithSpanSink("jaeger", spanProc)
54+
ss := processor.WithSourceName("jaeger", traceConsumer)
5455
if err := jtr.StartTraceReception(ctx, ss); err != nil {
5556
return nil, err
5657
}

internal/collector/opencensus/receiver.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ import (
2525
"go.uber.org/zap"
2626

2727
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
28+
"github.com/census-instrumentation/opencensus-service/consumer"
2829
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
2930
"github.com/census-instrumentation/opencensus-service/receiver"
3031
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
3132
)
3233

3334
// Start starts the OpenCensus receiver endpoint.
34-
func Start(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (receiver.TraceReceiver, error) {
35+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
3536
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
3637
if err != nil {
3738
return nil, err
@@ -47,7 +48,7 @@ func Start(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor)
4748
if err != nil {
4849
return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err)
4950
}
50-
ss := processor.WrapWithSpanSink("oc", spanProc)
51+
ss := processor.WithSourceName("oc_trace", traceConsumer)
5152
if err := ocr.StartTraceReception(context.Background(), ss); err != nil {
5253
return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err)
5354
}

internal/collector/processor/exporter_processor.go

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)