fix: faster healing when disk is replaced. (#11520)

This commit is contained in:
Klaus Post 2021-02-18 11:06:54 -08:00 committed by GitHub
parent 0f5ca83418
commit c5b2a8441b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 81 additions and 53 deletions

View File

@ -662,6 +662,13 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) {
} }
} }
func (h *healSequence) logHeal(healType madmin.HealItemType) {
h.mutex.Lock()
h.scannedItemsMap[healType]++
h.lastHealActivity = UTCNow()
h.mutex.Unlock()
}
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
globalHealConfigMu.Lock() globalHealConfigMu.Lock()
opts := globalHealConfig opts := globalHealConfig

View File

@ -204,8 +204,8 @@ wait:
} }
} }
lbDisks := z.serverPools[i].sets[setIndex].getOnlineDisks() err := z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets)
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }

View File

@ -18,7 +18,7 @@ package cmd
import ( import (
"context" "context"
"sync" "errors"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -120,18 +120,14 @@ func mustGetHealSequence(ctx context.Context) *healSequence {
} }
// healErasureSet lists and heals all objects in a specific erasure set // healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo) error {
bgSeq := mustGetHealSequence(ctx) bgSeq := mustGetHealSequence(ctx)
buckets = append(buckets, BucketInfo{ buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix), Name: pathJoin(minioMetaBucket, minioConfigPrefix),
}) })
// Try to pro-actively heal backend-encrypted file. // Try to pro-actively heal backend-encrypted file.
if err := bgSeq.queueHealTask(healSource{ if _, err := er.HealObject(ctx, minioMetaBucket, backendEncryptedFile, "", madmin.HealOpts{}); err != nil {
bucket: minioMetaBucket,
object: backendEncryptedFile,
}, madmin.HealItemMetadata); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
@ -140,61 +136,60 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
// Heal all buckets with all objects // Heal all buckets with all objects
for _, bucket := range buckets { for _, bucket := range buckets {
// Heal current bucket // Heal current bucket
if err := bgSeq.queueHealTask(healSource{ if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil {
bucket: bucket.Name,
}, madmin.HealItemBucket); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
} }
if serverDebugLog { if serverDebugLog {
console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, setIndex+1) console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, er.setNumber+1)
} }
var entryChs []FileInfoVersionsCh disks, _ := er.getOnlineDisksWithHealing()
var mu sync.Mutex if len(disks) == 0 {
var wg sync.WaitGroup return errors.New("healErasureSet: No non-healing disks found")
for _, disk := range disks {
disk := disk
wg.Add(1)
go func() {
defer wg.Done()
entryCh, err := disk.WalkVersions(ctx, bucket.Name, "", "", true, ctx.Done())
if err != nil {
// Disk walk returned error, ignore it.
return
}
mu.Lock()
entryChs = append(entryChs, FileInfoVersionsCh{
Ch: entryCh,
})
mu.Unlock()
}()
} }
wg.Wait() // Limit listing to 3 drives.
if len(disks) > 3 {
entriesValid := make([]bool, len(entryChs)) disks = disks[:3]
entries := make([]FileInfoVersions, len(entryChs)) }
healEntry := func(entry metaCacheEntry) {
for { if entry.isDir() {
entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) return
if !ok {
break
} }
fivs, err := entry.fileInfoVersions(bucket.Name)
for _, version := range entry.Versions { if err != nil {
if err := bgSeq.queueHealTask(healSource{ logger.LogIf(ctx, err)
bucket: bucket.Name, return
object: version.Name, }
versionID: version.VersionID, waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep)
}, madmin.HealItemObject); err != nil { for _, version := range fivs.Versions {
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
} }
bgSeq.logHeal(madmin.HealItemObject)
} }
} }
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bucket.Name,
recursive: true,
forwardTo: "", //TODO(klauspost): Set this to last known offset when resuming.
minDisks: 1,
reportNotFound: false,
agreed: healEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, _ := entries.firstFound()
if entry != nil && !entry.isDir() {
healEntry(*entry)
}
},
finished: nil,
})
logger.LogIf(ctx, err)
} }
return nil return nil

View File

@ -782,7 +782,13 @@ type listPathRawOptions struct {
disks []StorageAPI disks []StorageAPI
bucket, path string bucket, path string
recursive bool recursive bool
// Only return results with this prefix.
filterPrefix string filterPrefix string
// Forward to this prefix before returning results.
forwardTo string
// Minimum number of good disks to continue. // Minimum number of good disks to continue.
// An error will be returned if this many disks returned an error. // An error will be returned if this many disks returned an error.
minDisks int minDisks int
@ -836,7 +842,9 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
BaseDir: opts.path, BaseDir: opts.path,
Recursive: opts.recursive, Recursive: opts.recursive,
ReportNotFound: opts.reportNotFound, ReportNotFound: opts.reportNotFound,
FilterPrefix: opts.filterPrefix}, w) FilterPrefix: opts.filterPrefix,
ForwardTo: opts.forwardTo,
}, w)
w.CloseWithError(werr) w.CloseWithError(werr)
if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() { if werr != io.EOF && werr != nil && werr.Error() != errFileNotFound.Error() && werr.Error() != errVolumeNotFound.Error() {
logger.LogIf(ctx, werr) logger.LogIf(ctx, werr)

View File

@ -49,6 +49,9 @@ type WalkDirOptions struct {
// FilterPrefix will only return results with given prefix within folder. // FilterPrefix will only return results with given prefix within folder.
// Should never contain a slash. // Should never contain a slash.
FilterPrefix string FilterPrefix string
// ForwardTo will forward to the given object path.
ForwardTo string
} }
// WalkDir will traverse a directory and return all entries found. // WalkDir will traverse a directory and return all entries found.
@ -107,6 +110,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
prefix := opts.FilterPrefix prefix := opts.FilterPrefix
forward := opts.ForwardTo
var scanDir func(path string) error var scanDir func(path string) error
scanDir = func(current string) error { scanDir = func(current string) error {
entries, err := s.ListDir(ctx, opts.Bucket, current, -1) entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
@ -126,6 +130,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
continue continue
} }
if len(forward) > 0 && entry < forward {
continue
}
if strings.HasSuffix(entry, slashSeparator) { if strings.HasSuffix(entry, slashSeparator) {
if strings.HasSuffix(entry, globalDirSuffixWithSlash) { if strings.HasSuffix(entry, globalDirSuffixWithSlash) {
// Add without extension so it is sorted correctly. // Add without extension so it is sorted correctly.
@ -177,6 +184,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
sort.Strings(entries) sort.Strings(entries)
dirStack := make([]string, 0, 5) dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level. prefix = "" // Remove prefix after first level.
for _, entry := range entries { for _, entry := range entries {
if entry == "" { if entry == "" {
continue continue
@ -189,8 +197,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
out <- metaCacheEntry{name: pop} out <- metaCacheEntry{name: pop}
if opts.Recursive { if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are. // Scan folder we found. Should be in correct sort order where we are.
err := scanDir(pop) forward = ""
logger.LogIf(ctx, err) if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) {
forward = strings.TrimPrefix(opts.ForwardTo, pop)
}
logger.LogIf(ctx, scanDir(pop))
} }
dirStack = dirStack[:len(dirStack)-1] dirStack = dirStack[:len(dirStack)-1]
} }
@ -239,8 +250,11 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
out <- metaCacheEntry{name: pop} out <- metaCacheEntry{name: pop}
if opts.Recursive { if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are. // Scan folder we found. Should be in correct sort order where we are.
err := scanDir(pop) forward = ""
logger.LogIf(ctx, err) if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pop) {
forward = strings.TrimPrefix(opts.ForwardTo, pop)
}
logger.LogIf(ctx, scanDir(pop))
} }
dirStack = dirStack[:len(dirStack)-1] dirStack = dirStack[:len(dirStack)-1]
} }
@ -267,6 +281,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive)) values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive))
values.Set(storageRESTReportNotFound, strconv.FormatBool(opts.ReportNotFound)) values.Set(storageRESTReportNotFound, strconv.FormatBool(opts.ReportNotFound))
values.Set(storageRESTPrefixFilter, opts.FilterPrefix) values.Set(storageRESTPrefixFilter, opts.FilterPrefix)
values.Set(storageRESTForwardFilter, opts.ForwardTo)
respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1) respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
@ -299,6 +314,7 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques
} }
prefix := r.URL.Query().Get(storageRESTPrefixFilter) prefix := r.URL.Query().Get(storageRESTPrefixFilter)
forward := r.URL.Query().Get(storageRESTForwardFilter)
writer := streamHTTPResponse(w) writer := streamHTTPResponse(w)
writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{ writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{
Bucket: volume, Bucket: volume,
@ -306,5 +322,6 @@ func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Reques
Recursive: recursive, Recursive: recursive,
ReportNotFound: reportNotFound, ReportNotFound: reportNotFound,
FilterPrefix: prefix, FilterPrefix: prefix,
ForwardTo: forward,
}, writer)) }, writer))
} }

View File

@ -72,6 +72,7 @@ const (
storageRESTCount = "count" storageRESTCount = "count"
storageRESTMarkerPath = "marker" storageRESTMarkerPath = "marker"
storageRESTPrefixFilter = "prefix" storageRESTPrefixFilter = "prefix"
storageRESTForwardFilter = "forward"
storageRESTRecursive = "recursive" storageRESTRecursive = "recursive"
storageRESTReportNotFound = "report-notfound" storageRESTReportNotFound = "report-notfound"
storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotAlgo = "bitrot-algo"