Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 344c1fa

Browse files
author
Bogdan Drutu
authored
Add unified interface for receiver/exporter/spanprocessor. (#423)
* Add unified interface for receiver/exporter/spanprocessor. * Add a noop processor and simplify debug processor. * Add a noop processor tests.
1 parent f18a918 commit 344c1fa

7 files changed

Lines changed: 465 additions & 0 deletions

File tree

processor/debug_processor.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
20+
"github.com/census-instrumentation/opencensus-service/data"
21+
"go.uber.org/zap"
22+
)
23+
24+
// A debug processor that does not sends the data to any destination but logs debugging messages.
25+
type debugProcessor struct{ logger *zap.Logger }
26+
27+
var _ TraceDataProcessor = (*debugProcessor)(nil)
28+
var _ MetricsDataProcessor = (*debugProcessor)(nil)
29+
30+
func (sp *debugProcessor) ProcessTraceData(ctx context.Context, td data.TraceData) error {
31+
sp.logger.Debug("debugTraceDataProcessor", zap.Int("#spans", len(td.Spans)))
32+
return nil
33+
}
34+
35+
func (sp *debugProcessor) ProcessMetricsData(ctx context.Context, md data.MetricsData) error {
36+
sp.logger.Debug("debugMetricsDataProcessor", zap.Int("#metrics", len(md.Metrics)))
37+
return nil
38+
}
39+
40+
// NewDebugTraceDataProcessor creates an TraceDataProcessor that just drops the received data and logs debugging messages.
41+
func NewDebugTraceDataProcessor(logger *zap.Logger) TraceDataProcessor {
42+
return &debugProcessor{logger: logger}
43+
}
44+
45+
// NewDebugMetricsDataProcessor creates an MetricsDataProcessor that just drops the received data and logs debugging messages.
46+
func NewDebugMetricsDataProcessor(logger *zap.Logger) MetricsDataProcessor {
47+
return &debugProcessor{logger: logger}
48+
}

processor/debug_processor_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package processor
15+
16+
import (
17+
"context"
18+
"testing"
19+
20+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
21+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
22+
"github.com/census-instrumentation/opencensus-service/data"
23+
"go.uber.org/zap/zaptest"
24+
)
25+
26+
func TestDebugTraceDataProcessorNoErrors(t *testing.T) {
27+
dtdp := NewDebugTraceDataProcessor(zaptest.NewLogger(t))
28+
td := data.TraceData{
29+
Spans: make([]*tracepb.Span, 7),
30+
}
31+
if err := dtdp.ProcessTraceData(context.Background(), td); err != nil {
32+
t.Errorf("Wanted nil got error")
33+
return
34+
}
35+
}
36+
37+
func TestDebugMetricsDataProcessorNoErrors(t *testing.T) {
38+
dmdp := NewDebugMetricsDataProcessor(zaptest.NewLogger(t))
39+
md := data.MetricsData{
40+
Metrics: make([]*metricspb.Metric, 7),
41+
}
42+
if err := dmdp.ProcessMetricsData(context.Background(), md); err != nil {
43+
t.Errorf("Wanted nil got error")
44+
return
45+
}
46+
}

processor/multi_processor.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
20+
"github.com/census-instrumentation/opencensus-service/data"
21+
"github.com/census-instrumentation/opencensus-service/internal"
22+
)
23+
24+
// NewMultiMetricsDataProcessor wraps multiple metrics exporters in a single one.
25+
func NewMultiMetricsDataProcessor(mdps []MetricsDataProcessor) MetricsDataProcessor {
26+
return metricsDataProcessors(mdps)
27+
}
28+
29+
type metricsDataProcessors []MetricsDataProcessor
30+
31+
var _ MetricsDataProcessor = (*metricsDataProcessors)(nil)
32+
33+
// ExportMetricsData exports the MetricsData to all exporters wrapped by the current one.
34+
func (mdps metricsDataProcessors) ProcessMetricsData(ctx context.Context, md data.MetricsData) error {
35+
var errs []error
36+
for _, mdp := range mdps {
37+
if err := mdp.ProcessMetricsData(ctx, md); err != nil {
38+
errs = append(errs, err)
39+
}
40+
}
41+
return internal.CombineErrors(errs)
42+
}
43+
44+
// NewMultiTraceDataProcessor wraps multiple trace exporters in a single one.
45+
func NewMultiTraceDataProcessor(tdps []TraceDataProcessor) TraceDataProcessor {
46+
return traceDataProcessors(tdps)
47+
}
48+
49+
type traceDataProcessors []TraceDataProcessor
50+
51+
var _ TraceDataProcessor = (*traceDataProcessors)(nil)
52+
53+
// ExportSpans exports the span data to all trace exporters wrapped by the current one.
54+
func (tdps traceDataProcessors) ProcessTraceData(ctx context.Context, td data.TraceData) error {
55+
var errs []error
56+
for _, tdp := range tdps {
57+
if err := tdp.ProcessTraceData(ctx, td); err != nil {
58+
errs = append(errs, err)
59+
}
60+
}
61+
return internal.CombineErrors(errs)
62+
}

processor/multi_processor_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package processor
15+
16+
import (
17+
"context"
18+
"fmt"
19+
"testing"
20+
21+
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
22+
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
23+
"github.com/census-instrumentation/opencensus-service/data"
24+
)
25+
26+
func TestMultiTraceDataProcessorMultiplexing(t *testing.T) {
27+
processors := make([]TraceDataProcessor, 3)
28+
for i := range processors {
29+
processors[i] = &mockTraceDataProcessor{}
30+
}
31+
32+
mtdp := NewMultiTraceDataProcessor(processors)
33+
td := data.TraceData{
34+
Spans: make([]*tracepb.Span, 7),
35+
}
36+
37+
var wantSpansCount = 0
38+
for i := 0; i < 2; i++ {
39+
wantSpansCount += len(td.Spans)
40+
err := mtdp.ProcessTraceData(context.Background(), td)
41+
if err != nil {
42+
t.Errorf("Wanted nil got error")
43+
return
44+
}
45+
}
46+
47+
for _, p := range processors {
48+
m := p.(*mockTraceDataProcessor)
49+
if m.TotalSpans != wantSpansCount {
50+
t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans)
51+
return
52+
}
53+
}
54+
}
55+
56+
func TestMultiTraceDataProcessorWhenOneErrors(t *testing.T) {
57+
processors := make([]TraceDataProcessor, 3)
58+
for i := range processors {
59+
processors[i] = &mockTraceDataProcessor{}
60+
}
61+
62+
// Make one processor return error
63+
processors[1].(*mockTraceDataProcessor).MustFail = true
64+
65+
mtdp := NewMultiTraceDataProcessor(processors)
66+
td := data.TraceData{
67+
Spans: make([]*tracepb.Span, 5),
68+
}
69+
70+
var wantSpansCount = 0
71+
for i := 0; i < 2; i++ {
72+
wantSpansCount += len(td.Spans)
73+
err := mtdp.ProcessTraceData(context.Background(), td)
74+
if err == nil {
75+
t.Errorf("Wanted error got nil")
76+
return
77+
}
78+
}
79+
80+
for _, p := range processors {
81+
m := p.(*mockTraceDataProcessor)
82+
if m.TotalSpans != wantSpansCount {
83+
t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans)
84+
return
85+
}
86+
}
87+
}
88+
89+
func TestMultiMetricsDataProcessorMultiplexing(t *testing.T) {
90+
processors := make([]MetricsDataProcessor, 3)
91+
for i := range processors {
92+
processors[i] = &mockMetricsDataProcessor{}
93+
}
94+
95+
mmdp := NewMultiMetricsDataProcessor(processors)
96+
md := data.MetricsData{
97+
Metrics: make([]*metricspb.Metric, 7),
98+
}
99+
100+
var wantMetricsCount = 0
101+
for i := 0; i < 2; i++ {
102+
wantMetricsCount += len(md.Metrics)
103+
err := mmdp.ProcessMetricsData(context.Background(), md)
104+
if err != nil {
105+
t.Errorf("Wanted nil got error")
106+
return
107+
}
108+
}
109+
110+
for _, p := range processors {
111+
m := p.(*mockMetricsDataProcessor)
112+
if m.TotalMetrics != wantMetricsCount {
113+
t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics)
114+
return
115+
}
116+
}
117+
}
118+
119+
func TestMultiMetricsDataProcessorWhenOneErrors(t *testing.T) {
120+
processors := make([]MetricsDataProcessor, 3)
121+
for i := range processors {
122+
processors[i] = &mockMetricsDataProcessor{}
123+
}
124+
125+
// Make one processor return error
126+
processors[1].(*mockMetricsDataProcessor).MustFail = true
127+
128+
mmdp := NewMultiMetricsDataProcessor(processors)
129+
md := data.MetricsData{
130+
Metrics: make([]*metricspb.Metric, 5),
131+
}
132+
133+
var wantMetricsCount = 0
134+
for i := 0; i < 2; i++ {
135+
wantMetricsCount += len(md.Metrics)
136+
err := mmdp.ProcessMetricsData(context.Background(), md)
137+
if err == nil {
138+
t.Errorf("Wanted error got nil")
139+
return
140+
}
141+
}
142+
143+
for _, p := range processors {
144+
m := p.(*mockMetricsDataProcessor)
145+
if m.TotalMetrics != wantMetricsCount {
146+
t.Errorf("Wanted %d metrics for every processor but got %d", wantMetricsCount, m.TotalMetrics)
147+
return
148+
}
149+
}
150+
}
151+
152+
type mockTraceDataProcessor struct {
153+
TotalSpans int
154+
MustFail bool
155+
}
156+
157+
var _ TraceDataProcessor = &mockTraceDataProcessor{}
158+
159+
func (p *mockTraceDataProcessor) ProcessTraceData(ctx context.Context, td data.TraceData) error {
160+
p.TotalSpans += len(td.Spans)
161+
if p.MustFail {
162+
return fmt.Errorf("this processor must fail")
163+
}
164+
165+
return nil
166+
}
167+
168+
type mockMetricsDataProcessor struct {
169+
TotalMetrics int
170+
MustFail bool
171+
}
172+
173+
var _ MetricsDataProcessor = &mockMetricsDataProcessor{}
174+
175+
func (p *mockMetricsDataProcessor) ProcessMetricsData(ctx context.Context, td data.MetricsData) error {
176+
p.TotalMetrics += len(td.Metrics)
177+
if p.MustFail {
178+
return fmt.Errorf("this processor must fail")
179+
}
180+
181+
return nil
182+
}

processor/processor.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package processor
16+
17+
import (
18+
"context"
19+
20+
"github.com/census-instrumentation/opencensus-service/data"
21+
)
22+
23+
// MetricsDataProcessor is an interface that receives data.MetricsData, process it as needed, and
24+
// sends it to the next processing node if any or to the destination.
25+
//
26+
// ProcessMetricsData receives data.MetricsData for processing by the MetricsDataProcessor.
27+
type MetricsDataProcessor interface {
28+
ProcessMetricsData(ctx context.Context, md data.MetricsData) error
29+
}
30+
31+
// TraceDataProcessor is an interface that receives data.TraceData, process it as needed, and
32+
// sends it to the next processing node if any or to the destination.
33+
//
34+
// ProcessTraceData receives data.TraceData for processing by the TraceDataProcessor.
35+
type TraceDataProcessor interface {
36+
ProcessTraceData(ctx context.Context, td data.TraceData) error
37+
}

0 commit comments

Comments
 (0)