Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 7045421

Browse files
author
Bogdan Drutu
authored
Remove Receiver Trace and Metrics sinks. (#424)
1 parent 344c1fa commit 7045421

19 files changed

Lines changed: 128 additions & 177 deletions

File tree

cmd/ocagent/main.go

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,11 @@ import (
3131
"go.uber.org/zap"
3232
"go.uber.org/zap/zapcore"
3333

34-
"github.com/census-instrumentation/opencensus-service/data"
3534
"github.com/census-instrumentation/opencensus-service/exporter"
3635
"github.com/census-instrumentation/opencensus-service/internal"
3736
"github.com/census-instrumentation/opencensus-service/internal/config"
3837
"github.com/census-instrumentation/opencensus-service/internal/config/viperutils"
39-
"github.com/census-instrumentation/opencensus-service/receiver"
38+
"github.com/census-instrumentation/opencensus-service/processor"
4039
"github.com/census-instrumentation/opencensus-service/receiver/jaegerreceiver"
4140
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
4241
"github.com/census-instrumentation/opencensus-service/receiver/prometheusreceiver"
@@ -55,14 +54,6 @@ func main() {
5554
}
5655
}
5756

58-
type noopMetricsSink int
59-
60-
var _ receiver.MetricsReceiverSink = (*noopMetricsSink)(nil)
61-
62-
func (nms *noopMetricsSink) ReceiveMetricsData(ctx context.Context, metricsdata data.MetricsData) (*receiver.MetricsReceiverAcknowledgement, error) {
63-
return &receiver.MetricsReceiverAcknowledgement{}, nil
64-
}
65-
6657
func runOCAgent() {
6758
yamlBlob, err := ioutil.ReadFile(configYAMLFile)
6859
if err != nil {
@@ -189,7 +180,7 @@ func runZPages(port int) func() error {
189180
return srv.Close
190181
}
191182

192-
func runOCReceiver(logger *zap.Logger, acfg *config.Config, sr receiver.TraceReceiverSink, mr receiver.MetricsReceiverSink) (doneFn func() error, err error) {
183+
func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp processor.TraceDataProcessor, mdp processor.MetricsDataProcessor) (doneFn func() error, err error) {
193184
tlsCredsOption, hasTLSCreds, err := acfg.OpenCensusReceiverTLSCredentialsServerOption()
194185
if err != nil {
195186
return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
@@ -217,19 +208,19 @@ func runOCReceiver(logger *zap.Logger, acfg *config.Config, sr receiver.TraceRec
217208

218209
switch {
219210
case acfg.CanRunOpenCensusTraceReceiver() && acfg.CanRunOpenCensusMetricsReceiver():
220-
if err := ocr.Start(ctx, sr, mr); err != nil {
211+
if err := ocr.Start(ctx, tdp, mdp); err != nil {
221212
return nil, fmt.Errorf("Failed to start Trace and Metrics Receivers: %v", err)
222213
}
223214
log.Printf("Running OpenCensus Trace and Metrics receivers as a gRPC service at %q", addr)
224215

225216
case acfg.CanRunOpenCensusTraceReceiver():
226-
if err := ocr.StartTraceReception(ctx, sr); err != nil {
217+
if err := ocr.StartTraceReception(ctx, tdp); err != nil {
227218
return nil, fmt.Errorf("Failed to start TraceReceiver: %v", err)
228219
}
229220
log.Printf("Running OpenCensus Trace receiver as a gRPC service at %q", addr)
230221

231222
case acfg.CanRunOpenCensusMetricsReceiver():
232-
if err := ocr.StartMetricsReception(ctx, mr); err != nil {
223+
if err := ocr.StartMetricsReception(ctx, mdp); err != nil {
233224
return nil, fmt.Errorf("Failed to start MetricsReceiver: %v", err)
234225
}
235226
log.Printf("Running OpenCensus Metrics receiver as a gRPC service at %q", addr)
@@ -246,7 +237,7 @@ func runOCReceiver(logger *zap.Logger, acfg *config.Config, sr receiver.TraceRec
246237
return doneFn, nil
247238
}
248239

249-
func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, sr receiver.TraceReceiverSink) (doneFn func() error, err error) {
240+
func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next processor.TraceDataProcessor) (doneFn func() error, err error) {
250241
jtr, err := jaegerreceiver.New(context.Background(), &jaegerreceiver.Configuration{
251242
CollectorThriftPort: collectorThriftPort,
252243
CollectorHTTPPort: collectorHTTPPort,
@@ -258,7 +249,7 @@ func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, sr receiver.T
258249
if err != nil {
259250
return nil, fmt.Errorf("Failed to create new Jaeger receiver: %v", err)
260251
}
261-
if err := jtr.StartTraceReception(context.Background(), sr); err != nil {
252+
if err := jtr.StartTraceReception(context.Background(), next); err != nil {
262253
return nil, fmt.Errorf("Failed to start Jaeger receiver: %v", err)
263254
}
264255
doneFn = func() error {
@@ -268,13 +259,13 @@ func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, sr receiver.T
268259
return doneFn, nil
269260
}
270261

271-
func runZipkinReceiver(addr string, sr receiver.TraceReceiverSink) (doneFn func() error, err error) {
262+
func runZipkinReceiver(addr string, next processor.TraceDataProcessor) (doneFn func() error, err error) {
272263
zi, err := zipkinreceiver.New(addr)
273264
if err != nil {
274265
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
275266
}
276267

277-
if err := zi.StartTraceReception(context.Background(), sr); err != nil {
268+
if err := zi.StartTraceReception(context.Background(), next); err != nil {
278269
return nil, fmt.Errorf("Cannot start Zipkin receiver with address %q: %v", addr, err)
279270
}
280271
doneFn = func() error {
@@ -284,13 +275,13 @@ func runZipkinReceiver(addr string, sr receiver.TraceReceiverSink) (doneFn func(
284275
return doneFn, nil
285276
}
286277

287-
func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, sr receiver.TraceReceiverSink) (doneFn func() error, err error) {
278+
func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next processor.TraceDataProcessor) (doneFn func() error, err error) {
288279
zs, err := scribe.NewReceiver(config.Address, config.Port, config.Category)
289280
if err != nil {
290281
return nil, fmt.Errorf("Failed to create the Zipkin Scribe receiver: %v", err)
291282
}
292283

293-
if err := zs.StartTraceReception(context.Background(), sr); err != nil {
284+
if err := zs.StartTraceReception(context.Background(), next); err != nil {
294285
return nil, fmt.Errorf("Cannot start Zipkin Scribe receiver with %v: %v", config, err)
295286
}
296287
doneFn = func() error {
@@ -300,12 +291,12 @@ func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, sr receiver.Tr
300291
return doneFn, nil
301292
}
302293

303-
func runPrometheusReceiver(promConfig *prometheusreceiver.Configuration, mr receiver.MetricsReceiverSink) (doneFn func() error, err error) {
294+
func runPrometheusReceiver(promConfig *prometheusreceiver.Configuration, next processor.MetricsDataProcessor) (doneFn func() error, err error) {
304295
pmr, err := prometheusreceiver.New(promConfig)
305296
if err != nil {
306297
return nil, err
307298
}
308-
if err := pmr.StartMetricsReception(context.Background(), mr); err != nil {
299+
if err := pmr.StartMetricsReception(context.Background(), next); err != nil {
309300
return nil, err
310301
}
311302
doneFn = func() error {

exporter/metrics_exporter.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"context"
1919

2020
"github.com/census-instrumentation/opencensus-service/data"
21-
"github.com/census-instrumentation/opencensus-service/receiver"
21+
"github.com/census-instrumentation/opencensus-service/processor"
2222
)
2323

2424
// MetricsExporter is an interface that receives data.MetricsData, converts it as needed, and
@@ -30,13 +30,13 @@ type MetricsExporter interface {
3030
}
3131

3232
// MultiMetricsExporters wraps multiple metrics exporters in a single one.
33-
func MultiMetricsExporters(mes ...MetricsExporter) receiver.MetricsReceiverSink {
33+
func MultiMetricsExporters(mes ...MetricsExporter) processor.MetricsDataProcessor {
3434
return metricsExporters(mes)
3535
}
3636

3737
type metricsExporters []MetricsExporter
3838

39-
var _ receiver.MetricsReceiverSink = (*metricsExporters)(nil)
39+
var _ processor.MetricsDataProcessor = (*metricsExporters)(nil)
4040

4141
// ExportMetricsData exports the MetricsData to all exporters wrapped by the current one.
4242
func (mes metricsExporters) ExportMetricsData(ctx context.Context, md data.MetricsData) error {
@@ -46,15 +46,12 @@ func (mes metricsExporters) ExportMetricsData(ctx context.Context, md data.Metri
4646
return nil
4747
}
4848

49-
// ReceiveTraceData receives the span data in the protobuf format, translates it, and forwards the transformed
49+
// ProcessMetricsData receives the span data in the protobuf format, translates it, and forwards the transformed
5050
// span data to all trace exporters wrapped by the current one.
51-
func (mes metricsExporters) ReceiveMetricsData(ctx context.Context, md data.MetricsData) (*receiver.MetricsReceiverAcknowledgement, error) {
51+
func (mes metricsExporters) ProcessMetricsData(ctx context.Context, md data.MetricsData) error {
5252
for _, me := range mes {
5353
_ = me.ExportMetricsData(ctx, md)
5454
}
5555

56-
ack := &receiver.MetricsReceiverAcknowledgement{
57-
SavedMetrics: uint64(len(md.Metrics)),
58-
}
59-
return ack, nil
56+
return nil
6057
}

exporter/trace_exporter.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"context"
1919

2020
"github.com/census-instrumentation/opencensus-service/data"
21-
"github.com/census-instrumentation/opencensus-service/receiver"
21+
"github.com/census-instrumentation/opencensus-service/processor"
2222
)
2323

2424
// TraceExporter is a interface that receives OpenCensus data, converts it as needed, and
@@ -34,7 +34,7 @@ type TraceExporter interface {
3434
// transforms it to OpenCensus in memory data and sends it to the exporter.
3535
type TraceExporterSink interface {
3636
TraceExporter
37-
receiver.TraceReceiverSink
37+
processor.TraceDataProcessor
3838
}
3939

4040
// MultiTraceExporters wraps multiple trace exporters in a single one.
@@ -44,6 +44,9 @@ func MultiTraceExporters(tes ...TraceExporter) TraceExporterSink {
4444

4545
type traceExporters []TraceExporter
4646

47+
var _ TraceExporter = (*traceExporters)(nil)
48+
var _ processor.TraceDataProcessor = (*traceExporters)(nil)
49+
4750
// ExportSpans exports the span data to all trace exporters wrapped by the current one.
4851
func (tes traceExporters) ExportSpans(ctx context.Context, td data.TraceData) error {
4952
for _, te := range tes {
@@ -52,15 +55,12 @@ func (tes traceExporters) ExportSpans(ctx context.Context, td data.TraceData) er
5255
return nil
5356
}
5457

55-
// ReceiveTraceData receives the span data in the protobuf format, translates it, and forwards the transformed
58+
// ProcessTraceData receives the span data in the protobuf format, translates it, and forwards the transformed
5659
// span data to all trace exporters wrapped by the current one.
57-
func (tes traceExporters) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
60+
func (tes traceExporters) ProcessTraceData(ctx context.Context, td data.TraceData) error {
5861
for _, te := range tes {
5962
_ = te.ExportSpans(ctx, td)
6063
}
6164

62-
ack := &receiver.TraceReceiverAcknowledgement{
63-
SavedSpans: uint64(len(td.Spans)),
64-
}
65-
return ack, nil
65+
return nil
6666
}

internal/collector/processor/exporter_processor.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ func NewTraceExporterProcessor(traceExporters ...exporter.TraceExporter) SpanPro
3434
}
3535

3636
func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) {
37-
ack, err := sp.tes.ReceiveTraceData(context.Background(), data.TraceData{Node: batch.Node, Resource: batch.Resource, Spans: batch.Spans})
37+
err := sp.tes.ProcessTraceData(context.Background(), data.TraceData{Node: batch.Node, Resource: batch.Resource, Spans: batch.Spans})
3838
if err != nil {
39-
return ack.DroppedSpans, err
39+
// TODO: determine if the number of dropped spans is needed because it was wrong anyway.
40+
return 0, err
4041
}
4142
return 0, nil
4243
}

internal/collector/processor/processor_to_sink.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,31 @@ import (
1919

2020
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2121
"github.com/census-instrumentation/opencensus-service/data"
22-
"github.com/census-instrumentation/opencensus-service/receiver"
22+
"github.com/census-instrumentation/opencensus-service/processor"
2323
)
2424

2525
type protoProcessorSink struct {
2626
sourceFormat string
2727
protoProcessor SpanProcessor
2828
}
2929

30-
var _ (receiver.TraceReceiverSink) = (*protoProcessorSink)(nil)
30+
var _ (processor.TraceDataProcessor) = (*protoProcessorSink)(nil)
3131

3232
// WrapWithSpanSink wraps a processor to be used as a span sink by receivers.
33-
func WrapWithSpanSink(format string, p SpanProcessor) receiver.TraceReceiverSink {
33+
func WrapWithSpanSink(format string, p SpanProcessor) processor.TraceDataProcessor {
3434
return &protoProcessorSink{
3535
sourceFormat: format,
3636
protoProcessor: p,
3737
}
3838
}
3939

40-
func (ps *protoProcessorSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
40+
func (ps *protoProcessorSink) ProcessTraceData(ctx context.Context, td data.TraceData) error {
4141
batch := &agenttracepb.ExportTraceServiceRequest{
4242
Node: td.Node,
4343
Spans: td.Spans,
4444
}
4545

46-
failures, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat)
46+
_, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat)
4747

48-
ack := &receiver.TraceReceiverAcknowledgement{
49-
SavedSpans: uint64(len(batch.Spans)) - failures,
50-
DroppedSpans: failures,
51-
}
52-
53-
return ack, err
48+
return err
5449
}

receiver/end_to_end_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.opencensus.io/trace"
2525

2626
"github.com/census-instrumentation/opencensus-service/data"
27+
"github.com/census-instrumentation/opencensus-service/processor"
2728
"github.com/census-instrumentation/opencensus-service/receiver"
2829
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
2930
)
@@ -92,11 +93,11 @@ func Example_endToEnd() {
9293

9394
type logSpanSink int
9495

95-
var _ receiver.TraceReceiverSink = (*logSpanSink)(nil)
96+
var _ processor.TraceDataProcessor = (*logSpanSink)(nil)
9697

97-
func (lsr *logSpanSink) ReceiveTraceData(ctx context.Context, td data.TraceData) (*receiver.TraceReceiverAcknowledgement, error) {
98+
func (lsr *logSpanSink) ProcessTraceData(ctx context.Context, td data.TraceData) error {
9899
spansBlob, _ := json.MarshalIndent(td.Spans, " ", " ")
99100
log.Printf("\n****\nNode: %#v\nSpans: %s\n****\n", td.Node, spansBlob)
100101

101-
return &receiver.TraceReceiverAcknowledgement{SavedSpans: uint64(len(td.Spans))}, nil
102+
return nil
102103
}

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939

4040
"github.com/census-instrumentation/opencensus-service/data"
4141
"github.com/census-instrumentation/opencensus-service/internal"
42+
"github.com/census-instrumentation/opencensus-service/processor"
4243
"github.com/census-instrumentation/opencensus-service/receiver"
4344
jaegertranslator "github.com/census-instrumentation/opencensus-service/translator/trace/jaeger"
4445
)
@@ -60,7 +61,7 @@ type jReceiver struct {
6061
// mu protects the fields of this type
6162
mu sync.Mutex
6263

63-
spanSink receiver.TraceReceiverSink
64+
nextProcessor processor.TraceDataProcessor
6465

6566
startOnce sync.Once
6667
stopOnce sync.Once
@@ -159,7 +160,7 @@ func (jr *jReceiver) agentBinaryThriftAddr() string {
159160
return fmt.Sprintf(":%d", port)
160161
}
161162

162-
func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink receiver.TraceReceiverSink) error {
163+
func (jr *jReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error {
163164
jr.mu.Lock()
164165
defer jr.mu.Unlock()
165166

@@ -175,8 +176,8 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink receiver.
175176
return
176177
}
177178

178-
// Finally set the spanSink, since we never encountered an error.
179-
jr.spanSink = spanSink
179+
// Finally set the nextProcessor, since we never encountered an error.
180+
jr.nextProcessor = nextProcessor
180181

181182
err = nil
182183
})
@@ -236,7 +237,7 @@ func (jr *jReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch)
236237

237238
if err == nil && octrace != nil {
238239
ok = true
239-
jr.spanSink.ReceiveTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
240+
jr.nextProcessor.ProcessTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
240241
// We MUST unconditionally record metrics from this reception.
241242
spansMetricsFn(octrace.Node, octrace.Spans)
242243
}
@@ -268,7 +269,7 @@ func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error {
268269

269270
ctx := context.Background()
270271
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "jaeger-agent")
271-
_, err = jr.spanSink.ReceiveTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
272+
err = jr.nextProcessor.ProcessTraceData(ctx, data.TraceData{Node: octrace.Node, Spans: octrace.Spans})
272273
// We MUST unconditionally record metrics from this reception.
273274
spansMetricsFn(octrace.Node, octrace.Spans)
274275

receiver/opencensusreceiver/ocmetrics/opencensus.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,22 @@ import (
2929
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
3030
"github.com/census-instrumentation/opencensus-service/data"
3131
"github.com/census-instrumentation/opencensus-service/internal"
32-
"github.com/census-instrumentation/opencensus-service/receiver"
32+
"github.com/census-instrumentation/opencensus-service/processor"
3333
)
3434

3535
// Receiver is the type used to handle metrics from OpenCensus exporters.
3636
type Receiver struct {
37-
metricSink receiver.MetricsReceiverSink
37+
nextProcessor processor.MetricsDataProcessor
3838
metricBufferPeriod time.Duration
3939
metricBufferCount int
4040
}
4141

4242
// New creates a new ocmetrics.Receiver reference.
43-
func New(sr receiver.MetricsReceiverSink, opts ...Option) (*Receiver, error) {
44-
if sr == nil {
45-
return nil, errors.New("needs a non-nil receiver.MetricsReceiverSink")
43+
func New(nextProcessor processor.MetricsDataProcessor, opts ...Option) (*Receiver, error) {
44+
if nextProcessor == nil {
45+
return nil, errors.New("needs a non-nil processor.MetricsDataProcessor")
4646
}
47-
ocr := &Receiver{metricSink: sr}
47+
ocr := &Receiver{nextProcessor: nextProcessor}
4848
for _, opt := range opts {
4949
opt.WithReceiver(ocr)
5050
}
@@ -142,7 +142,7 @@ func (ocr *Receiver) batchMetricExporting(longLivedRPCCtx context.Context, paylo
142142

143143
nMetrics := int64(0)
144144
for _, md := range mds {
145-
ocr.metricSink.ReceiveMetricsData(ctx, *md)
145+
ocr.nextProcessor.ProcessMetricsData(ctx, *md)
146146
nMetrics += int64(len(md.Metrics))
147147
}
148148

0 commit comments

Comments
 (0)