Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 2603c50

Browse files
author
Steven Karis
authored
Improve oc-receiver and oc-exporter span throughput (#395)
* Improve oc-receiver and oc-exporter span throughput OC collector reception and export of spans did not scale up. This change removes the `bundler` from the oc span receiver in favor of go-routines, and additionally adds a worker count to the oc-exporter. * fix lint * PR comments * PR comments * Finish removal of unused options * Update cached test
1 parent 0dbf0b3 commit 2603c50

6 files changed

Lines changed: 86 additions & 114 deletions

File tree

exporter/exporterparser/opencensus.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package exporterparser
1717
import (
1818
"context"
1919
"fmt"
20+
"sync/atomic"
2021

2122
"contrib.go.opencensus.io/exporter/ocagent"
2223
"github.com/spf13/viper"
@@ -34,12 +35,18 @@ type opencensusConfig struct {
3435
Compression string `mapstructure:"compression,omitempty"`
3536
Headers map[string]string `mapstructure:"headers,omitempty"`
3637
// TODO: add insecure, service name options.
38+
NumWorkers int `mapstructure:"num-workers,omitempty"`
3739
}
3840

3941
type ocagentExporter struct {
40-
exporter *ocagent.Exporter
42+
counter uint32
43+
exporters []*ocagent.Exporter
4144
}
4245

46+
const (
47+
defaultNumWorkers int = 2
48+
)
49+
4350
var _ exporter.TraceExporter = (*ocagentExporter)(nil)
4451

4552
// OpenCensusTraceExportersFromViper unmarshals the viper and returns an exporter.TraceExporter targeting
@@ -57,43 +64,55 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tes []exporter.TraceExpo
5764
}
5865

5966
if ocac.Endpoint == "" {
60-
return nil, nil, nil, fmt.Errorf("OpenCensus config requires an Endpoint")
67+
return nil, nil, nil, fmt.Errorf("openCensus config requires an Endpoint")
6168
}
6269

6370
opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint), ocagent.WithInsecure()}
6471
if ocac.Compression != "" {
6572
if compressionKey := grpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != compression.Unsupported {
6673
opts = append(opts, ocagent.UseCompressor(compressionKey))
6774
} else {
68-
return nil, nil, nil, fmt.Errorf("Unsupported compression type: %s", ocac.Compression)
75+
return nil, nil, nil, fmt.Errorf("unsupported compression type: %s", ocac.Compression)
6976
}
7077
}
7178
if len(ocac.Headers) > 0 {
7279
opts = append(opts, ocagent.WithHeaders(ocac.Headers))
7380
}
7481

75-
sde, serr := ocagent.NewExporter(opts...)
76-
if serr != nil {
77-
return nil, nil, nil, fmt.Errorf("Cannot configure OpenCensus Trace exporter: %v", serr)
82+
numWorkers := defaultNumWorkers
83+
if ocac.NumWorkers > 0 {
84+
numWorkers = ocac.NumWorkers
85+
}
86+
87+
exporters := make([]*ocagent.Exporter, 0, numWorkers)
88+
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
89+
exporter, serr := ocagent.NewExporter(opts...)
90+
if serr != nil {
91+
return nil, nil, nil, fmt.Errorf("cannot configure OpenCensus Trace exporter: %v", serr)
92+
}
93+
exporters = append(exporters, exporter)
94+
doneFns = append(doneFns, func() error {
95+
exporter.Flush()
96+
return nil
97+
})
7898
}
7999

80-
oexp := &ocagentExporter{exporter: sde}
100+
oexp := &ocagentExporter{exporters: exporters}
81101
tes = append(tes, oexp)
82102

83103
// TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
84104
// mes = append(mes, oexp)
85-
doneFns = append(doneFns, func() error {
86-
sde.Flush()
87-
return nil
88-
})
89105
return tes, mes, doneFns, nil
90106
}
91107

92-
func (sde *ocagentExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
93-
err := sde.exporter.ExportTraceServiceRequest(
108+
func (oce *ocagentExporter) ExportSpans(ctx context.Context, td data.TraceData) error {
109+
// Get an exporter worker round-robin
110+
exporter := oce.exporters[atomic.AddUint32(&oce.counter, 1)%uint32(len(oce.exporters))]
111+
err := exporter.ExportTraceServiceRequest(
94112
&agenttracepb.ExportTraceServiceRequest{
95-
Spans: td.Spans,
96-
Node: td.Node,
113+
Spans: td.Spans,
114+
Resource: td.Resource,
115+
Node: td.Node,
97116
},
98117
)
99118
if err != nil {

receiver/opencensus/octrace/observability_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
3434
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
3535
"github.com/census-instrumentation/opencensus-service/internal"
36-
"github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace"
3736
)
3837

3938
// Ensure that if we add a metrics exporter that our target metrics
@@ -48,7 +47,7 @@ func TestEnsureRecordedMetrics(t *testing.T) {
4847

4948
sappender := newSpanAppender()
5049

51-
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender, octrace.WithSpanBufferPeriod(2*time.Millisecond))
50+
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender)
5251
defer doneFn()
5352

5453
// Now the opencensus-agent exporter.
@@ -119,7 +118,7 @@ func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) {
119118
)
120119
sappender := newSpanAppender()
121120

122-
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender, octrace.WithSpanBufferPeriod(2*time.Millisecond))
121+
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender)
123122
defer doneFn()
124123

125124
// Now the opencensus-agent exporter.
@@ -191,7 +190,7 @@ func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
191190

192191
spanSink := newSpanAppender()
193192
spansBufferPeriod := 10 * time.Millisecond
194-
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink, octrace.WithSpanBufferPeriod(spansBufferPeriod))
193+
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink)
195194
defer doneFn()
196195

197196
traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port)
@@ -201,7 +200,7 @@ func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
201200
defer traceSvcDoneFn()
202201

203202
n := 5
204-
for i := 0; i <= n; i++ {
203+
for i := 0; i < n; i++ {
205204
sl := []*tracepb.Span{{TraceId: []byte("abcdefghijklmnop"), SpanId: []byte{byte(i + 1), 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}}}
206205
_ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: sl, Node: &commonpb.Node{}})
207206
}
@@ -225,7 +224,7 @@ func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
225224
}
226225

227226
gotSpanData := ocSpansSaver.spanData[:]
228-
if g, w := len(gotSpanData), 2; g != w {
227+
if g, w := len(gotSpanData), n+1; g != w {
229228
blob, _ := json.MarshalIndent(gotSpanData, " ", " ")
230229
t.Fatalf("Spandata count: Got %d Want %d\n\nData: %s", g, w, blob)
231230
}
@@ -235,7 +234,8 @@ func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
235234
t.Fatalf("Links count: Got %d Want %d\nGotSpanData: %#v", g, w, receiverSpanData)
236235
}
237236

238-
rpcSpanData := gotSpanData[1]
237+
// The rpc span is always last in the list
238+
rpcSpanData := gotSpanData[len(gotSpanData)-1]
239239

240240
// Ensure that the link matches up exactly!
241241
wantLink := trace.Link{

receiver/opencensus/octrace/opencensus.go

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ import (
1818
"context"
1919
"errors"
2020
"io"
21-
"time"
22-
23-
"google.golang.org/api/support/bundler"
2421

2522
"go.opencensus.io/trace"
2623

@@ -35,9 +32,7 @@ import (
3532

3633
// Receiver is the type used to handle spans from OpenCensus exporters.
3734
type Receiver struct {
38-
spanSink receiver.TraceReceiverSink
39-
spanBufferPeriod time.Duration
40-
spanBufferCount int
35+
spanSink receiver.TraceReceiverSink
4136
}
4237

4338
// New creates a new opencensus.Receiver reference.
@@ -69,25 +64,8 @@ const receiverName = "opencensus_trace"
6964
// Export is the gRPC method that receives streamed traces from
7065
// OpenCensus-traceproto compatible libraries/applications.
7166
func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
72-
// The bundler will receive batches of spans i.e. []*tracepb.Span
7367
// We need to ensure that it propagates the receiver name as a tag
7468
ctxWithReceiverName := internal.ContextWithReceiverName(tes.Context(), receiverName)
75-
traceBundler := bundler.NewBundler((*data.TraceData)(nil), func(payload interface{}) {
76-
oci.batchSpanExporting(ctxWithReceiverName, payload)
77-
})
78-
79-
spanBufferPeriod := oci.spanBufferPeriod
80-
if spanBufferPeriod <= 0 {
81-
spanBufferPeriod = 2 * time.Second // Arbitrary value
82-
}
83-
spanBufferCount := oci.spanBufferCount
84-
if spanBufferCount <= 0 {
85-
// TODO: (@odeke-em) provide an option to disable any buffering
86-
spanBufferCount = 50 // Arbitrary value
87-
}
88-
89-
traceBundler.DelayThreshold = spanBufferPeriod
90-
traceBundler.BundleCountThreshold = spanBufferCount
9169

9270
// The first message MUST have a non-nil Node.
9371
recv, err := tes.Recv()
@@ -100,19 +78,6 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
10078
return errTraceExportProtocolViolation
10179
}
10280

103-
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), receiverName)
104-
105-
processReceivedSpans := func(ni *commonpb.Node, resource *resourcepb.Resource, spans []*tracepb.Span) {
106-
// Firstly, we'll add them to the bundler.
107-
if len(spans) > 0 {
108-
bundlerPayload := &data.TraceData{Node: ni, Resource: resource, Spans: spans}
109-
traceBundler.Add(bundlerPayload, len(bundlerPayload.Spans))
110-
}
111-
112-
// We MUST unconditionally record metrics from this reception.
113-
spansMetricsFn(ni, spans)
114-
}
115-
11681
var lastNonNilNode *commonpb.Node
11782
var resource *resourcepb.Resource
11883
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
@@ -128,7 +93,7 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
12893
resource = recv.Resource
12994
}
13095

131-
processReceivedSpans(lastNonNilNode, resource, recv.Spans)
96+
go oci.export(ctxWithReceiverName, tes, lastNonNilNode, resource, recv.Spans)
13297

13398
recv, err = tes.Recv()
13499
if err != nil {
@@ -142,9 +107,19 @@ func (oci *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
142107
}
143108
}
144109

145-
func (oci *Receiver) batchSpanExporting(longLivedRPCCtx context.Context, payload interface{}) {
146-
tracedata := payload.([]*data.TraceData)
147-
if len(tracedata) == 0 {
110+
func (oci *Receiver) export(
111+
longLivedCtx context.Context,
112+
tes agenttracepb.TraceService_ExportServer,
113+
node *commonpb.Node,
114+
resource *resourcepb.Resource,
115+
spans []*tracepb.Span,
116+
) {
117+
tracedata := data.TraceData{Node: node, Resource: resource, Spans: spans}
118+
// We MUST unconditionally record metrics from this reception.
119+
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), receiverName)
120+
spansMetricsFn(tracedata.Node, tracedata.Spans)
121+
122+
if len(tracedata.Spans) == 0 {
148123
return
149124
}
150125

@@ -157,15 +132,11 @@ func (oci *Receiver) batchSpanExporting(longLivedRPCCtx context.Context, payload
157132
// spansAndNode list unfurling then send spans grouped per node
158133

159134
// If the starting RPC has a parent span, then add it as a parent link.
160-
internal.SetParentLink(longLivedRPCCtx, span)
135+
internal.SetParentLink(longLivedCtx, span)
161136

162-
nSpans := int64(0)
163-
for _, td := range tracedata {
164-
oci.spanSink.ReceiveTraceData(ctx, *td)
165-
nSpans += int64(len(td.Spans))
166-
}
137+
oci.spanSink.ReceiveTraceData(ctx, tracedata)
167138

168139
span.Annotate([]trace.Attribute{
169-
trace.Int64Attribute("num_spans", nSpans),
140+
trace.Int64Attribute("num_spans", int64(len(tracedata.Spans))),
170141
}, "")
171142
}

receiver/opencensus/octrace/opencensus_test.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func TestReceiver_endToEnd(t *testing.T) {
4949

5050
sappender := newSpanAppender()
5151

52-
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender, octrace.WithSpanBufferPeriod(100*time.Millisecond))
52+
_, port, doneFn := ocReceiverOnGRPCServer(t, sappender)
5353
defer doneFn()
5454

5555
// Now the opencensus-agent exporter.
@@ -172,7 +172,7 @@ func TestReceiver_endToEnd(t *testing.T) {
172172
func TestExportMultiplexing(t *testing.T) {
173173
spanSink := newSpanAppender()
174174

175-
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink, octrace.WithSpanBufferPeriod(90*time.Millisecond))
175+
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink)
176176
defer doneFn()
177177

178178
traceClient, traceClientDoneFn, err := makeTraceServiceClient(port)
@@ -279,10 +279,27 @@ func TestExportMultiplexing(t *testing.T) {
279279
nodeToKey(node2): append(sL2, append(sLn2a, sLn2b...)...),
280280
}
281281

282-
gotBlob, _ := json.Marshal(resultsMapping)
283-
wantBlob, _ := json.Marshal(wantContents)
284-
if !bytes.Equal(gotBlob, wantBlob) {
285-
t.Errorf("Unequal serialization results\nGot:\n\t%s\nWant:\n\t%s\n", gotBlob, wantBlob)
282+
for nodeKey, wantSpans := range wantContents {
283+
gotSpans, ok := resultsMapping[nodeKey]
284+
if !ok {
285+
t.Errorf("Wanted to find a node that was not found for key: %s", nodeKey)
286+
}
287+
if len(gotSpans) != len(wantSpans) {
288+
t.Errorf("Unequal number of spans for nodeKey: %s", nodeKey)
289+
}
290+
for _, wantSpan := range wantSpans {
291+
found := false
292+
for _, gotSpan := range gotSpans {
293+
wantStr, _ := json.Marshal(wantSpan)
294+
gotStr, _ := json.Marshal(gotSpan)
295+
if bytes.Equal(wantStr, gotStr) {
296+
found = true
297+
}
298+
}
299+
if !found {
300+
t.Errorf("Unequal span serialization\nGot:\n\t%s\nWant:\n\t%s\n", gotSpans, wantSpans)
301+
}
302+
}
286303
}
287304
}
288305

@@ -291,7 +308,7 @@ func TestExportMultiplexing(t *testing.T) {
291308
func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) {
292309
spanSink := newSpanAppender()
293310

294-
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink, octrace.WithSpanBufferPeriod(90*time.Millisecond))
311+
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink)
295312
defer doneFn()
296313

297314
traceClient, traceClientDoneFn, err := makeTraceServiceClient(port)
@@ -359,7 +376,7 @@ func TestExportProtocolViolations_nodelessFirstMessage(t *testing.T) {
359376
func TestExportProtocolConformation_spansInFirstMessage(t *testing.T) {
360377
spanSink := newSpanAppender()
361378

362-
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink, octrace.WithSpanBufferPeriod(70*time.Millisecond))
379+
_, port, doneFn := ocReceiverOnGRPCServer(t, spanSink)
363380
defer doneFn()
364381

365382
traceClient, traceClientDoneFn, err := makeTraceServiceClient(port)

receiver/opencensus/octrace/options.go

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,43 +14,9 @@
1414

1515
package octrace
1616

17-
import "time"
18-
1917
// Option interface defines for configuration settings to be applied to receivers.
2018
//
2119
// WithReceiver applies the configuration to the given receiver.
2220
type Option interface {
2321
WithReceiver(*Receiver)
2422
}
25-
26-
type spanBufferPeriod struct {
27-
period time.Duration
28-
}
29-
30-
var _ Option = (*spanBufferPeriod)(nil)
31-
32-
func (sfd *spanBufferPeriod) WithReceiver(oci *Receiver) {
33-
oci.spanBufferPeriod = sfd.period
34-
}
35-
36-
// WithSpanBufferPeriod is an option that allows one to configure
37-
// the period that spans are buffered for before the Receiver
38-
// sends them to its TraceReceiverSink.
39-
func WithSpanBufferPeriod(period time.Duration) Option {
40-
return &spanBufferPeriod{period: period}
41-
}
42-
43-
type spanBufferCount int
44-
45-
var _ Option = (*spanBufferCount)(nil)
46-
47-
func (spc spanBufferCount) WithReceiver(oci *Receiver) {
48-
oci.spanBufferCount = int(spc)
49-
}
50-
51-
// WithSpanBufferCount is an option that allows one to configure
52-
// the number of spans that are buffered before the Receiver
53-
// send them to its TraceReceiverSink.
54-
func WithSpanBufferCount(count int) Option {
55-
return spanBufferCount(count)
56-
}

0 commit comments

Comments
 (0)