Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 85ab293

Browse files
author
Steven Karis
authored
Add batching to queued sender implementations (#361)
Adds batching (as a feature of queuing) to the current queued sender implementations. Testing Done: unit tests
1 parent 667502b commit 85ab293

12 files changed

Lines changed: 1146 additions & 81 deletions

File tree

cmd/occollector/app/builder/processor_builder.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ const (
3232
InvalidSenderType = "invalid"
3333
)
3434

35+
const (
36+
queuedExportersConfigKey = "queued-exporters"
37+
)
38+
3539
// JaegerThriftTChannelSenderCfg holds configuration for Jaeger Thrift Tchannel sender
3640
type JaegerThriftTChannelSenderCfg struct {
3741
CollectorHostPorts []string `mapstructure:"collector-host-ports"`
@@ -63,6 +67,30 @@ func NewJaegerThriftHTTPSenderCfg() *JaegerThriftHTTPSenderCfg {
6367
return opts
6468
}
6569

70+
// BatchingConfig contains configuration around the queueing batching.
71+
// It contains some advanced configurations, which should not be used
72+
// by a typical user, but are provided as advanced features to increase
73+
// scalability.
74+
type BatchingConfig struct {
75+
// Enable marks batching as enabled or not
76+
Enable bool `mapstructure:"enable"`
77+
// Timeout sets the time after which a batch will be sent regardless of size
78+
Timeout *time.Duration `mapstructure:"timeout,omitempty"`
79+
// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
80+
SendBatchSize *int `mapstructure:"send-batch-size,omitempty"`
81+
82+
// NumTickers sets the number of tickers to use to divide the work of looping
83+
// over batch buckets. This is an advanced configuration option.
84+
NumTickers int `mapstructure:"num-tickers,omitempty"`
85+
// TickTime sets time interval at which the tickers tick. This is an advanced
86+
// configuration option.
87+
TickTime *time.Duration `mapstructure:"tick-time,omitempty"`
88+
// RemoveAfterTicks is the number of ticks that must pass without a span arriving
89+
// from a node after which the batcher for that node will be deleted. This is an
90+
// advanved configuration option.
91+
RemoveAfterTicks *int `mapstructure:"remove-after-ticks,omitempty"`
92+
}
93+
6694
// QueuedSpanProcessorCfg holds configuration for the queued span processor
6795
type QueuedSpanProcessorCfg struct {
6896
// Name is the friendly name of the processor
@@ -78,6 +106,8 @@ type QueuedSpanProcessorCfg struct {
78106
// SenderType indicates the type of sender to instantiate
79107
SenderType SenderType `mapstructure:"sender-type"`
80108
SenderConfig interface{}
109+
// BatchingConfig sets config parameters related to batching
110+
BatchingConfig BatchingConfig `mapstructure:"batching"`
81111
}
82112

83113
// AttributesCfg holds configuration for attributes that can be added to all spans
@@ -144,10 +174,9 @@ func NewDefaultMultiSpanProcessorCfg() *MultiSpanProcessorCfg {
144174

145175
// InitFromViper initializes MultiSpanProcessorCfg with properties from viper
146176
func (mOpts *MultiSpanProcessorCfg) InitFromViper(v *viper.Viper) *MultiSpanProcessorCfg {
147-
const baseKey = "queued-exporters"
148-
procsv := v.Sub(baseKey)
177+
procsv := v.Sub(queuedExportersConfigKey)
149178
if procsv != nil {
150-
for procName := range v.GetStringMap(baseKey) {
179+
for procName := range v.GetStringMap(queuedExportersConfigKey) {
151180
procv := procsv.Sub(procName)
152181
procOpts := NewDefaultQueuedSpanProcessorCfg()
153182
procOpts.Name = procName

cmd/occollector/app/collector/processors.go

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ import (
2727
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/sender"
2828
"github.com/census-instrumentation/opencensus-service/exporter"
2929
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
30+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/nodebatcher"
31+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/queued"
3032
"github.com/census-instrumentation/opencensus-service/internal/config"
3133
)
3234

3335
func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []exporter.TraceExporter, []exporter.MetricsExporter) {
3436
// TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility.
35-
3637
cfg := builder.GetConfigFile(v)
3738
if cfg == "" {
3839
logger.Info("No config file, exporters can be only configured via the file.")
@@ -63,7 +64,9 @@ func createExporters(v *viper.Viper, logger *zap.Logger) ([]func(), []exporter.T
6364
return wrappedDoneFns, traceExporters, metricsExporters
6465
}
6566

66-
func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg) (processor.SpanProcessor, error) {
67+
func buildQueuedSpanProcessor(
68+
logger *zap.Logger, opts *builder.QueuedSpanProcessorCfg,
69+
) (closeFns []func(), queuedSpanProcessor processor.SpanProcessor, err error) {
6770
logger.Info("Constructing queue processor with name", zap.String("name", opts.Name))
6871

6972
// build span batch sender from configured options
@@ -80,7 +83,7 @@ func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProces
8083
tchreporter, err := tchrepbuilder.CreateReporter(metrics.NullFactory, logger)
8184
if err != nil {
8285
logger.Fatal("Cannot create tchannel reporter.", zap.Error(err))
83-
return nil, err
86+
return nil, nil, err
8487
}
8588
spanSender = sender.NewJaegerThriftTChannelSender(tchreporter, logger)
8689
case builder.ThriftHTTPSenderType:
@@ -93,21 +96,53 @@ func buildQueuedSpanProcessor(logger *zap.Logger, opts *builder.QueuedSpanProces
9396
logger,
9497
sender.HTTPTimeout(thriftHTTPSenderOpts.Timeout),
9598
)
96-
default:
97-
logger.Fatal("Unrecognized sender type configured")
99+
}
100+
101+
if spanSender == nil {
102+
logger.Fatal("Unrecognized sender type or no exporters configured", zap.String("SenderType", string(opts.SenderType)))
103+
}
104+
105+
var batchingOptions []nodebatcher.Option
106+
if opts.BatchingConfig.Enable {
107+
cfg := opts.BatchingConfig
108+
if cfg.Timeout != nil {
109+
batchingOptions = append(batchingOptions, nodebatcher.WithTimeout(*cfg.Timeout))
110+
}
111+
if cfg.NumTickers > 0 {
112+
batchingOptions = append(
113+
batchingOptions, nodebatcher.WithNumTickers(cfg.NumTickers),
114+
)
115+
}
116+
if cfg.TickTime != nil {
117+
batchingOptions = append(
118+
batchingOptions, nodebatcher.WithTickTime(*cfg.TickTime),
119+
)
120+
}
121+
if cfg.SendBatchSize != nil {
122+
batchingOptions = append(
123+
batchingOptions, nodebatcher.WithSendBatchSize(*cfg.SendBatchSize),
124+
)
125+
}
126+
if cfg.RemoveAfterTicks != nil {
127+
batchingOptions = append(
128+
batchingOptions, nodebatcher.WithRemoveAfterTicks(*cfg.RemoveAfterTicks),
129+
)
130+
}
98131
}
99132

100133
// build queued span processor with underlying sender
101-
queuedSpanProcessor := processor.NewQueuedSpanProcessor(
134+
queuedSpanProcessor = queued.NewQueuedSpanProcessor(
102135
spanSender,
103-
processor.Options.WithLogger(logger),
104-
processor.Options.WithName(opts.Name),
105-
processor.Options.WithNumWorkers(opts.NumWorkers),
106-
processor.Options.WithQueueSize(opts.QueueSize),
107-
processor.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
108-
processor.Options.WithBackoffDelay(opts.BackoffDelay),
136+
queued.Options.WithLogger(logger),
137+
queued.Options.WithName(opts.Name),
138+
queued.Options.WithNumWorkers(opts.NumWorkers),
139+
queued.Options.WithQueueSize(opts.QueueSize),
140+
queued.Options.WithRetryOnProcessingFailures(opts.RetryOnFailure),
141+
queued.Options.WithBackoffDelay(opts.BackoffDelay),
142+
queued.Options.WithBatching(opts.BatchingConfig.Enable),
143+
queued.Options.WithBatchingOptions(batchingOptions...),
109144
)
110-
return queuedSpanProcessor, nil
145+
return nil, queuedSpanProcessor, nil
111146
}
112147

113148
func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor, []func()) {
@@ -136,12 +171,13 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (processor.SpanProcessor
136171
multiProcessorCfg := builder.NewDefaultMultiSpanProcessorCfg().InitFromViper(v)
137172
for _, queuedJaegerProcessorCfg := range multiProcessorCfg.Processors {
138173
logger.Info("Queued Jaeger Sender Enabled")
139-
queuedJaegerProcessor, err := buildQueuedSpanProcessor(logger, queuedJaegerProcessorCfg)
174+
doneFns, queuedJaegerProcessor, err := buildQueuedSpanProcessor(logger, queuedJaegerProcessorCfg)
140175
if err != nil {
141176
logger.Error("Failed to build the queued span processor", zap.Error(err))
142177
os.Exit(1)
143178
}
144179
spanProcessors = append(spanProcessors, queuedJaegerProcessor)
180+
closeFns = append(closeFns, doneFns...)
145181
}
146182

147183
if len(spanProcessors) == 0 {

cmd/occollector/app/collector/telemetry.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727

2828
"github.com/census-instrumentation/opencensus-service/internal"
2929
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
30+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/nodebatcher"
31+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor/queued"
3032
"github.com/census-instrumentation/opencensus-service/internal/collector/telemetry"
3133
)
3234

@@ -54,7 +56,8 @@ func initTelemetry(asyncErrorChannel chan<- error, v *viper.Viper, logger *zap.L
5456
port := v.GetInt(metricsPortCfg)
5557

5658
views := processor.MetricViews(level)
57-
views = append(views, processor.QueuedProcessorMetricViews(level)...)
59+
views = append(views, queued.MetricViews(level)...)
60+
views = append(views, nodebatcher.MetricViews(level)...)
5861
views = append(views, internal.AllViews...)
5962
processMetricsViews := telemetry.NewProcessMetricsViews()
6063
views = append(views, processMetricsViews.Views()...)

internal/collector/processor/idbatcher/id_batcher_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ func BenchmarkConcurrentEnqueue(b *testing.B) {
6868
ticker := time.NewTicker(100 * time.Millisecond)
6969
defer ticker.Stop()
7070
var ticked int32
71+
var received int32
7172
go func() {
7273
for range ticker.C {
73-
batcher.CloseCurrentAndTakeFirstBatch()
74+
batch, _ := batcher.CloseCurrentAndTakeFirstBatch()
7475
atomic.AddInt32(&ticked, 1)
76+
atomic.AddInt32(&received, int32(len(batch)))
7577
}
7678
}()
7779

@@ -82,9 +84,6 @@ func BenchmarkConcurrentEnqueue(b *testing.B) {
8284
batcher.AddToCurrentBatch(ids[0])
8385
}
8486
})
85-
86-
closedBatches := atomic.LoadInt32(&ticked)
87-
b.Logf("Closed %d batches", closedBatches)
8887
}
8988

9089
func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchChannelSize uint64) {
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package nodebatcher
16+
17+
import (
18+
"go.opencensus.io/stats"
19+
"go.opencensus.io/stats/view"
20+
"go.opencensus.io/tag"
21+
22+
"github.com/census-instrumentation/opencensus-service/internal/collector/processor"
23+
"github.com/census-instrumentation/opencensus-service/internal/collector/telemetry"
24+
)
25+
26+
var (
27+
statBatchSize = stats.Int64("batch_size", "Size of batches sent from the batcher (in span)", stats.UnitDimensionless)
28+
statNodesAddedToBatches = stats.Int64("nodes_added_to_batches", "Count of nodes that are being batched.", stats.UnitDimensionless)
29+
statNodesRemovedFromBatches = stats.Int64("nodes_removed_from_batches", "Number of nodes that have been removed from batching.", stats.UnitDimensionless)
30+
31+
statBatchSizeTriggerSend = stats.Int64("batch_size_trigger_send", "Number of times the batch was sent due to a size trigger", stats.UnitDimensionless)
32+
statTimeoutTriggerSend = stats.Int64("timeout_trigger_send", "Number of times the batch was sent due to a timeout trigger", stats.UnitDimensionless)
33+
statBatchOnDeadNode = stats.Int64("removed_node_send", "Number of times the batch was sent due to spans being added for a no longer active node", stats.UnitDimensionless)
34+
)
35+
36+
// MetricViews returns the metrics views related to batching
37+
func MetricViews(level telemetry.Level) []*view.View {
38+
if level == telemetry.None {
39+
return nil
40+
}
41+
42+
tagKeys := processor.MetricTagKeys(level)
43+
if tagKeys == nil {
44+
return nil
45+
}
46+
47+
exporterTagKeys := []tag.Key{processor.TagExporterNameKey}
48+
49+
batchSizeAggregation := view.Distribution(10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 10000, 20000, 30000, 50000, 100000)
50+
51+
batchSizeView := &view.View{
52+
Name: statBatchSize.Name(),
53+
Measure: statBatchSize,
54+
Description: statBatchSize.Description(),
55+
TagKeys: exporterTagKeys,
56+
Aggregation: batchSizeAggregation,
57+
}
58+
59+
nodesAddedToBatchesView := &view.View{
60+
Name: statNodesAddedToBatches.Name(),
61+
Measure: statNodesAddedToBatches,
62+
Description: statNodesAddedToBatches.Description(),
63+
TagKeys: exporterTagKeys,
64+
Aggregation: view.Count(),
65+
}
66+
67+
nodesRemovedFromBatchesView := &view.View{
68+
Name: statNodesRemovedFromBatches.Name(),
69+
Measure: statNodesRemovedFromBatches,
70+
Description: statNodesRemovedFromBatches.Description(),
71+
TagKeys: exporterTagKeys,
72+
Aggregation: view.Count(),
73+
}
74+
75+
countBatchSizeTriggerSendView := &view.View{
76+
Name: statBatchSizeTriggerSend.Name(),
77+
Measure: statBatchSizeTriggerSend,
78+
Description: statBatchSizeTriggerSend.Description(),
79+
TagKeys: tagKeys,
80+
Aggregation: view.Count(),
81+
}
82+
83+
countTimeoutTriggerSendView := &view.View{
84+
Name: statTimeoutTriggerSend.Name(),
85+
Measure: statTimeoutTriggerSend,
86+
Description: statTimeoutTriggerSend.Description(),
87+
TagKeys: tagKeys,
88+
Aggregation: view.Count(),
89+
}
90+
91+
countBatchOnDeadNode := &view.View{
92+
Name: statBatchOnDeadNode.Name(),
93+
Measure: statBatchOnDeadNode,
94+
Description: statBatchOnDeadNode.Description(),
95+
TagKeys: tagKeys,
96+
Aggregation: view.Count(),
97+
}
98+
99+
return []*view.View{
100+
batchSizeView,
101+
nodesAddedToBatchesView,
102+
nodesRemovedFromBatchesView,
103+
countBatchSizeTriggerSendView,
104+
countTimeoutTriggerSendView,
105+
countBatchOnDeadNode,
106+
}
107+
}

0 commit comments

Comments
 (0)