batch metrics and flush periodically
This commit is contained in:
parent
b78bb2524e
commit
2207d7694d
7 changed files with 298 additions and 265 deletions
|
|
@ -20,6 +20,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
|
|
@ -206,92 +207,94 @@ func (sc *SocketCollector) handleMessage(msg []byte) {
|
|||
glog.V(5).Infof("msg: %v", string(msg))
|
||||
|
||||
// Unmarshall bytes
|
||||
var stats socketData
|
||||
err := json.Unmarshal(msg, &stats)
|
||||
var statsBatch []socketData
|
||||
err := json.Unmarshal(msg, &statsBatch)
|
||||
if err != nil {
|
||||
glog.Errorf("Unexpected error deserializing JSON paylod: %v", err)
|
||||
glog.Errorf("Unexpected error deserializing JSON paylod: %v. Payload:\n%v", err, string(msg))
|
||||
return
|
||||
}
|
||||
|
||||
requestLabels := prometheus.Labels{
|
||||
"host": stats.Host,
|
||||
"status": stats.Status,
|
||||
"method": stats.Method,
|
||||
"path": stats.Path,
|
||||
//"endpoint": stats.Endpoint,
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"service": stats.Service,
|
||||
}
|
||||
|
||||
collectorLabels := prometheus.Labels{
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"status": stats.Status,
|
||||
}
|
||||
|
||||
latencyLabels := prometheus.Labels{
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"service": stats.Service,
|
||||
}
|
||||
|
||||
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching requests metric: %v", err)
|
||||
} else {
|
||||
requestsMetric.Inc()
|
||||
}
|
||||
|
||||
if stats.Latency != -1 {
|
||||
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching latency metric: %v", err)
|
||||
} else {
|
||||
latencyMetric.Observe(stats.Latency)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestTime != -1 {
|
||||
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request duration metric: %v", err)
|
||||
} else {
|
||||
requestTimeMetric.Observe(stats.RequestTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestLength != -1 {
|
||||
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request length metric: %v", err)
|
||||
} else {
|
||||
requestLengthMetric.Observe(stats.RequestLength)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.ResponseTime != -1 {
|
||||
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching upstream response time metric: %v", err)
|
||||
} else {
|
||||
responseTimeMetric.Observe(stats.ResponseTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.ResponseLength != -1 {
|
||||
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||
} else {
|
||||
bytesSentMetric.Observe(stats.ResponseLength)
|
||||
for _, stats := range statsBatch {
|
||||
requestLabels := prometheus.Labels{
|
||||
"host": stats.Host,
|
||||
"status": stats.Status,
|
||||
"method": stats.Method,
|
||||
"path": stats.Path,
|
||||
//"endpoint": stats.Endpoint,
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"service": stats.Service,
|
||||
}
|
||||
|
||||
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
|
||||
collectorLabels := prometheus.Labels{
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"status": stats.Status,
|
||||
}
|
||||
|
||||
latencyLabels := prometheus.Labels{
|
||||
"namespace": stats.Namespace,
|
||||
"ingress": stats.Ingress,
|
||||
"service": stats.Service,
|
||||
}
|
||||
|
||||
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||
glog.Errorf("Error fetching requests metric: %v", err)
|
||||
} else {
|
||||
responseSizeMetric.Observe(stats.ResponseLength)
|
||||
requestsMetric.Inc()
|
||||
}
|
||||
|
||||
if stats.Latency != -1 {
|
||||
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching latency metric: %v", err)
|
||||
} else {
|
||||
latencyMetric.Observe(stats.Latency)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestTime != -1 {
|
||||
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request duration metric: %v", err)
|
||||
} else {
|
||||
requestTimeMetric.Observe(stats.RequestTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.RequestLength != -1 {
|
||||
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching request length metric: %v", err)
|
||||
} else {
|
||||
requestLengthMetric.Observe(stats.RequestLength)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.ResponseTime != -1 {
|
||||
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching upstream response time metric: %v", err)
|
||||
} else {
|
||||
responseTimeMetric.Observe(stats.ResponseTime)
|
||||
}
|
||||
}
|
||||
|
||||
if stats.ResponseLength != -1 {
|
||||
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||
} else {
|
||||
bytesSentMetric.Observe(stats.ResponseLength)
|
||||
}
|
||||
|
||||
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching bytes sent metric: %v", err)
|
||||
} else {
|
||||
responseSizeMetric.Observe(stats.ResponseLength)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -408,19 +411,15 @@ func (sc SocketCollector) Collect(ch chan<- prometheus.Metric) {
|
|||
sc.bytesSent.Collect(ch)
|
||||
}
|
||||
|
||||
const packetSize = 1024 * 65
|
||||
|
||||
// handleMessages process the content received in a network connection
|
||||
func handleMessages(conn io.ReadCloser, fn func([]byte)) {
|
||||
defer conn.Close()
|
||||
|
||||
msg := make([]byte, packetSize)
|
||||
s, err := conn.Read(msg[0:])
|
||||
data, err := ioutil.ReadAll(conn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
fn(msg[0:s])
|
||||
fn(data)
|
||||
}
|
||||
|
||||
func deleteConstants(labels prometheus.Labels) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue