Check for progress before we reach the limit (#17552)

This commit is contained in:
Klaus Post 2023-07-07 00:13:57 -07:00 committed by GitHub
parent 66bea3942a
commit e20aab25ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -629,6 +629,10 @@ const (
// for local and (incoming) remote disk ops respectively.
var diskMaxConcurrent = 512
// diskStartChecking is a threshold above which we will start to check
// the state of disks.
var diskStartChecking = 32
func init() {
s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "512")
diskMaxConcurrent, _ = strconv.Atoi(s)
@ -636,6 +640,10 @@ func init() {
logger.Info("invalid _MINIO_DISK_MAX_CONCURRENT value: %s, defaulting to '512'", s)
diskMaxConcurrent = 512
}
diskStartChecking = 16 + diskMaxConcurrent/8
if diskStartChecking > diskMaxConcurrent {
diskStartChecking = diskMaxConcurrent
}
}
type diskHealthTracker struct {
@ -708,8 +716,8 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet
}
// Return early if disk is faulty already.
if atomic.LoadInt32(&p.health.status) == diskHealthFaulty {
return ctx, done, errFaultyDisk
if err := p.checkHealth(ctx); err != nil {
return ctx, done, err
}
// Verify if the disk is not stale
@ -792,7 +800,7 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) {
return errFaultyDisk
}
// Check if there are tokens.
if len(p.health.tokens) > 0 {
if diskMaxConcurrent-len(p.health.tokens) < diskStartChecking {
return nil
}