fix: disk healing should honor the right pool/set index (#16712)

This commit is contained in:
Harshavardhana 2023-02-27 04:55:32 -08:00 committed by GitHub
parent 7777d3b43a
commit bfedea9bad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 21 deletions

View File

@ -299,7 +299,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
defer disk.Close() defer disk.Close()
poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint()) poolIdx := globalEndpoints.GetLocalPoolIdx(disk.Endpoint())
if poolIdx < 0 { if poolIdx < 0 {
return fmt.Errorf("unexpected pool index (%d) found in %s", poolIdx, disk.Endpoint()) return fmt.Errorf("unexpected pool index (%d) found for %s", poolIdx, disk.Endpoint())
} }
// Calculate the set index where the current endpoint belongs // Calculate the set index where the current endpoint belongs
@ -310,14 +310,15 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
return err return err
} }
if setIdx < 0 { if setIdx < 0 {
return fmt.Errorf("unexpected set index (%d) found in %s", setIdx, disk.Endpoint()) return fmt.Errorf("unexpected set index (%d) found for %s", setIdx, disk.Endpoint())
} }
// Prevent parallel erasure set healing // Prevent parallel erasure set healing
locker := z.NewNSLock(minioMetaBucket, fmt.Sprintf("new-drive-healing/%d/%d", poolIdx, setIdx)) locker := z.NewNSLock(minioMetaBucket, fmt.Sprintf("new-drive-healing/%d/%d", poolIdx, setIdx))
lkctx, err := locker.GetLock(ctx, newDiskHealingTimeout) lkctx, err := locker.GetLock(ctx, newDiskHealingTimeout)
if err != nil { if err != nil {
return err return fmt.Errorf("Healing of drive '%v' on %s pool, belonging to %s erasure set already in progress: %w",
disk, humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1), err)
} }
ctx = lkctx.Context() ctx = lkctx.Context()
defer locker.Unlock(lkctx) defer locker.Unlock(lkctx)
@ -325,19 +326,20 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
// Load healing tracker in this disk // Load healing tracker in this disk
tracker, err := loadHealingTracker(ctx, disk) tracker, err := loadHealingTracker(ctx, disk)
if err != nil { if err != nil {
// A healing track can be not found when another disk in the same // A healing tracker may be deleted if another disk in the
// erasure set and same healing-id successfully finished healing. // same erasure set with same healing-id successfully finished
if err == errFileNotFound { // healing.
if errors.Is(err, errFileNotFound) {
return nil return nil
} }
logger.LogIf(ctx, fmt.Errorf("Unable to load a healing tracker on '%s': %w", disk, err)) logger.LogIf(ctx, fmt.Errorf("Unable to load healing tracker on '%s': %w, re-initializing..", disk, err))
tracker = newHealingTracker(disk, mustGetUUID()) tracker = newHealingTracker(disk, mustGetUUID())
} }
logger.Info(fmt.Sprintf("Proceeding to heal '%s' - 'mc admin heal alias/ --verbose' to check the status.", endpoint)) logger.Info(fmt.Sprintf("Healing drive '%s' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint))
buckets, _ := z.ListBuckets(ctx, BucketOptions{}) buckets, _ := z.ListBuckets(ctx, BucketOptions{})
// Buckets data are dispersed in multiple zones/sets, make // Buckets data are dispersed in multiple pools/sets, make
// sure to heal all bucket metadata configuration. // sure to heal all bucket metadata configuration.
buckets = append(buckets, BucketInfo{ buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix), Name: pathJoin(minioMetaBucket, minioConfigPrefix),
@ -355,7 +357,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
}) })
if serverDebugLog { if serverDebugLog {
logger.Info("Healing drive '%v' on %s pool", disk, humanize.Ordinal(poolIdx+1)) logger.Info("Healing drive '%v' on %s pool, belonging to %s erasure set", disk, humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1))
} }
// Load bucket totals // Load bucket totals
@ -378,9 +380,9 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
} }
if tracker.ItemsFailed > 0 { if tracker.ItemsFailed > 0 {
logger.Info("Healing drive '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed) logger.Info("Healing of drive '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
} else { } else {
logger.Info("Healing drive '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed) logger.Info("Healing of drive '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
} }
if len(tracker.QueuedBuckets) > 0 { if len(tracker.QueuedBuckets) > 0 {
@ -392,7 +394,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
logger.Info("\n") logger.Info("\n")
} }
if tracker.HealID == "" { // HealID is empty only before Feb 2023 if tracker.HealID == "" { // HealID was empty only before Feb 2023
logger.LogIf(ctx, tracker.delete(ctx)) logger.LogIf(ctx, tracker.delete(ctx))
return nil return nil
} }
@ -401,7 +403,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
for _, disk := range z.serverPools[poolIdx].sets[setIdx].getDisks() { for _, disk := range z.serverPools[poolIdx].sets[setIdx].getDisks() {
t, err := loadHealingTracker(ctx, disk) t, err := loadHealingTracker(ctx, disk)
if err != nil { if err != nil {
if err != errFileNotFound { if !errors.Is(err, errFileNotFound) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
continue continue
@ -446,8 +448,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools) {
for _, disk := range healDisks { for _, disk := range healDisks {
go func(disk Endpoint) { go func(disk Endpoint) {
globalBackgroundHealState.setDiskHealingStatus(disk, true) globalBackgroundHealState.setDiskHealingStatus(disk, true)
err := healFreshDisk(ctx, z, disk) if err := healFreshDisk(ctx, z, disk); err != nil {
if err != nil {
globalBackgroundHealState.setDiskHealingStatus(disk, false) globalBackgroundHealState.setDiskHealingStatus(disk, false)
printEndpointError(disk, err, false) printEndpointError(disk, err, false)
return return

View File

@ -269,7 +269,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}() }()
// Note: updates from healEntry to tracker must be sent on results channel. // Note: updates from healEntry to tracker must be sent on results channel.
healEntry := func(entry metaCacheEntry) { healEntry := func(bucket string, entry metaCacheEntry) {
if entry.name == "" && len(entry.metadata) == 0 { if entry.name == "" && len(entry.metadata) == 0 {
// ignore entries that don't have metadata. // ignore entries that don't have metadata.
return return
@ -278,6 +278,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// ignore healing entry.name's with `/` suffix. // ignore healing entry.name's with `/` suffix.
return return
} }
// We might land at .metacache, .trash, .multipart // We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket // no need to heal them skip, only when bucket
// is '.minio.sys' // is '.minio.sys'
@ -302,6 +303,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
versionID: "", versionID: "",
}, madmin.HealItemObject) }, madmin.HealItemObject)
if err != nil { if err != nil {
if isErrObjectNotFound(err) {
// queueing happens across namespace, ignore
// objects that are not found.
return
}
result = healEntryFailure(0) result = healEntryFailure(0)
logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err)) logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
} else { } else {
@ -317,12 +323,19 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
return return
} }
var versionNotFound int
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
if err := bgSeq.queueHealTask(healSource{ if err := bgSeq.queueHealTask(healSource{
bucket: bucket, bucket: bucket,
object: version.Name, object: version.Name,
versionID: version.VersionID, versionID: version.VersionID,
}, madmin.HealItemObject); err != nil { }, madmin.HealItemObject); err != nil {
if isErrObjectNotFound(err) {
// queueing happens across namespace, ignore
// objects that are not found.
versionNotFound++
continue
}
// If not deleted, assume they failed. // If not deleted, assume they failed.
result = healEntryFailure(uint64(version.Size)) result = healEntryFailure(uint64(version.Size))
if version.VersionID != "" { if version.VersionID != "" {
@ -341,6 +354,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
case results <- result: case results <- result:
} }
} }
// All versions resulted in 'ObjectNotFound'
if versionNotFound == len(fivs.Versions) {
return
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -351,22 +368,25 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
waitForLowHTTPReq() waitForLowHTTPReq()
} }
actualBucket, prefix := path2BucketObject(bucket)
// How to resolve partial results. // How to resolve partial results.
resolver := metadataResolutionParams{ resolver := metadataResolutionParams{
dirQuorum: 1, dirQuorum: 1,
objQuorum: 1, objQuorum: 1,
bucket: bucket, bucket: actualBucket,
} }
err := listPathRaw(ctx, listPathRawOptions{ err := listPathRaw(ctx, listPathRawOptions{
disks: disks, disks: disks,
bucket: bucket, bucket: actualBucket,
path: prefix,
recursive: true, recursive: true,
forwardTo: forwardTo, forwardTo: forwardTo,
minDisks: 1, minDisks: 1,
reportNotFound: false, reportNotFound: false,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
healEntry(entry) healEntry(actualBucket, entry)
}, },
partial: func(entries metaCacheEntries, _ []error) { partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver) entry, ok := entries.resolve(&resolver)
@ -375,7 +395,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// proceed to heal nonetheless. // proceed to heal nonetheless.
entry, _ = entries.firstFound() entry, _ = entries.firstFound()
} }
healEntry(*entry) healEntry(actualBucket, *entry)
}, },
finished: nil, finished: nil,
}) })