fix: heal replaced drives properly (#10069)

healing was not working properly when drives were
replaced, due to the error check in root disk
calculation this PR fixes this behavior

This PR also adds additional fix for missing
metadata entries from .minio.sys as part of
disk healing as well.

Added code to ignore and print more context
sensitive errors for better debugging.

This PR is continuation of fix in 7b14e9b660
This commit is contained in:
Harshavardhana 2020-07-17 10:08:04 -07:00 committed by GitHub
parent 4a447a439a
commit 187c3f62df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 46 additions and 21 deletions

View File

@ -704,7 +704,12 @@ func (h *healSequence) healItemsFromSourceCh() error {
}
if err := h.queueHealTask(source, itemType); err != nil {
logger.LogIf(h.ctx, err)
switch err.(type) {
case ObjectExistsAsDirectory:
default:
logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w",
pathJoin(source.bucket, source.object), err))
}
}
h.scannedItemsMap[itemType]++

View File

@ -24,7 +24,7 @@ import (
"github.com/minio/minio/cmd/logger"
)
const defaultMonitorNewDiskInterval = time.Minute * 5
const defaultMonitorNewDiskInterval = time.Minute * 3
func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) {
go monitorLocalDisksAndHeal(ctx, objAPI)
@ -105,13 +105,13 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
logger.LogIf(ctx, err)
printEndpointError(endpoint, err, true)
continue
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
logger.LogIf(ctx, err)
printEndpointError(endpoint, err, false)
continue
}

View File

@ -459,7 +459,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// Attempt a rename now from healed data to final location.
if err = disk.RenameData(minioMetaTmpBucket, tmpID, latestMeta.DataDir, bucket, object); err != nil {
logger.LogIf(ctx, err)
if err != errIsNotRegular && err != errFileNotFound {
logger.LogIf(ctx, err)
}
return result, toObjectErr(err, bucket, object)
}

View File

@ -204,14 +204,14 @@ func (s *erasureSets) connectDisks() {
defer wg.Done()
disk, format, err := connectEndpoint(endpoint)
if err != nil {
printEndpointError(endpoint, err)
printEndpointError(endpoint, err, true)
return
}
setIndex, diskIndex, err := findDiskIndex(s.format, format)
if err != nil {
// Close the internal connection to avoid connection leaks.
disk.Close()
printEndpointError(endpoint, err)
printEndpointError(endpoint, err, false)
return
}
disk.SetDiskID(format.Erasure.This)
@ -1296,10 +1296,6 @@ func markRootDisksAsDown(storageDisks []StorageAPI) {
return
}
for i := range storageDisks {
if errs[i] != nil {
storageDisks[i] = nil
continue
}
if infos[i].RootDisk {
// We should not heal on root disk. i.e in a situation where the minio-administrator has unmounted a
// defective drive we should not heal a path on the root disk.

View File

@ -709,6 +709,9 @@ func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, format
for index := range storageDisks {
index := index
g.Go(func() error {
if formats[index] == nil {
return errDiskNotFound
}
return saveFormatErasure(storageDisks[index], formats[index], formats[index].Erasure.This)
}, index)
}

View File

@ -103,6 +103,12 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *erasureObjects, dr
}
}
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
}, BucketInfo{
Name: pathJoin(minioMetaBucket, bucketConfigPrefix),
}) // add metadata .minio.sys/ bucket prefixes to heal
// Heal all buckets with all objects
for _, bucket := range buckets {
// Heal current bucket

View File

@ -33,28 +33,41 @@ import (
"github.com/minio/minio/pkg/sync/errgroup"
)
var printEndpointError = func() func(Endpoint, error) {
var printEndpointError = func() func(Endpoint, error, bool) {
var mutex sync.Mutex
printOnce := make(map[Endpoint]map[string]bool)
printOnce := make(map[Endpoint]map[string]int)
return func(endpoint Endpoint, err error) {
return func(endpoint Endpoint, err error, once bool) {
reqInfo := (&logger.ReqInfo{}).AppendTags("endpoint", endpoint.String())
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
mutex.Lock()
defer mutex.Unlock()
m, ok := printOnce[endpoint]
if !ok {
m = make(map[string]bool)
m[err.Error()] = true
m = make(map[string]int)
m[err.Error()]++
printOnce[endpoint] = m
logger.LogAlwaysIf(ctx, err)
if once {
logger.LogAlwaysIf(ctx, err)
return
}
}
// Once is set and we are here means error was already
// printed once.
if once {
return
}
if m[err.Error()] {
return
// once not set, check if same error occurred 3 times in
// a row, then make sure we print it to call attention.
if m[err.Error()] > 2 {
logger.LogAlwaysIf(ctx, fmt.Errorf("Following error has been printed %d times.. %w", m[err.Error()], err))
// Reduce the count to introduce further delay in printing
// but let it again print after the 2th attempt
m[err.Error()]--
m[err.Error()]--
}
m[err.Error()] = true
logger.LogAlwaysIf(ctx, err)
m[err.Error()]++
}
}()