Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 9317ffe

Browse files
Build pipeline processors based on new configuration (#576)
- Build pipeline processors and plug them into exporters. - Added tests to verify that single exporter and multiple exporter pipelines (fan out) work correctly. - Minor cleanup in exporters_builder.go - Fixed opencensus receiver TestCreateReceiver to find available port for testing (instead of fixed port which would fail if already listened).
1 parent bf83eb9 commit 9317ffe

File tree

17 files changed

+426
-51
lines changed

17 files changed

+426
-51
lines changed

cmd/occollector/app/builder/exporters_builder.go

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,59 +15,31 @@
1515
package builder
1616

1717
import (
18-
"context"
1918
"fmt"
2019

2120
"go.uber.org/zap"
2221

2322
"github.com/census-instrumentation/opencensus-service/consumer"
24-
"github.com/census-instrumentation/opencensus-service/data"
25-
"github.com/census-instrumentation/opencensus-service/exporter"
2623
"github.com/census-instrumentation/opencensus-service/internal"
2724
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
2825
"github.com/census-instrumentation/opencensus-service/internal/factories"
2926
)
3027

31-
// exporterImpl is a running exporter that is built based on a config. It can have
28+
// builtExporter is an exporter that is built based on a config. It can have
3229
// a trace and/or a metrics consumer and have a stop function.
33-
type exporterImpl struct {
30+
type builtExporter struct {
3431
tc consumer.TraceConsumer
3532
mc consumer.MetricsConsumer
3633
stop func() error
3734
}
3835

39-
// Check that exporterImpl implements Exporter interface.
40-
var _ exporter.Exporter = (*exporterImpl)(nil)
41-
42-
// ConsumeTraceData receives data.TraceData for processing by the TraceConsumer.
43-
func (exp *exporterImpl) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
44-
return exp.tc.ConsumeTraceData(ctx, td)
45-
}
46-
47-
// ConsumeMetricsData receives data.MetricsData for processing by the MetricsConsumer.
48-
func (exp *exporterImpl) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
49-
return exp.mc.ConsumeMetricsData(ctx, md)
50-
}
51-
52-
// TraceExportFormat is unneeded, we need to get rid of it after we cleanup
53-
// exporter.TraceExporter interface.
54-
func (exp *exporterImpl) TraceExportFormat() string {
55-
return ""
56-
}
57-
58-
// MetricsExportFormat is unneeded, we need to get rid of it after we cleanup
59-
// exporter.MetricsExporter interface.
60-
func (exp *exporterImpl) MetricsExportFormat() string {
61-
return ""
62-
}
63-
6436
// Stop the exporter.
65-
func (exp *exporterImpl) Stop() error {
37+
func (exp *builtExporter) Stop() error {
6638
return exp.stop()
6739
}
6840

6941
// Exporters is a map of exporters created from exporter configs.
70-
type Exporters map[configmodels.Exporter]*exporterImpl
42+
type Exporters map[configmodels.Exporter]*builtExporter
7143

7244
// StopAll stops all exporters.
7345
func (exps Exporters) StopAll() {
@@ -179,11 +151,11 @@ func combineStopFunc(f1, f2 factories.StopFunc) factories.StopFunc {
179151
func (eb *ExportersBuilder) buildExporter(
180152
config configmodels.Exporter,
181153
exportersInputDataTypes exportersRequiredDataTypes,
182-
) (*exporterImpl, error) {
154+
) (*builtExporter, error) {
183155

184156
factory := factories.GetExporterFactory(config.Type())
185157

186-
exporter := &exporterImpl{}
158+
exporter := &builtExporter{}
187159

188160
inputDataTypes := exportersInputDataTypes[config]
189161
if inputDataTypes == nil {

cmd/occollector/app/builder/exporters_builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func TestExportersBuilder_StopAll(t *testing.T) {
9494
exporters := make(Exporters)
9595
expCfg := &configmodels.ExporterSettings{}
9696
stopCalled := false
97-
exporters[expCfg] = &exporterImpl{
97+
exporters[expCfg] = &builtExporter{
9898
stop: func() error {
9999
stopCalled = true
100100
return nil
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package builder
16+
17+
import (
18+
"fmt"
19+
20+
"go.uber.org/zap"
21+
22+
"github.com/census-instrumentation/opencensus-service/consumer"
23+
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
24+
"github.com/census-instrumentation/opencensus-service/internal/factories"
25+
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
26+
)
27+
28+
// builtProcessor is a processor that is built based on a config.
29+
// It can have a trace and/or a metrics consumer.
30+
type builtProcessor struct {
31+
tc consumer.TraceConsumer
32+
mc consumer.MetricsConsumer
33+
}
34+
35+
// PipelineProcessors is a map of entry-point processors created from pipeline configs.
36+
// Each element of the map points to the first processor of the pipeline.
37+
type PipelineProcessors map[*configmodels.Pipeline]*builtProcessor
38+
39+
// PipelinesBuilder builds pipelines from config.
40+
type PipelinesBuilder struct {
41+
logger *zap.Logger
42+
config *configmodels.ConfigV2
43+
exporters Exporters
44+
}
45+
46+
// NewPipelinesBuilder creates a new PipelinesBuilder. Requires exporters to be already
47+
// built via ExportersBuilder. Call Build() on the returned value.
48+
func NewPipelinesBuilder(
49+
logger *zap.Logger,
50+
config *configmodels.ConfigV2,
51+
exporters Exporters,
52+
) *PipelinesBuilder {
53+
return &PipelinesBuilder{logger, config, exporters}
54+
}
55+
56+
// Build pipeline processors from config.
57+
func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) {
58+
pipelineProcessors := make(PipelineProcessors)
59+
60+
for _, pipeline := range eb.config.Pipelines {
61+
firstProcessor, err := eb.buildPipeline(pipeline)
62+
if err != nil {
63+
return nil, err
64+
}
65+
pipelineProcessors[pipeline] = firstProcessor
66+
}
67+
68+
return pipelineProcessors, nil
69+
}
70+
71+
// Builds a pipeline of processors. Returns the first processor in the pipeline.
72+
// The last processor in the pipeline will be plugged to fan out the data into exporters
73+
// that are configured for this pipeline.
74+
func (eb *PipelinesBuilder) buildPipeline(
75+
pipelineCfg *configmodels.Pipeline,
76+
) (*builtProcessor, error) {
77+
78+
// Build the pipeline backwards.
79+
80+
// First create a consumer junction point that fans out the data to all exporters.
81+
var tc consumer.TraceConsumer
82+
var mc consumer.MetricsConsumer
83+
84+
switch pipelineCfg.InputType {
85+
case configmodels.TracesDataType:
86+
tc = eb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
87+
case configmodels.MetricsDataType:
88+
mc = eb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
89+
}
90+
91+
// Now build the processors backwards, starting from the last one.
92+
// The last processor points to consumer which fans out to exporters, then
93+
// the processor itself becomes a consumer for the one that precedes it in
94+
// in the pipeline and so on.
95+
for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- {
96+
procName := pipelineCfg.Processors[i]
97+
procCfg := eb.config.Processors[procName]
98+
99+
factory := factories.GetProcessorFactory(procCfg.Type())
100+
101+
// This processor must point to the next consumer and then
102+
// it becomes the next for the previous one (previous in the pipeline,
103+
// which we will build in the next loop iteration).
104+
var err error
105+
switch pipelineCfg.InputType {
106+
case configmodels.TracesDataType:
107+
tc, err = factory.CreateTraceProcessor(tc, procCfg)
108+
case configmodels.MetricsDataType:
109+
mc, err = factory.CreateMetricsProcessor(mc, procCfg)
110+
}
111+
112+
if err != nil {
113+
return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v",
114+
procName, pipelineCfg.Name, err)
115+
}
116+
}
117+
118+
return &builtProcessor{tc, mc}, nil
119+
}
120+
121+
// Converts the list of exporter names to a list of corresponding builtExporters.
122+
func (eb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter {
123+
var result []*builtExporter
124+
for _, name := range exporterNames {
125+
exporter := eb.exporters[eb.config.Exporters[name]]
126+
result = append(result, exporter)
127+
}
128+
129+
return result
130+
}
131+
132+
func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer {
133+
builtExporters := eb.getBuiltExportersByNames(exporterNames)
134+
135+
// Optimize for the case when there is only one exporter, no need to create junction point.
136+
if len(builtExporters) == 1 {
137+
return builtExporters[0].tc
138+
}
139+
140+
var exporters []consumer.TraceConsumer
141+
for _, builtExp := range builtExporters {
142+
exporters = append(exporters, builtExp.tc)
143+
}
144+
145+
// Create a junction point that fans out to all exporters.
146+
return multiconsumer.NewTraceProcessor(exporters)
147+
}
148+
149+
func (eb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
150+
builtExporters := eb.getBuiltExportersByNames(exporterNames)
151+
152+
// Optimize for the case when there is only one exporter, no need to create junction point.
153+
if len(builtExporters) == 1 {
154+
return builtExporters[0].mc
155+
}
156+
157+
var exporters []consumer.MetricsConsumer
158+
for _, builtExp := range builtExporters {
159+
exporters = append(exporters, builtExp.mc)
160+
}
161+
162+
// Create a junction point that fans out to all exporters.
163+
return multiconsumer.NewMetricsProcessor(exporters)
164+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package builder
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"go.uber.org/zap"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
27+
28+
"github.com/census-instrumentation/opencensus-service/data"
29+
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
30+
"github.com/census-instrumentation/opencensus-service/internal/configv2"
31+
"github.com/census-instrumentation/opencensus-service/processor/addattributesprocessor"
32+
)
33+
34+
// Ensure attributes processor is registered.
35+
var _ = addattributesprocessor.ConfigV2{}
36+
37+
// Register test factories used in the pipelines_builder.yaml test config.
38+
var _ = configv2.RegisterTestFactories()
39+
40+
func TestPipelinesBuilder_Build(t *testing.T) {
41+
tests := []struct {
42+
name string
43+
pipelineName string
44+
exporterNames []string
45+
}{
46+
{
47+
name: "one-exporter",
48+
pipelineName: "traces",
49+
exporterNames: []string{"exampleexporter"},
50+
},
51+
{
52+
name: "multi-exporter",
53+
pipelineName: "traces/2",
54+
exporterNames: []string{"exampleexporter", "exampleexporter/2"},
55+
},
56+
}
57+
58+
for _, test := range tests {
59+
t.Run(test.name, func(t *testing.T) {
60+
testPipeline(t, test.pipelineName, test.exporterNames)
61+
})
62+
}
63+
}
64+
65+
func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
66+
// Load the config
67+
config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
68+
require.Nil(t, err)
69+
70+
// Build the pipeline
71+
allExporters, err := NewExportersBuilder(zap.NewNop(), config).Build()
72+
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), config, allExporters).Build()
73+
74+
assert.NoError(t, err)
75+
require.NotNil(t, pipelineProcessors)
76+
77+
processor := pipelineProcessors[config.Pipelines[pipelineName]]
78+
79+
// Ensure pipeline has its fields correctly populated.
80+
require.NotNil(t, processor)
81+
assert.NotNil(t, processor.tc)
82+
assert.Nil(t, processor.mc)
83+
84+
// Compose the list of created exporters.
85+
var exporters []*builtExporter
86+
for _, name := range exporterNames {
87+
// Ensure exporter is created.
88+
exp := allExporters[config.Exporters[name]]
89+
require.NotNil(t, exp)
90+
exporters = append(exporters, exp)
91+
}
92+
93+
// Send TraceData via processor and verify that all exporters of the pipeline receive it.
94+
95+
// First check that there are no traces in the exporters yet.
96+
var exporterConsumers []*configv2.ExampleExporterConsumer
97+
for _, exporter := range exporters {
98+
consumer := exporter.tc.(*configv2.ExampleExporterConsumer)
99+
exporterConsumers = append(exporterConsumers, consumer)
100+
require.Equal(t, len(consumer.Traces), 0)
101+
}
102+
103+
// Send one trace.
104+
name := tracepb.TruncatableString{Value: "testspanname"}
105+
traceData := data.TraceData{
106+
SourceFormat: "test-source-format",
107+
Spans: []*tracepb.Span{
108+
{Name: &name},
109+
},
110+
}
111+
processor.tc.ConsumeTraceData(context.Background(), traceData)
112+
113+
// Now verify received data.
114+
for _, consumer := range exporterConsumers {
115+
// Check that the trace is received by exporter.
116+
require.Equal(t, 1, len(consumer.Traces))
117+
assert.Equal(t, traceData, consumer.Traces[0])
118+
119+
// Check that the span was processed by "attributes" processor and an
120+
// attribute was added.
121+
assert.Equal(t, int64(12345),
122+
consumer.Traces[0].Spans[0].Attributes.AttributeMap["attr1"].GetIntValue())
123+
}
124+
}
125+
126+
func TestPipelinesBuilder_Error(t *testing.T) {
127+
config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
128+
require.Nil(t, err)
129+
130+
// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here
131+
// since there is no way to have such config loaded by LoadConfigFile, it would not
132+
// pass validation. We are doing this to test failure mode of PipelinesBuilder.
133+
pipeline := config.Pipelines["traces"]
134+
pipeline.InputType = configmodels.MetricsDataType
135+
136+
exporters, err := NewExportersBuilder(zap.NewNop(), config).Build()
137+
138+
// This should fail because "attributes" processor defined in the config does
139+
// not support metrics data type.
140+
_, err = NewPipelinesBuilder(zap.NewNop(), config, exporters).Build()
141+
142+
assert.NotNil(t, err)
143+
}

0 commit comments

Comments
 (0)