Add RPC tcp timeout/errs and AVG duration to prometheus (#15747)

This commit is contained in:
Anis Elleuch 2022-09-26 17:04:26 +01:00 committed by GitHub
parent 1480340830
commit 048a46ec2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 28 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
@ -167,7 +168,8 @@ const (
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
apiLatencyMicroSec MetricName = "latency_us"
latencyMicroSec MetricName = "latency_us"
latencyNanoSec MetricName = "latency_ns"
usagePercent MetricName = "update_percent"
@ -331,7 +333,7 @@ func getNodeDiskAPILatencyMD() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: diskSubsystem,
Name: apiLatencyMicroSec,
Name: latencyMicroSec,
Help: "Average last minute latency in µs for drive API storage operations",
Type: gaugeMetric,
}
@ -537,6 +539,26 @@ func getInternodeFailedRequests() MetricDescription {
}
}
func getInternodeTCPDialTimeout() MetricDescription {
return MetricDescription{
Namespace: interNodeMetricNamespace,
Subsystem: trafficSubsystem,
Name: "dial_errors",
Help: "Total number of internode TCP dial timeouts and errors",
Type: counterMetric,
}
}
func getInternodeTCPAvgDuration() MetricDescription {
return MetricDescription{
Namespace: interNodeMetricNamespace,
Subsystem: trafficSubsystem,
Name: "dial_avg_time",
Help: "Average time of internodes TCP dial calls",
Type: gaugeMetric,
}
}
func getInterNodeSentBytesMD() MetricDescription {
return MetricDescription{
Namespace: interNodeMetricNamespace,
@ -1607,10 +1629,19 @@ func getNetworkMetrics() *MetricsGroup {
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
metrics = make([]Metric, 0, 10)
connStats := globalConnStats.toServerConnStats()
rpcStats := rest.GetRPCStats()
if globalIsDistErasure {
metrics = append(metrics, Metric{
Description: getInternodeFailedRequests(),
Value: float64(loadAndResetRPCNetworkErrsCounter()),
Value: float64(rpcStats.Errs),
})
metrics = append(metrics, Metric{
Description: getInternodeTCPDialTimeout(),
Value: float64(rpcStats.DialErrs),
})
metrics = append(metrics, Metric{
Description: getInternodeTCPAvgDuration(),
Value: float64(rpcStats.DialAvgDuration),
})
metrics = append(metrics, Metric{
Description: getInterNodeSentBytesMD(),

View File

@ -56,7 +56,6 @@ import (
ioutilx "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/audit"
"github.com/minio/minio/internal/rest"
"github.com/minio/pkg/certs"
"github.com/minio/pkg/env"
"golang.org/x/oauth2"
@ -1016,13 +1015,6 @@ func decodeDirObject(object string) string {
return object
}
// This is used by metrics to show the number of failed RPC calls
// between internodes
func loadAndResetRPCNetworkErrsCounter() uint64 {
defer rest.ResetNetworkErrsCounter()
return rest.GetNetworkErrsCounter()
}
// Helper method to return total number of nodes in cluster
func totalNodeCount() uint64 {
peers, _ := globalEndpoints.peers()

View File

@ -46,19 +46,6 @@ const (
closed
)
// Hold the number of failed RPC calls due to networking errors
var networkErrsCounter uint64
// GetNetworkErrsCounter returns the number of failed RPC requests
func GetNetworkErrsCounter() uint64 {
return atomic.LoadUint64(&networkErrsCounter)
}
// ResetNetworkErrsCounter resets the number of failed RPC requests
func ResetNetworkErrsCounter() {
atomic.StoreUint64(&networkErrsCounter, 0)
}
// NetworkError - error type in case of errors related to http/transport
// for ex. connection refused, connection reset, dns resolution failure etc.
// All errors returned by storage-rest-server (ex errFileNotFound, errDiskNotFound) are not considered to be network errors.
@ -217,7 +204,7 @@ type respBodyMonitor struct {
func (r respBodyMonitor) Read(p []byte) (n int, err error) {
n, err = r.ReadCloser.Read(p)
if err != nil && err != io.EOF {
atomic.AddUint64(&networkErrsCounter, 1)
atomic.AddUint64(&globalStats.errs, 1)
}
return
}
@ -225,7 +212,7 @@ func (r respBodyMonitor) Read(p []byte) (n int, err error) {
func (r respBodyMonitor) Close() (err error) {
err = r.ReadCloser.Close()
if err != nil {
atomic.AddUint64(&networkErrsCounter, 1)
atomic.AddUint64(&globalStats.errs, 1)
}
return
}
@ -252,11 +239,15 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
if length > 0 {
req.ContentLength = length
}
req, update := setupReqStatsUpdate(req)
defer update()
resp, err := c.httpClient.Do(req)
if err != nil {
if xnet.IsNetworkOrHostDown(err, c.ExpectTimeouts) {
if !c.NoMetrics {
atomic.AddUint64(&networkErrsCounter, 1)
atomic.AddUint64(&globalStats.errs, 1)
}
if c.MarkOffline(err) {
logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host)
@ -292,7 +283,7 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
if err != nil {
if xnet.IsNetworkOrHostDown(err, c.ExpectTimeouts) {
if !c.NoMetrics {
atomic.AddUint64(&networkErrsCounter, 1)
atomic.AddUint64(&globalStats.errs, 1)
}
if c.MarkOffline(err) {
logger.LogOnceIf(ctx, fmt.Errorf("Marking %s offline temporarily; caused by %w", c.url.Host, err), c.url.Host)

View File

@ -0,0 +1,80 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package rest
import (
"net/http"
"net/http/httptrace"
"sync/atomic"
"time"
)
var globalStats = struct {
errs uint64
tcpDialErrs uint64
tcpDialCount uint64
tcpDialTotalDur uint64
}{}
// RPCStats holds information about the DHCP/TCP metrics and errors
type RPCStats struct {
Errs uint64
DialAvgDuration uint64
DialErrs uint64
}
// GetRPCStats returns RPC stats, include calls errors and dhcp/tcp metrics
func GetRPCStats() RPCStats {
s := RPCStats{
Errs: atomic.LoadUint64(&globalStats.errs),
DialErrs: atomic.LoadUint64(&globalStats.tcpDialErrs),
}
if v := atomic.LoadUint64(&globalStats.tcpDialCount); v > 0 {
s.DialAvgDuration = atomic.LoadUint64(&globalStats.tcpDialTotalDur) / v
}
return s
}
// Return a function which update the global stats related to tcp connections
func setupReqStatsUpdate(req *http.Request) (*http.Request, func()) {
var dialStart, dialEnd time.Time
trace := &httptrace.ClientTrace{
ConnectStart: func(network, addr string) {
dialStart = time.Now()
},
ConnectDone: func(network, addr string, err error) {
if err == nil {
dialEnd = time.Now()
}
},
}
return req.WithContext(httptrace.WithClientTrace(req.Context(), trace)), func() {
if !dialStart.IsZero() {
if dialEnd.IsZero() {
atomic.AddUint64(&globalStats.tcpDialErrs, 1)
} else {
atomic.AddUint64(&globalStats.tcpDialCount, 1)
atomic.AddUint64(&globalStats.tcpDialTotalDur, uint64(dialEnd.Sub(dialStart)))
}
}
}
}