Create UDP collector that listens to UDP messages from monitor.lua and exposes them on /metrics endpoint
This commit is contained in:
parent
764bcd5a1b
commit
2cd2da7c3f
13 changed files with 575 additions and 746 deletions
51
internal/ingress/metric/collector/listener.go
Normal file
51
internal/ingress/metric/collector/listener.go
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
const packetSize = 1024 * 65
|
||||
|
||||
// newUDPListener creates a new UDP listener used to process messages
|
||||
// from the NGINX log phase containing information about a request
|
||||
func newUDPListener(port int) (*net.UDPConn, error) {
|
||||
service := fmt.Sprintf("127.0.0.1:%v", port)
|
||||
|
||||
udpAddr, err := net.ResolveUDPAddr("udp4", service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return net.ListenUDP("udp", udpAddr)
|
||||
}
|
||||
|
||||
// handleMessages process packets received in an UDP connection
|
||||
func handleMessages(conn *net.UDPConn, fn func([]byte)) {
|
||||
msg := make([]byte, packetSize)
|
||||
|
||||
for {
|
||||
s, _, err := conn.ReadFrom(msg[0:])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fn(msg[0:s])
|
||||
}
|
||||
}
|
||||
70
internal/ingress/metric/collector/listener_test.go
Normal file
70
internal/ingress/metric/collector/listener_test.go
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewUDPLogListener(t *testing.T) {
|
||||
port := freeUDPPort()
|
||||
|
||||
var count uint64
|
||||
|
||||
fn := func(message []byte) {
|
||||
t.Logf("message: %v", string(message))
|
||||
atomic.AddUint64(&count, 1)
|
||||
}
|
||||
|
||||
t.Logf("UDP Port: %v", port)
|
||||
|
||||
l, err := newUDPListener(port)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error creating UDP listener: %v", err)
|
||||
}
|
||||
if l == nil {
|
||||
t.Errorf("expected a listener but none returned")
|
||||
}
|
||||
|
||||
go handleMessages(l, fn)
|
||||
|
||||
conn, _ := net.Dial("udp", fmt.Sprintf(":%v", port))
|
||||
conn.Write([]byte("message"))
|
||||
conn.Close()
|
||||
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
if count != 1 {
|
||||
t.Errorf("expected only one message from the UDP listern but %v returned", count)
|
||||
}
|
||||
}
|
||||
|
||||
func freeUDPPort() int {
|
||||
l, err := net.ListenUDP("udp", &net.UDPAddr{})
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
if err := l.Close(); err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return l.LocalAddr().(*net.UDPAddr).Port
|
||||
}
|
||||
223
internal/ingress/metric/collector/nginx_status_collector.go
Normal file
223
internal/ingress/metric/collector/nginx_status_collector.go
Normal file
|
|
@ -0,0 +1,223 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
ac = regexp.MustCompile(`Active connections: (\d+)`)
|
||||
sahr = regexp.MustCompile(`(\d+)\s(\d+)\s(\d+)`)
|
||||
reading = regexp.MustCompile(`Reading: (\d+)`)
|
||||
writing = regexp.MustCompile(`Writing: (\d+)`)
|
||||
waiting = regexp.MustCompile(`Waiting: (\d+)`)
|
||||
)
|
||||
|
||||
type (
|
||||
nginxStatusCollector struct {
|
||||
scrapeChan chan scrapeRequest
|
||||
ngxHealthPort int
|
||||
ngxStatusPath string
|
||||
data *nginxStatusData
|
||||
watchNamespace string
|
||||
ingressClass string
|
||||
}
|
||||
|
||||
nginxStatusData struct {
|
||||
connectionsTotal *prometheus.Desc
|
||||
requestsTotal *prometheus.Desc
|
||||
connections *prometheus.Desc
|
||||
}
|
||||
|
||||
basicStatus struct {
|
||||
// Active total number of active connections
|
||||
Active int
|
||||
// Accepted total number of accepted client connections
|
||||
Accepted int
|
||||
// Handled total number of handled connections. Generally, the parameter value is the same as accepts unless some resource limits have been reached (for example, the worker_connections limit).
|
||||
Handled int
|
||||
// Requests total number of client requests.
|
||||
Requests int
|
||||
// Reading current number of connections where nginx is reading the request header.
|
||||
Reading int
|
||||
// Writing current number of connections where nginx is writing the response back to the client.
|
||||
Writing int
|
||||
// Waiting current number of idle client connections waiting for a request.
|
||||
Waiting int
|
||||
}
|
||||
)
|
||||
|
||||
// InitNGINXStatusCollector returns a new prometheus collector the default nginx status module
|
||||
func InitNGINXStatusCollector(watchNamespace, ingressClass string, ngxHealthPort int) error {
|
||||
const ns string = "nginx"
|
||||
const ngxStatusPath = "/nginx_status"
|
||||
p := nginxStatusCollector{
|
||||
scrapeChan: make(chan scrapeRequest),
|
||||
ngxHealthPort: ngxHealthPort,
|
||||
ngxStatusPath: ngxStatusPath,
|
||||
watchNamespace: watchNamespace,
|
||||
ingressClass: ingressClass,
|
||||
}
|
||||
|
||||
p.data = &nginxStatusData{
|
||||
connectionsTotal: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(ns, "", "connections_total"),
|
||||
"total number of connections with state {active, accepted, handled}",
|
||||
[]string{"ingress_class", "namespace", "state"}, nil),
|
||||
|
||||
requestsTotal: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(ns, "", "requests_total"),
|
||||
"total number of client requests",
|
||||
[]string{"ingress_class", "namespace"}, nil),
|
||||
|
||||
connections: prometheus.NewDesc(
|
||||
prometheus.BuildFQName(ns, "", "connections"),
|
||||
"current number of client connections with state {reading, writing, waiting}",
|
||||
[]string{"ingress_class", "namespace", "state"}, nil),
|
||||
}
|
||||
|
||||
err := prometheus.Register(p)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while registering nginx status collector : %v", err)
|
||||
}
|
||||
|
||||
go p.Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (p nginxStatusCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- p.data.connectionsTotal
|
||||
ch <- p.data.requestsTotal
|
||||
ch <- p.data.connections
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (p nginxStatusCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
req := scrapeRequest{results: ch, done: make(chan struct{})}
|
||||
p.scrapeChan <- req
|
||||
<-req.done
|
||||
}
|
||||
|
||||
func (p nginxStatusCollector) Run() {
|
||||
for req := range p.scrapeChan {
|
||||
ch := req.results
|
||||
p.scrape(ch)
|
||||
req.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (p nginxStatusCollector) Stop() {
|
||||
close(p.scrapeChan)
|
||||
}
|
||||
|
||||
func httpBody(url string) ([]byte, error) {
|
||||
resp, err := http.DefaultClient.Get(url)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error scraping nginx : %v", err)
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error scraping nginx (%v)", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
|
||||
return nil, fmt.Errorf("unexpected error scraping nginx (status %v)", resp.StatusCode)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func toInt(data []string, pos int) int {
|
||||
if len(data) == 0 {
|
||||
return 0
|
||||
}
|
||||
if pos > len(data) {
|
||||
return 0
|
||||
}
|
||||
if v, err := strconv.Atoi(data[pos]); err == nil {
|
||||
return v
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func parse(data string) *basicStatus {
|
||||
acr := ac.FindStringSubmatch(data)
|
||||
sahrr := sahr.FindStringSubmatch(data)
|
||||
readingr := reading.FindStringSubmatch(data)
|
||||
writingr := writing.FindStringSubmatch(data)
|
||||
waitingr := waiting.FindStringSubmatch(data)
|
||||
|
||||
return &basicStatus{
|
||||
toInt(acr, 1),
|
||||
toInt(sahrr, 1),
|
||||
toInt(sahrr, 2),
|
||||
toInt(sahrr, 3),
|
||||
toInt(readingr, 1),
|
||||
toInt(writingr, 1),
|
||||
toInt(waitingr, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func getNginxStatus(port int, path string) (*basicStatus, error) {
|
||||
url := fmt.Sprintf("http://0.0.0.0:%v%v", port, path)
|
||||
glog.V(3).Infof("start scraping url: %v", url)
|
||||
|
||||
data, err := httpBody(url)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error scraping nginx status page: %v", err)
|
||||
}
|
||||
|
||||
return parse(string(data)), nil
|
||||
}
|
||||
|
||||
// nginxStatusCollector scrape the nginx status
|
||||
func (p nginxStatusCollector) scrape(ch chan<- prometheus.Metric) {
|
||||
s, err := getNginxStatus(p.ngxHealthPort, p.ngxStatusPath)
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining nginx status info: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
|
||||
prometheus.CounterValue, float64(s.Active), p.ingressClass, p.watchNamespace, "active")
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
|
||||
prometheus.CounterValue, float64(s.Accepted), p.ingressClass, p.watchNamespace, "accepted")
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connectionsTotal,
|
||||
prometheus.CounterValue, float64(s.Handled), p.ingressClass, p.watchNamespace, "handled")
|
||||
ch <- prometheus.MustNewConstMetric(p.data.requestsTotal,
|
||||
prometheus.CounterValue, float64(s.Requests), p.ingressClass, p.watchNamespace)
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connections,
|
||||
prometheus.GaugeValue, float64(s.Reading), p.ingressClass, p.watchNamespace, "reading")
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connections,
|
||||
prometheus.GaugeValue, float64(s.Writing), p.ingressClass, p.watchNamespace, "writing")
|
||||
ch <- prometheus.MustNewConstMetric(p.data.connections,
|
||||
prometheus.GaugeValue, float64(s.Waiting), p.ingressClass, p.watchNamespace, "waiting")
|
||||
}
|
||||
185
internal/ingress/metric/collector/process_collector.go
Normal file
185
internal/ingress/metric/collector/process_collector.go
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collector
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
common "github.com/ncabatoff/process-exporter"
|
||||
"github.com/ncabatoff/process-exporter/proc"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type scrapeRequest struct {
|
||||
results chan<- prometheus.Metric
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Stopable defines a prometheus collector that can be stopped
|
||||
type Stopable interface {
|
||||
prometheus.Collector
|
||||
Stop()
|
||||
}
|
||||
|
||||
// BinaryNameMatcher ...
|
||||
type BinaryNameMatcher struct {
|
||||
Name string
|
||||
Binary string
|
||||
}
|
||||
|
||||
// MatchAndName returns false if the match failed, otherwise
|
||||
// true and the resulting name.
|
||||
func (em BinaryNameMatcher) MatchAndName(nacl common.NameAndCmdline) (bool, string) {
|
||||
if len(nacl.Cmdline) == 0 {
|
||||
return false, ""
|
||||
}
|
||||
cmd := filepath.Base(em.Binary)
|
||||
return em.Name == cmd, ""
|
||||
}
|
||||
|
||||
type namedProcessData struct {
|
||||
numProcs *prometheus.Desc
|
||||
cpuSecs *prometheus.Desc
|
||||
readBytes *prometheus.Desc
|
||||
writeBytes *prometheus.Desc
|
||||
memResidentbytes *prometheus.Desc
|
||||
memVirtualbytes *prometheus.Desc
|
||||
startTime *prometheus.Desc
|
||||
}
|
||||
|
||||
type namedProcess struct {
|
||||
*proc.Grouper
|
||||
|
||||
scrapeChan chan scrapeRequest
|
||||
fs *proc.FS
|
||||
data namedProcessData
|
||||
}
|
||||
|
||||
// newNamedProcess returns a new prometheus collector for the nginx process
|
||||
func newNamedProcess(children bool, mn common.MatchNamer) (prometheus.Collector, error) {
|
||||
fs, err := proc.NewFS("/proc")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := namedProcess{
|
||||
scrapeChan: make(chan scrapeRequest),
|
||||
Grouper: proc.NewGrouper(children, mn),
|
||||
fs: fs,
|
||||
}
|
||||
_, err = p.Update(p.fs.AllProcs())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.data = namedProcessData{
|
||||
numProcs: prometheus.NewDesc(
|
||||
"num_procs",
|
||||
"number of processes",
|
||||
nil, nil),
|
||||
|
||||
cpuSecs: prometheus.NewDesc(
|
||||
"cpu_seconds_total",
|
||||
"Cpu usage in seconds",
|
||||
nil, nil),
|
||||
|
||||
readBytes: prometheus.NewDesc(
|
||||
"read_bytes_total",
|
||||
"number of bytes read",
|
||||
nil, nil),
|
||||
|
||||
writeBytes: prometheus.NewDesc(
|
||||
"write_bytes_total",
|
||||
"number of bytes written",
|
||||
nil, nil),
|
||||
|
||||
memResidentbytes: prometheus.NewDesc(
|
||||
"resident_memory_bytes",
|
||||
"number of bytes of memory in use",
|
||||
nil, nil),
|
||||
|
||||
memVirtualbytes: prometheus.NewDesc(
|
||||
"virtual_memory_bytes",
|
||||
"number of bytes of memory in use",
|
||||
nil, nil),
|
||||
|
||||
startTime: prometheus.NewDesc(
|
||||
"oldest_start_time_seconds",
|
||||
"start time in seconds since 1970/01/01",
|
||||
nil, nil),
|
||||
}
|
||||
|
||||
go p.start()
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (p namedProcess) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- p.data.cpuSecs
|
||||
ch <- p.data.numProcs
|
||||
ch <- p.data.readBytes
|
||||
ch <- p.data.writeBytes
|
||||
ch <- p.data.memResidentbytes
|
||||
ch <- p.data.memVirtualbytes
|
||||
ch <- p.data.startTime
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (p namedProcess) Collect(ch chan<- prometheus.Metric) {
|
||||
req := scrapeRequest{results: ch, done: make(chan struct{})}
|
||||
p.scrapeChan <- req
|
||||
<-req.done
|
||||
}
|
||||
|
||||
func (p namedProcess) start() {
|
||||
for req := range p.scrapeChan {
|
||||
ch := req.results
|
||||
p.scrape(ch)
|
||||
req.done <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (p namedProcess) Stop() {
|
||||
close(p.scrapeChan)
|
||||
}
|
||||
|
||||
func (p namedProcess) scrape(ch chan<- prometheus.Metric) {
|
||||
_, err := p.Update(p.fs.AllProcs())
|
||||
if err != nil {
|
||||
glog.Warningf("unexpected error obtaining nginx process info: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, gcounts := range p.Groups() {
|
||||
ch <- prometheus.MustNewConstMetric(p.data.numProcs,
|
||||
prometheus.GaugeValue, float64(gcounts.Procs))
|
||||
ch <- prometheus.MustNewConstMetric(p.data.memResidentbytes,
|
||||
prometheus.GaugeValue, float64(gcounts.Memresident))
|
||||
ch <- prometheus.MustNewConstMetric(p.data.memVirtualbytes,
|
||||
prometheus.GaugeValue, float64(gcounts.Memvirtual))
|
||||
ch <- prometheus.MustNewConstMetric(p.data.startTime,
|
||||
prometheus.GaugeValue, float64(gcounts.OldestStartTime.Unix()))
|
||||
ch <- prometheus.MustNewConstMetric(p.data.cpuSecs,
|
||||
prometheus.CounterValue, gcounts.Cpu)
|
||||
ch <- prometheus.MustNewConstMetric(p.data.readBytes,
|
||||
prometheus.CounterValue, float64(gcounts.ReadBytes))
|
||||
ch <- prometheus.MustNewConstMetric(p.data.writeBytes,
|
||||
prometheus.CounterValue, float64(gcounts.WriteBytes))
|
||||
}
|
||||
}
|
||||
277
internal/ingress/metric/collector/udp_collector.go
Normal file
277
internal/ingress/metric/collector/udp_collector.go
Normal file
|
|
@ -0,0 +1,277 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package collector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type udpData struct {
|
||||
Host string `json:"host"` // Label
|
||||
Status string `json:"status"` // Label
|
||||
|
||||
RealIPAddress string `json:"realIpAddr"` // Label
|
||||
RemoteAddress string `json:"remoteAddr"` // Label
|
||||
RemoteUser string `json:"remoteUser"` // Label
|
||||
|
||||
BytesSent float64 `json:"bytesSent"` // Metric
|
||||
|
||||
Protocol string `json:"protocol"` // Label
|
||||
Method string `json:"method"` // Label
|
||||
URI string `json:"uri"` // Label
|
||||
|
||||
RequestLength float64 `json:"requestLength"` // Metric
|
||||
RequestTime float64 `json:"requestTime"` // Metric
|
||||
|
||||
UpstreamName string `json:"upstreamName"` // Label
|
||||
UpstreamIP string `json:"upstreamIP"` // Label
|
||||
UpstreamResponseTime float64 `json:"upstreamResponseTime"` // Metric
|
||||
UpstreamStatus string `json:"upstreamStatus"` // Label
|
||||
|
||||
Namespace string `json:"namespace"` // Label
|
||||
Ingress string `json:"ingress"` // Label
|
||||
Service string `json:"service"` // Label
|
||||
}
|
||||
|
||||
// UDPCollector stores prometheus metrics and ingress meta-data
|
||||
type UDPCollector struct {
|
||||
upstreamResponseTime *prometheus.HistogramVec
|
||||
requestTime *prometheus.HistogramVec
|
||||
requestLength *prometheus.HistogramVec
|
||||
bytesSent *prometheus.HistogramVec
|
||||
collectorSuccess *prometheus.GaugeVec
|
||||
collectorSuccessTime *prometheus.GaugeVec
|
||||
requests *prometheus.CounterVec
|
||||
listener *net.UDPConn
|
||||
ns string
|
||||
ingressClass string
|
||||
port int
|
||||
}
|
||||
|
||||
// InitUDPCollector creates a new UDPCollector instance
|
||||
func InitUDPCollector(ns string, class string, port int) error {
|
||||
sc := UDPCollector{}
|
||||
|
||||
ns = strings.Replace(ns, "-", "_", -1)
|
||||
|
||||
listener, err := newUDPListener(port)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc.listener = listener
|
||||
sc.ns = ns
|
||||
sc.ingressClass = class
|
||||
sc.port = port
|
||||
|
||||
requestTags := []string{"host", "status", "remote_address", "real_ip_address", "remote_user", "protocol", "method", "uri", "upstream_name", "upstream_ip", "upstream_status", "namespace", "ingress", "service"}
|
||||
collectorTags := []string{"namespace", "ingress_class"}
|
||||
|
||||
sc.upstreamResponseTime = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "upstream_response_time_seconds",
|
||||
Help: "The time spent on receiving the response from the upstream server",
|
||||
Namespace: ns,
|
||||
},
|
||||
requestTags,
|
||||
)
|
||||
|
||||
sc.requestTime = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "request_duration_seconds",
|
||||
Help: "The request processing time in seconds",
|
||||
Namespace: ns,
|
||||
},
|
||||
requestTags,
|
||||
)
|
||||
|
||||
sc.requestLength = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "request_length_bytes",
|
||||
Help: "The request length (including request line, header, and request body)",
|
||||
Namespace: ns,
|
||||
Buckets: prometheus.LinearBuckets(10, 10, 10), // 10 buckets, each 10 bytes wide.
|
||||
},
|
||||
requestTags,
|
||||
)
|
||||
|
||||
sc.requests = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "requests",
|
||||
Help: "The total number of client requests.",
|
||||
Namespace: ns,
|
||||
},
|
||||
collectorTags,
|
||||
)
|
||||
|
||||
sc.bytesSent = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "bytes_sent",
|
||||
Help: "The the number of bytes sent to a client",
|
||||
Namespace: ns,
|
||||
Buckets: prometheus.ExponentialBuckets(10, 10, 7), // 7 buckets, exponential factor of 10.
|
||||
},
|
||||
requestTags,
|
||||
)
|
||||
|
||||
sc.collectorSuccess = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "collector_last_run_successful",
|
||||
Help: "Whether the last collector run was successful (success = 1, failure = 0).",
|
||||
Namespace: ns,
|
||||
},
|
||||
collectorTags,
|
||||
)
|
||||
|
||||
sc.collectorSuccessTime = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "collector_last_run_successful_timestamp_seconds",
|
||||
Help: "Timestamp of the last successful collector run",
|
||||
Namespace: ns,
|
||||
},
|
||||
collectorTags,
|
||||
)
|
||||
|
||||
prometheus.MustRegister(sc.upstreamResponseTime)
|
||||
prometheus.MustRegister(sc.requestTime)
|
||||
prometheus.MustRegister(sc.requestLength)
|
||||
prometheus.MustRegister(sc.requests)
|
||||
prometheus.MustRegister(sc.bytesSent)
|
||||
prometheus.MustRegister(sc.collectorSuccess)
|
||||
prometheus.MustRegister(sc.collectorSuccessTime)
|
||||
|
||||
go sc.Run()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *UDPCollector) handleMessage(msg []byte) {
|
||||
glog.V(5).Infof("msg: %v", string(msg))
|
||||
|
||||
collectorSuccess := true
|
||||
|
||||
// Unmarshall bytes
|
||||
var stats udpData
|
||||
err := json.Unmarshal(msg, &stats)
|
||||
if err != nil {
|
||||
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
|
||||
collectorSuccess = false
|
||||
return
|
||||
}
|
||||
|
||||
// Create Request Labels Map
|
||||
requestLabels := prometheus.Labels{
|
||||
"host": stats.Host,
|
||||
"status": stats.Status,
|
||||
"remote_address": stats.RemoteAddress,
|
||||
"real_ip_address": stats.RealIPAddress,
|
||||
"remote_user": stats.RemoteUser,
|
||||
"protocol": stats.Protocol,
|
||||
"method": stats.Method,
|
||||
"uri": stats.URI,
|
||||
"upstream_name": stats.UpstreamName,
|
||||
"upstream_ip": stats.UpstreamIP,
|
||||
"upstream_status": stats.UpstreamStatus,
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"service": stats.Service,
|
||||
}
|
||||
|
||||
// Create Collector Labels Map
|
||||
collectorLabels := prometheus.Labels{
|
||||
"namespace": sc.ns,
|
||||
"ingress_class": sc.ingressClass,
|
||||
}
|
||||
|
||||
// Emit metrics
|
||||
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching requests metric: %v", err)
|
||||
collectorSuccess = false
|
||||
} else {
|
||||
requestsMetric.Inc()
|
||||
}
|
||||
|
||||
if stats.UpstreamResponseTime != -1 {
|
||||
upstreamResponseTimeMetric, err := sc.upstreamResponseTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching upstream response time metric: %v", err)
|
||||
collectorSuccess = false
|
||||
} else {
|
||||
upstreamResponseTimeMetric.Observe(stats.UpstreamResponseTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestTime != -1 {
|
||||
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request duration metric: %v", err)
|
||||
collectorSuccess = false
|
||||
} else {
|
||||
requestTimeMetric.Observe(stats.RequestTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestLength != -1 {
|
||||
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request length metric: %v", err)
|
||||
collectorSuccess = false
|
||||
} else {
|
||||
requestLengthMetric.Observe(stats.RequestLength)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.BytesSent != -1 {
|
||||
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||
collectorSuccess = false
|
||||
} else {
|
||||
bytesSentMetric.Observe(stats.BytesSent)
|
||||
}
|
||||
}
|
||||
|
||||
collectorSuccessMetric, err := sc.collectorSuccess.GetMetricWith(collectorLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching collector success metric: %v", err)
|
||||
} else {
|
||||
if collectorSuccess {
|
||||
collectorSuccessMetric.Set(1)
|
||||
collectorSuccessTimeMetric, err := sc.collectorSuccessTime.GetMetricWith(collectorLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching collector success time metric: %v", err)
|
||||
} else {
|
||||
collectorSuccessTimeMetric.Set(float64(time.Now().Unix()))
|
||||
}
|
||||
} else {
|
||||
collectorSuccessMetric.Set(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run adds a message handler to a UDP listener
|
||||
func (sc *UDPCollector) Run() {
|
||||
handleMessages(sc.listener, sc.handleMessage)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue