1414 * limitations under the License.
1515 */
1616
17- import { AggregationType , Bucket , DistributionData , logger , Logger , Measurement , MeasureType , StatsEventListener , View } from '@opencensus/core' ;
17+ import { AggregationType , Bucket , DistributionData , logger , Logger , Measurement , MeasureType , StatsEventListener , Tags , View } from '@opencensus/core' ;
1818import { auth , JWT } from 'google-auth-library' ;
1919import { google } from 'googleapis' ;
2020import * as path from 'path' ;
@@ -24,23 +24,41 @@ import {Distribution, LabelDescriptor, MetricDescriptor, MetricKind, Point, Stac
2424google . options ( { headers : { 'x-opencensus-outgoing-request' : 0x1 } } ) ;
2525const monitoring = google . monitoring ( 'v3' ) ;
2626
27+ const RECORD_SEPARATOR = String . fromCharCode ( 30 ) ;
28+ const UNIT_SEPARATOR = String . fromCharCode ( 31 ) ;
29+
2730/** Format and sends Stats to Stackdriver */
2831export class StackdriverStatsExporter implements StatsEventListener {
29- private delay : number ;
32+ private period : number ;
3033 private projectId : string ;
3134 private metricPrefix : string ;
35+ private viewToUpload :
36+ { [ key : string ] : { view : View ; tags : { [ key : string ] : Tags ; } } } ;
37+ private timer : NodeJS . Timer ;
3238 static readonly CUSTOM_OPENCENSUS_DOMAIN : string =
3339 'custom.googleapis.com/opencensus' ;
34- static readonly DELAY : number = 60000 ;
40+ static readonly PERIOD : number = 60000 ;
3541 logger : Logger ;
3642
3743 constructor ( options : StackdriverExporterOptions ) {
38- this . delay =
39- options . delay != null ? options . delay : StackdriverStatsExporter . DELAY ;
44+ this . period = options . period !== undefined ?
45+ options . period :
46+ StackdriverStatsExporter . PERIOD ;
4047 this . projectId = options . projectId ;
4148 this . metricPrefix = options . metricPrefix ||
4249 StackdriverStatsExporter . CUSTOM_OPENCENSUS_DOMAIN ;
4350 this . logger = options . logger || logger . logger ( ) ;
51+ this . viewToUpload = { } ;
52+
53+ this . timer = setInterval ( async ( ) => {
54+ try {
55+ await this . uploadViews ( ) ;
56+ } catch ( err ) {
57+ if ( typeof options . onMetricUploadError === 'function' ) {
58+ options . onMetricUploadError ( err ) ;
59+ }
60+ }
61+ } , this . period ) ;
4462 }
4563
4664 /**
@@ -69,14 +87,37 @@ export class StackdriverStatsExporter implements StatsEventListener {
6987 * @param views The views associated with the measure
7088 * @param measurement The measurement recorded
7189 */
72- async onRecord ( views : View [ ] , measurement : Measurement ) {
73- await new Promise ( resolve => {
74- setTimeout ( resolve , this . delay ) ;
75- } ) ;
90+ onRecord ( views : View [ ] , measurement : Measurement ) {
91+ for ( const view of views ) {
92+ if ( ! this . viewToUpload [ view . name ] ) {
93+ this . viewToUpload [ view . name ] = { view, tags : { } } ;
94+ }
95+ const tagsKey = this . encodeTags ( measurement . tags ) ;
96+ this . viewToUpload [ view . name ] . tags [ tagsKey ] = measurement . tags ;
97+ }
98+ }
7699
77- const timeSeries = views . map ( view => {
78- return this . createTimeSeriesData ( view , measurement ) ;
79- } ) ;
100+ /**
101+ * Clear the interval timer to stop uploading metrics. It should be called
102+ * whenever the exporter is not needed anymore.
103+ */
104+ close ( ) {
105+ clearInterval ( this . timer ) ;
106+ }
107+
108+ private uploadViews ( ) {
109+ const timeSeries : TimeSeries [ ] = [ ] ;
110+ for ( const name of Object . keys ( this . viewToUpload ) ) {
111+ const v = this . viewToUpload [ name ] ;
112+ for ( const tagsKey of Object . keys ( v . tags ) ) {
113+ timeSeries . push ( this . createTimeSeriesData ( v . view , v . tags [ tagsKey ] ) ) ;
114+ }
115+ }
116+ this . viewToUpload = { } ;
117+
118+ if ( timeSeries . length === 0 ) {
119+ return Promise . resolve ( ) ;
120+ }
80121
81122 return this . authorize ( ) . then ( authClient => {
82123 const request = {
@@ -94,6 +135,15 @@ export class StackdriverStatsExporter implements StatsEventListener {
94135 } ) ;
95136 }
96137
138+ private encodeTags ( tags : Tags ) : string {
139+ return Object . keys ( tags )
140+ . sort ( )
141+ . map ( tagKey => {
142+ return tagKey + UNIT_SEPARATOR + tags [ tagKey ] ;
143+ } )
144+ . join ( RECORD_SEPARATOR ) ;
145+ }
146+
97147 /**
98148 * Gets the Google Application Credentials from the environment variables
99149 * and authenticates the client.
@@ -122,31 +172,30 @@ export class StackdriverStatsExporter implements StatsEventListener {
122172 * @param view The view to get TimeSeries information from
123173 * @param measurement The measurement to get TimeSeries information from
124174 */
125- private createTimeSeriesData ( view : View , measurement : Measurement ) :
126- TimeSeries {
127- const aggregationData = view . getSnapshot ( measurement . tags ) ;
175+ private createTimeSeriesData ( view : View , tags : Tags ) : TimeSeries {
176+ const aggregationData = view . getSnapshot ( tags ) ;
128177
129178 const resourceLabels :
130179 { [ key : string ] : string } = { project_id : this . projectId } ;
131180
132- // For non Sum Aggregations, the end time should be the same as the start
181+ // For LAST_VALUE Aggregations, the end time should be the same as the start
133182 // time.
134183 const endTime = ( new Date ( aggregationData . timestamp ) ) . toISOString ( ) ;
135- const startTime = view . aggregation === AggregationType . SUM ?
184+ const startTime = view . aggregation !== AggregationType . LAST_VALUE ?
136185 ( new Date ( view . startTime ) ) . toISOString ( ) :
137186 endTime ;
138187
139188 let value ;
140- if ( view . measure . type === MeasureType . INT64 ) {
141- value = { int64Value : measurement . value . toString ( ) } ;
142- } else if ( aggregationData . type === AggregationType . DISTRIBUTION ) {
189+ if ( aggregationData . type === AggregationType . DISTRIBUTION ) {
143190 value = { distributionValue : this . createDistribution ( aggregationData ) } ;
191+ } else if ( view . measure . type === MeasureType . INT64 ) {
192+ value = { int64Value : aggregationData . value . toString ( ) } ;
144193 } else {
145- value = { doubleValue : measurement . value } ;
194+ value = { doubleValue : aggregationData . value } ;
146195 }
147196
148197 return {
149- metric : { type : this . getMetricType ( view . name ) , labels : measurement . tags } ,
198+ metric : { type : this . getMetricType ( view . name ) , labels : tags } ,
150199 resource : { type : 'global' , labels : resourceLabels } ,
151200 metricKind : this . createMetricKind ( view . aggregation ) ,
152201 valueType : this . createValueType ( view ) ,
@@ -241,7 +290,7 @@ export class StackdriverStatsExporter implements StatsEventListener {
241290 * from.
242291 */
243292 private createMetricKind ( aggregationType : AggregationType ) : MetricKind {
244- if ( aggregationType === AggregationType . SUM ) {
293+ if ( aggregationType !== AggregationType . LAST_VALUE ) {
245294 return MetricKind . CUMULATIVE ;
246295 }
247296 return MetricKind . GAUGE ;
0 commit comments