Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit 84d38db

Browse files
Allow creating additional View universes. (#1196)
* Allow creating additional View universes. * Add methods to extract stats.Option * Update with comments from @rghetia * Change record interface to include WithMeter option, per @rghetia * Update with feedback from @rghetia Signed-off-by: Evan Anderson <evan.k.anderson@gmail.com> * Add a benchmark for stats.WithMeter (but with no views registered) Signed-off-by: Evan Anderson <evan.k.anderson@gmail.com> * Stop the custom meter in test to prevent leaking goroutines.
1 parent a7631f6 commit 84d38db

File tree

7 files changed

+420
-46
lines changed

7 files changed

+420
-46
lines changed

stats/benchmark_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020

2121
"go.opencensus.io/stats"
22+
"go.opencensus.io/stats/view"
2223
_ "go.opencensus.io/stats/view" // enable collection
2324
"go.opencensus.io/tag"
2425
)
@@ -52,6 +53,22 @@ func BenchmarkRecord8(b *testing.B) {
5253
}
5354
}
5455

56+
func BenchmarkRecord8_WithRecorder(b *testing.B) {
57+
ctx := context.Background()
58+
meter := view.NewMeter()
59+
meter.Start()
60+
defer meter.Stop()
61+
b.ResetTimer()
62+
63+
for i := 0; i < b.N; i++ {
64+
// Note that this benchmark has one extra allocation for stats.WithRecorder.
65+
// If you cache the recorder option, this benchmark should be equally fast as BenchmarkRecord8
66+
stats.RecordWithOptions(ctx, stats.WithRecorder(meter), stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
67+
}
68+
69+
b.StopTimer()
70+
}
71+
5572
func BenchmarkRecord8_Parallel(b *testing.B) {
5673
ctx := context.Background()
5774
b.ResetTimer()

stats/record.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,19 @@ func init() {
3131
}
3232
}
3333

34+
// Recorder provides an interface for exporting measurement information from
35+
// the static Record method by using the WithRecorder option.
36+
type Recorder interface {
37+
// Record records a set of measurements associated with the given tags and attachments.
38+
// The second argument is a `[]Measurement`.
39+
Record(*tag.Map, interface{}, map[string]interface{})
40+
}
41+
3442
type recordOptions struct {
3543
attachments metricdata.Attachments
3644
mutators []tag.Mutator
3745
measurements []Measurement
46+
recorder Recorder
3847
}
3948

4049
// WithAttachments applies provided exemplar attachments.
@@ -58,6 +67,14 @@ func WithMeasurements(measurements ...Measurement) Options {
5867
}
5968
}
6069

70+
// WithRecorder records the measurements to the specified `Recorder`, rather
71+
// than to the global metrics recorder.
72+
func WithRecorder(meter Recorder) Options {
73+
return func(ro *recordOptions) {
74+
ro.recorder = meter
75+
}
76+
}
77+
6178
// Options apply changes to recordOptions.
6279
type Options func(*recordOptions)
6380

@@ -93,6 +110,9 @@ func RecordWithOptions(ctx context.Context, ros ...Options) error {
93110
return nil
94111
}
95112
recorder := internal.DefaultRecorder
113+
if o.recorder != nil {
114+
recorder = o.recorder.Record
115+
}
96116
if recorder == nil {
97117
return nil
98118
}

stats/record_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func TestRecordWithAttachments(t *testing.T) {
5656
if err := view.Register(v); err != nil {
5757
log.Fatalf("Failed to register views: %v", err)
5858
}
59+
defer view.Unregister(v)
5960

6061
attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
6162
stats.RecordWithOptions(context.Background(), stats.WithAttachments(attachments), stats.WithMeasurements(m.M(12)))
@@ -93,3 +94,108 @@ func TestRecordWithAttachments(t *testing.T) {
9394
func cmpExemplar(got, want *metricdata.Exemplar) string {
9495
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
9596
}
97+
98+
func TestRecordWithMeter(t *testing.T) {
99+
meter := view.NewMeter()
100+
meter.Start()
101+
defer meter.Stop()
102+
k1 := tag.MustNewKey("k1")
103+
k2 := tag.MustNewKey("k2")
104+
m1 := stats.Int64("TestResolveOptions/m1", "", stats.UnitDimensionless)
105+
m2 := stats.Int64("TestResolveOptions/m2", "", stats.UnitDimensionless)
106+
v := []*view.View{{
107+
Name: "test_view",
108+
TagKeys: []tag.Key{k1, k2},
109+
Measure: m1,
110+
Aggregation: view.Distribution(5, 10),
111+
}, {
112+
Name: "second_view",
113+
TagKeys: []tag.Key{k1},
114+
Measure: m2,
115+
Aggregation: view.Count(),
116+
}}
117+
meter.SetReportingPeriod(100 * time.Millisecond)
118+
if err := meter.Register(v...); err != nil {
119+
t.Fatalf("Failed to register view: %v", err)
120+
}
121+
defer meter.Unregister(v...)
122+
123+
attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}
124+
ctx, err := tag.New(context.Background(), tag.Insert(k1, "foo"), tag.Insert(k2, "foo"))
125+
if err != nil {
126+
t.Fatalf("Failed to set context: %v", err)
127+
}
128+
err = stats.RecordWithOptions(ctx,
129+
stats.WithTags(tag.Upsert(k1, "bar"), tag.Insert(k2, "bar")),
130+
stats.WithAttachments(attachments),
131+
stats.WithMeasurements(m1.M(12), m1.M(6), m2.M(5)),
132+
stats.WithRecorder(meter))
133+
if err != nil {
134+
t.Fatalf("Failed to resolve data point: %v", err)
135+
}
136+
137+
rows, err := meter.RetrieveData("test_view")
138+
if err != nil {
139+
t.Fatalf("Unable to retrieve data for test_view: %v", err)
140+
}
141+
if len(rows) != 1 {
142+
t.Fatalf("Expected one row, got %d rows: %+v", len(rows), rows)
143+
}
144+
if len(rows[0].Tags) != 2 {
145+
t.Errorf("Wrong number of tags %d: %v", len(rows[0].Tags), rows[0].Tags)
146+
}
147+
// k2 was Insert() ed, and shouldn't update the value that was in the supplied context.
148+
wantTags := []tag.Tag{{Key: k1, Value: "bar"}, {Key: k2, Value: "foo"}}
149+
for i, tag := range rows[0].Tags {
150+
if tag.Key != wantTags[i].Key {
151+
t.Errorf("Incorrect tag %d, want: %q, got: %q", i, wantTags[i].Key, tag.Key)
152+
}
153+
if tag.Value != wantTags[i].Value {
154+
t.Errorf("Incorrect tag for %s, want: %q, got: %v", tag.Key, wantTags[i].Value, tag.Value)
155+
}
156+
157+
}
158+
wantBuckets := []int64{0, 1, 1}
159+
gotBuckets := rows[0].Data.(*view.DistributionData)
160+
if !reflect.DeepEqual(gotBuckets.CountPerBucket, wantBuckets) {
161+
t.Fatalf("want buckets %v, got %v", wantBuckets, gotBuckets)
162+
}
163+
for i, e := range gotBuckets.ExemplarsPerBucket {
164+
if gotBuckets.CountPerBucket[i] == 0 {
165+
if e != nil {
166+
t.Errorf("Unexpected exemplar for bucket")
167+
}
168+
continue
169+
}
170+
// values from the metrics above
171+
exemplarValues := []float64{0, 6, 12}
172+
wantExemplar := &metricdata.Exemplar{Value: exemplarValues[i], Attachments: attachments}
173+
if diff := cmpExemplar(e, wantExemplar); diff != "" {
174+
t.Errorf("Bad exemplar for %d: %+v", i, diff)
175+
}
176+
}
177+
178+
rows2, err := meter.RetrieveData("second_view")
179+
if err != nil {
180+
t.Fatalf("Failed to read second_view: %v", err)
181+
}
182+
if len(rows2) != 1 {
183+
t.Fatalf("Expected one row, got %d rows: %v", len(rows2), rows2)
184+
}
185+
if len(rows2[0].Tags) != 1 {
186+
t.Errorf("Expected one tag, got %d tags: %v", len(rows2[0].Tags), rows2[0].Tags)
187+
}
188+
wantTags = []tag.Tag{{Key: k1, Value: "bar"}}
189+
for i, tag := range rows2[0].Tags {
190+
if wantTags[i].Key != tag.Key {
191+
t.Errorf("Wrong key for %d, want %q, got %q", i, wantTags[i].Key, tag.Key)
192+
}
193+
if wantTags[i].Value != tag.Value {
194+
t.Errorf("Wrong value for tag %s, want %q got %q", tag.Key, wantTags[i].Value, tag.Value)
195+
}
196+
}
197+
gotCount := rows2[0].Data.(*view.CountData)
198+
if gotCount.Value != 1 {
199+
t.Errorf("Wrong count for second_view, want %d, got %d", 1, gotCount.Value)
200+
}
201+
}

stats/view/benchmark_test.go

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,15 @@ var (
4646
// BenchmarkRecordReqCommand benchmarks calling the internal recording machinery
4747
// directly.
4848
func BenchmarkRecordReqCommand(b *testing.B) {
49-
w := newWorker()
49+
w := NewMeter().(*worker)
5050

5151
register := &registerViewReq{views: []*View{view}, err: make(chan error, 1)}
5252
register.handleCommand(w)
5353
if err := <-register.err; err != nil {
5454
b.Fatal(err)
5555
}
5656

57-
const tagCount = 10
58-
ctxs := make([]context.Context, 0, tagCount)
59-
for i := 0; i < tagCount; i++ {
60-
ctx, _ := tag.New(context.Background(),
61-
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
62-
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
63-
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
64-
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
65-
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
66-
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
67-
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
68-
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
69-
)
70-
ctxs = append(ctxs, ctx)
71-
}
57+
ctxs := prepareContexts(10)
7258

7359
b.ReportAllocs()
7460
b.ResetTimer()
@@ -91,3 +77,41 @@ func BenchmarkRecordReqCommand(b *testing.B) {
9177
record.handleCommand(w)
9278
}
9379
}
80+
81+
func BenchmarkRecordViaStats(b *testing.B) {
82+
83+
meter := NewMeter()
84+
meter.Start()
85+
defer meter.Stop()
86+
meter.Register(view)
87+
defer meter.Unregister(view)
88+
89+
ctxs := prepareContexts(10)
90+
rec := stats.WithRecorder(meter)
91+
b.ReportAllocs()
92+
b.ResetTimer()
93+
94+
for i := 0; i < b.N; i++ {
95+
stats.RecordWithOptions(ctxs[i%len(ctxs)], rec, stats.WithMeasurements(m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1), m.M(1)))
96+
}
97+
98+
}
99+
100+
func prepareContexts(tagCount int) []context.Context {
101+
ctxs := make([]context.Context, 0, tagCount)
102+
for i := 0; i < tagCount; i++ {
103+
ctx, _ := tag.New(context.Background(),
104+
tag.Upsert(k1, fmt.Sprintf("v%d", i)),
105+
tag.Upsert(k2, fmt.Sprintf("v%d", i)),
106+
tag.Upsert(k3, fmt.Sprintf("v%d", i)),
107+
tag.Upsert(k4, fmt.Sprintf("v%d", i)),
108+
tag.Upsert(k5, fmt.Sprintf("v%d", i)),
109+
tag.Upsert(k6, fmt.Sprintf("v%d", i)),
110+
tag.Upsert(k7, fmt.Sprintf("v%d", i)),
111+
tag.Upsert(k8, fmt.Sprintf("v%d", i)),
112+
)
113+
ctxs = append(ctxs, ctx)
114+
}
115+
116+
return ctxs
117+
}

stats/view/export.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@
1414

1515
package view
1616

17-
import "sync"
18-
19-
var (
20-
exportersMu sync.RWMutex // guards exporters
21-
exporters = make(map[Exporter]struct{})
22-
)
23-
2417
// Exporter exports the collected records as view data.
2518
//
2619
// The ExportView method should return quickly; if an
@@ -43,16 +36,10 @@ type Exporter interface {
4336
//
4437
// Binaries can register exporters, libraries shouldn't register exporters.
4538
func RegisterExporter(e Exporter) {
46-
exportersMu.Lock()
47-
defer exportersMu.Unlock()
48-
49-
exporters[e] = struct{}{}
39+
defaultWorker.RegisterExporter(e)
5040
}
5141

5242
// UnregisterExporter unregisters an exporter.
5343
func UnregisterExporter(e Exporter) {
54-
exportersMu.Lock()
55-
defer exportersMu.Unlock()
56-
57-
delete(exporters, e)
44+
defaultWorker.UnregisterExporter(e)
5845
}

0 commit comments

Comments
 (0)