Skip to content
This repository was archived by the owner on Nov 7, 2022. It is now read-only.

Commit 65c89d8

Browse files
author
Paulo Janotti
authored
Add grpc server configs to OC receiver (#560)
* Add grpc server configs to OC receiver This will be only applied to the collector since it is assumed that unlike the agent it is typically located behind some loadbalancer or proxy where these settings are more useful. * Updates to README and k8s file
1 parent cd2c92d commit 65c89d8

File tree

9 files changed

+294
-18
lines changed

9 files changed

+294
-18
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ The collector also serves as a control plane for agents/clients by supplying
276276
them updated configuration (e.g. trace sampling policies), and reporting
277277
agent/client health information/inventory metadata to downstream exporters.
278278

279+
### <a name="receivers-configuration"></a> Receivers Configuration
280+
281+
For detailed information about configuring receivers for the collector refer to the [receivers README.md](receiver/README.md).
282+
279283
### <a name="global-attributes"></a> Global Attributes
280284

281285
The collector also takes some global configurations that modify its behavior for all receivers / exporters.
@@ -398,7 +402,7 @@ Sample configuration file:
398402
log-level: DEBUG
399403
400404
receivers:
401-
opencensus: {} # Runs OpenCensus receiver with default configuration (default behavior)
405+
opencensus: {} # Runs OpenCensus receiver with default configuration (default behavior).
402406
403407
queued-exporters:
404408
jaeger-sender-test: # A friendly name for the exporter

cmd/occollector/app/builder/builder.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"flag"
1919
"fmt"
2020
"strings"
21+
"time"
2122

2223
"github.com/census-instrumentation/opencensus-service/internal/config"
2324
"github.com/spf13/viper"
@@ -106,6 +107,37 @@ type OpenCensusReceiverCfg struct {
106107

107108
// TLSCredentials is a (cert_file, key_file) configuration.
108109
TLSCredentials *config.TLSCredentials `mapstructure:"tls_credentials"`
110+
111+
// Keepalive anchor for all the settings related to keepalive.
112+
Keepalive *serverParametersAndEnforcementPolicy `mapstructure:"keepalive,omitempty"`
113+
114+
// MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server.
115+
MaxRecvMsgSizeMiB uint64 `mapstructure:"max-recv-msg-size-mib"`
116+
117+
// MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport.
118+
MaxConcurrentStreams uint32 `mapstructure:"max-concurrent-streams"`
119+
}
120+
121+
type serverParametersAndEnforcementPolicy struct {
122+
ServerParameters *keepaliveServerParameters `mapstructure:"server-parameters,omitempty"`
123+
EnforcementPolicy *keepaliveEnforcementPolicy `mapstructure:"enforcement-policy,omitempty"`
124+
}
125+
126+
// keepaliveServerParameters allow configuration of the keepalive.ServerParameters.
127+
// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details.
128+
type keepaliveServerParameters struct {
129+
MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle,omitempty"`
130+
MaxConnectionAge time.Duration `mapstructure:"max-connection-age,omitempty"`
131+
MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace,omitempty"`
132+
Time time.Duration `mapstructure:"time,omitempty"`
133+
Timeout time.Duration `mapstructure:"timeout,omitempty"`
134+
}
135+
136+
// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy.
137+
// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details.
138+
type keepaliveEnforcementPolicy struct {
139+
MinTime time.Duration `mapstructure:"min-time,omitempty"`
140+
PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"`
109141
}
110142

111143
// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment

cmd/occollector/app/builder/builder_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,36 @@ func TestTailSamplingConfig(t *testing.T) {
212212
}
213213
}
214214

215+
func TestOpencensusReceiverKeepaliveSettings(t *testing.T) {
216+
v, err := loadViperFromFile("./testdata/oc_keepalive_config.yaml")
217+
if err != nil {
218+
t.Fatalf("Failed to load viper from test file: %v", err)
219+
}
220+
221+
wCfg := NewDefaultOpenCensusReceiverCfg()
222+
wCfg.Keepalive = &serverParametersAndEnforcementPolicy{
223+
ServerParameters: &keepaliveServerParameters{
224+
Time: 30 * time.Second,
225+
Timeout: 5 * time.Second,
226+
},
227+
EnforcementPolicy: &keepaliveEnforcementPolicy{
228+
MinTime: 10 * time.Second,
229+
PermitWithoutStream: true,
230+
},
231+
}
232+
233+
gCfg, err := NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
234+
if err != nil {
235+
t.Fatalf("got '%v', want nil", err)
236+
}
237+
if !reflect.DeepEqual(*gCfg.Keepalive.ServerParameters, *wCfg.Keepalive.ServerParameters) {
238+
t.Fatalf("Wanted ServerParameters %+v but got %+v", *wCfg.Keepalive.ServerParameters, *gCfg.Keepalive.ServerParameters)
239+
}
240+
if !reflect.DeepEqual(*gCfg.Keepalive.EnforcementPolicy, *wCfg.Keepalive.EnforcementPolicy) {
241+
t.Fatalf("Wanted EnforcementPolicy %+v but got %+v", *wCfg.Keepalive.EnforcementPolicy, *gCfg.Keepalive.EnforcementPolicy)
242+
}
243+
}
244+
215245
func loadViperFromFile(file string) (*viper.Viper, error) {
216246
v := viper.New()
217247
v.SetConfigFile(file)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
receivers:
2+
opencensus:
3+
keepalive:
4+
server-parameters:
5+
time: 30s
6+
timeout: 5s
7+
enforcement-policy:
8+
min-time: 10s
9+
permit-without-stream: true

example/k8s.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ metadata:
7070
data:
7171
oc-collector-config: |
7272
receivers:
73-
opencensus: {}
73+
opencensus:
74+
# keepalive settings can help load balancing, see receiver/README.md for more info.
75+
keepalive:
76+
server-parameters:
77+
max-connection-age: 120s
78+
max-connection-age-grace: 30s
7479
jaeger: {}
7580
zipkin: {}
7681
# Can only use one exporter

internal/collector/opencensus/.nocover

Lines changed: 0 additions & 1 deletion
This file was deleted.

internal/collector/opencensus/receiver.go

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323

2424
"github.com/spf13/viper"
2525
"go.uber.org/zap"
26+
"google.golang.org/grpc"
27+
"google.golang.org/grpc/keepalive"
2628

2729
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
2830
"github.com/census-instrumentation/opencensus-service/consumer"
@@ -32,18 +34,12 @@ import (
3234

3335
// Start starts the OpenCensus receiver endpoint.
3436
func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsumer, asyncErrorChan chan<- error) (receiver.TraceReceiver, error) {
35-
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
37+
addr, opts, zapFields, err := receiverOptions(v)
3638
if err != nil {
3739
return nil, err
3840
}
3941

40-
tlsCredsOption, hasTLSCreds, err := rOpts.TLSCredentials.ToOpenCensusReceiverServerOption()
41-
if err != nil {
42-
return nil, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
43-
}
44-
45-
addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10)
46-
ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, tlsCredsOption)
42+
ocr, err := opencensusreceiver.New(addr, traceConsumer, nil, opts...)
4743
if err != nil {
4844
return nil, fmt.Errorf("Failed to create the OpenCensus trace receiver: %v", err)
4945
}
@@ -52,15 +48,69 @@ func Start(logger *zap.Logger, v *viper.Viper, traceConsumer consumer.TraceConsu
5248
return nil, fmt.Errorf("Cannot bind Opencensus receiver to address %q: %v", addr, err)
5349
}
5450

51+
logger.Info("OpenCensus receiver is running.", zapFields...)
52+
53+
return ocr, nil
54+
}
55+
56+
func receiverOptions(v *viper.Viper) (addr string, opts []opencensusreceiver.Option, zapFields []zap.Field, err error) {
57+
rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v)
58+
if err != nil {
59+
return addr, opts, zapFields, err
60+
}
61+
62+
tlsCredsOption, hasTLSCreds, err := rOpts.TLSCredentials.ToOpenCensusReceiverServerOption()
63+
if err != nil {
64+
return addr, opts, zapFields, fmt.Errorf("OpenCensus receiver TLS Credentials: %v", err)
65+
}
5566
if hasTLSCreds {
67+
opts = append(opts, tlsCredsOption)
5668
tlsCreds := rOpts.TLSCredentials
57-
logger.Info("OpenCensus receiver is running.",
58-
zap.Int("port", rOpts.Port),
59-
zap.String("cert_file", tlsCreds.CertFile),
60-
zap.String("key_file", tlsCreds.KeyFile))
61-
} else {
62-
logger.Info("OpenCensus receiver is running.", zap.Int("port", rOpts.Port))
69+
zapFields = append(zapFields, zap.String("cert_file", tlsCreds.CertFile), zap.String("key_file", tlsCreds.KeyFile))
6370
}
6471

65-
return ocr, nil
72+
grpcServerOptions, zapFields := grpcServerOptions(rOpts, zapFields)
73+
if len(grpcServerOptions) > 0 {
74+
opts = append(opts, opencensusreceiver.WithGRPCServerOptions(grpcServerOptions...))
75+
}
76+
77+
addr = ":" + strconv.FormatInt(int64(rOpts.Port), 10)
78+
zapFields = append(zapFields, zap.Int("port", rOpts.Port))
79+
80+
return addr, opts, zapFields, err
81+
}
82+
83+
func grpcServerOptions(rOpts *builder.OpenCensusReceiverCfg, zapFields []zap.Field) ([]grpc.ServerOption, []zap.Field) {
84+
var grpcServerOptions []grpc.ServerOption
85+
if rOpts.MaxRecvMsgSizeMiB > 0 {
86+
grpcServerOptions = append(grpcServerOptions, grpc.MaxRecvMsgSize(int(rOpts.MaxRecvMsgSizeMiB*1024*1024)))
87+
zapFields = append(zapFields, zap.Uint64("max-recv-msg-size-mib", rOpts.MaxRecvMsgSizeMiB))
88+
}
89+
if rOpts.MaxConcurrentStreams > 0 {
90+
grpcServerOptions = append(grpcServerOptions, grpc.MaxConcurrentStreams(rOpts.MaxConcurrentStreams))
91+
zapFields = append(zapFields, zap.Uint32("max-concurrent-streams", rOpts.MaxConcurrentStreams))
92+
}
93+
if rOpts.Keepalive != nil {
94+
if rOpts.Keepalive.ServerParameters != nil {
95+
svrParams := rOpts.Keepalive.ServerParameters
96+
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
97+
MaxConnectionIdle: svrParams.MaxConnectionIdle,
98+
MaxConnectionAge: svrParams.MaxConnectionAge,
99+
MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace,
100+
Time: svrParams.Time,
101+
Timeout: svrParams.Timeout,
102+
}))
103+
zapFields = append(zapFields, zap.Any("keepalive.server-parameters", rOpts.Keepalive.ServerParameters))
104+
}
105+
if rOpts.Keepalive.EnforcementPolicy != nil {
106+
enfPol := rOpts.Keepalive.EnforcementPolicy
107+
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
108+
MinTime: enfPol.MinTime,
109+
PermitWithoutStream: enfPol.PermitWithoutStream,
110+
}))
111+
zapFields = append(zapFields, zap.Any("keepalive.enforcement-policy", rOpts.Keepalive.EnforcementPolicy))
112+
}
113+
}
114+
115+
return grpcServerOptions, zapFields
66116
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2019, OpenCensus Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package ocreceiver wraps the functionality to start the end-point that
16+
// receives data directly in the OpenCensus format.
17+
package ocreceiver
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/spf13/viper"
24+
"go.uber.org/zap"
25+
26+
"github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder"
27+
"github.com/census-instrumentation/opencensus-service/processor/processortest"
28+
"github.com/census-instrumentation/opencensus-service/receiver/opencensusreceiver"
29+
)
30+
31+
func TestStart(t *testing.T) {
32+
tests := []struct {
33+
name string
34+
viperFn func() *viper.Viper
35+
wantErr bool
36+
}{
37+
{
38+
name: "default_config",
39+
viperFn: func() *viper.Viper {
40+
v := viper.New()
41+
v.Set("receivers.opencensus.{}", nil)
42+
return v
43+
},
44+
},
45+
{
46+
name: "invalid_port",
47+
viperFn: func() *viper.Viper {
48+
v := viper.New()
49+
v.Set("receivers.opencensus.port", -1)
50+
return v
51+
},
52+
wantErr: true,
53+
},
54+
{
55+
name: "missing_tls_files",
56+
viperFn: func() *viper.Viper {
57+
v := viper.New()
58+
v.Set("receivers.opencensus.tls_credentials.cert_file", "foo")
59+
return v
60+
},
61+
wantErr: true,
62+
},
63+
{
64+
name: "grpc_settings",
65+
viperFn: func() *viper.Viper {
66+
v := viper.New()
67+
v.Set("receivers.opencensus.port", 55678)
68+
v.Set("receivers.opencensus.max-recv-msg-size-mib", 32)
69+
v.Set("receivers.opencensus.max-concurrent-streams", 64)
70+
v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age", 180*time.Second)
71+
v.Set("receivers.opencensus.keepalive.server-parameters.max-connection-age-grace", 10*time.Second)
72+
v.Set("receivers.opencensus.keepalive.enforcement-policy.min-time", 60*time.Second)
73+
v.Set("receivers.opencensus.keepalive.enforcement-policy.permit-without-stream", true)
74+
return v
75+
},
76+
},
77+
}
78+
for _, tt := range tests {
79+
t.Run(tt.name, func(t *testing.T) {
80+
// Enforce that all configurations are actually recognized.
81+
v := tt.viperFn()
82+
rOpts := builder.OpenCensusReceiverCfg{}
83+
if err := v.Sub("receivers.opencensus").UnmarshalExact(&rOpts); err != nil {
84+
t.Errorf("UnmarshalExact error: %v", err)
85+
return
86+
}
87+
nopProcessor := processortest.NewNopTraceProcessor(nil)
88+
asyncErrChan := make(chan error, 1)
89+
got, err := Start(zap.NewNop(), v, nopProcessor, asyncErrChan)
90+
if (err != nil) != tt.wantErr {
91+
t.Errorf("Start() error = %v, wantErr %v", err, tt.wantErr)
92+
return
93+
}
94+
if got != nil {
95+
// TODO: (@pjanotti) current StopTraceReception, stop the whole receiver.
96+
// See https://github.com/census-instrumentation/opencensus-service/issues/559
97+
got.(*opencensusreceiver.Receiver).Stop()
98+
}
99+
})
100+
}
101+
}

receiver/README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,52 @@ using `--receive-oc-trace=false`. On the Collector only the port can be configur
4949
receivers:
5050
opencensus:
5151
port: 55678
52+
53+
# Settings below are only available on collector.
54+
55+
# Changes the maximum msg size that can be received (default is 4MiB).
56+
# See https://godoc.org/google.golang.org/grpc#MaxRecvMsgSize for more information.
57+
max-recv-msg-size-mib: 32
58+
59+
# Limits the maximum number of concurrent streams for each receiver transport (default is 100).
60+
# See https://godoc.org/google.golang.org/grpc#MaxConcurrentStreams for more information.
61+
max-concurrent-streams: 20
62+
63+
# Controls the keepalive settings, typically used to help scenarios in which the senders have
64+
# load-balancers or proxies between them and the collectors.
65+
keepalive:
66+
67+
# This section controls the https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters.
68+
# These are typically used to help load balancers by periodically terminating connections, or keeping
69+
# connections alive (preventing RSTs by proxies) when needed for bursts of data following periods of
70+
# inactivity.
71+
server-parameters:
72+
# max-connection-idle is the amount of time after which an idle connection would be closed,
73+
# the default is infinity.
74+
max-connection-idle: 90s
75+
# max-connection-age is the maximum amount of time a connection may exist before it is closed,
76+
# the default is infinity.
77+
max-connection-age: 180s
78+
# max-connection-age-grace is an additive period after max-connection-age for which the connection
79+
# will be forcibly closed. The default is infinity.
80+
max-connection-age-grace: 10s
81+
# time is a duration for which, if the server doesn't see any activity it pings the client to see
82+
# if the transport is still alive. The default is 2 hours.
83+
time: 30s
84+
# timeout is the wait time after a ping that the server waits for the response before closing the
85+
# connection. The default is 20 seconds.
86+
timeout: 5s
87+
88+
# This section controls the https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy.
89+
# It is used to set keepalive enforcement policy on the server-side. Server will close connection
90+
# with a client that violates this policy.
91+
enforcement-policy:
92+
# min-time is the minimum amount of time a client should wait before sending a keepalive ping.
93+
# The default value is 5 minutes.
94+
min-time: 10s
95+
# permit-without-stream if true, server allows keepalive pings even when there are no active
96+
# streams(RPCs). The default is false.
97+
permit-without-stream: true
5298
```
5399

54100
## Jaeger

0 commit comments

Comments
 (0)