Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 0747a83

Browse files
Build exporters based on new configuration (#569)
- Introduced a new unisvc binary and make target. Unisvc is planned to be the implementation of the unified agent and collector that uses the new configuration format. - Refactored existing Application.execute() code to make it more readable and also reusable. The functionality is not changed. Reviews: please check this carefully! - Introduced new Application.executeUnified() function that will call all new agent/collector execution logic that is different from old logic. Application.executeUnified() is currently partial implementation, which only builds the exporters. I plan to add building of processor pipelines and receivers in future PRs. - Introduced ExportersBuilder which builds runtime exporters based on provided configuration. - Added ability for App telemetry shutdown. This is required to be able to run multiple tests that involve App start and shutdown. Previously there was only one test - TestApplication_Start - and it was not correctly cleaning up telemetry resources at the end, making impossible to add more tests against App.
1 parent 00fa8ee commit 0747a83

File tree

20 files changed

+1016
-36
lines changed

20 files changed

+1016
-36
lines changed

Makefile

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ agent:
9292
collector:
9393
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/occollector_$(GOOS) $(BUILD_INFO) ./cmd/occollector
9494

95+
.PHONY: unisvc
96+
unisvc:
97+
GO111MODULE=on CGO_ENABLED=0 go build -o ./bin/unisvc_$(GOOS) $(BUILD_INFO) ./cmd/unisvc
98+
9599
.PHONY: docker-component # Not intended to be used directly
96100
docker-component: check-component
97101
GOOS=linux $(MAKE) $(COMPONENT)
@@ -113,9 +117,12 @@ docker-agent:
113117
docker-collector:
114118
COMPONENT=collector $(MAKE) docker-component
115119

120+
.PHONY: docker-unisvc
121+
docker-unisvc:
122+
COMPONENT=unisvc $(MAKE) docker-component
116123

117124
.PHONY: binaries
118-
binaries: agent collector
125+
binaries: agent collector unisvc
119126

120127
.PHONY: binaries-all-sys
121128
binaries-all-sys:
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package builder
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"go.uber.org/zap"
22+
23+
"github.com/census-instrumentation/opencensus-service/consumer"
24+
"github.com/census-instrumentation/opencensus-service/data"
25+
"github.com/census-instrumentation/opencensus-service/exporter"
26+
"github.com/census-instrumentation/opencensus-service/internal"
27+
"github.com/census-instrumentation/opencensus-service/internal/configmodels"
28+
"github.com/census-instrumentation/opencensus-service/internal/factories"
29+
)
30+
31+
// exporterImpl is a running exporter that is built based on a config. It can have
32+
// a trace and/or a metrics consumer and have a stop function.
33+
type exporterImpl struct {
34+
tc consumer.TraceConsumer
35+
mc consumer.MetricsConsumer
36+
stop func() error
37+
}
38+
39+
// Check that exporterImpl implements Exporter interface.
40+
var _ exporter.Exporter = (*exporterImpl)(nil)
41+
42+
// ConsumeTraceData receives data.TraceData for processing by the TraceConsumer.
43+
func (exp *exporterImpl) ConsumeTraceData(ctx context.Context, td data.TraceData) error {
44+
return exp.tc.ConsumeTraceData(ctx, td)
45+
}
46+
47+
// ConsumeMetricsData receives data.MetricsData for processing by the MetricsConsumer.
48+
func (exp *exporterImpl) ConsumeMetricsData(ctx context.Context, md data.MetricsData) error {
49+
return exp.mc.ConsumeMetricsData(ctx, md)
50+
}
51+
52+
// TraceExportFormat is unneeded, we need to get rid of it after we cleanup
53+
// exporter.TraceExporter interface.
54+
func (exp *exporterImpl) TraceExportFormat() string {
55+
return ""
56+
}
57+
58+
// MetricsExportFormat is unneeded, we need to get rid of it after we cleanup
59+
// exporter.MetricsExporter interface.
60+
func (exp *exporterImpl) MetricsExportFormat() string {
61+
return ""
62+
}
63+
64+
// Stop the exporter.
65+
func (exp *exporterImpl) Stop() error {
66+
return exp.stop()
67+
}
68+
69+
// Exporters is a map of exporters created from exporter configs.
70+
type Exporters map[configmodels.Exporter]*exporterImpl
71+
72+
// StopAll stops all exporters.
73+
func (exps Exporters) StopAll() {
74+
for _, exp := range exps {
75+
exp.Stop()
76+
}
77+
}
78+
79+
type dataTypeRequirement struct {
80+
// Pipeline that requires the data type.
81+
requiredBy *configmodels.Pipeline
82+
}
83+
84+
// Map of data type requirements.
85+
type dataTypeRequirements map[configmodels.DataType]dataTypeRequirement
86+
87+
// Data type requirements for all exporters.
88+
type exportersRequiredDataTypes map[configmodels.Exporter]dataTypeRequirements
89+
90+
// ExportersBuilder builds exporters from config.
91+
type ExportersBuilder struct {
92+
logger *zap.Logger
93+
config *configmodels.ConfigV2
94+
}
95+
96+
// NewExportersBuilder creates a new ExportersBuilder. Call Build() on the returned value.
97+
func NewExportersBuilder(logger *zap.Logger, config *configmodels.ConfigV2) *ExportersBuilder {
98+
return &ExportersBuilder{logger, config}
99+
}
100+
101+
// Build exporters from config.
102+
func (eb *ExportersBuilder) Build() (Exporters, error) {
103+
exporters := make(Exporters)
104+
105+
// We need to calculate required input data types for each exporter so that we know
106+
// which data type must be started for each exporter.
107+
exporterInputDataTypes := eb.calcExportersRequiredDataTypes()
108+
109+
// Build exporters based on configuration and required input data types.
110+
for _, cfg := range eb.config.Exporters {
111+
exp, err := eb.buildExporter(cfg, exporterInputDataTypes)
112+
if err != nil {
113+
return nil, err
114+
}
115+
exporters[cfg] = exp
116+
}
117+
118+
return exporters, nil
119+
}
120+
121+
func (eb *ExportersBuilder) calcExportersRequiredDataTypes() exportersRequiredDataTypes {
122+
123+
// Go over all pipelines. The data type of the pipeline defines what data type
124+
// each exporter is expected to receive. Collect all required types for each
125+
// exporter.
126+
//
127+
// We also remember the last pipeline that requested the particular data type.
128+
// This is only needed for logging purposes in error cases when we need to
129+
// print that a particular exporter does not support the data type required for
130+
// a particular pipeline.
131+
132+
result := make(exportersRequiredDataTypes)
133+
134+
// Iterate over pipelines.
135+
for _, pipeline := range eb.config.Pipelines {
136+
// Iterate over all exporters for this pipeline.
137+
for _, expName := range pipeline.Exporters {
138+
// Find the exporter config by name.
139+
exporter := eb.config.Exporters[expName]
140+
141+
// Create the data type requirement for the exporter if it does not exist.
142+
if result[exporter] == nil {
143+
result[exporter] = make(dataTypeRequirements)
144+
}
145+
146+
// Remember that this data type is required for the exporter and also which
147+
// pipeline the requirement is coming from.
148+
result[exporter][pipeline.InputType] = dataTypeRequirement{pipeline}
149+
}
150+
}
151+
return result
152+
}
153+
154+
// combineStopFunc combines 2 functions and returns one function
155+
// that can be called and which in turn will call both functions
156+
// and then combine any errors that the 2 functions return.
157+
// Safe to use if any of the 2 functions are nil. If both functions
158+
// are nil then returns nil.
159+
func combineStopFunc(f1, f2 factories.StopFunc) factories.StopFunc {
160+
if f1 == nil {
161+
return f2
162+
}
163+
if f2 == nil {
164+
return f1
165+
}
166+
167+
return func() error {
168+
var errs []error
169+
if err := f1(); err != nil {
170+
errs = append(errs, err)
171+
}
172+
if err := f2(); err != nil {
173+
errs = append(errs, err)
174+
}
175+
return internal.CombineErrors(errs)
176+
}
177+
}
178+
179+
func (eb *ExportersBuilder) buildExporter(
180+
config configmodels.Exporter,
181+
exportersInputDataTypes exportersRequiredDataTypes,
182+
) (*exporterImpl, error) {
183+
184+
factory := factories.GetExporterFactory(config.Type())
185+
186+
exporter := &exporterImpl{}
187+
188+
inputDataTypes := exportersInputDataTypes[config]
189+
if inputDataTypes == nil {
190+
// No data types where requested for this exporter. This can only happen
191+
// if there are no pipelines associated with the exporter.
192+
eb.logger.Warn("Exporter " + config.Name() +
193+
" is not associated with any pipeline and will not export data.")
194+
return exporter, nil
195+
}
196+
197+
if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok {
198+
// Traces data type is required. Create a trace exporter based on config.
199+
tc, stopFunc, err := factory.CreateTraceExporter(config)
200+
if err != nil {
201+
if err == factories.ErrDataTypeIsNotSupported {
202+
// Could not create because this exporter does not support this data type.
203+
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType)
204+
}
205+
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err)
206+
}
207+
208+
exporter.tc = tc
209+
exporter.stop = stopFunc
210+
}
211+
212+
if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok {
213+
// Metrics data type is required. Create a trace exporter based on config.
214+
mc, stopFunc, err := factory.CreateMetricsExporter(config)
215+
if err != nil {
216+
if err == factories.ErrDataTypeIsNotSupported {
217+
// Could not create because this exporter does not support this data type.
218+
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType)
219+
}
220+
return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err)
221+
}
222+
223+
exporter.mc = mc
224+
exporter.stop = combineStopFunc(exporter.stop, stopFunc)
225+
}
226+
227+
return exporter, nil
228+
}
229+
230+
func typeMismatchErr(
231+
config configmodels.Exporter,
232+
requiredByPipeline *configmodels.Pipeline,
233+
dataType configmodels.DataType,
234+
) error {
235+
return fmt.Errorf(
236+
"pipeline %q produces %q to exporter %s which does not support %q "+
237+
"telemetry data. exporter will be detached from pipeline",
238+
requiredByPipeline.Name, dataType.GetDataTypeStr(),
239+
config.Name(), dataType.GetDataTypeStr(),
240+
)
241+
}

0 commit comments

Comments
 (0)