move bucket centric metrics to /minio/v2/metrics/bucket handlers (#17663)

users/customers do not have a reasonable number of buckets anymore,
this is why we must avoid overpopulating cluster endpoints, instead
move the bucket monitoring to a separate endpoint.

some of it's a breaking change here for a couple of metrics, but
it is imperative that we do it to improve the responsiveness of
our Prometheus cluster endpoint.

Bonus: Added new cluster metrics for usage, objects and histograms
This commit is contained in:
Harshavardhana
2023-07-18 22:25:12 -07:00
committed by GitHub
parent 4f257bf1e6
commit 6426b74770
12 changed files with 773 additions and 246 deletions

View File

@@ -229,7 +229,8 @@ func guessIsMetricsReq(req *http.Request) bool {
return (aType == authTypeAnonymous || aType == authTypeJWT) &&
req.URL.Path == minioReservedBucketPath+prometheusMetricsPathLegacy ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2ClusterPath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2NodePath
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2NodePath ||
req.URL.Path == minioReservedBucketPath+prometheusMetricsV2BucketPath
}
// guessIsRPCReq - returns true if the request is for an RPC endpoint.

View File

@@ -127,6 +127,14 @@ func (bh *bucketHTTPStats) updateHTTPStats(bucket, api string, w *xhttp.Response
return
}
if w != nil {
// Increment the prometheus http request response histogram with API, Bucket
bucketHTTPRequestsDuration.With(prometheus.Labels{
"api": api,
"bucket": bucket,
}).Observe(w.TimeToFirstByte.Seconds())
}
bh.Lock()
defer bh.Unlock()

View File

@@ -27,6 +27,7 @@ import (
const (
prometheusMetricsPathLegacy = "/prometheus/metrics"
prometheusMetricsV2ClusterPath = "/v2/metrics/cluster"
prometheusMetricsV2BucketPath = "/v2/metrics/bucket"
prometheusMetricsV2NodePath = "/v2/metrics/node"
)
@@ -47,14 +48,13 @@ func registerMetricsRouter(router *mux.Router) {
// metrics router
metricsRouter := router.NewRoute().PathPrefix(minioReservedBucketPath).Subrouter()
authType := strings.ToLower(env.Get(EnvPrometheusAuthType, string(prometheusJWT)))
switch prometheusAuthType(authType) {
case prometheusPublic:
metricsRouter.Handle(prometheusMetricsPathLegacy, metricsHandler())
metricsRouter.Handle(prometheusMetricsV2ClusterPath, metricsServerHandler())
metricsRouter.Handle(prometheusMetricsV2NodePath, metricsNodeHandler())
case prometheusJWT:
metricsRouter.Handle(prometheusMetricsPathLegacy, AuthMiddleware(metricsHandler()))
metricsRouter.Handle(prometheusMetricsV2ClusterPath, AuthMiddleware(metricsServerHandler()))
metricsRouter.Handle(prometheusMetricsV2NodePath, AuthMiddleware(metricsNodeHandler()))
auth := AuthMiddleware
if prometheusAuthType(authType) == prometheusPublic {
auth = NoAuthMiddleware
}
metricsRouter.Handle(prometheusMetricsPathLegacy, auth(metricsHandler()))
metricsRouter.Handle(prometheusMetricsV2ClusterPath, auth(metricsServerHandler()))
metricsRouter.Handle(prometheusMetricsV2BucketPath, auth(metricsBucketHandler()))
metricsRouter.Handle(prometheusMetricsV2NodePath, auth(metricsNodeHandler()))
}

View File

@@ -40,24 +40,26 @@ import (
)
var (
nodeCollector *minioNodeCollector
clusterCollector *minioClusterCollector
peerMetricsGroups []*MetricsGroup
nodeCollector *minioNodeCollector
clusterCollector *minioClusterCollector
bucketCollector *minioBucketCollector
peerMetricsGroups []*MetricsGroup
bucketPeerMetricsGroups []*MetricsGroup
)
func init() {
clusterMetricsGroups := []*MetricsGroup{
getBucketUsageMetrics(),
getNodeHealthMetrics(),
getClusterStorageMetrics(),
getClusterTierMetrics(),
getClusterUsageMetrics(),
getKMSMetrics(),
}
peerMetricsGroups = []*MetricsGroup{
getCacheMetrics(),
getGoMetrics(),
getHTTPMetrics(),
getHTTPMetrics(false),
getNotificationMetrics(),
getLocalStorageMetrics(),
getMinioProcMetrics(),
@@ -82,7 +84,7 @@ func init() {
getNodeHealthMetrics(),
getLocalDriveStorageMetrics(),
getCacheMetrics(),
getHTTPMetrics(),
getHTTPMetrics(false),
getNetworkMetrics(),
getMinioVersionMetrics(),
getS3TTFBMetric(),
@@ -90,8 +92,20 @@ func init() {
getDistLockMetrics(),
}
bucketMetricsGroups := []*MetricsGroup{
getBucketUsageMetrics(),
getHTTPMetrics(true),
getBucketTTFBMetric(),
}
bucketPeerMetricsGroups = []*MetricsGroup{
getHTTPMetrics(true),
getBucketTTFBMetric(),
}
nodeCollector = newMinioCollectorNode(nodeGroups)
clusterCollector = newMinioClusterCollector(allMetricsGroups)
bucketCollector = newMinioBucketCollector(bucketMetricsGroups)
}
// MetricNamespace is top level grouping of metrics to create the metric name.
@@ -121,11 +135,13 @@ const (
ioSubsystem MetricSubsystem = "io"
nodesSubsystem MetricSubsystem = "nodes"
objectsSubsystem MetricSubsystem = "objects"
bucketsSubsystem MetricSubsystem = "bucket"
processSubsystem MetricSubsystem = "process"
replicationSubsystem MetricSubsystem = "replication"
requestsSubsystem MetricSubsystem = "requests"
requestsRejectedSubsystem MetricSubsystem = "requests_rejected"
timeSubsystem MetricSubsystem = "time"
ttfbSubsystem MetricSubsystem = "requests_ttfb"
trafficSubsystem MetricSubsystem = "traffic"
softwareSubsystem MetricSubsystem = "software"
sysCallSubsystem MetricSubsystem = "syscall"
@@ -192,7 +208,7 @@ const (
sizeDistribution = "size_distribution"
versionDistribution = "version_distribution"
ttfbDistribution = "ttfb_seconds_distribution"
ttfbDistribution = "seconds_distribution"
lastActivityTime = "last_activity_nano_seconds"
startTime = "starttime_seconds"
@@ -308,6 +324,16 @@ func (g *MetricsGroup) Get() (metrics []Metric) {
return metrics
}
func getClusterBucketsTotalMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: bucketsSubsystem,
Name: total,
Help: "Total number of buckets in the cluster",
Type: gaugeMetric,
}
}
func getClusterCapacityTotalBytesMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
@@ -528,6 +554,36 @@ func getBucketUsageTotalBytesMD() MetricDescription {
}
}
func getClusterUsageTotalBytesMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: usageSubsystem,
Name: totalBytes,
Help: "Total cluster usage in bytes",
Type: gaugeMetric,
}
}
func getClusterUsageObjectsTotalMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: usageSubsystem,
Name: objectTotal,
Help: "Total number of objects in a cluster",
Type: gaugeMetric,
}
}
func getClusterUsageVersionsTotalMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: usageSubsystem,
Name: versionTotal,
Help: "Total number of versions (includes delete marker) in a cluster",
Type: gaugeMetric,
}
}
func getBucketUsageObjectsTotalMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@@ -543,7 +599,7 @@ func getBucketUsageVersionsTotalMD() MetricDescription {
Namespace: bucketMetricNamespace,
Subsystem: usageSubsystem,
Name: versionTotal,
Help: "Total number of versions",
Help: "Total number of versions (includes delete marker)",
Type: gaugeMetric,
}
}
@@ -598,6 +654,26 @@ func getBucketRepFailedOperationsMD() MetricDescription {
}
}
func getClusterObjectDistributionMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: objectsSubsystem,
Name: sizeDistribution,
Help: "Distribution of object sizes across a cluster",
Type: histogramMetric,
}
}
func getClusterObjectVersionsMD() MetricDescription {
return MetricDescription{
Namespace: clusterMetricNamespace,
Subsystem: objectsSubsystem,
Name: versionDistribution,
Help: "Distribution of object sizes across a cluster",
Type: histogramMetric,
}
}
func getBucketObjectDistributionMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@@ -961,9 +1037,19 @@ func getMinIOCommitMD() MetricDescription {
func getS3TTFBDistributionMD() MetricDescription {
return MetricDescription{
Namespace: s3MetricNamespace,
Subsystem: timeSubsystem,
Subsystem: ttfbSubsystem,
Name: ttfbDistribution,
Help: "Distribution of the time to first byte across API calls",
Help: "Distribution of time to first byte across API calls",
Type: gaugeMetric,
}
}
func getBucketTTFBDistributionMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: ttfbSubsystem,
Name: ttfbDistribution,
Help: "Distribution of time to first byte across API calls per bucket",
Type: gaugeMetric,
}
}
@@ -1234,6 +1320,51 @@ func getGoMetrics() *MetricsGroup {
return mg
}
func getBucketTTFBMetric() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
// Read prometheus metric on this channel
ch := make(chan prometheus.Metric)
var wg sync.WaitGroup
wg.Add(1)
// Read prometheus histogram data and convert it to internal metric data
go func() {
defer wg.Done()
for promMetric := range ch {
dtoMetric := &dto.Metric{}
err := promMetric.Write(dtoMetric)
if err != nil {
logger.LogIf(GlobalContext, err)
return
}
h := dtoMetric.GetHistogram()
for _, b := range h.Bucket {
labels := make(map[string]string)
for _, lp := range dtoMetric.GetLabel() {
labels[*lp.Name] = *lp.Value
}
labels["le"] = fmt.Sprintf("%.3f", *b.UpperBound)
metric := Metric{
Description: getBucketTTFBDistributionMD(),
VariableLabels: labels,
Value: float64(b.GetCumulativeCount()),
}
metrics = append(metrics, metric)
}
}
}()
bucketHTTPRequestsDuration.Collect(ch)
close(ch)
wg.Wait()
return
})
return mg
}
func getS3TTFBMetric() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
@@ -1912,84 +2043,87 @@ func getNotificationMetrics() *MetricsGroup {
return mg
}
func getHTTPMetrics() *MetricsGroup {
func getHTTPMetrics(bucketOnly bool) *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 10 * time.Second,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
httpStats := globalHTTPStats.toServerHTTPStats()
metrics = make([]Metric, 0, 3+
len(httpStats.CurrentS3Requests.APIStats)+
len(httpStats.TotalS3Requests.APIStats)+
len(httpStats.TotalS3Errors.APIStats)+
len(httpStats.TotalS35xxErrors.APIStats)+
len(httpStats.TotalS34xxErrors.APIStats))
metrics = append(metrics, Metric{
Description: getS3RejectedAuthRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedAuth),
})
metrics = append(metrics, Metric{
Description: getS3RejectedTimestampRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedTime),
})
metrics = append(metrics, Metric{
Description: getS3RejectedHeaderRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedHeader),
})
metrics = append(metrics, Metric{
Description: getS3RejectedInvalidRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedInvalid),
})
metrics = append(metrics, Metric{
Description: getS3RequestsInQueueMD(),
Value: float64(httpStats.S3RequestsInQueue),
})
metrics = append(metrics, Metric{
Description: getIncomingS3RequestsMD(),
Value: float64(httpStats.S3RequestsIncoming),
})
if !bucketOnly {
httpStats := globalHTTPStats.toServerHTTPStats()
metrics = make([]Metric, 0, 3+
len(httpStats.CurrentS3Requests.APIStats)+
len(httpStats.TotalS3Requests.APIStats)+
len(httpStats.TotalS3Errors.APIStats)+
len(httpStats.TotalS35xxErrors.APIStats)+
len(httpStats.TotalS34xxErrors.APIStats))
metrics = append(metrics, Metric{
Description: getS3RejectedAuthRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedAuth),
})
metrics = append(metrics, Metric{
Description: getS3RejectedTimestampRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedTime),
})
metrics = append(metrics, Metric{
Description: getS3RejectedHeaderRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedHeader),
})
metrics = append(metrics, Metric{
Description: getS3RejectedInvalidRequestsTotalMD(),
Value: float64(httpStats.TotalS3RejectedInvalid),
})
metrics = append(metrics, Metric{
Description: getS3RequestsInQueueMD(),
Value: float64(httpStats.S3RequestsInQueue),
})
metrics = append(metrics, Metric{
Description: getIncomingS3RequestsMD(),
Value: float64(httpStats.S3RequestsIncoming),
})
for api, value := range httpStats.CurrentS3Requests.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsInFlightMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Requests.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsTotalMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Errors.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS35xxErrors.APIStats {
metrics = append(metrics, Metric{
Description: getS3Requests5xxErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS34xxErrors.APIStats {
metrics = append(metrics, Metric{
Description: getS3Requests4xxErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Canceled.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsCanceledMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
for api, value := range httpStats.CurrentS3Requests.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsInFlightMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Requests.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsTotalMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Errors.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS35xxErrors.APIStats {
metrics = append(metrics, Metric{
Description: getS3Requests5xxErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS34xxErrors.APIStats {
metrics = append(metrics, Metric{
Description: getS3Requests4xxErrorsMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
for api, value := range httpStats.TotalS3Canceled.APIStats {
metrics = append(metrics, Metric{
Description: getS3RequestsCanceledMD(),
Value: float64(value),
VariableLabels: map[string]string{"api": api},
})
}
return
}
for bucket, inOut := range globalBucketConnStats.getS3InOutBytes() {
@@ -2100,6 +2234,105 @@ func getNetworkMetrics() *MetricsGroup {
return mg
}
func getClusterUsageMetrics() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
}
mg.RegisterRead(func(ctx context.Context) (metrics []Metric) {
objLayer := newObjectLayerFn()
// Service not initialized yet
if objLayer == nil {
return
}
metrics = make([]Metric, 0, 50)
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer)
if err != nil {
logger.LogIf(ctx, err)
return
}
// data usage has not captured any data yet.
if dataUsageInfo.LastUpdate.IsZero() {
return
}
metrics = append(metrics, Metric{
Description: getUsageLastScanActivityMD(),
Value: float64(time.Since(dataUsageInfo.LastUpdate)),
})
var (
clusterSize uint64
clusterBuckets uint64
clusterObjectsCount uint64
clusterVersionsCount uint64
)
clusterObjectSizesHistogram := map[string]uint64{}
clusterVersionsHistogram := map[string]uint64{}
for _, usage := range dataUsageInfo.BucketsUsage {
clusterBuckets++
clusterSize += usage.Size
clusterObjectsCount += usage.ObjectsCount
clusterVersionsCount += usage.VersionsCount
for k, v := range usage.ObjectSizesHistogram {
v1, ok := clusterObjectSizesHistogram[k]
if !ok {
clusterObjectSizesHistogram[k] = v
} else {
v1 += v
clusterObjectSizesHistogram[k] = v1
}
}
for k, v := range usage.ObjectVersionsHistogram {
v1, ok := clusterVersionsHistogram[k]
if !ok {
clusterVersionsHistogram[k] = v
} else {
v1 += v
clusterVersionsHistogram[k] = v1
}
}
}
metrics = append(metrics, Metric{
Description: getClusterUsageTotalBytesMD(),
Value: float64(clusterSize),
})
metrics = append(metrics, Metric{
Description: getClusterUsageObjectsTotalMD(),
Value: float64(clusterObjectsCount),
})
metrics = append(metrics, Metric{
Description: getClusterUsageVersionsTotalMD(),
Value: float64(clusterVersionsCount),
})
metrics = append(metrics, Metric{
Description: getClusterObjectDistributionMD(),
Histogram: clusterObjectSizesHistogram,
HistogramBucketLabel: "range",
})
metrics = append(metrics, Metric{
Description: getClusterObjectVersionsMD(),
Histogram: clusterVersionsHistogram,
HistogramBucketLabel: "range",
})
metrics = append(metrics, Metric{
Description: getClusterBucketsTotalMD(),
Value: float64(clusterBuckets),
})
return
})
return mg
}
func getBucketUsageMetrics() *MetricsGroup {
mg := &MetricsGroup{
cacheInterval: 1 * time.Minute,
@@ -2199,6 +2432,7 @@ func getBucketUsageMetrics() *MetricsGroup {
HistogramBucketLabel: "range",
VariableLabels: map[string]string{"bucket": bucket},
})
metrics = append(metrics, Metric{
Description: getBucketObjectVersionsMD(),
Histogram: usage.ObjectVersionsHistogram,
@@ -2598,6 +2832,77 @@ func getKMSMetrics() *MetricsGroup {
return mg
}
type minioBucketCollector struct {
metricsGroups []*MetricsGroup
desc *prometheus.Desc
}
func newMinioBucketCollector(metricsGroups []*MetricsGroup) *minioBucketCollector {
return &minioBucketCollector{
metricsGroups: metricsGroups,
desc: prometheus.NewDesc("minio_bucket_stats", "Statistics exposed by MinIO server cluster wide per bucket", nil, nil),
}
}
// Describe sends the super-set of all possible descriptors of metrics
func (c *minioBucketCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.desc
}
// Collect is called by the Prometheus registry when collecting metrics.
func (c *minioBucketCollector) Collect(out chan<- prometheus.Metric) {
var wg sync.WaitGroup
publish := func(in <-chan Metric) {
defer wg.Done()
for metric := range in {
labels, values := getOrderedLabelValueArrays(metric.VariableLabels)
if metric.Description.Type == histogramMetric {
if metric.Histogram == nil {
continue
}
for k, v := range metric.Histogram {
out <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(string(metric.Description.Namespace),
string(metric.Description.Subsystem),
string(metric.Description.Name)),
metric.Description.Help,
append(labels, metric.HistogramBucketLabel),
metric.StaticLabels,
),
prometheus.GaugeValue,
float64(v),
append(values, k)...)
}
continue
}
metricType := prometheus.GaugeValue
if metric.Description.Type == counterMetric {
metricType = prometheus.CounterValue
}
toPost := prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(string(metric.Description.Namespace),
string(metric.Description.Subsystem),
string(metric.Description.Name)),
metric.Description.Help,
labels,
metric.StaticLabels,
),
metricType,
metric.Value,
values...)
out <- toPost
}
}
// Call peer api to fetch metrics
wg.Add(2)
go publish(ReportMetrics(GlobalContext, c.metricsGroups))
go publish(globalNotificationSys.GetBucketMetrics(GlobalContext))
wg.Wait()
}
type minioClusterCollector struct {
metricsGroups []*MetricsGroup
desc *prometheus.Desc
@@ -2791,6 +3096,49 @@ func newMinioCollectorNode(metricsGroups []*MetricsGroup) *minioNodeCollector {
}
}
func metricsBucketHandler() http.Handler {
registry := prometheus.NewRegistry()
// Report all other metrics
logger.CriticalIf(GlobalContext, registry.Register(bucketCollector))
// DefaultGatherers include golang metrics and process metrics.
gatherers := prometheus.Gatherers{
registry,
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tc, ok := r.Context().Value(mcontext.ContextTraceKey).(*mcontext.TraceCtxt)
if ok {
tc.FuncName = "handler.MetricsBucket"
tc.ResponseRecorder.LogErrBody = true
}
mfs, err := gatherers.Gather()
if err != nil {
if len(mfs) == 0 {
writeErrorResponseJSON(r.Context(), w, toAdminAPIErr(r.Context(), err), r.URL)
return
}
}
contentType := expfmt.Negotiate(r.Header)
w.Header().Set("Content-Type", string(contentType))
enc := expfmt.NewEncoder(w, contentType)
for _, mf := range mfs {
if err := enc.Encode(mf); err != nil {
// client may disconnect for any reasons
// we do not have to log this.
return
}
}
if closer, ok := enc.(expfmt.Closer); ok {
closer.Close()
}
})
}
func metricsServerHandler() http.Handler {
registry := prometheus.NewRegistry()

View File

@@ -39,6 +39,14 @@ var (
},
[]string{"api"},
)
bucketHTTPRequestsDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "s3_ttfb_seconds",
Help: "Time taken by requests served by current MinIO server instance per bucket",
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
},
[]string{"api", "bucket"},
)
minioVersionInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "minio",
@@ -614,6 +622,11 @@ func metricsHandler() http.Handler {
})
}
// NoAuthMiddleware no auth middle ware.
func NoAuthMiddleware(h http.Handler) http.Handler {
return h
}
// AuthMiddleware checks if the bearer token is valid and authorized.
func AuthMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -121,11 +121,11 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a
func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.DeletePolicy(policyName)
}, idx, *client.host)
}
@@ -136,11 +136,11 @@ func (sys *NotificationSys) DeletePolicy(policyName string) []NotificationPeerEr
func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.LoadPolicy(policyName)
}, idx, *client.host)
}
@@ -151,11 +151,11 @@ func (sys *NotificationSys) LoadPolicy(policyName string) []NotificationPeerErr
func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUserType, isGroup bool) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.LoadPolicyMapping(userOrGroup, userType, isGroup)
}, idx, *client.host)
}
@@ -166,11 +166,11 @@ func (sys *NotificationSys) LoadPolicyMapping(userOrGroup string, userType IAMUs
func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.DeleteUser(accessKey)
}, idx, *client.host)
}
@@ -181,11 +181,11 @@ func (sys *NotificationSys) DeleteUser(accessKey string) []NotificationPeerErr {
func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.LoadUser(accessKey, temp)
}, idx, *client.host)
}
@@ -196,11 +196,13 @@ func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []Notification
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error { return client.LoadGroup(group) }, idx, *client.host)
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.LoadGroup(group)
}, idx, *client.host)
}
return ng.Wait()
}
@@ -209,11 +211,11 @@ func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.DeleteServiceAccount(accessKey)
}, idx, *client.host)
}
@@ -224,11 +226,11 @@ func (sys *NotificationSys) DeleteServiceAccount(accessKey string) []Notificatio
func (sys *NotificationSys) LoadServiceAccount(accessKey string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients)).WithRetries(1)
for idx, client := range sys.peerClients {
if client == nil {
continue
}
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
return client.LoadServiceAccount(accessKey)
}, idx, *client.host)
}
@@ -240,12 +242,12 @@ func (sys *NotificationSys) BackgroundHealStatus() ([]madmin.BgHealState, []Noti
ng := WithNPeers(len(sys.peerClients))
states := make([]madmin.BgHealState, len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
idx := idx
client := client
ng.Go(GlobalContext, func() error {
if client == nil {
return errPeerNotReachable
}
st, err := client.BackgroundHealStatus()
if err != nil {
return err
@@ -1101,6 +1103,65 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...
return consolidatedReport
}
// GetBucketMetrics - gets the cluster level bucket metrics from all nodes excluding self.
func (sys *NotificationSys) GetBucketMetrics(ctx context.Context) <-chan Metric {
if sys == nil {
return nil
}
g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients))
for index := range sys.peerClients {
index := index
g.Go(func() error {
if sys.peerClients[index] == nil {
return errPeerNotReachable
}
var err error
peerChannels[index], err = sys.peerClients[index].GetPeerBucketMetrics(ctx)
return err
}, index)
}
ch := make(chan Metric)
var wg sync.WaitGroup
for index, err := range g.Wait() {
if err != nil {
if sys.peerClients[index] != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress",
sys.peerClients[index].host.String())
logger.LogOnceIf(logger.SetReqInfo(ctx, reqInfo), err, sys.peerClients[index].host.String())
} else {
logger.LogOnceIf(ctx, err, "peer-offline")
}
continue
}
wg.Add(1)
go func(ctx context.Context, peerChannel <-chan Metric, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case m, ok := <-peerChannel:
if !ok {
return
}
select {
case ch <- m:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(ctx, peerChannels[index], &wg)
}
go func(wg *sync.WaitGroup, ch chan Metric) {
wg.Wait()
close(ch)
}(&wg, ch)
return ch
}
// GetClusterMetrics - gets the cluster metrics from all nodes excluding self.
func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) <-chan Metric {
if sys == nil {
@@ -1109,11 +1170,11 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) <-chan Metric
g := errgroup.WithNErrs(len(sys.peerClients))
peerChannels := make([]<-chan Metric, len(sys.peerClients))
for index := range sys.peerClients {
if sys.peerClients[index] == nil {
continue
}
index := index
g.Go(func() error {
if sys.peerClients[index] == nil {
return errPeerNotReachable
}
var err error
peerChannels[index], err = sys.peerClients[index].GetPeerMetrics(ctx)
return err
@@ -1142,7 +1203,11 @@ func (sys *NotificationSys) GetClusterMetrics(ctx context.Context) <-chan Metric
if !ok {
return
}
ch <- m
select {
case ch <- m:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}

View File

@@ -839,6 +839,33 @@ func (client *peerRESTClient) GetPeerMetrics(ctx context.Context) (<-chan Metric
return ch, nil
}
func (client *peerRESTClient) GetPeerBucketMetrics(ctx context.Context) (<-chan Metric, error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodGetPeerBucketMetrics, nil, nil, -1)
if err != nil {
return nil, err
}
dec := gob.NewDecoder(respBody)
ch := make(chan Metric)
go func(ch chan<- Metric) {
defer func() {
xhttp.DrainBody(respBody)
close(ch)
}()
for {
var metric Metric
if err := dec.Decode(&metric); err != nil {
return
}
select {
case <-ctx.Done():
return
case ch <- metric:
}
}
}(ch)
return ch, nil
}
func (client *peerRESTClient) SpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) {
values := make(url.Values)
values.Set(peerRESTSize, strconv.Itoa(opts.objectSize))

View File

@@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v31" // Add replication MRF
peerRESTVersion = "v32" // Add bucket peer metrics
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
@@ -65,6 +65,7 @@ const (
peerRESTMethodGetMetacacheListing = "/getmetacache"
peerRESTMethodUpdateMetacacheListing = "/updatemetacache"
peerRESTMethodGetPeerMetrics = "/peermetrics"
peerRESTMethodGetPeerBucketMetrics = "/peerbucketmetrics"
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
peerRESTMethodSpeedTest = "/speedtest"
peerRESTMethodDriveSpeedTest = "/drivespeedtest"

View File

@@ -1207,6 +1207,23 @@ func (s *peerRESTServer) GetPeerMetrics(w http.ResponseWriter, r *http.Request)
}
}
// GetPeerBucketMetrics gets the metrics to be federated across peers.
func (s *peerRESTServer) GetPeerBucketMetrics(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
enc := gob.NewEncoder(w)
for m := range ReportMetrics(r.Context(), bucketPeerMetricsGroups) {
if err := enc.Encode(m); err != nil {
s.writeErrorResponse(w, errors.New("Encoding metric failed: "+err.Error()))
return
}
}
}
func (s *peerRESTServer) SpeedTestHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
@@ -1431,6 +1448,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetMetacacheListing).HandlerFunc(h(server.GetMetacacheListingHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodUpdateMetacacheListing).HandlerFunc(h(server.UpdateMetacacheListingHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerMetrics).HandlerFunc(h(server.GetPeerMetrics))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetPeerBucketMetrics).HandlerFunc(h(server.GetPeerBucketMetrics))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(h(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedTest).HandlerFunc(h(server.SpeedTestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDriveSpeedTest).HandlerFunc(h(server.DriveSpeedTestHandler))