Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit af30f77

Browse files
authored
Record a Start Time Per Time Series within a View (#1220)
1 parent 8e242ed commit af30f77

12 files changed

Lines changed: 182 additions & 149 deletions

plugin/ocgrpc/client_stats_handler_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,9 @@ func TestClientDefaultCollections(t *testing.T) {
318318
t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err)
319319
continue
320320
}
321+
for i := range gotRows {
322+
view.ClearStart(gotRows[i].Data)
323+
}
321324

322325
for _, gotRow := range gotRows {
323326
if !containsRow(wantData.rows, gotRow) {

plugin/ocgrpc/server_stats_handler_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,10 @@ func TestServerDefaultCollections(t *testing.T) {
305305
continue
306306
}
307307

308+
for i := range gotRows {
309+
view.ClearStart(gotRows[i].Data)
310+
}
311+
308312
for _, gotRow := range gotRows {
309313
if !containsRow(wantData.rows, gotRow) {
310314
t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)

plugin/ochttp/route_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ func TestWithRouteTag(t *testing.T) {
5353
view.Unregister(v) // trigger exporting
5454

5555
got := e.rowsForView("request_total")
56+
for i := range got {
57+
view.ClearStart(got[i].Data)
58+
}
5659
want := []*view.Row{
5760
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
5861
}
@@ -90,6 +93,9 @@ func TestSetRoute(t *testing.T) {
9093
view.Unregister(v) // trigger exporting
9194

9295
got := e.rowsForView("request_total")
96+
for i := range got {
97+
view.ClearStart(got[i].Data)
98+
}
9399
want := []*view.Row{
94100
{Data: &view.CountData{Value: 1}, Tags: []tag.Tag{{Key: ochttp.KeyServerRoute, Value: "/a/"}}},
95101
}

stats/view/aggregation.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package view
1717

18+
import "time"
19+
1820
// AggType represents the type of aggregation function used on a View.
1921
type AggType int
2022

@@ -45,20 +47,20 @@ type Aggregation struct {
4547
Type AggType // Type is the AggType of this Aggregation.
4648
Buckets []float64 // Buckets are the bucket endpoints if this Aggregation represents a distribution, see Distribution.
4749

48-
newData func() AggregationData
50+
newData func(time.Time) AggregationData
4951
}
5052

5153
var (
5254
aggCount = &Aggregation{
5355
Type: AggTypeCount,
54-
newData: func() AggregationData {
55-
return &CountData{}
56+
newData: func(t time.Time) AggregationData {
57+
return &CountData{Start: t}
5658
},
5759
}
5860
aggSum = &Aggregation{
5961
Type: AggTypeSum,
60-
newData: func() AggregationData {
61-
return &SumData{}
62+
newData: func(t time.Time) AggregationData {
63+
return &SumData{Start: t}
6264
},
6365
}
6466
)
@@ -103,8 +105,8 @@ func Distribution(bounds ...float64) *Aggregation {
103105
Type: AggTypeDistribution,
104106
Buckets: bounds,
105107
}
106-
agg.newData = func() AggregationData {
107-
return newDistributionData(agg)
108+
agg.newData = func(t time.Time) AggregationData {
109+
return newDistributionData(agg, t)
108110
}
109111
return agg
110112
}
@@ -114,7 +116,7 @@ func Distribution(bounds ...float64) *Aggregation {
114116
func LastValue() *Aggregation {
115117
return &Aggregation{
116118
Type: AggTypeLastValue,
117-
newData: func() AggregationData {
119+
newData: func(_ time.Time) AggregationData {
118120
return &LastValueData{}
119121
},
120122
}

stats/view/aggregation_data.go

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type AggregationData interface {
3131
clone() AggregationData
3232
equal(other AggregationData) bool
3333
toPoint(t metricdata.Type, time time.Time) metricdata.Point
34+
StartTime() time.Time
3435
}
3536

3637
const epsilon = 1e-9
@@ -40,6 +41,7 @@ const epsilon = 1e-9
4041
//
4142
// Most users won't directly access count data.
4243
type CountData struct {
44+
Start time.Time
4345
Value int64
4446
}
4547

@@ -50,7 +52,7 @@ func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time)
5052
}
5153

5254
func (a *CountData) clone() AggregationData {
53-
return &CountData{Value: a.Value}
55+
return &CountData{Value: a.Value, Start: a.Start}
5456
}
5557

5658
func (a *CountData) equal(other AggregationData) bool {
@@ -59,7 +61,7 @@ func (a *CountData) equal(other AggregationData) bool {
5961
return false
6062
}
6163

62-
return a.Value == a2.Value
64+
return a.Start.Equal(a2.Start) && a.Value == a2.Value
6365
}
6466

6567
func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
@@ -71,11 +73,17 @@ func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.
7173
}
7274
}
7375

76+
// StartTime returns the start time of the data being aggregated by CountData.
77+
func (a *CountData) StartTime() time.Time {
78+
return a.Start
79+
}
80+
7481
// SumData is the aggregated data for the Sum aggregation.
7582
// A sum aggregation processes data and sums up the recordings.
7683
//
7784
// Most users won't directly access sum data.
7885
type SumData struct {
86+
Start time.Time
7987
Value float64
8088
}
8189

@@ -86,15 +94,15 @@ func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
8694
}
8795

8896
func (a *SumData) clone() AggregationData {
89-
return &SumData{Value: a.Value}
97+
return &SumData{Value: a.Value, Start: a.Start}
9098
}
9199

92100
func (a *SumData) equal(other AggregationData) bool {
93101
a2, ok := other.(*SumData)
94102
if !ok {
95103
return false
96104
}
97-
return math.Pow(a.Value-a2.Value, 2) < epsilon
105+
return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon
98106
}
99107

100108
func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
@@ -108,6 +116,11 @@ func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Po
108116
}
109117
}
110118

119+
// StartTime returns the start time of the data being aggregated by SumData.
120+
func (a *SumData) StartTime() time.Time {
121+
return a.Start
122+
}
123+
111124
// DistributionData is the aggregated data for the
112125
// Distribution aggregation.
113126
//
@@ -126,16 +139,18 @@ type DistributionData struct {
126139
// an exemplar for the associated bucket, or nil.
127140
ExemplarsPerBucket []*metricdata.Exemplar
128141
bounds []float64 // histogram distribution of the values
142+
Start time.Time
129143
}
130144

131-
func newDistributionData(agg *Aggregation) *DistributionData {
145+
func newDistributionData(agg *Aggregation, t time.Time) *DistributionData {
132146
bucketCount := len(agg.Buckets) + 1
133147
return &DistributionData{
134148
CountPerBucket: make([]int64, bucketCount),
135149
ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
136150
bounds: agg.Buckets,
137151
Min: math.MaxFloat64,
138152
Max: math.SmallestNonzeroFloat64,
153+
Start: t,
139154
}
140155
}
141156

@@ -226,7 +241,11 @@ func (a *DistributionData) equal(other AggregationData) bool {
226241
return false
227242
}
228243
}
229-
return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
244+
return a.Start.Equal(a2.Start) &&
245+
a.Count == a2.Count &&
246+
a.Min == a2.Min &&
247+
a.Max == a2.Max &&
248+
math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
230249
}
231250

232251
func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
@@ -256,6 +275,11 @@ func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metr
256275
}
257276
}
258277

278+
// StartTime returns the start time of the data being aggregated by DistributionData.
279+
func (a *DistributionData) StartTime() time.Time {
280+
return a.Start
281+
}
282+
259283
// LastValueData returns the last value recorded for LastValue aggregation.
260284
type LastValueData struct {
261285
Value float64
@@ -291,3 +315,22 @@ func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricd
291315
panic("unsupported metricdata.Type")
292316
}
293317
}
318+
319+
// StartTime returns an empty time value as start time is not recorded when using last value
320+
// aggregation.
321+
func (l *LastValueData) StartTime() time.Time {
322+
return time.Time{}
323+
}
324+
325+
// ClearStart clears the Start field from data if present. Useful for testing in cases where the
326+
// start time will be nondeterministic.
327+
func ClearStart(data AggregationData) {
328+
switch data := data.(type) {
329+
case *CountData:
330+
data.Start = time.Time{}
331+
case *SumData:
332+
data.Start = time.Time{}
333+
case *DistributionData:
334+
data.Start = time.Time{}
335+
}
336+
}

stats/view/aggregation_data_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestDataClone(t *testing.T) {
2929
agg := &Aggregation{
3030
Buckets: []float64{1, 2, 3, 4},
3131
}
32-
dist := newDistributionData(agg)
32+
dist := newDistributionData(agg, time.Time{})
3333
dist.Count = 7
3434
dist.Max = 11
3535
dist.Min = 1
@@ -72,7 +72,7 @@ func TestDistributionData_addSample(t *testing.T) {
7272
agg := &Aggregation{
7373
Buckets: []float64{1, 2},
7474
}
75-
dd := newDistributionData(agg)
75+
dd := newDistributionData(agg, time.Time{})
7676
attachments1 := map[string]interface{}{"key1": "value1"}
7777
t1 := time.Now()
7878
dd.addSample(0.5, attachments1, t1)

stats/view/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type collector struct {
3535
func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
3636
aggregator, ok := c.signatures[s]
3737
if !ok {
38-
aggregator = c.a.newData()
38+
aggregator = c.a.newData(t)
3939
c.signatures[s] = aggregator
4040
}
4141
aggregator.addSample(v, attachments, t)

0 commit comments

Comments
 (0)