diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index bb1ee7e91..fb27dce84 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -33,7 +33,6 @@ import ( "github.com/minio/madmin-go/v3" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" - "github.com/minio/pkg/v2/env" ) //go:generate stringer -type=storageMetric -trimprefix=storageMetric $GOFILE @@ -90,14 +89,6 @@ type xlStorageDiskIDCheck struct { health *diskHealthTracker healthCheck bool - // driveStartChecking is a threshold above which we will start to check - // the state of disks, generally this value is less than driveMaxConcurrent - driveStartChecking int - - // driveMaxConcurrent represents maximum number of running concurrent - // operations for local and (incoming) remote disk operations. - driveMaxConcurrent int - metricsCache timedValue diskCtx context.Context diskCancel context.CancelFunc @@ -127,8 +118,6 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { } // Do not need this value to be cached. - diskMetric.TotalTokens = uint32(p.driveMaxConcurrent) - diskMetric.TotalWaiting = uint32(p.health.waiting.Load()) diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load() diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load() @@ -189,61 +178,11 @@ func (e *lockedLastMinuteLatency) total() AccElem { return e.lastMinuteLatency.getTotal() } -var maxConcurrentOnce sync.Once - func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDiskIDCheck { - // driveMaxConcurrent represents maximum number of running concurrent - // operations for local and (incoming) remote disk operations. - // - // this value is a placeholder it is overridden via ENV for custom settings - // or this default value is used to pick the correct value HDDs v/s NVMe's - driveMaxConcurrent := -1 - maxConcurrentOnce.Do(func() { - s := env.Get("_MINIO_DRIVE_MAX_CONCURRENT", "") - if s == "" { - s = env.Get("_MINIO_DISK_MAX_CONCURRENT", "") - } - if s != "" { - driveMaxConcurrent, _ = strconv.Atoi(s) - } - }) - - if driveMaxConcurrent <= 0 { - // nr_requests is for both READ and WRITE separately - // so we this 2x tokens on our end. - // - // https://www.kernel.org/doc/Documentation/block/queue-sysfs.txt - // - // nr_requests (RW) - // ---------------- - // This controls how many requests may be allocated in the block layer for - // read or write requests. Note that the total allocated number may be twice - // this amount, since it applies only to reads or writes (not the accumulated - // sum). - driveMaxConcurrent = int(storage.nrRequests) * 2 - if driveMaxConcurrent <= 0 { - driveMaxConcurrent = 1023 * 2 // Default value on Linux for most NVMe - } - if storage.rotational { - // use 80% of the available nr_requests on HDDs - driveMaxConcurrent = int(float64(storage.nrRequests)*0.8) * 2 - if driveMaxConcurrent < 32 { - driveMaxConcurrent = 32 - } - } - } - - driveStartChecking := 16 + driveMaxConcurrent/8 - if driveStartChecking > driveMaxConcurrent { - driveStartChecking = driveMaxConcurrent - } - xl := xlStorageDiskIDCheck{ - storage: storage, - health: newDiskHealthTracker(driveMaxConcurrent), - healthCheck: healthCheck && globalDriveMonitoring, - driveMaxConcurrent: driveMaxConcurrent, - driveStartChecking: driveStartChecking, + storage: storage, + health: newDiskHealthTracker(), + healthCheck: healthCheck && globalDriveMonitoring, } xl.totalWrites.Store(xl.storage.getWriteAttribute()) @@ -362,7 +301,6 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption if opts.NoOp { info.Metrics.TotalWrites = p.totalWrites.Load() info.Metrics.TotalDeletes = p.totalDeletes.Load() - info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent) info.Metrics.TotalWaiting = uint32(p.health.waiting.Load()) info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load() info.Metrics.TotalErrorsAvailability = p.totalErrsAvailability.Load() @@ -375,7 +313,6 @@ func (p *xlStorageDiskIDCheck) DiskInfo(ctx context.Context, opts DiskInfoOption } info.Metrics.TotalWrites = p.totalWrites.Load() info.Metrics.TotalDeletes = p.totalDeletes.Load() - info.Metrics.TotalTokens = uint32(p.driveMaxConcurrent) info.Metrics.TotalWaiting = uint32(p.health.waiting.Load()) info.Metrics.TotalErrorsTimeout = p.totalErrsTimeout.Load() info.Metrics.TotalErrorsAvailability = p.totalErrsAvailability.Load() @@ -847,24 +784,17 @@ type diskHealthTracker struct { // Atomic status of disk. status atomic.Int32 - // Atomic number of requests waiting for a token. + // Atomic number indicates if a disk is hung waiting atomic.Int32 - - // Concurrency tokens. - tokens chan struct{} } // newDiskHealthTracker creates a new disk health tracker. -func newDiskHealthTracker(driveMaxConcurrent int) *diskHealthTracker { +func newDiskHealthTracker() *diskHealthTracker { d := diskHealthTracker{ lastSuccess: time.Now().UnixNano(), lastStarted: time.Now().UnixNano(), - tokens: make(chan struct{}, driveMaxConcurrent), } d.status.Store(diskHealthOK) - for i := 0; i < driveMaxConcurrent; i++ { - d.tokens <- struct{}{} - } return &d } @@ -906,9 +836,8 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet return ctx, done, ctx.Err() } - // Return early if disk is faulty already. - if err := p.checkHealth(ctx); err != nil { - return ctx, done, err + if p.health.status.Load() != diskHealthOK { + return ctx, done, errFaultyDisk } // Verify if the disk is not stale @@ -924,27 +853,19 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet return ctx, done, nil } - select { - case <-ctx.Done(): + if contextCanceled(ctx) { return ctx, done, ctx.Err() - case <-p.health.tokens: - // Fast path, got token. - default: - // We ran out of tokens, check health before blocking. - err = p.waitForToken(ctx) - if err != nil { - return ctx, done, err - } } - // We only progress here if we got a token. atomic.StoreInt64(&p.health.lastStarted, time.Now().UnixNano()) + p.health.waiting.Add(1) + ctx = context.WithValue(ctx, healthDiskCtxKey{}, &healthDiskCtxValue{lastSuccess: &p.health.lastSuccess}) si := p.updateStorageMetrics(s, paths...) var once sync.Once return ctx, func(errp *error) { + p.health.waiting.Add(-1) once.Do(func() { - p.health.tokens <- struct{}{} if errp != nil { err := *errp if err == nil || errors.Is(err, io.EOF) { @@ -956,71 +877,6 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet }, nil } -// waitForToken will wait for a token, while periodically -// checking the disk status. -// If nil is returned a token was picked up. -func (p *xlStorageDiskIDCheck) waitForToken(ctx context.Context) (err error) { - p.health.waiting.Add(1) - defer p.health.waiting.Add(-1) - - // Avoid stampeding herd... - ticker := time.NewTicker(5*time.Second + time.Duration(rand.Int63n(int64(5*time.Second)))) - defer ticker.Stop() - for { - err = p.checkHealth(ctx) - if err != nil { - return err - } - select { - case <-ticker.C: - // Ticker expired, check health again. - case <-ctx.Done(): - return ctx.Err() - case <-p.health.tokens: - return nil - } - } -} - -// checkHealth should only be called when tokens have run out. -// This will check if disk should be taken offline. -func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) { - if p.health.status.Load() == diskHealthFaulty { - return errFaultyDisk - } - // Check if there are tokens. - if p.driveMaxConcurrent-len(p.health.tokens) < p.driveStartChecking { - return nil - } - - const maxTimeSinceLastSuccess = 30 * time.Second - const minTimeSinceLastOpStarted = 15 * time.Second - - // To avoid stampeding herd (100s of simultaneous starting requests) - // there must be a delay between the last started request and now - // for the last lastSuccess to be useful. - t := time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastStarted))) - if t < minTimeSinceLastOpStarted { - return nil - } - - // If also more than 15 seconds since last success, take disk offline. - t = time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) - if t > maxTimeSinceLastSuccess { - if p.health.status.CompareAndSwap(diskHealthOK, diskHealthFaulty) { - logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond))) - p.health.waiting.Add(1) - go p.monitorDiskStatus(0, mustGetUUID()) - } - return errFaultyDisk - } - return nil -} - -// Make sure we do not write O_DIRECT aligned I/O because WrIteAll() ends -// up using O_DIRECT codepath which internally utilizes p.health.tokens -// we need to avoid using incoming I/O tokens as part of the healthcheck -// monitoring I/O. var toWrite = []byte{2048: 42} // monitorDiskStatus should be called once when a drive has been marked offline. @@ -1034,11 +890,6 @@ func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string) return } - if len(p.health.tokens) == 0 { - // Queue is still full, no need to check. - continue - } - err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, toWrite) if err != nil { continue diff --git a/internal/config/drive/drive.go b/internal/config/drive/drive.go index 1fefe3997..431086a95 100644 --- a/internal/config/drive/drive.go +++ b/internal/config/drive/drive.go @@ -73,7 +73,7 @@ func LookupConfig(kvs config.KVS) (cfg *Config, err error) { } dur, _ := time.ParseDuration(d) if dur < time.Second { - cfg.MaxTimeout = time.Minute * 2 + cfg.MaxTimeout = 30 * time.Second } else { cfg.MaxTimeout = getMaxTimeout(dur) } @@ -89,7 +89,7 @@ func getMaxTimeout(t time.Duration) time.Duration { } dur, _ := time.ParseDuration(d) if dur < time.Second { - return time.Minute * 2 + return 30 * time.Second } return dur }