@@ -27,12 +27,12 @@ import (
2727 "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/sender"
2828 "github.com/census-instrumentation/opencensus-service/consumer"
2929 "github.com/census-instrumentation/opencensus-service/exporter/loggingexporter"
30- "github.com/census-instrumentation/opencensus-service/internal/collector/processor"
3130 "github.com/census-instrumentation/opencensus-service/internal/collector/processor/nodebatcher"
3231 "github.com/census-instrumentation/opencensus-service/internal/collector/processor/queued"
3332 "github.com/census-instrumentation/opencensus-service/internal/collector/processor/tailsampling"
3433 "github.com/census-instrumentation/opencensus-service/internal/collector/sampling"
3534 "github.com/census-instrumentation/opencensus-service/internal/config"
35+ "github.com/census-instrumentation/opencensus-service/processor/addattributesprocessor"
3636 "github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
3737)
3838
@@ -161,7 +161,7 @@ func buildQueuedSpanProcessor(
161161 ),
162162 )
163163 }
164- return doneFns , processor . NewMultiSpanProcessor (queuedConsumers ), nil
164+ return doneFns , multiconsumer . NewTraceProcessor (queuedConsumers ), nil
165165}
166166
167167func buildSamplingProcessor (cfg * builder.SamplingCfg , nameToTraceConsumer map [string ]consumer.TraceConsumer , v * viper.Viper , logger * zap.Logger ) (consumer.TraceConsumer , error ) {
@@ -209,7 +209,7 @@ func buildSamplingProcessor(cfg *builder.SamplingCfg, nameToTraceConsumer map[st
209209 case numPolicyProcessors == 1 :
210210 policy .Destination = policyProcessors [0 ]
211211 case numPolicyProcessors > 1 :
212- policy .Destination = processor . NewMultiSpanProcessor (policyProcessors )
212+ policy .Destination = multiconsumer . NewTraceProcessor (policyProcessors )
213213 default :
214214 return nil , fmt .Errorf ("no exporters for sampling policy %q" , polCfg .Name )
215215 }
@@ -292,7 +292,7 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer,
292292 {
293293 Name : "tail-always-sampling" ,
294294 Evaluator : sampling .NewAlwaysSample (),
295- Destination : processor . NewMultiSpanProcessor (traceConsumers ),
295+ Destination : multiconsumer . NewTraceProcessor (traceConsumers ),
296296 },
297297 }
298298 var err error
@@ -310,20 +310,18 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer,
310310 }
311311
312312 // Wraps processors in a single one to be connected to all enabled receivers.
313- var processorOptions []processor.MultiProcessorOption
314313 if multiProcessorCfg .Global != nil && multiProcessorCfg .Global .Attributes != nil {
315314 logger .Info (
316315 "Found global attributes config" ,
317316 zap .Bool ("overwrite" , multiProcessorCfg .Global .Attributes .Overwrite ),
318317 zap .Any ("values" , multiProcessorCfg .Global .Attributes .Values ),
319318 )
320- processorOptions = append (
321- processorOptions ,
322- processor .WithAddAttributes (
323- multiProcessorCfg .Global .Attributes .Values ,
324- multiProcessorCfg .Global .Attributes .Overwrite ,
325- ),
319+ tp , _ := addattributesprocessor .NewTraceProcessor (
320+ multiconsumer .NewTraceProcessor (traceConsumers ),
321+ addattributesprocessor .WithAttributes (multiProcessorCfg .Global .Attributes .Values ),
322+ addattributesprocessor .WithOverwrite (multiProcessorCfg .Global .Attributes .Overwrite ),
326323 )
324+ return tp , closeFns
327325 }
328- return processor . NewMultiSpanProcessor (traceConsumers , processorOptions ... ), closeFns
326+ return multiconsumer . NewTraceProcessor (traceConsumers ), closeFns
329327}
0 commit comments