@@ -16,48 +16,75 @@ package opencensusexporter
1616
1717import (
1818 "context"
19- "errors "
19+ "crypto/x509 "
2020 "fmt"
21- "sync/atomic"
21+ "sync"
22+ "time"
2223
2324 "contrib.go.opencensus.io/exporter/ocagent"
2425 agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
2526 "github.com/spf13/viper"
27+ "google.golang.org/grpc"
2628 "google.golang.org/grpc/credentials"
29+ "google.golang.org/grpc/keepalive"
2730
2831 "github.com/census-instrumentation/opencensus-service/consumer"
2932 "github.com/census-instrumentation/opencensus-service/data"
3033 "github.com/census-instrumentation/opencensus-service/exporter/exporterhelper"
34+ "github.com/census-instrumentation/opencensus-service/internal"
3135 "github.com/census-instrumentation/opencensus-service/internal/compression"
32- "github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
36+ compressiongrpc "github.com/census-instrumentation/opencensus-service/internal/compression/grpc"
3337)
3438
35- type opencensusConfig struct {
36- Endpoint string `mapstructure:"endpoint,omitempty"`
37- Compression string `mapstructure:"compression,omitempty"`
38- Headers map [string ]string `mapstructure:"headers,omitempty"`
39- NumWorkers int `mapstructure:"num-workers,omitempty"`
40- CertPemFile string `mapstructure:"cert-pem-file,omitempty"`
39+ // keepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter.
40+ // Refer to the original data-structure for the meaning of each parameter.
41+ type keepaliveConfig struct {
42+ Time time.Duration `mapstructure:"time,omitempty"`
43+ Timeout time.Duration `mapstructure:"timeout,omitempty"`
44+ PermitWithoutStream bool `mapstructure:"permit-without-stream,omitempty"`
45+ }
4146
42- // TODO: add insecure, service name options.
47+ type opencensusConfig struct {
48+ Endpoint string `mapstructure:"endpoint,omitempty"`
49+ Compression string `mapstructure:"compression,omitempty"`
50+ Headers map [string ]string `mapstructure:"headers,omitempty"`
51+ NumWorkers int `mapstructure:"num-workers,omitempty"`
52+ CertPemFile string `mapstructure:"cert-pem-file,omitempty"`
53+ UseSecure bool `mapstructure:"secure,omitempty"`
54+ ReconnectionDelay time.Duration `mapstructure:"reconnection-delay,omitempty"`
55+ KeepaliveParameters * keepaliveConfig `mapstructure:"keepalive,omitempty"`
56+ // TODO: service name options.
4357}
4458
4559type ocagentExporter struct {
4660 counter uint32
47- exporters []* ocagent.Exporter
61+ exporters chan * ocagent.Exporter
62+ }
63+
64+ type ocTraceExporterErrorCode int
65+ type ocTraceExporterError struct {
66+ code ocTraceExporterErrorCode
67+ msg string
68+ }
69+
70+ var _ error = (* ocTraceExporterError )(nil )
71+
72+ func (e * ocTraceExporterError ) Error () string {
73+ return e .msg
4874}
4975
5076const (
5177 defaultNumWorkers int = 2
52- )
5378
54- var (
55- // ErrEndpointRequired indicates that this exporter was not provided with an endpoint in its config.
56- ErrEndpointRequired = errors .New ("OpenCensus exporter config requires an Endpoint" )
57- // ErrUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support.
58- ErrUnsupportedCompressionType = errors .New ("OpenCensus exporter unsupported compression type" )
59- // ErrUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials.
60- ErrUnableToGetTLSCreds = errors .New ("OpenCensus exporter unable to read TLS credentials" )
79+ _ ocTraceExporterErrorCode = iota // skip 0
80+ // errEndpointRequired indicates that this exporter was not provided with an endpoint in its config.
81+ errEndpointRequired
82+ // errUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support.
83+ errUnsupportedCompressionType
84+ // errUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials.
85+ errUnableToGetTLSCreds
86+ // errAlreadyStopped indicates that the exporter was already stopped.
87+ errAlreadyStopped
6188)
6289
6390// OpenCensusTraceExportersFromViper unmarshals the viper and returns an consumer.TraceConsumer targeting
@@ -75,49 +102,75 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons
75102 }
76103
77104 if ocac .Endpoint == "" {
78- return nil , nil , nil , ErrEndpointRequired
105+ return nil , nil , nil , & ocTraceExporterError {
106+ code : errEndpointRequired ,
107+ msg : "OpenCensus exporter config requires an Endpoint" ,
108+ }
79109 }
80110
81111 opts := []ocagent.ExporterOption {ocagent .WithAddress (ocac .Endpoint )}
82112 if ocac .Compression != "" {
83- if compressionKey := grpc .GetGRPCCompressionKey (ocac .Compression ); compressionKey != compression .Unsupported {
113+ if compressionKey := compressiongrpc .GetGRPCCompressionKey (ocac .Compression ); compressionKey != compression .Unsupported {
84114 opts = append (opts , ocagent .UseCompressor (compressionKey ))
85115 } else {
86- return nil , nil , nil , ErrUnsupportedCompressionType
116+ return nil , nil , nil , & ocTraceExporterError {
117+ code : errUnsupportedCompressionType ,
118+ msg : fmt .Sprintf ("OpenCensus exporter unsupported compression type %q" , ocac .Compression ),
119+ }
87120 }
88121 }
89122 if ocac .CertPemFile != "" {
90123 creds , err := credentials .NewClientTLSFromFile (ocac .CertPemFile , "" )
91124 if err != nil {
92- return nil , nil , nil , ErrUnableToGetTLSCreds
125+ return nil , nil , nil , & ocTraceExporterError {
126+ code : errUnableToGetTLSCreds ,
127+ msg : fmt .Sprintf ("OpenCensus exporter unable to read TLS credentials from pem file %q: %v" , ocac .CertPemFile , err ),
128+ }
129+ }
130+ opts = append (opts , ocagent .WithTLSCredentials (creds ))
131+ } else if ocac .UseSecure {
132+ certPool , err := x509 .SystemCertPool ()
133+ if err != nil {
134+ return nil , nil , nil , & ocTraceExporterError {
135+ code : errUnableToGetTLSCreds ,
136+ msg : fmt .Sprintf (
137+ "OpenCensus exporter unable to read certificates from system pool: %v" , err ),
138+ }
93139 }
140+ creds := credentials .NewClientTLSFromCert (certPool , "" )
94141 opts = append (opts , ocagent .WithTLSCredentials (creds ))
95142 } else {
96143 opts = append (opts , ocagent .WithInsecure ())
97144 }
98145 if len (ocac .Headers ) > 0 {
99146 opts = append (opts , ocagent .WithHeaders (ocac .Headers ))
100147 }
148+ if ocac .ReconnectionDelay > 0 {
149+ opts = append (opts , ocagent .WithReconnectionPeriod (ocac .ReconnectionDelay ))
150+ }
151+ if ocac .KeepaliveParameters != nil {
152+ opts = append (opts , ocagent .WithGRPCDialOption (grpc .WithKeepaliveParams (keepalive.ClientParameters {
153+ Time : ocac .KeepaliveParameters .Time ,
154+ Timeout : ocac .KeepaliveParameters .Timeout ,
155+ PermitWithoutStream : ocac .KeepaliveParameters .PermitWithoutStream ,
156+ })))
157+ }
101158
102159 numWorkers := defaultNumWorkers
103160 if ocac .NumWorkers > 0 {
104161 numWorkers = ocac .NumWorkers
105162 }
106163
107- exporters := make ([] * ocagent.Exporter , 0 , numWorkers )
164+ exportersChan := make (chan * ocagent.Exporter , numWorkers )
108165 for exporterIndex := 0 ; exporterIndex < numWorkers ; exporterIndex ++ {
109166 exporter , serr := ocagent .NewExporter (opts ... )
110167 if serr != nil {
111168 return nil , nil , nil , fmt .Errorf ("cannot configure OpenCensus Trace exporter: %v" , serr )
112169 }
113- exporters = append (exporters , exporter )
114- doneFns = append (doneFns , func () error {
115- exporter .Flush ()
116- return nil
117- })
170+ exportersChan <- exporter
118171 }
119172
120- oce := & ocagentExporter {exporters : exporters }
173+ oce := & ocagentExporter {exporters : exportersChan }
121174 oexp , err := exporterhelper .NewTraceExporter (
122175 "oc_trace" ,
123176 oce .PushTraceData ,
@@ -129,22 +182,61 @@ func OpenCensusTraceExportersFromViper(v *viper.Viper) (tps []consumer.TraceCons
129182 }
130183
131184 tps = append (tps , oexp )
185+ doneFns = append (doneFns , oce .stop )
132186
133187 // TODO: (@odeke-em, @songya23) implement ExportMetrics for OpenCensus.
134188 // mps = append(mps, oexp)
135189 return
136190}
137191
192+ func (oce * ocagentExporter ) stop () error {
193+ wg := & sync.WaitGroup {}
194+ var errors []error
195+ var errorsMu sync.Mutex
196+ visitedCnt := 0
197+ for currExporter := range oce .exporters {
198+ wg .Add (1 )
199+ go func (exporter * ocagent.Exporter ) {
200+ defer wg .Done ()
201+ err := exporter .Stop ()
202+ if err != nil {
203+ errorsMu .Lock ()
204+ errors = append (errors , err )
205+ errorsMu .Unlock ()
206+ }
207+ }(currExporter )
208+ visitedCnt ++
209+ if visitedCnt == cap (oce .exporters ) {
210+ // Visited and started Stop on all exporters, just wait for the stop to finish.
211+ break
212+ }
213+ }
214+
215+ wg .Wait ()
216+ close (oce .exporters )
217+
218+ return internal .CombineErrors (errors )
219+ }
220+
138221func (oce * ocagentExporter ) PushTraceData (ctx context.Context , td data.TraceData ) (int , error ) {
139- // Get an exporter worker round-robin
140- exporter := oce .exporters [atomic .AddUint32 (& oce .counter , 1 )% uint32 (len (oce .exporters ))]
222+ // Get first available exporter.
223+ exporter , ok := <- oce .exporters
224+ if ! ok {
225+ err := & ocTraceExporterError {
226+ code : errAlreadyStopped ,
227+ msg : fmt .Sprintf ("OpenCensus exporter was already stopped." ),
228+ }
229+ return len (td .Spans ), err
230+ }
231+
141232 err := exporter .ExportTraceServiceRequest (
142233 & agenttracepb.ExportTraceServiceRequest {
143234 Spans : td .Spans ,
144235 Resource : td .Resource ,
145236 Node : td .Node ,
146237 },
147238 )
239+ oce .exporters <- exporter
148240 if err != nil {
149241 return len (td .Spans ), err
150242 }
0 commit comments