From c6a120df0ed21efc10dc841e562fa7b6f845e5b0 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 2 Mar 2021 17:28:04 -0800 Subject: [PATCH] fix: Prometheus metrics to re-use storage disks (#11647) also re-use storage disks for all `mc admin server info` calls as well, implement a new LocalStorageInfo() API call at ObjectLayer to lookup local disks storageInfo also fixes bugs where there were double calls to StorageInfo() --- cmd/admin-handlers.go | 34 +++++++++++++-------------- cmd/admin-server-info.go | 40 +++++++++++--------------------- cmd/erasure-common.go | 10 ++++++++ cmd/erasure-server-pool.go | 29 +++++++++++++++++++++++ cmd/erasure-sets.go | 31 +++++++++++++++++++++++++ cmd/erasure.go | 12 ++++++++++ cmd/fs-v1.go | 5 ++++ cmd/gateway-unsupported.go | 7 ++++++ cmd/gateway/hdfs/gateway-hdfs.go | 4 ++++ cmd/metrics-v2.go | 31 +++++++++++++++++-------- cmd/metrics.go | 4 ++++ cmd/notification.go | 6 ++--- cmd/object-api-interface.go | 3 ++- cmd/xl-storage.go | 2 +- pkg/madmin/info-commands.go | 17 ++++++++------ 15 files changed, 169 insertions(+), 66 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 32ebd6732..99381c5b6 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1551,13 +1551,13 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque if globalLDAPConfig.Enabled { ldapConn, err := globalLDAPConfig.Connect() if err != nil { - ldap.Status = "offline" + ldap.Status = string(madmin.ItemOffline) } else if ldapConn == nil { ldap.Status = "Not Configured" } else { // Close ldap connection to avoid leaks. ldapConn.Close() - ldap.Status = "online" + ldap.Status = string(madmin.ItemOnline) } } @@ -1573,7 +1573,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque assignPoolNumbers(servers) var backend interface{} - mode := madmin.ObjectLayerInitializing + mode := madmin.ItemInitializing buckets := madmin.Buckets{} objects := madmin.Objects{} @@ -1581,7 +1581,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque objectAPI := newObjectLayerFn() if objectAPI != nil { - mode = madmin.ObjectLayerOnline + mode = madmin.ItemOnline // Load data usage dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) if err == nil { @@ -1628,7 +1628,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque } infoMsg := madmin.InfoMessage{ - Mode: mode, + Mode: string(mode), Domain: domain, Region: globalServerRegion, SQSARN: globalNotificationSys.GetARNList(false), @@ -1678,9 +1678,9 @@ func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus { active, _ := tgt.IsActive() targetID := tgt.ID() if active { - targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"} + targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOnline)} } else { - targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"} + targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOffline)} } list := lambdaMap[targetID.Name] list = append(list, targetIDStatus) @@ -1692,9 +1692,9 @@ func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus { active, _ := tgt.IsActive() targetID := tgt.ID() if active { - targetIDStatus[targetID.ID] = madmin.Status{Status: "Online"} + targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOnline)} } else { - targetIDStatus[targetID.ID] = madmin.Status{Status: "Offline"} + targetIDStatus[targetID.ID] = madmin.Status{Status: string(madmin.ItemOffline)} } list := lambdaMap[targetID.Name] list = append(list, targetIDStatus) @@ -1727,9 +1727,9 @@ func fetchKMSStatus() madmin.KMS { } if err := checkConnection(kmsInfo.Endpoints[0], 15*time.Second); err != nil { - kmsStat.Status = "offline" + kmsStat.Status = string(madmin.ItemOffline) } else { - kmsStat.Status = "online" + kmsStat.Status = string(madmin.ItemOnline) kmsContext := crypto.Context{"MinIO admin API": "ServerInfoHandler"} // Context for a test key operation // 1. Generate a new key using the KMS. @@ -1737,7 +1737,7 @@ func fetchKMSStatus() madmin.KMS { if err != nil { kmsStat.Encrypt = fmt.Sprintf("Encryption failed: %v", err) } else { - kmsStat.Encrypt = "Ok" + kmsStat.Encrypt = "success" } // 2. Verify that we can indeed decrypt the (encrypted) key @@ -1748,7 +1748,7 @@ func fetchKMSStatus() madmin.KMS { case subtle.ConstantTimeCompare(key[:], decryptedKey[:]) != 1: kmsStat.Decrypt = "Decryption failed: decrypted key does not match generated key" default: - kmsStat.Decrypt = "Ok" + kmsStat.Decrypt = "success" } } return kmsStat @@ -1764,11 +1764,11 @@ func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { err := checkConnection(target.Endpoint(), 15*time.Second) if err == nil { mapLog := make(map[string]madmin.Status) - mapLog[tgt] = madmin.Status{Status: "Online"} + mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOnline)} loggerInfo = append(loggerInfo, mapLog) } else { mapLog := make(map[string]madmin.Status) - mapLog[tgt] = madmin.Status{Status: "offline"} + mapLog[tgt] = madmin.Status{Status: string(madmin.ItemOffline)} loggerInfo = append(loggerInfo, mapLog) } } @@ -1780,11 +1780,11 @@ func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { err := checkConnection(target.Endpoint(), 15*time.Second) if err == nil { mapAudit := make(map[string]madmin.Status) - mapAudit[tgt] = madmin.Status{Status: "Online"} + mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOnline)} auditloggerInfo = append(auditloggerInfo, mapAudit) } else { mapAudit := make(map[string]madmin.Status) - mapAudit[tgt] = madmin.Status{Status: "Offline"} + mapAudit[tgt] = madmin.Status{Status: string(madmin.ItemOffline)} auditloggerInfo = append(auditloggerInfo, mapAudit) } } diff --git a/cmd/admin-server-info.go b/cmd/admin-server-info.go index 32fab7b2b..27f36885f 100644 --- a/cmd/admin-server-info.go +++ b/cmd/admin-server-info.go @@ -42,16 +42,16 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req } if endpoint.IsLocal { // Only proceed for local endpoints - network[nodeName] = "online" + network[nodeName] = string(madmin.ItemOnline) localEndpoints = append(localEndpoints, endpoint) continue } _, present := network[nodeName] if !present { if err := isServerResolvable(endpoint, 2*time.Second); err == nil { - network[nodeName] = "online" + network[nodeName] = string(madmin.ItemOnline) } else { - network[nodeName] = "offline" + network[nodeName] = string(madmin.ItemOffline) // log once the error logger.LogOnceIf(context.Background(), err, nodeName) } @@ -59,35 +59,21 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req } } - localDisks, _ := initStorageDisksWithErrors(localEndpoints) - defer closeStorageDisks(localDisks) - - storageInfo, _ := getStorageInfo(localDisks, localEndpoints.GetAllStrings()) - - return madmin.ServerProperties{ - State: "ok", + props := madmin.ServerProperties{ + State: string(madmin.ItemInitializing), Endpoint: addr, Uptime: UTCNow().Unix() - globalBootTime.Unix(), Version: Version, CommitID: CommitID, Network: network, - Disks: storageInfo.Disks, - } -} - -func getLocalDisks(endpointServerPools EndpointServerPools) []madmin.Disk { - var localEndpoints Endpoints - for _, ep := range endpointServerPools { - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { - continue - } - localEndpoints = append(localEndpoints, endpoint) - } } - localDisks, _ := initStorageDisksWithErrors(localEndpoints) - defer closeStorageDisks(localDisks) - storageInfo, _ := getStorageInfo(localDisks, localEndpoints.GetAllStrings()) - return storageInfo.Disks + objLayer := newObjectLayerFn() + if objLayer != nil { + storageInfo, _ := objLayer.LocalStorageInfo(GlobalContext) + props.State = string(madmin.ItemOnline) + props.Disks = storageInfo.Disks + } + + return props } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 1b3ae6a21..d563868e9 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -23,6 +23,16 @@ import ( "github.com/minio/minio/pkg/sync/errgroup" ) +func (er erasureObjects) getLocalDisks() (localDisks []StorageAPI) { + disks := er.getDisks() + for _, disk := range disks { + if disk != nil && disk.IsLocal() { + localDisks = append(localDisks, disk) + } + } + return localDisks +} + func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { disks := er.getDisks() // Based on the random shuffling return back randomized disks. diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 8d0dc1f7b..3a52e19d2 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -331,6 +331,35 @@ func (z *erasureServerPools) BackendInfo() (b BackendInfo) { return } +func (z *erasureServerPools) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { + var storageInfo StorageInfo + + storageInfos := make([]StorageInfo, len(z.serverPools)) + storageInfosErrs := make([][]error, len(z.serverPools)) + g := errgroup.WithNErrs(len(z.serverPools)) + for index := range z.serverPools { + index := index + g.Go(func() error { + storageInfos[index], storageInfosErrs[index] = z.serverPools[index].LocalStorageInfo(ctx) + return nil + }, index) + } + + // Wait for the go routines. + g.Wait() + + storageInfo.Backend = z.BackendInfo() + for _, lstorageInfo := range storageInfos { + storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...) + } + + var errs []error + for i := range z.serverPools { + errs = append(errs, storageInfosErrs[i]...) + } + return storageInfo, errs +} + func (z *erasureServerPools) StorageInfo(ctx context.Context) (StorageInfo, []error) { var storageInfo StorageInfo diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 71731a00c..ac164db04 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -601,6 +601,37 @@ func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) { return storageInfo, errs } +// StorageInfo - combines output of StorageInfo across all erasure coded object sets. +func (s *erasureSets) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { + var storageInfo StorageInfo + + storageInfos := make([]StorageInfo, len(s.sets)) + storageInfoErrs := make([][]error, len(s.sets)) + + g := errgroup.WithNErrs(len(s.sets)) + for index := range s.sets { + index := index + g.Go(func() error { + storageInfos[index], storageInfoErrs[index] = s.sets[index].LocalStorageInfo(ctx) + return nil + }, index) + } + + // Wait for the go routines. + g.Wait() + + for _, lstorageInfo := range storageInfos { + storageInfo.Disks = append(storageInfo.Disks, lstorageInfo.Disks...) + } + + var errs []error + for i := range s.sets { + errs = append(errs, storageInfoErrs[i]...) + } + + return storageInfo, errs +} + // Shutdown shutsdown all erasure coded sets in parallel // returns error upon first error. func (s *erasureSets) Shutdown(ctx context.Context) error { diff --git a/cmd/erasure.go b/cmd/erasure.go index 15e591516..b4ca287c6 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -229,6 +229,18 @@ func (er erasureObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) return getStorageInfo(disks, endpoints) } +// LocalStorageInfo - returns underlying local storage statistics. +func (er erasureObjects) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { + disks := er.getLocalDisks() + endpoints := make([]string, len(disks)) + for i, disk := range disks { + if disk != nil { + endpoints[i] = disk.String() + } + } + return getStorageInfo(disks, endpoints) +} + func (er erasureObjects) getOnlineDisksWithHealing() (newDisks []StorageAPI, healing bool) { var wg sync.WaitGroup disks := er.getDisks() diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index d300dfa47..dd6094834 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -206,6 +206,11 @@ func (fs *FSObjects) BackendInfo() BackendInfo { return BackendInfo{Type: BackendFS} } +// LocalStorageInfo - returns underlying storage statistics. +func (fs *FSObjects) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { + return fs.StorageInfo(ctx) +} + // StorageInfo - returns underlying storage statistics. func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) { atomic.AddInt64(&fs.activeIOCount, 1) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 340bc345f..ee30ad434 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -39,6 +39,13 @@ func (a GatewayUnsupported) BackendInfo() BackendInfo { return BackendInfo{Type: BackendGateway} } +// LocalStorageInfo returns the local disks information, mainly used +// in prometheus - for gateway this just a no-op +func (a GatewayUnsupported) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { + logger.CriticalIf(ctx, errors.New("not implemented")) + return StorageInfo{}, nil +} + // NSScanner - scanner is not implemented for gateway func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error { logger.CriticalIf(ctx, errors.New("not implemented")) diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 1419c89df..2ed894b24 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -232,6 +232,10 @@ func (n *hdfsObjects) Shutdown(ctx context.Context) error { return n.clnt.Close() } +func (n *hdfsObjects) LocalStorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) { + return n.StorageInfo(ctx) +} + func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) { fsInfo, err := n.clnt.StatFs() if err != nil { diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index a71e324a6..a3786df60 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -1023,7 +1023,7 @@ func getBucketUsageMetrics() MetricsGroup { initialize: func(ctx context.Context, metrics *MetricsGroup) { objLayer := newObjectLayerFn() // Service not initialized yet - if objLayer == nil || globalIsGateway { + if objLayer == nil { return } @@ -1031,7 +1031,7 @@ func getBucketUsageMetrics() MetricsGroup { return } - dataUsageInfo, err := loadDataUsageFromBackend(GlobalContext, objLayer) + dataUsageInfo, err := loadDataUsageFromBackend(ctx, objLayer) if err != nil { return } @@ -1040,8 +1040,8 @@ func getBucketUsageMetrics() MetricsGroup { if dataUsageInfo.LastUpdate.IsZero() { return } - for bucket, usage := range dataUsageInfo.BucketsUsage { + for bucket, usage := range dataUsageInfo.BucketsUsage { metrics.Metrics = append(metrics.Metrics, Metric{ Description: getBucketUsageTotalBytesMD(), Value: float64(usage.Size), @@ -1092,9 +1092,18 @@ func getLocalStorageMetrics() MetricsGroup { return MetricsGroup{ Metrics: []Metric{}, initialize: func(ctx context.Context, metrics *MetricsGroup) { - disks := getLocalDisks(globalEndpoints) - for _, disk := range disks { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil { + return + } + if globalIsGateway { + return + } + + storageInfo, _ := objLayer.LocalStorageInfo(ctx) + for _, disk := range storageInfo.Disks { metrics.Metrics = append(metrics.Metrics, Metric{ Description: getNodeDiskUsedBytesMD(), Value: float64(disk.UsedSpace), @@ -1120,15 +1129,18 @@ func getClusterStorageMetrics() MetricsGroup { return MetricsGroup{ Metrics: []Metric{}, initialize: func(ctx context.Context, metrics *MetricsGroup) { - objLayer := newObjectLayerFn() // Service not initialized yet if objLayer == nil { return } + if globalIsGateway { + return + } + // Fetch disk space info, ignore errors - storageInfo, _ := objLayer.StorageInfo(GlobalContext) + storageInfo, _ := objLayer.StorageInfo(ctx) onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) totalDisks := offlineDisks.Merge(onlineDisks) @@ -1142,15 +1154,14 @@ func getClusterStorageMetrics() MetricsGroup { Value: float64(GetTotalCapacityFree(storageInfo.Disks)), }) - s, _ := objLayer.StorageInfo(GlobalContext) metrics.Metrics = append(metrics.Metrics, Metric{ Description: getClusterCapacityUsageBytesMD(), - Value: GetTotalUsableCapacity(storageInfo.Disks, s), + Value: GetTotalUsableCapacity(storageInfo.Disks, storageInfo), }) metrics.Metrics = append(metrics.Metrics, Metric{ Description: getClusterCapacityUsageFreeBytesMD(), - Value: GetTotalUsableCapacityFree(storageInfo.Disks, s), + Value: GetTotalUsableCapacityFree(storageInfo.Disks, storageInfo), }) metrics.Metrics = append(metrics.Metrics, Metric{ diff --git a/cmd/metrics.go b/cmd/metrics.go index 7e51b20a4..1184005b6 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -521,6 +521,10 @@ func storageMetricsPrometheus(ch chan<- prometheus.Metric) { return } + if globalIsGateway { + return + } + server := getLocalServerProperty(globalEndpoints, &http.Request{ Host: GetLocalPeer(globalEndpoints), }) diff --git a/cmd/notification.go b/cmd/notification.go index 8ecd4933e..70d084bdf 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1231,9 +1231,9 @@ func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties { info, err := client.ServerInfo() if err != nil { info.Endpoint = client.host.String() - info.State = "offline" + info.State = string(madmin.ItemOffline) } else { - info.State = "ok" + info.State = string(madmin.ItemOnline) } reply[idx] = info }(client, i) @@ -1306,7 +1306,7 @@ func GetPeerOnlineCount() (nodesOnline, nodesOffline int) { nodesOffline = 0 servers := globalNotificationSys.ServerInfo() for _, s := range servers { - if s.State == "ok" { + if s.State == string(madmin.ItemOnline) { nodesOnline++ continue } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index b68515168..e9f2d182a 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -90,7 +90,8 @@ type ObjectLayer interface { NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error BackendInfo() BackendInfo - StorageInfo(ctx context.Context) (StorageInfo, []error) // local queries only local disks + StorageInfo(ctx context.Context) (StorageInfo, []error) + LocalStorageInfo(ctx context.Context) (StorageInfo, []error) // Bucket operations. MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index a0f9ce6be..0a3dd6f0d 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -295,7 +295,7 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { return p, err } // error is unsupported disk, turn-off directIO for reads - logger.Info(fmt.Sprintf("Drive %s does not support O_DIRECT for reads, proceeding to use the drive without O_DIRECT", ep)) + logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive %s does not support O_DIRECT for reads, proceeding to use the drive without O_DIRECT", ep), ep.String()) p.readODirectSupported = false } diff --git a/pkg/madmin/info-commands.go b/pkg/madmin/info-commands.go index 45debe06f..c7c8cf5c6 100644 --- a/pkg/madmin/info-commands.go +++ b/pkg/madmin/info-commands.go @@ -39,14 +39,17 @@ const ( // Add your own backend. ) -// ObjectLayerState - represents the status of the object layer -type ObjectLayerState string +// ItemState - represents the status of any item in offline,init,online state +type ItemState string const ( - // ObjectLayerInitializing indicates that the object layer is still in initialization phase - ObjectLayerInitializing = ObjectLayerState("initializing") - // ObjectLayerOnline indicates that the object layer is ready - ObjectLayerOnline = ObjectLayerState("online") + + // ItemOffline indicates that the item is offline + ItemOffline = ItemState("offline") + // ItemInitializing indicates that the item is still in initialization phase + ItemInitializing = ItemState("initializing") + // ItemOnline indicates that the item is online + ItemOnline = ItemState("online") ) // StorageInfo - represents total capacity of underlying storage. @@ -171,7 +174,7 @@ func (adm *AdminClient) DataUsageInfo(ctx context.Context) (DataUsageInfo, error // InfoMessage container to hold server admin related information. type InfoMessage struct { - Mode ObjectLayerState `json:"mode,omitempty"` + Mode string `json:"mode,omitempty"` Domain []string `json:"domain,omitempty"` Region string `json:"region,omitempty"` SQSARN []string `json:"sqsARN,omitempty"`