Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 92fed7f

Browse files
author
Paulo Janotti
authored
Make all receivers take "next" on creation (#477)
* Make all receivers take "next" on creation This is required in order to move the code to use factories for the receivers. The "next" parameter will be passed to the factory and then to the receiver constructor, this way Start[Trace|Metrics]Reception doesn't need the next parameter anymore. Since typically the Start* methods launch goroutines took the opportunity to add a channel to report asynchronous errors. Opportunistic change: renamed WrapWithSpanSink and related variables. Remaing items from previous commit * Improve test coverage for zipkin-scribe Improving code coverage for zipkin-scribe that went a tad down with the initial PR. * Improve a little code coverage zipkin receiver * Fix test broken during rebase
1 parent b9f473c commit 92fed7f

File tree

22 files changed

+291
-133
lines changed

22 files changed

+291
-133
lines changed

cmd/ocagent/main.go

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func runOCAgent() {
122122
commonMetricsSink := multiconsumer.NewMetricsProcessor(metricsExporters)
123123

124124
// Add other receivers here as they are implemented
125-
ocReceiverDoneFn, err := runOCReceiver(logger, &agentConfig, commonSpanSink, commonMetricsSink)
125+
ocReceiverDoneFn, err := runOCReceiver(logger, &agentConfig, commonSpanSink, commonMetricsSink, asyncErrorChan)
126126
if err != nil {
127127
log.Fatal(err)
128128
}
@@ -139,15 +139,15 @@ func runOCAgent() {
139139
// If the Zipkin receiver is enabled, then run it
140140
if agentConfig.ZipkinReceiverEnabled() {
141141
zipkinReceiverAddr := agentConfig.ZipkinReceiverAddress()
142-
zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanSink)
142+
zipkinReceiverDoneFn, err := runZipkinReceiver(zipkinReceiverAddr, commonSpanSink, asyncErrorChan)
143143
if err != nil {
144144
log.Fatal(err)
145145
}
146146
closeFns = append(closeFns, zipkinReceiverDoneFn)
147147
}
148148

149149
if agentConfig.ZipkinScribeReceiverEnabled() {
150-
zipkinScribeDoneFn, err := runZipkinScribeReceiver(agentConfig.ZipkinScribeConfig(), commonSpanSink)
150+
zipkinScribeDoneFn, err := runZipkinScribeReceiver(agentConfig.ZipkinScribeConfig(), commonSpanSink, asyncErrorChan)
151151
if err != nil {
152152
log.Fatal(err)
153153
}
@@ -156,7 +156,7 @@ func runOCAgent() {
156156

157157
if agentConfig.JaegerReceiverEnabled() {
158158
collectorHTTPPort, collectorThriftPort := agentConfig.JaegerReceiverPorts()
159-
jaegerDoneFn, err := runJaegerReceiver(collectorThriftPort, collectorHTTPPort, commonSpanSink)
159+
jaegerDoneFn, err := runJaegerReceiver(collectorThriftPort, collectorHTTPPort, commonSpanSink, asyncErrorChan)
160160
if err != nil {
161161
log.Fatal(err)
162162
}
@@ -165,7 +165,7 @@ func runOCAgent() {
165165

166166
// If the Prometheus receiver is enabled, then run it.
167167
if agentConfig.PrometheusReceiverEnabled() {
168-
promDoneFn, err := runPrometheusReceiver(viperCfg, commonMetricsSink)
168+
promDoneFn, err := runPrometheusReceiver(viperCfg, commonMetricsSink, asyncErrorChan)
169169
if err != nil {
170170
log.Fatal(err)
171171
}
@@ -214,14 +214,16 @@ func runZPages(port int) func() error {
214214
return srv.Close
215215
}
216216

217-
func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp consumer.TraceConsumer, mdp consumer.MetricsConsumer) (doneFn func() error, err error) {
217+
func runOCReceiver(logger *zap.Logger, acfg *config.Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
218218
tlsCredsOption, hasTLSCreds, err := acfg.OpenCensusReceiverTLSCredentialsServerOption()
219219
if err != nil {
220220
return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
221221
}
222222
addr := acfg.OpenCensusReceiverAddress()
223223
corsOrigins := acfg.OpenCensusReceiverCorsAllowedOrigins()
224224
ocr, err := opencensusreceiver.New(addr,
225+
tc,
226+
mc,
225227
tlsCredsOption,
226228
opencensusreceiver.WithCorsOrigins(corsOrigins))
227229

@@ -242,19 +244,19 @@ func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp consumer.TraceCo
242244

243245
switch {
244246
case acfg.CanRunOpenCensusTraceReceiver() && acfg.CanRunOpenCensusMetricsReceiver():
245-
if err := ocr.Start(ctx, tdp, mdp); err != nil {
247+
if err := ocr.Start(ctx); err != nil {
246248
return nil, fmt.Errorf("failed to start Trace and Metrics Receivers: %v", err)
247249
}
248250
log.Printf("Running OpenCensus Trace and Metrics receivers as a gRPC service at %q", addr)
249251

250252
case acfg.CanRunOpenCensusTraceReceiver():
251-
if err := ocr.StartTraceReception(ctx, tdp); err != nil {
253+
if err := ocr.StartTraceReception(ctx, asyncErrorChan); err != nil {
252254
return nil, fmt.Errorf("failed to start TraceReceiver: %v", err)
253255
}
254256
log.Printf("Running OpenCensus Trace receiver as a gRPC service at %q", addr)
255257

256258
case acfg.CanRunOpenCensusMetricsReceiver():
257-
if err := ocr.StartMetricsReception(ctx, mdp); err != nil {
259+
if err := ocr.StartMetricsReception(ctx, asyncErrorChan); err != nil {
258260
return nil, fmt.Errorf("failed to start MetricsReceiver: %v", err)
259261
}
260262
log.Printf("Running OpenCensus Metrics receiver as a gRPC service at %q", addr)
@@ -271,19 +273,20 @@ func runOCReceiver(logger *zap.Logger, acfg *config.Config, tdp consumer.TraceCo
271273
return doneFn, nil
272274
}
273275

274-
func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next consumer.TraceConsumer) (doneFn func() error, err error) {
275-
jtr, err := jaegerreceiver.New(context.Background(), &jaegerreceiver.Configuration{
276+
func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next consumer.TraceConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
277+
config := &jaegerreceiver.Configuration{
276278
CollectorThriftPort: collectorThriftPort,
277279
CollectorHTTPPort: collectorHTTPPort,
278280

279281
// TODO: (@odeke-em, @pjanotti) send a change
280282
// to dynamically retrieve the Jaeger Agent's ports
281283
// and not use their defaults of 5778, 6831, 6832
282-
})
284+
}
285+
jtr, err := jaegerreceiver.New(context.Background(), config, next)
283286
if err != nil {
284287
return nil, fmt.Errorf("failed to create new Jaeger receiver: %v", err)
285288
}
286-
if err := jtr.StartTraceReception(context.Background(), next); err != nil {
289+
if err := jtr.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
287290
return nil, fmt.Errorf("failed to start Jaeger receiver: %v", err)
288291
}
289292
doneFn = func() error {
@@ -293,13 +296,13 @@ func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, next consumer
293296
return doneFn, nil
294297
}
295298

296-
func runZipkinReceiver(addr string, next consumer.TraceConsumer) (doneFn func() error, err error) {
297-
zi, err := zipkinreceiver.New(addr)
299+
func runZipkinReceiver(addr string, next consumer.TraceConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
300+
zi, err := zipkinreceiver.New(addr, next)
298301
if err != nil {
299302
return nil, fmt.Errorf("failed to create the Zipkin receiver: %v", err)
300303
}
301304

302-
if err := zi.StartTraceReception(context.Background(), next); err != nil {
305+
if err := zi.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
303306
return nil, fmt.Errorf("cannot start Zipkin receiver with address %q: %v", addr, err)
304307
}
305308
doneFn = func() error {
@@ -309,13 +312,13 @@ func runZipkinReceiver(addr string, next consumer.TraceConsumer) (doneFn func()
309312
return doneFn, nil
310313
}
311314

312-
func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next consumer.TraceConsumer) (doneFn func() error, err error) {
313-
zs, err := zipkinscribereceiver.NewReceiver(config.Address, config.Port, config.Category)
315+
func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next consumer.TraceConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
316+
zs, err := zipkinscribereceiver.NewReceiver(config.Address, config.Port, config.Category, next)
314317
if err != nil {
315318
return nil, fmt.Errorf("failed to create the Zipkin Scribe receiver: %v", err)
316319
}
317320

318-
if err := zs.StartTraceReception(context.Background(), next); err != nil {
321+
if err := zs.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
319322
return nil, fmt.Errorf("cannot start Zipkin Scribe receiver with %v: %v", config, err)
320323
}
321324
doneFn = func() error {
@@ -325,12 +328,12 @@ func runZipkinScribeReceiver(config *config.ScribeReceiverConfig, next consumer.
325328
return doneFn, nil
326329
}
327330

328-
func runPrometheusReceiver(v *viper.Viper, next consumer.MetricsConsumer) (doneFn func() error, err error) {
329-
pmr, err := prometheusreceiver.New(v.Sub("receivers.prometheus"))
331+
func runPrometheusReceiver(v *viper.Viper, next consumer.MetricsConsumer, asyncErrorChan chan<- error) (doneFn func() error, err error) {
332+
pmr, err := prometheusreceiver.New(v.Sub("receivers.prometheus"), next)
330333
if err != nil {
331334
return nil, err
332335
}
333-
if err := pmr.StartMetricsReception(context.Background(), next); err != nil {
336+
if err := pmr.StartMetricsReception(context.Background(), asyncErrorChan); err != nil {
334337
return nil, err
335338
}
336339
doneFn = func() error {

cmd/occollector/app/collector/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (app *Application) execute() {
8989
var closeFns []func()
9090
app.processor, closeFns = startProcessor(app.v, app.logger)
9191

92-
app.receivers = createReceivers(app.v, app.logger, app.processor)
92+
app.receivers = createReceivers(app.v, app.logger, app.processor, asyncErrorChannel)
9393

9494
err = initTelemetry(asyncErrorChannel, app.v, app.logger)
9595
if err != nil {

cmd/occollector/app/collector/receivers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import (
3030
"github.com/census-instrumentation/opencensus-service/receiver"
3131
)
3232

33-
func createReceivers(v *viper.Viper, logger *zap.Logger, traceConsumers consumer.TraceConsumer) []receiver.TraceReceiver {
33+
func createReceivers(v *viper.Viper, logger *zap.Logger, traceConsumers consumer.TraceConsumer, asyncErrorChan chan<- error) []receiver.TraceReceiver {
3434
var someReceiverEnabled bool
3535
receivers := []struct {
36-
runFn func(*zap.Logger, *viper.Viper, consumer.TraceConsumer) (receiver.TraceReceiver, error)
36+
runFn func(*zap.Logger, *viper.Viper, consumer.TraceConsumer, chan<- error) (receiver.TraceReceiver, error)
3737
enabled bool
3838
}{
3939
{jaegerreceiver.Start, builder.JaegerReceiverEnabled(v)},
@@ -45,7 +45,7 @@ func createReceivers(v *viper.Viper, logger *zap.Logger, traceConsumers consumer
4545
var startedTraceReceivers []receiver.TraceReceiver
4646
for _, receiver := range receivers {
4747
if receiver.enabled {
48-
rec, err := receiver.runFn(logger, v, traceConsumers)
48+
rec, err := receiver.runFn(logger, v, traceConsumers, asyncErrorChan)
4949
if err != nil {
5050
// TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating.
5151
for _, startedTraceReceiver := range startedTraceReceivers {

exporter/zipkinexporter/zipkin_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ zipkin:
168168
tes[0].(*zipkinExporter).reporter = mzr
169169

170170
// Run the Zipkin receiver to "receive spans upload from a client application"
171-
zi, err := zipkinreceiver.New(":0")
171+
zexp := multiconsumer.NewTraceProcessor(tes)
172+
zi, err := zipkinreceiver.New(":0", zexp)
172173
if err != nil {
173174
t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
174175
}
175176

176-
zexp := multiconsumer.NewTraceProcessor(tes)
177-
if err := zi.StartTraceReception(context.Background(), zexp); err != nil {
177+
if err := zi.StartTraceReception(context.Background(), nil); err != nil {
178178
t.Fatalf("Failed to start trace reception: %v", err)
179179
}
180180
defer zi.StopTraceReception(context.Background())

internal/collector/jaeger/receiver.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,23 @@ import (
3535
)
3636

3737
// Start starts the Jaeger receiver endpoint.
38-
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
38+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
3939
rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v)
4040
if err != nil {
4141
return nil, err
4242
}
4343

4444
ctx := context.Background()
45-
jtr, err := jaegerreceiver.New(ctx, &jaegerreceiver.Configuration{
45+
config := &jaegerreceiver.Configuration{
4646
CollectorThriftPort: rOpts.ThriftTChannelPort,
4747
CollectorHTTPPort: rOpts.ThriftHTTPPort,
48-
})
48+
}
49+
jtr, err := jaegerreceiver.New(ctx, config, traceConsumer)
4950
if err != nil {
5051
return nil, err
5152
}
5253

53-
if err := jtr.StartTraceReception(ctx, traceConsumer); err != nil {
54+
if err := jtr.StartTraceReception(ctx, asyncErrorChan); err != nil {
5455
return nil, err
5556
}
5657

internal/collector/opencensus/receiver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
)
3232

3333
// Start starts the OpenCensus receiver endpoint.
34-
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
34+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
3535
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
3636
if err != nil {
3737
return nil, err
@@ -43,12 +43,12 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
4343
}
4444

4545
addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10)
46-
ocr, err := opencensusreceiver.New(addr, tlsCredsOption)
46+
ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, tlsCredsOption)
4747
if err != nil {
4848
return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err)
4949
}
5050

51-
if err := ocr.StartTraceReception(context.Background(), traceConsumer); err != nil {
51+
if err := ocr.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
5252
return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err)
5353
}
5454

internal/collector/zipkin/receiver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,19 @@ import (
3131
)
3232

3333
// Start starts the Zipkin receiver endpoint.
34-
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
34+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
3535
rOpts, err := builder.NewDefaultZipkinReceiverCfg().InitFromViper(v)
3636
if err != nil {
3737
return nil, err
3838
}
3939

4040
addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10)
41-
zi, err := zipkinreceiver.New(addr)
41+
zi, err := zipkinreceiver.New(addr, traceConsumer)
4242
if err != nil {
4343
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
4444
}
4545

46-
if err := zi.StartTraceReception(context.Background(), traceConsumer); err != nil {
46+
if err := zi.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
4747
return nil, fmt.Errorf("Cannot start Zipkin receiver to address %q: %v", addr, err)
4848
}
4949

internal/collector/zipkin/scribe/receiver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,18 @@ import (
3030
)
3131

3232
// Start starts the Zipkin Scribe receiver endpoint.
33-
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) {
33+
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
3434
rOpts, err := builder.NewDefaultZipkinScribeReceiverCfg().InitFromViper(v)
3535
if err != nil {
3636
return nil, err
3737
}
3838

39-
sr, err := zipkinscribereceiver.NewReceiver(rOpts.Address, rOpts.Port, rOpts.Category)
39+
sr, err := zipkinscribereceiver.NewReceiver(rOpts.Address, rOpts.Port, rOpts.Category, traceConsumer)
4040
if err != nil {
4141
return nil, fmt.Errorf("Failed to create the Zipkin Scribe receiver: %v", err)
4242
}
4343

44-
if err := sr.StartTraceReception(context.Background(), traceConsumer); err != nil {
44+
if err := sr.StartTraceReception(context.Background(), asyncErrorChan); err != nil {
4545
return nil, fmt.Errorf("Cannot start Zipkin Scribe receiver %+v: %v", rOpts, err)
4646
}
4747

receiver/end_to_end_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ func Example_endToEnd() {
3333
// This is what the cmd/ocagent code would look like this.
3434
// A trace receiver as per the trace receiver
3535
// configs that have been parsed.
36-
tr, err := opencensusreceiver.New("localhost:55678")
36+
lte, err := loggingexporter.NewTraceExporter(zap.NewNop())
37+
if err != nil {
38+
log.Fatalf("Failed to create logging exporter: %v", err)
39+
}
40+
41+
tr, err := opencensusreceiver.New("localhost:55678", lte, nil)
3742
if err != nil {
3843
log.Fatalf("Failed to create trace receiver: %v", err)
3944
}
@@ -43,9 +48,8 @@ func Example_endToEnd() {
4348

4449
// Once we have the span receiver which will connect to the
4550
// various exporter pipeline i.e. *tracepb.Span->OpenCensus.SpanData
46-
lsr, _ := loggingexporter.NewTraceExporter(zap.NewNop())
4751
for _, tr := range trl {
48-
if err := tr.StartTraceReception(context.Background(), lsr); err != nil {
52+
if err := tr.StartTraceReception(context.Background(), nil); err != nil {
4953
log.Fatalf("Failed to start trace receiver: %v", err)
5054
}
5155
}

receiver/factorytemplate/factorytemplate_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ func (mr *mockReceiver) TraceSource() string {
531531
return "mockReceiver"
532532
}
533533

534-
func (mr *mockReceiver) StartTraceReception(ctx context.Context, nextConsumer consumer.TraceConsumer) error {
534+
func (mr *mockReceiver) StartTraceReception(ctx context.Context, asyncErrChan chan<- error) error {
535535
if mr == nil {
536536
panic("mockReceiver is nil")
537537
}
@@ -552,7 +552,7 @@ func (mr *mockReceiver) MetricsSource() string {
552552
return "mockReceiver"
553553
}
554554

555-
func (mr *mockReceiver) StartMetricsReception(ctx context.Context, nextConsumer consumer.MetricsConsumer) error {
555+
func (mr *mockReceiver) StartMetricsReception(ctx context.Context, asyncErrChan chan<- error) error {
556556
if mr == nil {
557557
panic("mockReceiver is nil")
558558
}

0 commit comments

Comments
 (0)