From 7c0673279bbe3a21d173caf68f892d2ddbd83c42 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 18 Jan 2024 11:17:43 -0800 Subject: [PATCH] capture I/O in waiting and total tokens in diskMetrics (#18819) This is needed for the subsequent changes in ServerUpdate(), ServerRestart() etc. --- cmd/storage-datatypes.go | 2 + cmd/storage-datatypes_gen.go | 60 +++++++++++++++++++-- cmd/storage-rest-common.go | 2 +- cmd/xl-storage-disk-id-check.go | 93 +++++++++++++++++---------------- 4 files changed, 106 insertions(+), 51 deletions(-) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 0b8f1b4a8..a4a960f77 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -73,6 +73,8 @@ type DiskInfo struct { type DiskMetrics struct { LastMinute map[string]AccElem `json:"apiLatencies,omitempty"` APICalls map[string]uint64 `json:"apiCalls,omitempty"` + TotalTokens uint32 `json:"totalTokens,omitempty"` + TotalWaiting uint32 `json:"totalWaiting,omitempty"` TotalErrorsAvailability uint64 `json:"totalErrsAvailability"` TotalErrorsTimeout uint64 `json:"totalErrsTimeout"` TotalWrites uint64 `json:"totalWrites"` diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index b36786b98..726facb2c 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1327,6 +1327,18 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { } z.APICalls[za0003] = za0004 } + case "TotalTokens": + z.TotalTokens, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "TotalTokens") + return + } + case "TotalWaiting": + z.TotalWaiting, err = dc.ReadUint32() + if err != nil { + err = msgp.WrapError(err, "TotalWaiting") + return + } case "TotalErrorsAvailability": z.TotalErrorsAvailability, err = dc.ReadUint64() if err != nil { @@ -1364,9 +1376,9 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 8 // write "LastMinute" - err = en.Append(0x86, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) + err = en.Append(0x88, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) if err != nil { return } @@ -1409,6 +1421,26 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { return } } + // write "TotalTokens" + err = en.Append(0xab, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteUint32(z.TotalTokens) + if err != nil { + err = msgp.WrapError(err, "TotalTokens") + return + } + // write "TotalWaiting" + err = en.Append(0xac, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x57, 0x61, 0x69, 0x74, 0x69, 0x6e, 0x67) + if err != nil { + return + } + err = en.WriteUint32(z.TotalWaiting) + if err != nil { + err = msgp.WrapError(err, "TotalWaiting") + return + } // write "TotalErrorsAvailability" err = en.Append(0xb7, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79) if err != nil { @@ -1455,9 +1487,9 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 8 // string "LastMinute" - o = append(o, 0x86, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) + o = append(o, 0x88, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65) o = msgp.AppendMapHeader(o, uint32(len(z.LastMinute))) for za0001, za0002 := range z.LastMinute { o = msgp.AppendString(o, za0001) @@ -1474,6 +1506,12 @@ func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendString(o, za0003) o = msgp.AppendUint64(o, za0004) } + // string "TotalTokens" + o = append(o, 0xab, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x73) + o = msgp.AppendUint32(o, z.TotalTokens) + // string "TotalWaiting" + o = append(o, 0xac, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x57, 0x61, 0x69, 0x74, 0x69, 0x6e, 0x67) + o = msgp.AppendUint32(o, z.TotalWaiting) // string "TotalErrorsAvailability" o = append(o, 0xb7, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x41, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79) o = msgp.AppendUint64(o, z.TotalErrorsAvailability) @@ -1567,6 +1605,18 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { } z.APICalls[za0003] = za0004 } + case "TotalTokens": + z.TotalTokens, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotalTokens") + return + } + case "TotalWaiting": + z.TotalWaiting, bts, err = msgp.ReadUint32Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotalWaiting") + return + } case "TotalErrorsAvailability": z.TotalErrorsAvailability, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { @@ -1619,7 +1669,7 @@ func (z *DiskMetrics) Msgsize() (s int) { s += msgp.StringPrefixSize + len(za0003) + msgp.Uint64Size } } - s += 24 + msgp.Uint64Size + 19 + msgp.Uint64Size + 12 + msgp.Uint64Size + 13 + msgp.Uint64Size + s += 12 + msgp.Uint32Size + 13 + msgp.Uint32Size + 24 + msgp.Uint64Size + 19 + msgp.Uint64Size + 12 + msgp.Uint64Size + 13 + msgp.Uint64Size return } diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index f6e773387..e47917a8e 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,7 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v53" // Remove deprecated APIs + storageRESTVersion = "v54" // Add more metrics per drive storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 40dc8defa..d26af3483 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2024 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -90,13 +90,13 @@ type xlStorageDiskIDCheck struct { storage *xlStorage health *diskHealthTracker - // diskStartChecking is a threshold above which we will start to check - // the state of disks, generally this value is less than diskMaxConcurrent - diskStartChecking int + // driveStartChecking is a threshold above which we will start to check + // the state of disks, generally this value is less than driveMaxConcurrent + driveStartChecking int - // diskMaxConcurrent represents maximum number of running concurrent + // driveMaxConcurrent represents maximum number of running concurrent // operations for local and (incoming) remote disk operations. - diskMaxConcurrent int + driveMaxConcurrent int metricsCache timedValue diskCtx context.Context @@ -127,8 +127,11 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { } // Do not need this value to be cached. + diskMetric.TotalTokens = uint32(p.driveMaxConcurrent) + diskMetric.TotalWaiting = uint32(p.health.waiting.Load()) diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load() diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load() + return diskMetric } @@ -189,42 +192,42 @@ func (e *lockedLastMinuteLatency) total() AccElem { var maxConcurrentOnce sync.Once func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDiskIDCheck { - // diskMaxConcurrent represents maximum number of running concurrent + // driveMaxConcurrent represents maximum number of running concurrent // operations for local and (incoming) remote disk operations. // // this value is a placeholder it is overridden via ENV for custom settings // or this default value is used to pick the correct value HDDs v/s NVMe's - diskMaxConcurrent := -1 + driveMaxConcurrent := -1 maxConcurrentOnce.Do(func() { s := env.Get("_MINIO_DRIVE_MAX_CONCURRENT", "") if s == "" { s = env.Get("_MINIO_DISK_MAX_CONCURRENT", "") } if s != "" { - diskMaxConcurrent, _ = strconv.Atoi(s) + driveMaxConcurrent, _ = strconv.Atoi(s) } }) - if diskMaxConcurrent <= 0 { - diskMaxConcurrent = 512 + if driveMaxConcurrent <= 0 { + driveMaxConcurrent = 512 if storage.rotational { - diskMaxConcurrent = int(storage.nrRequests) / 2 - if diskMaxConcurrent < 32 { - diskMaxConcurrent = 32 + driveMaxConcurrent = int(storage.nrRequests) / 2 + if driveMaxConcurrent < 32 { + driveMaxConcurrent = 32 } } } - diskStartChecking := 16 + diskMaxConcurrent/8 - if diskStartChecking > diskMaxConcurrent { - diskStartChecking = diskMaxConcurrent + driveStartChecking := 16 + driveMaxConcurrent/8 + if driveStartChecking > driveMaxConcurrent { + driveStartChecking = driveMaxConcurrent } xl := xlStorageDiskIDCheck{ - storage: storage, - health: newDiskHealthTracker(diskMaxConcurrent), - diskMaxConcurrent: diskMaxConcurrent, - diskStartChecking: diskStartChecking, + storage: storage, + health: newDiskHealthTracker(driveMaxConcurrent), + driveMaxConcurrent: driveMaxConcurrent, + driveStartChecking: driveStartChecking, } if driveQuorum { @@ -347,6 +350,8 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, metrics bool) (info info.Metrics.TotalWrites = p.totalWrites.Load() info.Metrics.TotalDeletes = p.totalDeletes.Load() } + info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent) + info.Metrics.TotalWaiting = uint32(p.health.waiting.Load()) info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load() info.Metrics.TotalErrorsAvailability = p.totalErrsAvailability.Load() }() @@ -842,7 +847,7 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st } const ( - diskHealthOK = iota + diskHealthOK int32 = iota diskHealthFaulty ) @@ -867,24 +872,24 @@ type diskHealthTracker struct { lastStarted int64 // Atomic status of disk. - status int32 + status atomic.Int32 - // Atomic number of requests blocking for a token. - blocked int32 + // Atomic number of requests waiting for a token. + waiting atomic.Int32 // Concurrency tokens. tokens chan struct{} } // newDiskHealthTracker creates a new disk health tracker. -func newDiskHealthTracker(diskMaxConcurrent int) *diskHealthTracker { +func newDiskHealthTracker(driveMaxConcurrent int) *diskHealthTracker { d := diskHealthTracker{ lastSuccess: time.Now().UnixNano(), lastStarted: time.Now().UnixNano(), - status: diskHealthOK, - tokens: make(chan struct{}, diskMaxConcurrent), + tokens: make(chan struct{}, driveMaxConcurrent), } - for i := 0; i < diskMaxConcurrent; i++ { + d.status.Store(diskHealthOK) + for i := 0; i < driveMaxConcurrent; i++ { d.tokens <- struct{}{} } return &d @@ -896,7 +901,7 @@ func (d *diskHealthTracker) logSuccess() { } func (d *diskHealthTracker) isFaulty() bool { - return atomic.LoadInt32(&d.status) == diskHealthFaulty + return d.status.Load() == diskHealthFaulty } type ( @@ -982,10 +987,9 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet // checking the disk status. // If nil is returned a token was picked up. func (p *xlStorageDiskIDCheck) waitForToken(ctx context.Context) (err error) { - atomic.AddInt32(&p.health.blocked, 1) - defer func() { - atomic.AddInt32(&p.health.blocked, -1) - }() + p.health.waiting.Add(1) + defer p.health.waiting.Add(-1) + // Avoid stampeding herd... ticker := time.NewTicker(5*time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) defer ticker.Stop() @@ -1008,11 +1012,11 @@ func (p *xlStorageDiskIDCheck) waitForToken(ctx context.Context) (err error) { // checkHealth should only be called when tokens have run out. // This will check if disk should be taken offline. func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { - if atomic.LoadInt32(&p.health.status) == diskHealthFaulty { + if p.health.status.Load() == diskHealthFaulty { return errFaultyDisk } // Check if there are tokens. - if p.diskMaxConcurrent-len(p.health.tokens) < p.diskStartChecking { + if p.driveMaxConcurrent-len(p.health.tokens) < p.driveStartChecking { return nil } @@ -1030,8 +1034,9 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { // If also more than 15 seconds since last success, take disk offline. t = time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) if t > maxTimeSinceLastSuccess { - if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { + if p.health.status.CompareAndSwap(diskHealthOK, diskHealthFaulty) { logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond))) + p.health.waiting.Add(1) go p.monitorDiskStatus(0, mustGetUUID()) } return errFaultyDisk @@ -1077,12 +1082,9 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string) }) if err == nil { - t := time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess)) - if spent > 0 { - t = t.Add(spent) - } - logger.Info("node(%s): Read/Write/Delete successful, bringing drive %s online. Drive was offline for %s.", globalLocalNodeName, p.storage.String(), time.Since(t)) - atomic.StoreInt32(&p.health.status, diskHealthOK) + logger.Info("node(%s): Read/Write/Delete successful, bringing drive %s online", globalLocalNodeName, p.storage.String()) + p.health.status.Store(diskHealthOK) + p.health.waiting.Add(-1) return } } @@ -1128,7 +1130,7 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { return false } - if atomic.LoadInt32(&p.health.status) != diskHealthOK { + if p.health.status.Load() != diskHealthOK { return true } @@ -1138,8 +1140,9 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { } goOffline := func(err error, spent time.Duration) { - if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { + if p.health.status.CompareAndSwap(diskHealthOK, diskHealthFaulty) { logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err)) + p.health.waiting.Add(1) go p.monitorDiskStatus(spent, fn) } }