avoid disk monitoring leaks under various conditions (#18777)

- HealFormat() was leaking healthcheck goroutines for
  disks, we are only interested in enabling healthcheck
  for the newly formatted disk, not for existing disks.

- When disk is a root-disk a random disk monitor was
  leaking while we ignored the drive.

- When loading the disk for each erasure set, we were
  leaking goroutines for the prepare-storage.go disks
  which were replaced via the globalLocalDrives slice

- avoid disk monitoring utilizing health tokens that
  would cause exhaustion in the tokens, prematurely
  which were meant for incoming I/O. This is ensured
  by avoiding writing O_DIRECT aligned buffer instead
  write 2048 worth of content only as O_DSYNC, which is
  sufficient.
This commit is contained in:
Harshavardhana 2024-01-12 01:48:36 -08:00 committed by GitHub
parent ac90a873eb
commit e5c8794b8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 23 deletions

View File

@ -128,9 +128,11 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
if errors.Is(err, errUnformattedDisk) { if errors.Is(err, errUnformattedDisk) {
info, derr := disk.DiskInfo(context.TODO(), false) info, derr := disk.DiskInfo(context.TODO(), false)
if derr != nil && info.RootDisk { if derr != nil && info.RootDisk {
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s is a root drive", disk) return nil, nil, fmt.Errorf("Drive: %s is a root drive", disk)
} }
} }
disk.Close()
return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
} }
@ -413,6 +415,7 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
globalLocalDrivesMu.RUnlock() globalLocalDrivesMu.RUnlock()
continue continue
} }
disk.Close()
disk = ldisk disk = ldisk
globalLocalDrivesMu.RUnlock() globalLocalDrivesMu.RUnlock()
} }
@ -1053,6 +1056,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a // We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a
// defective drive we should not heal a path on the root disk. // defective drive we should not heal a path on the root disk.
logger.LogIf(GlobalContext, fmt.Errorf("Drive `%s` is part of root drive, will not be used", storageDisks[i])) logger.LogIf(GlobalContext, fmt.Errorf("Drive `%s` is part of root drive, will not be used", storageDisks[i]))
storageDisks[i].Close()
storageDisks[i] = nil storageDisks[i] = nil
} }
} }
@ -1062,7 +1066,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{ storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{
cleanUp: true, cleanUp: true,
healthCheck: true, healthCheck: false,
}) })
defer func(storageDisks []StorageAPI) { defer func(storageDisks []StorageAPI) {
@ -1132,6 +1136,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
} }
if err := saveFormatErasure(storageDisks[index], format, formatOpID); err != nil { if err := saveFormatErasure(storageDisks[index], format, formatOpID); err != nil {
logger.LogIf(ctx, fmt.Errorf("Drive %s failed to write updated 'format.json': %v", storageDisks[index], err)) logger.LogIf(ctx, fmt.Errorf("Drive %s failed to write updated 'format.json': %v", storageDisks[index], err))
storageDisks[index].Close()
tmpNewFormats[index] = nil // this disk failed to write new format tmpNewFormats[index] = nil // this disk failed to write new format
} }
} }
@ -1154,17 +1159,27 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
} }
if disk := storageDisks[index]; disk != nil { if disk := storageDisks[index]; disk != nil {
if disk.IsLocal() {
disk.SetDiskLoc(s.poolIndex, m, n) disk.SetDiskLoc(s.poolIndex, m, n)
if disk.IsLocal() && driveQuorum {
commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk, ok := disk.(*xlStorageDiskIDCheck) xldisk, ok := disk.(*xlStorageDiskIDCheck)
if ok { if ok {
xldisk.totalWrites.Add(commonWrites) if driveQuorum {
xldisk.totalDeletes.Add(commonDeletes) commonWrites, commonDeletes := calcCommonWritesDeletes(currentDisksInfo[m], (s.setDriveCount+1)/2)
xldisk.totalWrites.Store(commonWrites)
xldisk.totalDeletes.Store(commonDeletes)
xldisk.storage.setWriteAttribute(commonWrites) xldisk.storage.setWriteAttribute(commonWrites)
xldisk.storage.setDeleteAttribute(commonDeletes) xldisk.storage.setDeleteAttribute(commonDeletes)
} }
go xldisk.monitorDiskWritable(xldisk.diskCtx)
}
} else {
disk.Close() // Close the remote storage client, re-initialize with healthchecks.
disk, err = newStorageRESTClient(disk.Endpoint(), true, globalGrid.Load())
if err != nil {
continue
}
disk.SetDiskLoc(s.poolIndex, m, n)
} }
s.erasureDisks[m][n] = disk s.erasureDisks[m][n] = disk

View File

@ -228,8 +228,8 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
} }
if driveQuorum { if driveQuorum {
xl.totalWrites.Add(xl.storage.getWriteAttribute()) xl.totalWrites.Store(xl.storage.getWriteAttribute())
xl.totalDeletes.Add(xl.storage.getDeleteAttribute()) xl.totalDeletes.Store(xl.storage.getDeleteAttribute())
} }
xl.diskCtx, xl.cancel = context.WithCancel(context.TODO()) xl.diskCtx, xl.cancel = context.WithCancel(context.TODO())
@ -1032,37 +1032,50 @@ func (p *xlStorageDiskIDCheck) checkHealth(ctx context.Context) (err error) {
if t > maxTimeSinceLastSuccess { if t > maxTimeSinceLastSuccess {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond))) logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline, time since last response %v", globalLocalNodeName, p.storage.String(), t.Round(time.Millisecond)))
go p.monitorDiskStatus(t) go p.monitorDiskStatus(0, mustGetUUID())
} }
return errFaultyDisk return errFaultyDisk
} }
return nil return nil
} }
// Make sure we do not write O_DIRECT aligned I/O because WrIteAll() ends
// up using O_DIRECT codepath which internally utilizes p.health.tokens
// we need to avoid using incoming I/O tokens as part of the healthcheck
// monitoring I/O.
var toWrite = []byte{2048: 42}
// monitorDiskStatus should be called once when a drive has been marked offline. // monitorDiskStatus should be called once when a drive has been marked offline.
// Once the disk has been deemed ok, it will return to online status. // Once the disk has been deemed ok, it will return to online status.
func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration) { func (p *xlStorageDiskIDCheck) monitorDiskStatus(spent time.Duration, fn string) {
t := time.NewTicker(5 * time.Second) t := time.NewTicker(5 * time.Second)
defer t.Stop() defer t.Stop()
fn := mustGetUUID()
for range t.C { for range t.C {
if contextCanceled(p.diskCtx) {
return
}
if len(p.health.tokens) == 0 { if len(p.health.tokens) == 0 {
// Queue is still full, no need to check. // Queue is still full, no need to check.
continue continue
} }
err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, []byte{10000: 42})
err := p.storage.WriteAll(context.Background(), minioMetaTmpBucket, fn, toWrite)
if err != nil { if err != nil {
continue continue
} }
b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn) b, err := p.storage.ReadAll(context.Background(), minioMetaTmpBucket, fn)
if err != nil || len(b) != 10001 { if err != nil || len(b) != len(toWrite) {
continue continue
} }
err = p.storage.Delete(context.Background(), minioMetaTmpBucket, fn, DeleteOptions{ err = p.storage.Delete(context.Background(), minioMetaTmpBucket, fn, DeleteOptions{
Recursive: false, Recursive: false,
Immediate: false, Immediate: false,
}) })
if err == nil { if err == nil {
t := time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess)) t := time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))
if spent > 0 { if spent > 0 {
@ -1108,8 +1121,6 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
defer t.Stop() defer t.Stop()
fn := mustGetUUID() fn := mustGetUUID()
// Be just above directio size.
toWrite := []byte{xioutil.DirectioAlignSize + 1: 42}
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
monitor := func() bool { monitor := func() bool {
@ -1129,7 +1140,7 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
goOffline := func(err error, spent time.Duration) { goOffline := func(err error, spent time.Duration) {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err)) logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err))
go p.monitorDiskStatus(spent) go p.monitorDiskStatus(spent, fn)
} }
} }

View File

@ -715,11 +715,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
s.diskInfoCache.Once.Do(func() { s.diskInfoCache.Once.Do(func() {
s.diskInfoCache.TTL = time.Second s.diskInfoCache.TTL = time.Second
s.diskInfoCache.Update = func() (interface{}, error) { s.diskInfoCache.Update = func() (interface{}, error) {
dcinfo := DiskInfo{ dcinfo := DiskInfo{}
RootDisk: s.rootDisk,
MountPath: s.drivePath,
Endpoint: s.endpoint.String(),
}
di, err := getDiskInfo(s.drivePath) di, err := getDiskInfo(s.drivePath)
if err != nil { if err != nil {
return dcinfo, err return dcinfo, err
@ -748,6 +744,10 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro
if v != nil { if v != nil {
info = v.(DiskInfo) info = v.(DiskInfo)
} }
info.RootDisk = s.rootDisk
info.MountPath = s.drivePath
info.Endpoint = s.endpoint.String()
info.Scanning = atomic.LoadInt32(&s.scanning) == 1 info.Scanning = atomic.LoadInt32(&s.scanning) == 1
return info, err return info, err
} }