Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit baec3ed

Browse files
author
Bogdan Drutu
authored
Add the source name to the receiver interface. (#435)
1 parent 25ecb99 commit baec3ed

7 files changed

Lines changed: 60 additions & 13 deletions

File tree

cmd/occollector/app/collector/receivers.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,13 @@ import (
3333
func createReceivers(v *viper.Viper, logger *zap.Logger, spanProcessor processor.SpanProcessor) []receiver.TraceReceiver {
3434
var someReceiverEnabled bool
3535
receivers := []struct {
36-
name string
3736
runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (receiver.TraceReceiver, error)
3837
enabled bool
3938
}{
40-
{"Jaeger", jaegerreceiver.Start, builder.JaegerReceiverEnabled(v)},
41-
{"OpenCensus", ocreceiver.Start, builder.OpenCensusReceiverEnabled(v)},
42-
{"Zipkin", zipkinreceiver.Start, builder.ZipkinReceiverEnabled(v)},
43-
{"Zipkin-Scribe", zipkinscribereceiver.Start, builder.ZipkinScribeReceiverEnabled(v)},
39+
{jaegerreceiver.Start, builder.JaegerReceiverEnabled(v)},
40+
{ocreceiver.Start, builder.OpenCensusReceiverEnabled(v)},
41+
{zipkinreceiver.Start, builder.ZipkinReceiverEnabled(v)},
42+
{zipkinscribereceiver.Start, builder.ZipkinScribeReceiverEnabled(v)},
4443
}
4544

4645
var startedTraceReceivers []receiver.TraceReceiver
@@ -52,7 +51,7 @@ func createReceivers(v *viper.Viper, logger *zap.Logger, spanProcessor processor
5251
for _, startedTraceReceiver := range startedTraceReceivers {
5352
startedTraceReceiver.StopTraceReception(context.Background())
5453
}
55-
logger.Fatal("Cannot run receiver for "+receiver.name, zap.Error(err))
54+
logger.Fatal("Cannot run receiver for "+rec.TraceSource(), zap.Error(err))
5655
}
5756
startedTraceReceivers = append(startedTraceReceivers, rec)
5857
someReceiverEnabled = true

receiver/jaegerreceiver/trace_receiver.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ const (
8989
defaultZipkinThriftUDPPort = 5775
9090
defaultCompactThriftUDPPort = 6831
9191
defaultBinaryThriftUDPPort = 6832
92+
93+
traceSource string = "Jaeger"
9294
)
9395

9496
// New creates a TraceReceiver that receives traffic as a collector with both Thrift and HTTP transports.
@@ -160,6 +162,10 @@ func (jr *jReceiver) agentBinaryThriftAddr() string {
160162
return fmt.Sprintf(":%d", port)
161163
}
162164

165+
func (jr *jReceiver) TraceSource() string {
166+
return traceSource
167+
}
168+
163169
func (jr *jReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error {
164170
jr.mu.Lock()
165171
defer jr.mu.Unlock()

receiver/opencensusreceiver/opencensus.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ var (
6464
errAlreadyStopped = errors.New("already stopped")
6565
)
6666

67+
const source string = "OpenCensus"
68+
6769
// New just creates the OpenCensus receiver services. It is the caller's
6870
// responsibility to invoke the respective Start*Reception methods as well
6971
// as the various Stop*Reception methods or simply Stop to end it.
@@ -87,6 +89,11 @@ func New(addr string, opts ...Option) (*Receiver, error) {
8789
return ocr, nil
8890
}
8991

92+
// TraceSource returns the name of the trace data source.
93+
func (ocr *Receiver) TraceSource() string {
94+
return source
95+
}
96+
9097
// StartTraceReception exclusively runs the Trace receiver on the gRPC server.
9198
// To start both Trace and Metrics receivers/services, please use Start.
9299
func (ocr *Receiver) StartTraceReception(ctx context.Context, ts processor.TraceDataProcessor) error {
@@ -111,6 +118,11 @@ func (ocr *Receiver) registerTraceDataProcessor(ts processor.TraceDataProcessor)
111118
return err
112119
}
113120

121+
// MetricsSource returns the name of the metrics data source.
122+
func (ocr *Receiver) MetricsSource() string {
123+
return source
124+
}
125+
114126
// StartMetricsReception exclusively runs the Metrics receiver on the gRPC server.
115127
// To start both Trace and Metrics receivers/services, please use Start.
116128
func (ocr *Receiver) StartMetricsReception(ctx context.Context, ms processor.MetricsDataProcessor) error {

receiver/prometheusreceiver/metrics_receiver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ var (
5959
errNilScrapeConfig = errors.New("expecting a non-nil ScrapeConfig")
6060
)
6161

62+
const metricsSource string = "Prometheus"
63+
64+
// MetricsSource returns the name of the metrics data source.
65+
func (pr *Preceiver) MetricsSource() string {
66+
return metricsSource
67+
}
68+
6269
// StartMetricsReception is the method that starts Prometheus scraping and it
6370
// is controlled by having previously defined a Configuration using perhaps New.
6471
func (pr *Preceiver) StartMetricsReception(ctx context.Context, nextProcessor processor.MetricsDataProcessor) error {

receiver/receiver.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,37 @@ import (
2424
// A TraceReceiver is an "arbitrary data"-to-"trace proto span" converter.
2525
// Its purpose is to translate data from the wild into trace proto accompanied
2626
// by a *commonpb.Node to uniquely identify where that data comes from.
27-
// TraceReceiver feeds a TraceReceiverSink with data.
27+
// TraceReceiver feeds a processor.TraceDataProcessor with data.
2828
//
2929
// For example it could be Zipkin data source which translates
3030
// Zipkin spans into *tracepb.Span-s.
31-
//
32-
// StartTraceReception tells the receiver to start its processing.
33-
//
34-
// StopTraceReception tells the receiver that should stop reception,
35-
// giving it a chance to perform any necessary clean-up.
3631
type TraceReceiver interface {
32+
// TraceSource returns the name of the trace data source.
33+
TraceSource() string
34+
35+
// StartTraceReception tells the receiver to start its processing.
3736
StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error
37+
38+
// StopTraceReception tells the receiver that should stop reception,
39+
// giving it a chance to perform any necessary clean-up.
3840
StopTraceReception(ctx context.Context) error
3941
}
4042

4143
// A MetricsReceiver is an "arbitrary data"-to-"metric proto" converter.
4244
// Its purpose is to translate data from the wild into metric proto accompanied
4345
// by a *commonpb.Node to uniquely identify where that data comes from.
44-
// MetricsReceiver feeds a MetricsReceiverSink with data.
46+
// MetricsReceiver feeds a processor.MetricsDataProcessor with data.
4547
//
4648
// For example it could be Prometheus data source which translates
4749
// Prometheus metrics into *metricpb.Metric-s.
4850
type MetricsReceiver interface {
51+
// MetricsSource returns the name of the metrics data source.
52+
MetricsSource() string
53+
54+
// StartMetricsReception tells the receiver to start its processing.
4955
StartMetricsReception(ctx context.Context, nextProcessor processor.MetricsDataProcessor) error
56+
57+
// StopMetricsReception tells the receiver that should stop reception,
58+
// giving it a chance to perform any necessary clean-up.
5059
StopMetricsReception(ctx context.Context) error
5160
}

receiver/zipkinreceiver/scribe/scribe_receiver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ func NewReceiver(addr string, port uint16, category string) (receiver.TraceRecei
6565
return r, nil
6666
}
6767

68+
const traceSource string = "Zipkin-Scribe"
69+
70+
// TraceSource returns the name of the trace data source.
71+
func (r *scribeReceiver) TraceSource() string {
72+
return traceSource
73+
}
74+
6875
func (r *scribeReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error {
6976
r.Lock()
7077
defer r.Unlock()

receiver/zipkinreceiver/trace_receiver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ var (
8484
errAlreadyStopped = errors.New("already stopped")
8585
)
8686

87+
const traceSource string = "Zipkin"
88+
89+
// TraceSource returns the name of the trace data source.
90+
func (zr *ZipkinReceiver) TraceSource() string {
91+
return traceSource
92+
}
93+
8794
// StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing.
8895
func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, nextProcessor processor.TraceDataProcessor) error {
8996
zr.mu.Lock()

0 commit comments

Comments
 (0)