Introduce disk io stats metrics (#15512)

This commit is contained in:
Anis Elleuch
2022-08-16 15:13:49 +01:00
committed by GitHub
parent 9c025b8cce
commit 5682685c80
20 changed files with 404 additions and 47 deletions

View File

@@ -398,8 +398,21 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
} else {
types = madmin.MetricsAll
}
disks := strings.Split(r.Form.Get("disks"), ",")
byDisk := strings.EqualFold(r.Form.Get("by-disk"), "true")
var diskMap map[string]struct{}
if len(disks) > 0 && disks[0] != "" {
diskMap = make(map[string]struct{}, len(disks))
for _, k := range disks {
if k != "" {
diskMap[k] = struct{}{}
}
}
}
hosts := strings.Split(r.Form.Get("hosts"), ",")
byhost := strings.EqualFold(r.Form.Get("by-host"), "true")
byHost := strings.EqualFold(r.Form.Get("by-host"), "true")
var hostMap map[string]struct{}
if len(hosts) > 0 && hosts[0] != "" {
hostMap = make(map[string]struct{}, len(hosts))
@@ -409,6 +422,7 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
}
}
}
done := ctx.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
@@ -416,17 +430,20 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request)
for n > 0 {
var m madmin.RealtimeMetrics
mLocal := collectLocalMetrics(types, hostMap)
mLocal := collectLocalMetrics(types, hostMap, diskMap)
m.Merge(&mLocal)
// Allow half the interval for collecting remote...
cctx, cancel := context.WithTimeout(ctx, interval/2)
mRemote := collectRemoteMetrics(cctx, types, hostMap)
mRemote := collectRemoteMetrics(cctx, types, hostMap, diskMap)
cancel()
m.Merge(&mRemote)
if !byhost {
if !byHost {
m.ByHost = nil
}
if !byDisk {
m.ByDisk = nil
}
m.Final = n <= 1
// Marshal API response

View File

@@ -201,8 +201,11 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint) (disksInfo []madmin.
UsedSpace: info.Used,
AvailableSpace: info.Free,
UUID: info.ID,
Major: info.Major,
Minor: info.Minor,
RootDisk: info.RootDisk,
Healing: info.Healing,
Scanning: info.Scanning,
State: diskErrToDriveState(err),
FreeInodes: info.FreeInodes,
}

View File

@@ -22,24 +22,36 @@ import (
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/disk"
)
func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}) (m madmin.RealtimeMetrics) {
func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) (m madmin.RealtimeMetrics) {
if types == madmin.MetricsNone {
return
}
if len(hosts) > 0 {
if _, ok := hosts[globalMinioAddr]; !ok {
return
}
}
if types.Contains(madmin.MetricsDisk) && !globalIsGateway {
m.ByDisk = make(map[string]madmin.DiskMetric)
aggr := madmin.DiskMetric{
CollectedAt: time.Now(),
}
for name, disk := range collectLocalDisksMetrics(disks) {
m.ByDisk[name] = disk
aggr.Merge(&disk)
}
m.Aggregated.Disk = &aggr
}
if types.Contains(madmin.MetricsScanner) {
metrics := globalScannerMetrics.report()
m.Aggregated.Scanner = &metrics
}
if types.Contains(madmin.MetricsDisk) && !globalIsGateway {
m.Aggregated.Disk = collectDiskMetrics()
}
if types.Contains(madmin.MetricsOS) {
metrics := globalOSMetrics.report()
m.Aggregated.OS = &metrics
@@ -53,55 +65,89 @@ func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}) (m
return m
}
func collectDiskMetrics() *madmin.DiskMetric {
func collectLocalDisksMetrics(disks map[string]struct{}) map[string]madmin.DiskMetric {
objLayer := newObjectLayerFn()
disks := madmin.DiskMetric{
CollectedAt: time.Now(),
}
if objLayer == nil {
return nil
}
metrics := make(map[string]madmin.DiskMetric)
procStats, procErr := disk.GetAllDrivesIOStats()
if procErr != nil {
return metrics
}
// only need Disks information in server mode.
storageInfo, errs := objLayer.LocalStorageInfo(GlobalContext)
for _, err := range errs {
if err != nil {
disks.Merge(&madmin.DiskMetric{NDisks: 1, Offline: 1})
for i, d := range storageInfo.Disks {
if len(disks) != 0 {
_, ok := disks[d.Endpoint]
if !ok {
continue
}
}
}
for i, disk := range storageInfo.Disks {
if errs[i] != nil {
metrics[d.Endpoint] = madmin.DiskMetric{NDisks: 1, Offline: 1}
continue
}
var d madmin.DiskMetric
d.NDisks = 1
if disk.Healing {
d.Healing++
var dm madmin.DiskMetric
dm.NDisks = 1
if d.Healing {
dm.Healing++
}
if disk.Metrics != nil {
d.LifeTimeOps = make(map[string]uint64, len(disk.Metrics.APICalls))
for k, v := range disk.Metrics.APICalls {
if d.Metrics != nil {
dm.LifeTimeOps = make(map[string]uint64, len(d.Metrics.APICalls))
for k, v := range d.Metrics.APICalls {
if v != 0 {
d.LifeTimeOps[k] = v
dm.LifeTimeOps[k] = v
}
}
d.LastMinute.Operations = make(map[string]madmin.TimedAction, len(disk.Metrics.APICalls))
for k, v := range disk.Metrics.LastMinute {
dm.LastMinute.Operations = make(map[string]madmin.TimedAction, len(d.Metrics.APICalls))
for k, v := range d.Metrics.LastMinute {
if v.Count != 0 {
d.LastMinute.Operations[k] = v
dm.LastMinute.Operations[k] = v
}
}
}
disks.Merge(&d)
// get disk
if procErr == nil {
st := procStats[disk.DevID{Major: d.Major, Minor: d.Minor}]
dm.IOStats = madmin.DiskIOStats{
ReadIOs: st.ReadIOs,
ReadMerges: st.ReadMerges,
ReadSectors: st.ReadSectors,
ReadTicks: st.ReadTicks,
WriteIOs: st.WriteIOs,
WriteMerges: st.WriteMerges,
WriteSectors: st.WriteSectors,
WriteTicks: st.WriteTicks,
CurrentIOs: st.CurrentIOs,
TotalTicks: st.TotalTicks,
ReqTicks: st.ReqTicks,
DiscardIOs: st.DiscardIOs,
DiscardMerges: st.DiscardMerges,
DiscardSectors: st.DiscardSectors,
DiscardTicks: st.DiscardTicks,
FlushIOs: st.FlushIOs,
FlushTicks: st.FlushTicks,
}
}
metrics[d.Endpoint] = dm
}
return &disks
return metrics
}
func collectRemoteMetrics(ctx context.Context, types madmin.MetricType, hosts map[string]struct{}) (m madmin.RealtimeMetrics) {
func collectRemoteMetrics(ctx context.Context, types madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) (m madmin.RealtimeMetrics) {
if !globalIsDistErasure {
return
}
all := globalNotificationSys.GetMetrics(ctx, types, hosts)
all := globalNotificationSys.GetMetrics(ctx, types, hosts, disks)
for _, remote := range all {
m.Merge(&remote)
}

View File

@@ -904,7 +904,7 @@ func (sys *NotificationSys) GetOSInfo(ctx context.Context) []madmin.OSInfo {
}
// GetMetrics - Get metrics from all peers.
func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType, hosts map[string]struct{}) []madmin.RealtimeMetrics {
func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType, hosts map[string]struct{}, disks map[string]struct{}) []madmin.RealtimeMetrics {
reply := make([]madmin.RealtimeMetrics, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
@@ -922,7 +922,7 @@ func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType,
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].GetMetrics(ctx, t)
reply[index], err = sys.peerClients[index].GetMetrics(ctx, t, disks)
return err
}, index)
}

View File

@@ -195,9 +195,12 @@ func (client *peerRESTClient) GetMemInfo(ctx context.Context) (info madmin.MemIn
}
// GetMetrics - fetch metrics from a remote node.
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType) (info madmin.RealtimeMetrics, err error) {
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType, diskMap map[string]struct{}) (info madmin.RealtimeMetrics, err error) {
values := make(url.Values)
values.Set(peerRESTTypes, strconv.FormatUint(uint64(t), 10))
for disk := range diskMap {
values.Set(peerRESTDisk, disk)
}
respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1)
if err != nil {
return

View File

@@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v24" // Change ServerUpdate to DownloadBinary and CommitBinary
peerRESTVersion = "v25" // Update /metrics
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@@ -91,6 +91,7 @@ const (
peerRESTDuration = "duration"
peerRESTStorageClass = "storage-class"
peerRESTTypes = "types"
peerRESTDisk = "disk"
peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix"

View File

@@ -426,10 +426,17 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques
types = madmin.MetricsAll
}
diskMap := make(map[string]struct{})
if r.Form != nil {
for _, disk := range r.Form[peerRESTDisk] {
diskMap[disk] = struct{}{}
}
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := collectLocalMetrics(types, nil)
info := collectLocalMetrics(types, nil, diskMap)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}

View File

@@ -40,9 +40,12 @@ type DiskInfo struct {
Used uint64
UsedInodes uint64
FreeInodes uint64
Major uint32
Minor uint32
FSType string
RootDisk bool
Healing bool
Scanning bool
Endpoint string
MountPath string
ID string

View File

@@ -14,8 +14,8 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 13 {
err = msgp.ArrayError{Wanted: 13, Got: zb0001}
if zb0001 != 16 {
err = msgp.ArrayError{Wanted: 16, Got: zb0001}
return
}
z.Total, err = dc.ReadUint64()
@@ -43,6 +43,16 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "FreeInodes")
return
}
z.Major, err = dc.ReadUint32()
if err != nil {
err = msgp.WrapError(err, "Major")
return
}
z.Minor, err = dc.ReadUint32()
if err != nil {
err = msgp.WrapError(err, "Minor")
return
}
z.FSType, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "FSType")
@@ -58,6 +68,11 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Healing")
return
}
z.Scanning, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Scanning")
return
}
z.Endpoint, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Endpoint")
@@ -88,8 +103,8 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 13
err = en.Append(0x9d)
// array header, size 16
err = en.Append(0xdc, 0x0, 0x10)
if err != nil {
return
}
@@ -118,6 +133,16 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "FreeInodes")
return
}
err = en.WriteUint32(z.Major)
if err != nil {
err = msgp.WrapError(err, "Major")
return
}
err = en.WriteUint32(z.Minor)
if err != nil {
err = msgp.WrapError(err, "Minor")
return
}
err = en.WriteString(z.FSType)
if err != nil {
err = msgp.WrapError(err, "FSType")
@@ -133,6 +158,11 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Healing")
return
}
err = en.WriteBool(z.Scanning)
if err != nil {
err = msgp.WrapError(err, "Scanning")
return
}
err = en.WriteString(z.Endpoint)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
@@ -164,16 +194,19 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 13
o = append(o, 0x9d)
// array header, size 16
o = append(o, 0xdc, 0x0, 0x10)
o = msgp.AppendUint64(o, z.Total)
o = msgp.AppendUint64(o, z.Free)
o = msgp.AppendUint64(o, z.Used)
o = msgp.AppendUint64(o, z.UsedInodes)
o = msgp.AppendUint64(o, z.FreeInodes)
o = msgp.AppendUint32(o, z.Major)
o = msgp.AppendUint32(o, z.Minor)
o = msgp.AppendString(o, z.FSType)
o = msgp.AppendBool(o, z.RootDisk)
o = msgp.AppendBool(o, z.Healing)
o = msgp.AppendBool(o, z.Scanning)
o = msgp.AppendString(o, z.Endpoint)
o = msgp.AppendString(o, z.MountPath)
o = msgp.AppendString(o, z.ID)
@@ -194,8 +227,8 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err)
return
}
if zb0001 != 13 {
err = msgp.ArrayError{Wanted: 13, Got: zb0001}
if zb0001 != 16 {
err = msgp.ArrayError{Wanted: 16, Got: zb0001}
return
}
z.Total, bts, err = msgp.ReadUint64Bytes(bts)
@@ -223,6 +256,16 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "FreeInodes")
return
}
z.Major, bts, err = msgp.ReadUint32Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Major")
return
}
z.Minor, bts, err = msgp.ReadUint32Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Minor")
return
}
z.FSType, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "FSType")
@@ -238,6 +281,11 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Healing")
return
}
z.Scanning, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Scanning")
return
}
z.Endpoint, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
@@ -269,7 +317,7 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *DiskInfo) Msgsize() (s int) {
s = 1 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.StringPrefixSize + len(z.FSType) + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.Endpoint) + msgp.StringPrefixSize + len(z.MountPath) + msgp.StringPrefixSize + len(z.ID) + z.Metrics.Msgsize() + msgp.StringPrefixSize + len(z.Error)
s = 3 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint32Size + msgp.Uint32Size + msgp.StringPrefixSize + len(z.FSType) + msgp.BoolSize + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.Endpoint) + msgp.StringPrefixSize + len(z.MountPath) + msgp.StringPrefixSize + len(z.ID) + z.Metrics.Msgsize() + msgp.StringPrefixSize + len(z.Error)
return
}

View File

@@ -32,6 +32,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@@ -101,6 +102,9 @@ type xlStorage struct {
// Indexes, will be -1 until assigned a set.
poolIndex, setIndex, diskIndex int
// Indicate of NSScanner is in progress in this disk
scanning int32
formatFileInfo os.FileInfo
formatLegacy bool
formatLastCheck time.Time
@@ -423,6 +427,9 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte,
}
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
atomic.AddInt32(&s.scanning, 1)
defer atomic.AddInt32(&s.scanning, -1)
// Updates must be closed before we return.
defer close(updates)
var lc *lifecycle.Lifecycle
@@ -576,6 +583,8 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
if err != nil {
return dcinfo, err
}
dcinfo.Major = di.Major
dcinfo.Minor = di.Minor
dcinfo.Total = di.Total
dcinfo.Free = di.Free
dcinfo.Used = di.Used
@@ -587,6 +596,7 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
// - if we found an unformatted disk (no 'format.json')
// - if we found healing tracker 'healing.bin'
dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
dcinfo.Scanning = atomic.LoadInt32(&s.scanning) == 1
dcinfo.ID = diskID
return dcinfo, err
}