@@ -24,35 +24,64 @@ import (
2424 commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
2525 agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2626 resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
27- tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
2827 "github.com/census-instrumentation/opencensus-service/data"
2928 "github.com/census-instrumentation/opencensus-service/internal"
3029 "github.com/census-instrumentation/opencensus-service/receiver"
3130)
3231
32+ const (
33+ defaultNumWorkers = 4
34+
35+ messageChannelSize = 64
36+ )
37+
3338// Receiver is the type used to handle spans from OpenCensus exporters.
3439type Receiver struct {
35- spanSink receiver.TraceReceiverSink
40+ spanSink receiver.TraceReceiverSink
41+ numWorkers int
42+ workers []* receiverWorker
43+ messageChan chan * traceDataWithCtx
44+ }
45+
46+ type traceDataWithCtx struct {
47+ data * data.TraceData
48+ ctx context.Context
3649}
3750
3851// New creates a new opencensus.Receiver reference.
3952func New (sr receiver.TraceReceiverSink , opts ... Option ) (* Receiver , error ) {
4053 if sr == nil {
4154 return nil , errors .New ("needs a non-nil receiver.TraceReceiverSink" )
4255 }
43- oci := & Receiver {spanSink : sr }
56+
57+ messageChan := make (chan * traceDataWithCtx , messageChannelSize )
58+ ocr := & Receiver {
59+ spanSink : sr ,
60+ numWorkers : defaultNumWorkers ,
61+ messageChan : messageChan ,
62+ }
4463 for _ , opt := range opts {
45- opt .WithReceiver (oci )
64+ opt (ocr )
65+ }
66+
67+ // Setup and startup worker pool
68+ workers := make ([]* receiverWorker , 0 , ocr .numWorkers )
69+ for index := 0 ; index < ocr .numWorkers ; index ++ {
70+ worker := newReceiverWorker (ocr )
71+ go worker .listenOn (messageChan )
72+ workers = append (workers , worker )
4673 }
47- return oci , nil
74+ ocr .workers = workers
75+
76+ return ocr , nil
4877}
4978
5079var _ agenttracepb.TraceServiceServer = (* Receiver )(nil )
5180
5281var errUnimplemented = errors .New ("unimplemented" )
5382
5483// Config handles configuration messages.
55- func (oci * Receiver ) Config (tcs agenttracepb.TraceService_ConfigServer ) error {
84+ func (ocr * Receiver ) Config (tcs agenttracepb.TraceService_ConfigServer ) error {
5685 // TODO: Implement when we define the config receiver/sender.
5786 return errUnimplemented
5887}
@@ -63,9 +92,10 @@ const receiverName = "opencensus_trace"
6392
6493// Export is the gRPC method that receives streamed traces from
6594// OpenCensus-traceproto compatible libraries/applications.
66- func (oci * Receiver ) Export (tes agenttracepb.TraceService_ExportServer ) error {
95+ func (ocr * Receiver ) Export (tes agenttracepb.TraceService_ExportServer ) error {
6796 // We need to ensure that it propagates the receiver name as a tag
6897 ctxWithReceiverName := internal .ContextWithReceiverName (tes .Context (), receiverName )
98+ spansMetricsFn := internal .NewReceivedSpansRecorderStreaming (tes .Context (), receiverName )
6999
70100 // The first message MUST have a non-nil Node.
71101 recv , err := tes .Recv ()
@@ -93,7 +123,15 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
93123 resource = recv .Resource
94124 }
95125
96- go oci .export (ctxWithReceiverName , tes , lastNonNilNode , resource , recv .Spans )
126+ td := & data.TraceData {
127+ Node : lastNonNilNode ,
128+ Resource : resource ,
129+ Spans : recv .Spans ,
130+ }
131+
132+ ocr .messageChan <- & traceDataWithCtx {data : td , ctx : ctxWithReceiverName }
133+
134+ spansMetricsFn (td .Node , td .Spans )
97135
98136 recv , err = tes .Recv ()
99137 if err != nil {
@@ -107,17 +145,45 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
107145 }
108146}
109147
110- func (oci * Receiver ) export (
111- longLivedCtx context.Context ,
112- tes agenttracepb.TraceService_ExportServer ,
113- node * commonpb.Node ,
114- resource * resourcepb.Resource ,
115- spans []* tracepb.Span ,
116- ) {
117- tracedata := data.TraceData {Node : node , Resource : resource , Spans : spans }
118- // We MUST unconditionally record metrics from this reception.
119- spansMetricsFn := internal .NewReceivedSpansRecorderStreaming (tes .Context (), receiverName )
120- spansMetricsFn (tracedata .Node , tracedata .Spans )
148+ // Stop the receiver and its workers
149+ func (ocr * Receiver ) Stop () {
150+ for _ , worker := range ocr .workers {
151+ worker .stopListening ()
152+ }
153+ }
154+
155+ type receiverWorker struct {
156+ receiver * Receiver
157+ tes agenttracepb.TraceService_ExportServer
158+ cancel chan struct {}
159+ }
160+
161+ func newReceiverWorker (receiver * Receiver ) * receiverWorker {
162+ return & receiverWorker {
163+ receiver : receiver ,
164+ cancel : make (chan struct {}),
165+ }
166+ }
167+
168+ func (rw * receiverWorker ) listenOn (cn <- chan * traceDataWithCtx ) {
169+ for {
170+ select {
171+ case tdWithCtx := <- cn :
172+ rw .export (tdWithCtx .ctx , tdWithCtx .data )
173+ case <- rw .cancel :
174+ return
175+ }
176+ }
177+ }
178+
179+ func (rw * receiverWorker ) stopListening () {
180+ close (rw .cancel )
181+ }
182+
183+ func (rw * receiverWorker ) export (longLivedCtx context.Context , tracedata * data.TraceData ) {
184+ if tracedata == nil {
185+ return
186+ }
121187
122188 if len (tracedata .Spans ) == 0 {
123189 return
@@ -134,7 +200,7 @@ func (oci *Receiver) export(
134200 // If the starting RPC has a parent span, then add it as a parent link.
135201 internal .SetParentLink (longLivedCtx , span )
136202
137- oci . spanSink .ReceiveTraceData (ctx , tracedata )
203+ rw . receiver . spanSink .ReceiveTraceData (ctx , * tracedata )
138204
139205 span .Annotate ([]trace.Attribute {
140206 trace .Int64Attribute ("num_spans" , int64 (len (tracedata .Spans ))),
0 commit comments