Switch to go modules

This commit is contained in:
Manuel Alejandro de Brito Fontes 2019-04-15 08:34:23 -04:00
parent 461954facb
commit 1720059244
No known key found for this signature in database
GPG key ID: 786136016A8BA02A
763 changed files with 24896 additions and 177398 deletions

View file

@ -17,8 +17,9 @@ package view
import (
"math"
"time"
"go.opencensus.io/exemplar"
"go.opencensus.io/metric/metricdata"
)
// AggregationData represents an aggregated value from a collection.
@ -26,9 +27,10 @@ import (
// Mosts users won't directly access aggregration data.
type AggregationData interface {
isAggregationData() bool
addSample(e *exemplar.Exemplar)
addSample(v float64, attachments map[string]interface{}, t time.Time)
clone() AggregationData
equal(other AggregationData) bool
toPoint(t metricdata.Type, time time.Time) metricdata.Point
}
const epsilon = 1e-9
@ -43,7 +45,7 @@ type CountData struct {
func (a *CountData) isAggregationData() bool { return true }
func (a *CountData) addSample(_ *exemplar.Exemplar) {
func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
a.Value = a.Value + 1
}
@ -60,6 +62,15 @@ func (a *CountData) equal(other AggregationData) bool {
return a.Value == a2.Value
}
func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeInt64:
return metricdata.NewInt64Point(t, a.Value)
default:
panic("unsupported metricdata.Type")
}
}
// SumData is the aggregated data for the Sum aggregation.
// A sum aggregation processes data and sums up the recordings.
//
@ -70,8 +81,8 @@ type SumData struct {
func (a *SumData) isAggregationData() bool { return true }
func (a *SumData) addSample(e *exemplar.Exemplar) {
a.Value += e.Value
func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
a.Value += v
}
func (a *SumData) clone() AggregationData {
@ -86,6 +97,17 @@ func (a *SumData) equal(other AggregationData) bool {
return math.Pow(a.Value-a2.Value, 2) < epsilon
}
func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeInt64:
return metricdata.NewInt64Point(t, int64(a.Value))
case metricdata.TypeCumulativeFloat64:
return metricdata.NewFloat64Point(t, a.Value)
default:
panic("unsupported metricdata.Type")
}
}
// DistributionData is the aggregated data for the
// Distribution aggregation.
//
@ -102,7 +124,7 @@ type DistributionData struct {
CountPerBucket []int64 // number of occurrences per bucket
// ExemplarsPerBucket is slice the same length as CountPerBucket containing
// an exemplar for the associated bucket, or nil.
ExemplarsPerBucket []*exemplar.Exemplar
ExemplarsPerBucket []*metricdata.Exemplar
bounds []float64 // histogram distribution of the values
}
@ -110,7 +132,7 @@ func newDistributionData(bounds []float64) *DistributionData {
bucketCount := len(bounds) + 1
return &DistributionData{
CountPerBucket: make([]int64, bucketCount),
ExemplarsPerBucket: make([]*exemplar.Exemplar, bucketCount),
ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount),
bounds: bounds,
Min: math.MaxFloat64,
Max: math.SmallestNonzeroFloat64,
@ -129,64 +151,62 @@ func (a *DistributionData) variance() float64 {
func (a *DistributionData) isAggregationData() bool { return true }
func (a *DistributionData) addSample(e *exemplar.Exemplar) {
f := e.Value
if f < a.Min {
a.Min = f
// TODO(songy23): support exemplar attachments.
func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
if v < a.Min {
a.Min = v
}
if f > a.Max {
a.Max = f
if v > a.Max {
a.Max = v
}
a.Count++
a.addToBucket(e)
a.addToBucket(v, attachments, t)
if a.Count == 1 {
a.Mean = f
a.Mean = v
return
}
oldMean := a.Mean
a.Mean = a.Mean + (f-a.Mean)/float64(a.Count)
a.SumOfSquaredDev = a.SumOfSquaredDev + (f-oldMean)*(f-a.Mean)
a.Mean = a.Mean + (v-a.Mean)/float64(a.Count)
a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
}
func (a *DistributionData) addToBucket(e *exemplar.Exemplar) {
func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
var count *int64
var ex **exemplar.Exemplar
for i, b := range a.bounds {
if e.Value < b {
var i int
var b float64
for i, b = range a.bounds {
if v < b {
count = &a.CountPerBucket[i]
ex = &a.ExemplarsPerBucket[i]
break
}
}
if count == nil {
count = &a.CountPerBucket[len(a.bounds)]
ex = &a.ExemplarsPerBucket[len(a.bounds)]
if count == nil { // Last bucket.
i = len(a.bounds)
count = &a.CountPerBucket[i]
}
*count++
*ex = maybeRetainExemplar(*ex, e)
if exemplar := getExemplar(v, attachments, t); exemplar != nil {
a.ExemplarsPerBucket[i] = exemplar
}
}
func maybeRetainExemplar(old, cur *exemplar.Exemplar) *exemplar.Exemplar {
if old == nil {
return cur
func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
if len(attachments) == 0 {
return nil
}
// Heuristic to pick the "better" exemplar: first keep the one with a
// sampled trace attachment, if neither have a trace attachment, pick the
// one with more attachments.
_, haveTraceID := cur.Attachments[exemplar.KeyTraceID]
if haveTraceID || len(cur.Attachments) >= len(old.Attachments) {
return cur
return &metricdata.Exemplar{
Value: v,
Timestamp: t,
Attachments: attachments,
}
return old
}
func (a *DistributionData) clone() AggregationData {
c := *a
c.CountPerBucket = append([]int64(nil), a.CountPerBucket...)
c.ExemplarsPerBucket = append([]*exemplar.Exemplar(nil), a.ExemplarsPerBucket...)
c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...)
return &c
}
@ -209,6 +229,33 @@ func (a *DistributionData) equal(other AggregationData) bool {
return a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon
}
func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeCumulativeDistribution:
buckets := []metricdata.Bucket{}
for i := 0; i < len(a.CountPerBucket); i++ {
buckets = append(buckets, metricdata.Bucket{
Count: a.CountPerBucket[i],
Exemplar: a.ExemplarsPerBucket[i],
})
}
bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds}
val := &metricdata.Distribution{
Count: a.Count,
Sum: a.Sum(),
SumOfSquaredDeviation: a.SumOfSquaredDev,
BucketOptions: bucketOptions,
Buckets: buckets,
}
return metricdata.NewDistributionPoint(t, val)
default:
// TODO: [rghetia] when we have a use case for TypeGaugeDistribution.
panic("unsupported metricdata.Type")
}
}
// LastValueData returns the last value recorded for LastValue aggregation.
type LastValueData struct {
Value float64
@ -218,8 +265,8 @@ func (l *LastValueData) isAggregationData() bool {
return true
}
func (l *LastValueData) addSample(e *exemplar.Exemplar) {
l.Value = e.Value
func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
l.Value = v
}
func (l *LastValueData) clone() AggregationData {
@ -233,3 +280,14 @@ func (l *LastValueData) equal(other AggregationData) bool {
}
return l.Value == a2.Value
}
func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point {
switch metricType {
case metricdata.TypeGaugeInt64:
return metricdata.NewInt64Point(t, int64(l.Value))
case metricdata.TypeGaugeFloat64:
return metricdata.NewFloat64Point(t, l.Value)
default:
panic("unsupported metricdata.Type")
}
}

View file

@ -17,8 +17,7 @@ package view
import (
"sort"
"go.opencensus.io/exemplar"
"time"
"go.opencensus.io/internal/tagencoding"
"go.opencensus.io/tag"
@ -33,13 +32,13 @@ type collector struct {
a *Aggregation
}
func (c *collector) addSample(s string, e *exemplar.Exemplar) {
func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
c.signatures[s] = aggregator
}
aggregator.addSample(e)
aggregator.addSample(v, attachments, t)
}
// collectRows returns a snapshot of the collected Row values.

View file

@ -24,8 +24,7 @@ import (
"sync/atomic"
"time"
"go.opencensus.io/exemplar"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
@ -118,15 +117,17 @@ func dropZeroBounds(bounds ...float64) []float64 {
// viewInternal is the internal representation of a View.
type viewInternal struct {
view *View // view is the canonicalized View definition associated with this view.
subscribed uint32 // 1 if someone is subscribed and data need to be exported, use atomic to access
collector *collector
view *View // view is the canonicalized View definition associated with this view.
subscribed uint32 // 1 if someone is subscribed and data need to be exported, use atomic to access
collector *collector
metricDescriptor *metricdata.Descriptor
}
func newViewInternal(v *View) (*viewInternal, error) {
return &viewInternal{
view: v,
collector: &collector{make(map[string]AggregationData), v.Aggregation},
view: v,
collector: &collector{make(map[string]AggregationData), v.Aggregation},
metricDescriptor: viewToMetricDescriptor(v),
}, nil
}
@ -152,12 +153,12 @@ func (v *viewInternal) collectedRows() []*Row {
return v.collector.collectedRows(v.view.TagKeys)
}
func (v *viewInternal) addSample(m *tag.Map, e *exemplar.Exemplar) {
func (v *viewInternal) addSample(m *tag.Map, val float64, attachments map[string]interface{}, t time.Time) {
if !v.isSubscribed() {
return
}
sig := string(encodeWithKeys(m, v.view.TagKeys))
v.collector.addSample(sig, e)
v.collector.addSample(sig, val, attachments, t)
}
// A Data is a set of rows about usage of the single measure associated

131
vendor/go.opencensus.io/stats/view/view_to_metric.go generated vendored Normal file
View file

@ -0,0 +1,131 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package view
import (
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
)
func getUnit(unit string) metricdata.Unit {
switch unit {
case "1":
return metricdata.UnitDimensionless
case "ms":
return metricdata.UnitMilliseconds
case "By":
return metricdata.UnitBytes
}
return metricdata.UnitDimensionless
}
func getType(v *View) metricdata.Type {
m := v.Measure
agg := v.Aggregation
switch agg.Type {
case AggTypeSum:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeCumulativeInt64
case *stats.Float64Measure:
return metricdata.TypeCumulativeFloat64
default:
panic("unexpected measure type")
}
case AggTypeDistribution:
return metricdata.TypeCumulativeDistribution
case AggTypeLastValue:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeGaugeInt64
case *stats.Float64Measure:
return metricdata.TypeGaugeFloat64
default:
panic("unexpected measure type")
}
case AggTypeCount:
switch m.(type) {
case *stats.Int64Measure:
return metricdata.TypeCumulativeInt64
case *stats.Float64Measure:
return metricdata.TypeCumulativeInt64
default:
panic("unexpected measure type")
}
default:
panic("unexpected aggregation type")
}
}
func getLableKeys(v *View) []string {
labelKeys := []string{}
for _, k := range v.TagKeys {
labelKeys = append(labelKeys, k.Name())
}
return labelKeys
}
func viewToMetricDescriptor(v *View) *metricdata.Descriptor {
return &metricdata.Descriptor{
Name: v.Name,
Description: v.Description,
Unit: getUnit(v.Measure.Unit()),
Type: getType(v),
LabelKeys: getLableKeys(v),
}
}
func toLabelValues(row *Row) []metricdata.LabelValue {
labelValues := []metricdata.LabelValue{}
for _, tag := range row.Tags {
labelValues = append(labelValues, metricdata.NewLabelValue(tag.Value))
}
return labelValues
}
func rowToTimeseries(v *viewInternal, row *Row, now time.Time, startTime time.Time) *metricdata.TimeSeries {
return &metricdata.TimeSeries{
Points: []metricdata.Point{row.Data.toPoint(v.metricDescriptor.Type, now)},
LabelValues: toLabelValues(row),
StartTime: startTime,
}
}
func viewToMetric(v *viewInternal, now time.Time, startTime time.Time) *metricdata.Metric {
if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
startTime = time.Time{}
}
rows := v.collectedRows()
if len(rows) == 0 {
return nil
}
ts := []*metricdata.TimeSeries{}
for _, row := range rows {
ts = append(ts, rowToTimeseries(v, row, now, startTime))
}
m := &metricdata.Metric{
Descriptor: *v.metricDescriptor,
TimeSeries: ts,
}
return m
}

View file

@ -17,8 +17,11 @@ package view
import (
"fmt"
"sync"
"time"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats"
"go.opencensus.io/stats/internal"
"go.opencensus.io/tag"
@ -43,6 +46,7 @@ type worker struct {
timer *time.Ticker
c chan command
quit, done chan bool
mu sync.RWMutex
}
var defaultWorker *worker
@ -102,7 +106,7 @@ func RetrieveData(viewName string) ([]*Row, error) {
return resp.rows, resp.err
}
func record(tags *tag.Map, ms interface{}, attachments map[string]string) {
func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
req := &recordReq{
tm: tags,
ms: ms.([]stats.Measurement),
@ -143,6 +147,9 @@ func newWorker() *worker {
}
func (w *worker) start() {
prodMgr := metricproducer.GlobalManager()
prodMgr.AddProducer(w)
for {
select {
case cmd := <-w.c:
@ -159,6 +166,9 @@ func (w *worker) start() {
}
func (w *worker) stop() {
prodMgr := metricproducer.GlobalManager()
prodMgr.DeleteProducer(w)
w.quit <- true
<-w.done
}
@ -176,6 +186,8 @@ func (w *worker) getMeasureRef(name string) *measureRef {
}
func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
w.mu.Lock()
defer w.mu.Unlock()
vi, err := newViewInternal(v)
if err != nil {
return nil, err
@ -195,6 +207,12 @@ func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
return vi, nil
}
func (w *worker) unregisterView(viewName string) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.views, viewName)
}
func (w *worker) reportView(v *viewInternal, now time.Time) {
if !v.isSubscribed() {
return
@ -222,3 +240,40 @@ func (w *worker) reportUsage(now time.Time) {
w.reportView(v, now)
}
}
func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
if !v.isSubscribed() {
return nil
}
_, ok := w.startTimes[v]
if !ok {
w.startTimes[v] = now
}
var startTime time.Time
if v.metricDescriptor.Type == metricdata.TypeGaugeInt64 ||
v.metricDescriptor.Type == metricdata.TypeGaugeFloat64 {
startTime = time.Time{}
} else {
startTime = w.startTimes[v]
}
return viewToMetric(v, now, startTime)
}
// Read reads all view data and returns them as metrics.
// It is typically invoked by metric reader to export stats in metric format.
func (w *worker) Read() []*metricdata.Metric {
w.mu.Lock()
defer w.mu.Unlock()
now := time.Now()
metrics := make([]*metricdata.Metric, 0, len(w.views))
for _, v := range w.views {
metric := w.toMetric(v, now)
if metric != nil {
metrics = append(metrics, metric)
}
}
return metrics
}

View file

@ -21,8 +21,6 @@ import (
"strings"
"time"
"go.opencensus.io/exemplar"
"go.opencensus.io/stats"
"go.opencensus.io/stats/internal"
"go.opencensus.io/tag"
@ -105,7 +103,7 @@ func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
// The collected data can be cleared.
vi.clearRows()
}
delete(w.views, name)
w.unregisterView(name)
}
cmd.done <- struct{}{}
}
@ -150,7 +148,7 @@ func (cmd *retrieveDataReq) handleCommand(w *worker) {
type recordReq struct {
tm *tag.Map
ms []stats.Measurement
attachments map[string]string
attachments map[string]interface{}
t time.Time
}
@ -161,12 +159,7 @@ func (cmd *recordReq) handleCommand(w *worker) {
}
ref := w.getMeasureRef(m.Measure().Name())
for v := range ref.views {
e := &exemplar.Exemplar{
Value: m.Value(),
Timestamp: cmd.t,
Attachments: cmd.attachments,
}
v.addSample(cmd.tm, e)
v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now())
}
}
}