Update dependencies

This commit is contained in:
Manuel de Brito Fontes 2017-10-06 17:33:32 -03:00
parent bf5616c65b
commit d6d374b28d
13962 changed files with 48226 additions and 3618880 deletions

View file

@ -1,345 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package prometheus provides bindings to the Prometheus HTTP API:
// http://prometheus.io/docs/querying/api/
package prometheus
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
)
const (
statusAPIError = 422
apiPrefix = "/api/v1"
epQuery = "/query"
epQueryRange = "/query_range"
epLabelValues = "/label/:name/values"
epSeries = "/series"
)
type ErrorType string
const (
// The different API error types.
ErrBadData ErrorType = "bad_data"
ErrTimeout = "timeout"
ErrCanceled = "canceled"
ErrExec = "execution"
ErrBadResponse = "bad_response"
)
// Error is an error returned by the API.
type Error struct {
Type ErrorType
Msg string
}
func (e *Error) Error() string {
return fmt.Sprintf("%s: %s", e.Type, e.Msg)
}
// CancelableTransport is like net.Transport but provides
// per-request cancelation functionality.
type CancelableTransport interface {
http.RoundTripper
CancelRequest(req *http.Request)
}
var DefaultTransport CancelableTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
}
// Config defines configuration parameters for a new client.
type Config struct {
// The address of the Prometheus to connect to.
Address string
// Transport is used by the Client to drive HTTP requests. If not
// provided, DefaultTransport will be used.
Transport CancelableTransport
}
func (cfg *Config) transport() CancelableTransport {
if cfg.Transport == nil {
return DefaultTransport
}
return cfg.Transport
}
type Client interface {
url(ep string, args map[string]string) *url.URL
do(context.Context, *http.Request) (*http.Response, []byte, error)
}
// New returns a new Client.
//
// It is safe to use the returned Client from multiple goroutines.
func New(cfg Config) (Client, error) {
u, err := url.Parse(cfg.Address)
if err != nil {
return nil, err
}
u.Path = strings.TrimRight(u.Path, "/") + apiPrefix
return &httpClient{
endpoint: u,
transport: cfg.transport(),
}, nil
}
type httpClient struct {
endpoint *url.URL
transport CancelableTransport
}
func (c *httpClient) url(ep string, args map[string]string) *url.URL {
p := path.Join(c.endpoint.Path, ep)
for arg, val := range args {
arg = ":" + arg
p = strings.Replace(p, arg, val, -1)
}
u := *c.endpoint
u.Path = p
return &u
}
func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
resp, err := ctxhttp.Do(ctx, &http.Client{Transport: c.transport}, req)
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
if err != nil {
return nil, nil, err
}
var body []byte
done := make(chan struct{})
go func() {
body, err = ioutil.ReadAll(resp.Body)
close(done)
}()
select {
case <-ctx.Done():
err = resp.Body.Close()
<-done
if err == nil {
err = ctx.Err()
}
case <-done:
}
return resp, body, err
}
// apiClient wraps a regular client and processes successful API responses.
// Successful also includes responses that errored at the API level.
type apiClient struct {
Client
}
type apiResponse struct {
Status string `json:"status"`
Data json.RawMessage `json:"data"`
ErrorType ErrorType `json:"errorType"`
Error string `json:"error"`
}
func (c apiClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
resp, body, err := c.Client.do(ctx, req)
if err != nil {
return resp, body, err
}
code := resp.StatusCode
if code/100 != 2 && code != statusAPIError {
return resp, body, &Error{
Type: ErrBadResponse,
Msg: fmt.Sprintf("bad response code %d", resp.StatusCode),
}
}
var result apiResponse
if err = json.Unmarshal(body, &result); err != nil {
return resp, body, &Error{
Type: ErrBadResponse,
Msg: err.Error(),
}
}
if (code == statusAPIError) != (result.Status == "error") {
err = &Error{
Type: ErrBadResponse,
Msg: "inconsistent body for response code",
}
}
if code == statusAPIError && result.Status == "error" {
err = &Error{
Type: result.ErrorType,
Msg: result.Error,
}
}
return resp, []byte(result.Data), err
}
// Range represents a sliced time range.
type Range struct {
// The boundaries of the time range.
Start, End time.Time
// The maximum time between two slices within the boundaries.
Step time.Duration
}
// queryResult contains result data for a query.
type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`
// The decoded value.
v model.Value
}
func (qr *queryResult) UnmarshalJSON(b []byte) error {
v := struct {
Type model.ValueType `json:"resultType"`
Result json.RawMessage `json:"result"`
}{}
err := json.Unmarshal(b, &v)
if err != nil {
return err
}
switch v.Type {
case model.ValScalar:
var sv model.Scalar
err = json.Unmarshal(v.Result, &sv)
qr.v = &sv
case model.ValVector:
var vv model.Vector
err = json.Unmarshal(v.Result, &vv)
qr.v = vv
case model.ValMatrix:
var mv model.Matrix
err = json.Unmarshal(v.Result, &mv)
qr.v = mv
default:
err = fmt.Errorf("unexpected value type %q", v.Type)
}
return err
}
// QueryAPI provides bindings the Prometheus's query API.
type QueryAPI interface {
// Query performs a query for the given time.
Query(ctx context.Context, query string, ts time.Time) (model.Value, error)
// Query performs a query for the given range.
QueryRange(ctx context.Context, query string, r Range) (model.Value, error)
}
// NewQueryAPI returns a new QueryAPI for the client.
//
// It is safe to use the returned QueryAPI from multiple goroutines.
func NewQueryAPI(c Client) QueryAPI {
return &httpQueryAPI{client: apiClient{c}}
}
type httpQueryAPI struct {
client Client
}
func (h *httpQueryAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, error) {
u := h.client.url(epQuery, nil)
q := u.Query()
q.Set("query", query)
q.Set("time", ts.Format(time.RFC3339Nano))
u.RawQuery = q.Encode()
req, _ := http.NewRequest("GET", u.String(), nil)
_, body, err := h.client.do(ctx, req)
if err != nil {
return nil, err
}
var qres queryResult
err = json.Unmarshal(body, &qres)
return model.Value(qres.v), err
}
func (h *httpQueryAPI) QueryRange(ctx context.Context, query string, r Range) (model.Value, error) {
u := h.client.url(epQueryRange, nil)
q := u.Query()
var (
start = r.Start.Format(time.RFC3339Nano)
end = r.End.Format(time.RFC3339Nano)
step = strconv.FormatFloat(r.Step.Seconds(), 'f', 3, 64)
)
q.Set("query", query)
q.Set("start", start)
q.Set("end", end)
q.Set("step", step)
u.RawQuery = q.Encode()
req, _ := http.NewRequest("GET", u.String(), nil)
_, body, err := h.client.do(ctx, req)
if err != nil {
return nil, err
}
var qres queryResult
err = json.Unmarshal(body, &qres)
return model.Value(qres.v), err
}

View file

@ -1,453 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"reflect"
"testing"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
)
func TestConfig(t *testing.T) {
c := Config{}
if c.transport() != DefaultTransport {
t.Fatalf("expected default transport for nil Transport field")
}
}
func TestClientURL(t *testing.T) {
tests := []struct {
address string
endpoint string
args map[string]string
expected string
}{
{
address: "http://localhost:9090",
endpoint: "/test",
expected: "http://localhost:9090/test",
},
{
address: "http://localhost",
endpoint: "/test",
expected: "http://localhost/test",
},
{
address: "http://localhost:9090",
endpoint: "test",
expected: "http://localhost:9090/test",
},
{
address: "http://localhost:9090/prefix",
endpoint: "/test",
expected: "http://localhost:9090/prefix/test",
},
{
address: "https://localhost:9090/",
endpoint: "/test/",
expected: "https://localhost:9090/test",
},
{
address: "http://localhost:9090",
endpoint: "/test/:param",
args: map[string]string{
"param": "content",
},
expected: "http://localhost:9090/test/content",
},
{
address: "http://localhost:9090",
endpoint: "/test/:param/more/:param",
args: map[string]string{
"param": "content",
},
expected: "http://localhost:9090/test/content/more/content",
},
{
address: "http://localhost:9090",
endpoint: "/test/:param/more/:foo",
args: map[string]string{
"param": "content",
"foo": "bar",
},
expected: "http://localhost:9090/test/content/more/bar",
},
{
address: "http://localhost:9090",
endpoint: "/test/:param",
args: map[string]string{
"nonexistant": "content",
},
expected: "http://localhost:9090/test/:param",
},
}
for _, test := range tests {
ep, err := url.Parse(test.address)
if err != nil {
t.Fatal(err)
}
hclient := &httpClient{
endpoint: ep,
transport: DefaultTransport,
}
u := hclient.url(test.endpoint, test.args)
if u.String() != test.expected {
t.Errorf("unexpected result: got %s, want %s", u, test.expected)
continue
}
// The apiClient must return exactly the same result as the httpClient.
aclient := &apiClient{hclient}
u = aclient.url(test.endpoint, test.args)
if u.String() != test.expected {
t.Errorf("unexpected result: got %s, want %s", u, test.expected)
}
}
}
type testClient struct {
*testing.T
ch chan apiClientTest
req *http.Request
}
type apiClientTest struct {
code int
response interface{}
expected string
err *Error
}
func (c *testClient) url(ep string, args map[string]string) *url.URL {
return nil
}
func (c *testClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
if ctx == nil {
c.Fatalf("context was not passed down")
}
if req != c.req {
c.Fatalf("request was not passed down")
}
test := <-c.ch
var b []byte
var err error
switch v := test.response.(type) {
case string:
b = []byte(v)
default:
b, err = json.Marshal(v)
if err != nil {
c.Fatal(err)
}
}
resp := &http.Response{
StatusCode: test.code,
}
return resp, b, nil
}
func TestAPIClientDo(t *testing.T) {
tests := []apiClientTest{
{
response: &apiResponse{
Status: "error",
Data: json.RawMessage(`null`),
ErrorType: ErrBadData,
Error: "failed",
},
err: &Error{
Type: ErrBadData,
Msg: "failed",
},
code: statusAPIError,
expected: `null`,
},
{
response: &apiResponse{
Status: "error",
Data: json.RawMessage(`"test"`),
ErrorType: ErrTimeout,
Error: "timed out",
},
err: &Error{
Type: ErrTimeout,
Msg: "timed out",
},
code: statusAPIError,
expected: `test`,
},
{
response: "bad json",
err: &Error{
Type: ErrBadResponse,
Msg: "bad response code 400",
},
code: http.StatusBadRequest,
},
{
response: "bad json",
err: &Error{
Type: ErrBadResponse,
Msg: "invalid character 'b' looking for beginning of value",
},
code: statusAPIError,
},
{
response: &apiResponse{
Status: "success",
Data: json.RawMessage(`"test"`),
},
err: &Error{
Type: ErrBadResponse,
Msg: "inconsistent body for response code",
},
code: statusAPIError,
},
{
response: &apiResponse{
Status: "success",
Data: json.RawMessage(`"test"`),
ErrorType: ErrTimeout,
Error: "timed out",
},
err: &Error{
Type: ErrBadResponse,
Msg: "inconsistent body for response code",
},
code: statusAPIError,
},
{
response: &apiResponse{
Status: "error",
Data: json.RawMessage(`"test"`),
ErrorType: ErrTimeout,
Error: "timed out",
},
err: &Error{
Type: ErrBadResponse,
Msg: "inconsistent body for response code",
},
code: http.StatusOK,
},
}
tc := &testClient{
T: t,
ch: make(chan apiClientTest, 1),
req: &http.Request{},
}
client := &apiClient{tc}
for _, test := range tests {
tc.ch <- test
_, body, err := client.do(context.Background(), tc.req)
if test.err != nil {
if err == nil {
t.Errorf("expected error %q but got none", test.err)
continue
}
if test.err.Error() != err.Error() {
t.Errorf("unexpected error: want %q, got %q", test.err, err)
}
continue
}
if err != nil {
t.Errorf("unexpeceted error %s", err)
continue
}
want, got := test.expected, string(body)
if want != got {
t.Errorf("unexpected body: want %q, got %q", want, got)
}
}
}
type apiTestClient struct {
*testing.T
curTest apiTest
}
type apiTest struct {
do func() (interface{}, error)
inErr error
inRes interface{}
reqPath string
reqParam url.Values
reqMethod string
res interface{}
err error
}
func (c *apiTestClient) url(ep string, args map[string]string) *url.URL {
u := &url.URL{
Host: "test:9090",
Path: apiPrefix + ep,
}
return u
}
func (c *apiTestClient) do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
test := c.curTest
if req.URL.Path != test.reqPath {
c.Errorf("unexpected request path: want %s, got %s", test.reqPath, req.URL.Path)
}
if req.Method != test.reqMethod {
c.Errorf("unexpected request method: want %s, got %s", test.reqMethod, req.Method)
}
b, err := json.Marshal(test.inRes)
if err != nil {
c.Fatal(err)
}
resp := &http.Response{}
if test.inErr != nil {
resp.StatusCode = statusAPIError
} else {
resp.StatusCode = http.StatusOK
}
return resp, b, test.inErr
}
func TestAPIs(t *testing.T) {
testTime := time.Now()
client := &apiTestClient{T: t}
queryApi := &httpQueryAPI{
client: client,
}
doQuery := func(q string, ts time.Time) func() (interface{}, error) {
return func() (interface{}, error) {
return queryApi.Query(context.Background(), q, ts)
}
}
doQueryRange := func(q string, rng Range) func() (interface{}, error) {
return func() (interface{}, error) {
return queryApi.QueryRange(context.Background(), q, rng)
}
}
queryTests := []apiTest{
{
do: doQuery("2", testTime),
inRes: &queryResult{
Type: model.ValScalar,
Result: &model.Scalar{
Value: 2,
Timestamp: model.TimeFromUnix(testTime.Unix()),
},
},
reqMethod: "GET",
reqPath: "/api/v1/query",
reqParam: url.Values{
"query": []string{"2"},
"time": []string{testTime.Format(time.RFC3339Nano)},
},
res: &model.Scalar{
Value: 2,
Timestamp: model.TimeFromUnix(testTime.Unix()),
},
},
{
do: doQuery("2", testTime),
inErr: fmt.Errorf("some error"),
reqMethod: "GET",
reqPath: "/api/v1/query",
reqParam: url.Values{
"query": []string{"2"},
"time": []string{testTime.Format(time.RFC3339Nano)},
},
err: fmt.Errorf("some error"),
},
{
do: doQueryRange("2", Range{
Start: testTime.Add(-time.Minute),
End: testTime,
Step: time.Minute,
}),
inErr: fmt.Errorf("some error"),
reqMethod: "GET",
reqPath: "/api/v1/query_range",
reqParam: url.Values{
"query": []string{"2"},
"start": []string{testTime.Add(-time.Minute).Format(time.RFC3339Nano)},
"end": []string{testTime.Format(time.RFC3339Nano)},
"step": []string{time.Minute.String()},
},
err: fmt.Errorf("some error"),
},
}
var tests []apiTest
tests = append(tests, queryTests...)
for _, test := range tests {
client.curTest = test
res, err := test.do()
if test.err != nil {
if err == nil {
t.Errorf("expected error %q but got none", test.err)
continue
}
if err.Error() != test.err.Error() {
t.Errorf("unexpected error: want %s, got %s", test.err, err)
}
continue
}
if err != nil {
t.Errorf("unexpected error: %s", err)
continue
}
if !reflect.DeepEqual(res, test.res) {
t.Errorf("unexpected result: want %v, got %v", test.res, res)
}
}
}

View file

@ -1,103 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A simple example exposing fictional RPC latencies with different types of
// random distributions (uniform, normal, and exponential) as Prometheus
// metrics.
package main
import (
"flag"
"math"
"math/rand"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")
uniformDomain = flag.Float64("uniform.domain", 200, "The domain for the uniform distribution.")
normDomain = flag.Float64("normal.domain", 200, "The domain for the normal distribution.")
normMean = flag.Float64("normal.mean", 10, "The mean for the normal distribution.")
oscillationPeriod = flag.Duration("oscillation-period", 10*time.Minute, "The duration of the rate oscillation period.")
)
var (
// Create a summary to track fictional interservice RPC latencies for three
// distinct services with different latency distributions. These services are
// differentiated via a "service" label.
rpcDurations = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "rpc_durations_microseconds",
Help: "RPC latency distributions.",
},
[]string{"service"},
)
// The same as above, but now as a histogram, and only for the normal
// distribution. The buckets are targeted to the parameters of the
// normal distribution, with 20 buckets centered on the mean, each
// half-sigma wide.
rpcDurationsHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "rpc_durations_histogram_microseconds",
Help: "RPC latency distributions.",
Buckets: prometheus.LinearBuckets(*normMean-5**normDomain, .5**normDomain, 20),
})
)
func init() {
// Register the summary and the histogram with Prometheus's default registry.
prometheus.MustRegister(rpcDurations)
prometheus.MustRegister(rpcDurationsHistogram)
}
func main() {
flag.Parse()
start := time.Now()
oscillationFactor := func() float64 {
return 2 + math.Sin(math.Sin(2*math.Pi*float64(time.Since(start))/float64(*oscillationPeriod)))
}
// Periodically record some sample latencies for the three services.
go func() {
for {
v := rand.Float64() * *uniformDomain
rpcDurations.WithLabelValues("uniform").Observe(v)
time.Sleep(time.Duration(100*oscillationFactor()) * time.Millisecond)
}
}()
go func() {
for {
v := (rand.NormFloat64() * *normDomain) + *normMean
rpcDurations.WithLabelValues("normal").Observe(v)
rpcDurationsHistogram.Observe(v)
time.Sleep(time.Duration(75*oscillationFactor()) * time.Millisecond)
}
}()
go func() {
for {
v := rand.ExpFloat64()
rpcDurations.WithLabelValues("exponential").Observe(v)
time.Sleep(time.Duration(50*oscillationFactor()) * time.Millisecond)
}
}()
// Expose the registered metrics via HTTP.
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*addr, nil)
}

View file

@ -1,30 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A minimal example of how to include Prometheus instrumentation.
package main
import (
"flag"
"net/http"
"github.com/prometheus/client_golang/prometheus"
)
var addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.")
func main() {
flag.Parse()
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*addr, nil)
}

View file

@ -1,56 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package push_test
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
func ExampleCollectors() {
completionTime := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "db_backup_last_completion_timestamp_seconds",
Help: "The timestamp of the last succesful completion of a DB backup.",
})
completionTime.Set(float64(time.Now().Unix()))
if err := push.Collectors(
"db_backup", push.HostnameGroupingKey(),
"http://pushgateway:9091",
completionTime,
); err != nil {
fmt.Println("Could not push completion time to Pushgateway:", err)
}
}
func ExampleRegistry() {
registry := prometheus.NewRegistry()
completionTime := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "db_backup_last_completion_timestamp_seconds",
Help: "The timestamp of the last succesful completion of a DB backup.",
})
registry.MustRegister(completionTime)
completionTime.Set(float64(time.Now().Unix()))
if err := push.FromGatherer(
"db_backup", push.HostnameGroupingKey(),
"http://pushgateway:9091",
registry,
); err != nil {
fmt.Println("Could not push completion time to Pushgateway:", err)
}
}

View file

@ -1,172 +0,0 @@
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright (c) 2013, The Prometheus Authors
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
// Package push provides functions to push metrics to a Pushgateway. The metrics
// to push are either collected from a provided registry, or from explicitly
// listed collectors.
//
// See the documentation of the Pushgateway to understand the meaning of the
// grouping parameters and the differences between push.Registry and
// push.Collectors on the one hand and push.AddRegistry and push.AddCollectors
// on the other hand: https://github.com/prometheus/pushgateway
package push
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/client_golang/prometheus"
)
const contentTypeHeader = "Content-Type"
// FromGatherer triggers a metric collection by the provided Gatherer (which is
// usually implemented by a prometheus.Registry) and pushes all gathered metrics
// to the Pushgateway specified by url, using the provided job name and the
// (optional) further grouping labels (the grouping map may be nil). See the
// Pushgateway documentation for detailed implications of the job and other
// grouping labels. Neither the job name nor any grouping label value may
// contain a "/". The metrics pushed must not contain a job label of their own
// nor any of the grouping labels.
//
// You can use just host:port or ip:port as url, in which case 'http://' is
// added automatically. You can also include the schema in the URL. However, do
// not include the '/metrics/jobs/...' part.
//
// Note that all previously pushed metrics with the same job and other grouping
// labels will be replaced with the metrics pushed by this call. (It uses HTTP
// method 'PUT' to push to the Pushgateway.)
func FromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
return push(job, grouping, url, g, "PUT")
}
// AddFromGatherer works like FromGatherer, but only previously pushed metrics
// with the same name (and the same job and other grouping labels) will be
// replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
func AddFromGatherer(job string, grouping map[string]string, url string, g prometheus.Gatherer) error {
return push(job, grouping, url, g, "POST")
}
func push(job string, grouping map[string]string, pushURL string, g prometheus.Gatherer, method string) error {
if !strings.Contains(pushURL, "://") {
pushURL = "http://" + pushURL
}
if strings.HasSuffix(pushURL, "/") {
pushURL = pushURL[:len(pushURL)-1]
}
if strings.Contains(job, "/") {
return fmt.Errorf("job contains '/': %s", job)
}
urlComponents := []string{url.QueryEscape(job)}
for ln, lv := range grouping {
if !model.LabelNameRE.MatchString(ln) {
return fmt.Errorf("grouping label has invalid name: %s", ln)
}
if strings.Contains(lv, "/") {
return fmt.Errorf("value of grouping label %s contains '/': %s", ln, lv)
}
urlComponents = append(urlComponents, ln, lv)
}
pushURL = fmt.Sprintf("%s/metrics/job/%s", pushURL, strings.Join(urlComponents, "/"))
mfs, err := g.Gather()
if err != nil {
return err
}
buf := &bytes.Buffer{}
enc := expfmt.NewEncoder(buf, expfmt.FmtProtoDelim)
// Check for pre-existing grouping labels:
for _, mf := range mfs {
for _, m := range mf.GetMetric() {
for _, l := range m.GetLabel() {
if l.GetName() == "job" {
return fmt.Errorf("pushed metric %s (%s) already contains a job label", mf.GetName(), m)
}
if _, ok := grouping[l.GetName()]; ok {
return fmt.Errorf(
"pushed metric %s (%s) already contains grouping label %s",
mf.GetName(), m, l.GetName(),
)
}
}
}
enc.Encode(mf)
}
req, err := http.NewRequest(method, pushURL, buf)
if err != nil {
return err
}
req.Header.Set(contentTypeHeader, string(expfmt.FmtProtoDelim))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 202 {
body, _ := ioutil.ReadAll(resp.Body) // Ignore any further error as this is for an error message only.
return fmt.Errorf("unexpected status code %d while pushing to %s: %s", resp.StatusCode, pushURL, body)
}
return nil
}
// Collectors works like FromGatherer, but it does not use a Gatherer. Instead,
// it collects from the provided collectors directly. It is a convenient way to
// push only a few metrics.
func Collectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
return pushCollectors(job, grouping, url, "PUT", collectors...)
}
// AddCollectors works like AddFromGatherer, but it does not use a Gatherer.
// Instead, it collects from the provided collectors directly. It is a
// convenient way to push only a few metrics.
func AddCollectors(job string, grouping map[string]string, url string, collectors ...prometheus.Collector) error {
return pushCollectors(job, grouping, url, "POST", collectors...)
}
func pushCollectors(job string, grouping map[string]string, url, method string, collectors ...prometheus.Collector) error {
r := prometheus.NewRegistry()
for _, collector := range collectors {
if err := r.Register(collector); err != nil {
return err
}
}
return push(job, grouping, url, r, method)
}
// HostnameGroupingKey returns a label map with the only entry
// {instance="<hostname>"}. This can be conveniently used as the grouping
// parameter if metrics should be pushed with the hostname as label. The
// returned map is created upon each call so that the caller is free to add more
// labels to the map.
func HostnameGroupingKey() map[string]string {
hostname, err := os.Hostname()
if err != nil {
return map[string]string{"instance": "unknown"}
}
return map[string]string{"instance": hostname}
}

View file

@ -1,176 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright (c) 2013, The Prometheus Authors
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be found
// in the LICENSE file.
package push
import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/client_golang/prometheus"
)
func TestPush(t *testing.T) {
var (
lastMethod string
lastBody []byte
lastPath string
)
host, err := os.Hostname()
if err != nil {
t.Error(err)
}
// Fake a Pushgateway that always responds with 202.
pgwOK := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lastMethod = r.Method
var err error
lastBody, err = ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
lastPath = r.URL.EscapedPath()
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusAccepted)
}),
)
defer pgwOK.Close()
// Fake a Pushgateway that always responds with 500.
pgwErr := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "fake error", http.StatusInternalServerError)
}),
)
defer pgwErr.Close()
metric1 := prometheus.NewCounter(prometheus.CounterOpts{
Name: "testname1",
Help: "testhelp1",
})
metric2 := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "testname2",
Help: "testhelp2",
ConstLabels: prometheus.Labels{"foo": "bar", "dings": "bums"},
})
reg := prometheus.NewRegistry()
reg.MustRegister(metric1)
reg.MustRegister(metric2)
mfs, err := reg.Gather()
if err != nil {
t.Fatal(err)
}
buf := &bytes.Buffer{}
enc := expfmt.NewEncoder(buf, expfmt.FmtProtoDelim)
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
t.Fatal(err)
}
}
wantBody := buf.Bytes()
// PushCollectors, all good.
if err := Collectors("testjob", HostnameGroupingKey(), pgwOK.URL, metric1, metric2); err != nil {
t.Fatal(err)
}
if lastMethod != "PUT" {
t.Error("want method PUT for PushCollectors, got", lastMethod)
}
if bytes.Compare(lastBody, wantBody) != 0 {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/instance/"+host {
t.Error("unexpected path:", lastPath)
}
// PushAddCollectors, with nil grouping, all good.
if err := AddCollectors("testjob", nil, pgwOK.URL, metric1, metric2); err != nil {
t.Fatal(err)
}
if lastMethod != "POST" {
t.Error("want method POST for PushAddCollectors, got", lastMethod)
}
if bytes.Compare(lastBody, wantBody) != 0 {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob" {
t.Error("unexpected path:", lastPath)
}
// PushCollectors with a broken PGW.
if err := Collectors("testjob", nil, pgwErr.URL, metric1, metric2); err == nil {
t.Error("push to broken Pushgateway succeeded")
} else {
if got, want := err.Error(), "unexpected status code 500 while pushing to "+pgwErr.URL+"/metrics/job/testjob: fake error\n"; got != want {
t.Errorf("got error %q, want %q", got, want)
}
}
// PushCollectors with invalid grouping or job.
if err := Collectors("testjob", map[string]string{"foo": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
t.Error("push with grouping contained in metrics succeeded")
}
if err := Collectors("test/job", nil, pgwErr.URL, metric1, metric2); err == nil {
t.Error("push with invalid job value succeeded")
}
if err := Collectors("testjob", map[string]string{"foo/bar": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
t.Error("push with invalid grouping succeeded")
}
if err := Collectors("testjob", map[string]string{"foo-bar": "bums"}, pgwErr.URL, metric1, metric2); err == nil {
t.Error("push with invalid grouping succeeded")
}
// Push registry, all good.
if err := FromGatherer("testjob", HostnameGroupingKey(), pgwOK.URL, reg); err != nil {
t.Fatal(err)
}
if lastMethod != "PUT" {
t.Error("want method PUT for Push, got", lastMethod)
}
if bytes.Compare(lastBody, wantBody) != 0 {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
// PushAdd registry, all good.
if err := AddFromGatherer("testjob", map[string]string{"a": "x", "b": "y"}, pgwOK.URL, reg); err != nil {
t.Fatal(err)
}
if lastMethod != "POST" {
t.Error("want method POSTT for PushAdd, got", lastMethod)
}
if bytes.Compare(lastBody, wantBody) != 0 {
t.Errorf("got body %v, want %v", lastBody, wantBody)
}
if lastPath != "/metrics/job/testjob/a/x/b/y" && lastPath != "/metrics/job/testjob/b/y/a/x" {
t.Error("unexpected path:", lastPath)
}
}