From c11a2ac396ccce120a4ef446fd8863e4a1ae2991 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 26 Aug 2021 14:06:04 -0700 Subject: [PATCH] refactor healing to remove certain structs (#13079) - remove sourceCh usage from healing we already have tasks and resp channel - use read locks to lookup globalHealConfig - fix healing resolver to pick candidates quickly that need healing, without this resolver was unexpectedly skipping. --- cmd/admin-heal-ops.go | 59 +--------------------- cmd/background-heal-ops.go | 60 +++++----------------- cmd/background-newdisks-heal-ops.go | 4 +- cmd/config-current.go | 4 +- cmd/data-scanner.go | 78 ++++++++++++++--------------- cmd/fs-v1.go | 2 +- cmd/global-heal.go | 47 +++++------------ cmd/metacache-entries.go | 14 +++++- cmd/mrf.go | 11 +--- cmd/xl-storage.go | 8 +-- internal/config/heal/heal.go | 59 ++++++++++++++++++++++ 11 files changed, 144 insertions(+), 202 deletions(-) diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index 5273af4e7..2fc939034 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -396,9 +396,6 @@ type healSequence struct { // bucket, and object on which heal seq. was initiated bucket, object string - // A channel of entities (format, buckets, objects) to heal - sourceCh chan healSource - // A channel of entities with heal result respCh chan healResult @@ -648,11 +645,7 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) { h.currentStatus.StartTime = UTCNow() h.mutex.Unlock() - if h.sourceCh == nil { - go h.traverseAndHeal(objAPI) - } else { - go h.healFromSourceCh() - } + go h.traverseAndHeal(objAPI) select { case err, ok := <-h.traverseAndHealDoneCh: @@ -696,10 +689,6 @@ func (h *healSequence) logHeal(healType madmin.HealItemType) { } func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { - globalHealConfigMu.Lock() - opts := globalHealConfig - globalHealConfigMu.Unlock() - // Send heal request task := healTask{ bucket: source.bucket, @@ -711,9 +700,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem if source.opts != nil { task.opts = *source.opts } - if opts.Bitrot { - task.opts.ScanMode = madmin.HealDeepScan - } + task.opts.ScanMode = globalHealConfig.ScanMode() h.mutex.Lock() h.scannedItemsMap[healType]++ @@ -773,48 +760,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem } } -func (h *healSequence) healItemsFromSourceCh() error { - for { - select { - case source, ok := <-h.sourceCh: - if !ok { - return nil - } - - var itemType madmin.HealItemType - switch source.bucket { - case nopHeal: - continue - case SlashSeparator: - itemType = madmin.HealItemMetadata - default: - if source.object == "" { - itemType = madmin.HealItemBucket - } else { - itemType = madmin.HealItemObject - } - } - - if err := h.queueHealTask(source, itemType); err != nil { - switch err.(type) { - case ObjectExistsAsDirectory: - case ObjectNotFound: - case VersionNotFound: - default: - logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w", - pathJoin(source.bucket, source.object), err)) - } - } - case <-h.ctx.Done(): - return nil - } - } -} - -func (h *healSequence) healFromSourceCh() { - h.healItemsFromSourceCh() -} - func (h *healSequence) healDiskMeta(objAPI ObjectLayer) error { // Start healing the config prefix. return h.healMinioSysMeta(objAPI, minioConfigPrefix)() diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 07d7d69c3..559c55c81 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -19,10 +19,8 @@ package cmd import ( "context" - "time" "github.com/minio/madmin-go" - "github.com/minio/minio/internal/logger" ) // healTask represents what to heal along with options @@ -52,51 +50,22 @@ type healRoutine struct { // Add a new task in the tasks queue func (h *healRoutine) queueHealTask(task healTask) { - select { - case h.tasks <- task: - default: - } + h.tasks <- task +} + +func systemIO() int { + // Bucket notification and http trace are not costly, it is okay to ignore them + // while counting the number of concurrent connections + return int(globalHTTPListen.NumSubscribers()) + int(globalTrace.NumSubscribers()) } func waitForLowHTTPReq() { - globalHealConfigMu.Lock() - maxIO, maxWait := globalHealConfig.IOCount, globalHealConfig.Sleep - globalHealConfigMu.Unlock() - - // No need to wait run at full speed. - if maxIO <= 0 { - return - } - - // At max 10 attempts to wait with 100 millisecond interval before proceeding - waitTick := 100 * time.Millisecond - - // Bucket notification and http trace are not costly, it is okay to ignore them - // while counting the number of concurrent connections - maxIOFn := func() int { - return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalTrace.NumSubscribers()) - } - - tmpMaxWait := maxWait + var currentIO func() int if httpServer := newHTTPServerFn(); httpServer != nil { - // Any requests in progress, delay the heal. - for httpServer.GetRequestCount() >= maxIOFn() { - if tmpMaxWait > 0 { - if tmpMaxWait < waitTick { - time.Sleep(tmpMaxWait) - } else { - time.Sleep(waitTick) - } - tmpMaxWait = tmpMaxWait - waitTick - } - if tmpMaxWait <= 0 { - if intDataUpdateTracker.debug { - logger.Info("waitForLowHTTPReq: waited max %s, resuming", maxWait) - } - break - } - } + currentIO = httpServer.GetRequestCount } + + globalHealConfig.Wait(currentIO, systemIO) } // Wait for heal requests and process them @@ -123,10 +92,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { } } - select { - case task.responseCh <- healResult{result: res, err: err}: - default: - } + task.responseCh <- healResult{result: res, err: err} case <-h.doneCh: return @@ -138,7 +104,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { func newHealRoutine() *healRoutine { return &healRoutine{ - tasks: make(chan healTask, 50000), + tasks: make(chan healTask), doneCh: make(chan struct{}), } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 946310afc..2044df5df 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -337,10 +337,10 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints() if len(healDisks) > 0 { // Reformat disks - bgSeq.sourceCh <- healSource{bucket: SlashSeparator} + bgSeq.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata) // Ensure that reformatting disks is finished - bgSeq.sourceCh <- healSource{bucket: nopHeal} + bgSeq.queueHealTask(healSource{bucket: nopHeal}, madmin.HealItemMetadata) logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...", len(healDisks))) diff --git a/cmd/config-current.go b/cmd/config-current.go index 397f0cb46..6ae3895e3 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -579,9 +579,7 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config globalCompressConfig = cmpCfg globalCompressConfigMu.Unlock() - globalHealConfigMu.Lock() - globalHealConfig = healCfg - globalHealConfigMu.Unlock() + globalHealConfig.Update(healCfg) // update dynamic scanner values. scannerCycle.Update(scannerCfg.Cycle) diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index d851a7d94..406ff709e 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -57,8 +57,7 @@ const ( ) var ( - globalHealConfig heal.Config - globalHealConfigMu sync.Mutex + globalHealConfig heal.Config dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) // Sleeper values are updated when config is loaded. @@ -638,6 +637,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix) } + if bucket != resolver.bucket { + // Bucket might be missing as well with abandoned children. + // make sure it is created first otherwise healing won't proceed + // for objects. + _, _ = objAPI.HealBucket(ctx, bucket, madmin.HealOpts{}) + } + resolver.bucket = bucket foundObjs := false @@ -856,27 +862,21 @@ func (i *scannerItem) transformMetaDir() { i.objectName = split[len(split)-1] } -// actionMeta contains information used to apply actions. -type actionMeta struct { - oi ObjectInfo - bitRotScan bool // indicates if bitrot check was requested. -} - var applyActionsLogPrefix = color.Green("applyActions:") -func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) { +func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) { if i.debug { - if meta.oi.VersionID != "" { - console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID) + if oi.VersionID != "" { + console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), oi.VersionID) } else { console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath()) } } - healOpts := madmin.HealOpts{Remove: healDeleteDangling} - if meta.bitRotScan { - healOpts.ScanMode = madmin.HealDeepScan + healOpts := madmin.HealOpts{ + Remove: healDeleteDangling, + ScanMode: globalHealConfig.ScanMode(), } - res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts) + res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts) if isErrObjectNotFound(err) || isErrVersionNotFound(err) { return 0 } @@ -887,8 +887,8 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta acti return res.ObjectSize } -func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta actionMeta) (applied bool, size int64) { - size, err := meta.oi.GetActualSize() +func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (applied bool, size int64) { + size, err := oi.GetActualSize() if i.debug { logger.LogIf(ctx, err) } @@ -900,20 +900,20 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac return false, size } - versionID := meta.oi.VersionID + versionID := oi.VersionID action := i.lifeCycle.ComputeAction( lifecycle.ObjectOpts{ Name: i.objectPath(), - UserTags: meta.oi.UserTags, - ModTime: meta.oi.ModTime, - VersionID: meta.oi.VersionID, - DeleteMarker: meta.oi.DeleteMarker, - IsLatest: meta.oi.IsLatest, - NumVersions: meta.oi.NumVersions, - SuccessorModTime: meta.oi.SuccessorModTime, - RestoreOngoing: meta.oi.RestoreOngoing, - RestoreExpires: meta.oi.RestoreExpires, - TransitionStatus: meta.oi.TransitionedObject.Status, + UserTags: oi.UserTags, + ModTime: oi.ModTime, + VersionID: oi.VersionID, + DeleteMarker: oi.DeleteMarker, + IsLatest: oi.IsLatest, + NumVersions: oi.NumVersions, + SuccessorModTime: oi.SuccessorModTime, + RestoreOngoing: oi.RestoreOngoing, + RestoreExpires: oi.RestoreExpires, + TransitionStatus: oi.TransitionedObject.Status, RemoteTiersImmediately: globalDebugRemoteTiersImmediately, }) if i.debug { @@ -972,8 +972,8 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac // applyTierObjSweep removes remote object pending deletion and the free-version // tracking this information. -func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) { - if !meta.oi.TransitionedObject.FreeVersion { +func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) { + if !oi.TransitionedObject.FreeVersion { // nothing to be done return } @@ -986,18 +986,18 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta return err } // Remove the remote object - err := deleteObjectFromRemoteTier(ctx, meta.oi.TransitionedObject.Name, meta.oi.TransitionedObject.VersionID, meta.oi.TransitionedObject.Tier) + err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier) if ignoreNotFoundErr(err) != nil { logger.LogIf(ctx, err) return } // Remove this free version - _, err = o.DeleteObject(ctx, meta.oi.Bucket, meta.oi.Name, ObjectOptions{ - VersionID: meta.oi.VersionID, + _, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{ + VersionID: oi.VersionID, }) if err == nil { - auditLogLifecycle(ctx, meta.oi, ILMFreeVersionDelete) + auditLogLifecycle(ctx, oi, ILMFreeVersionDelete) } if ignoreNotFoundErr(err) != nil { logger.LogIf(ctx, err) @@ -1009,19 +1009,19 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta // The resulting size on disk will always be returned. // The metadata will be compared to consensus on the object layer before any changes are applied. // If no metadata is supplied, -1 is returned if no action is taken. -func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) int64 { - i.applyTierObjSweep(ctx, o, meta) +func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 { + i.applyTierObjSweep(ctx, o, oi) - applied, size := i.applyLifecycle(ctx, o, meta) + applied, size := i.applyLifecycle(ctx, o, oi) // For instance, an applied lifecycle means we remove/transitioned an object // from the current deployment, which means we don't have to call healing // routine even if we are asked to do via heal flag. if !applied { if i.heal { - size = i.applyHealing(ctx, o, meta) + size = i.applyHealing(ctx, o, oi) } // replicate only if lifecycle rules are not applied. - i.healReplication(ctx, o, meta.oi.Clone(), sizeS) + i.healReplication(ctx, o, oi.Clone(), sizeS) } return size } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 19fcede3d..7a0c6d2df 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -386,7 +386,7 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs } oi := fsMeta.ToObjectInfo(bucket, object, fi) - sz := item.applyActions(ctx, fs, actionMeta{oi: oi}, &sizeSummary{}) + sz := item.applyActions(ctx, fs, oi, &sizeSummary{}) if sz >= 0 { return sizeSummary{totalSize: sz, versions: 1}, nil } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 87435ae1e..5fda5dfd6 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -41,24 +41,14 @@ func newBgHealSequence() *healSequence { reqInfo := &logger.ReqInfo{API: "BackgroundHeal"} ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo)) - globalHealConfigMu.Lock() - opts := globalHealConfig - globalHealConfigMu.Unlock() - - scanMode := madmin.HealNormalScan - if opts.Bitrot { - scanMode = madmin.HealDeepScan - } - hs := madmin.HealOpts{ // Remove objects that do not have read-quorum Remove: healDeleteDangling, - ScanMode: scanMode, + ScanMode: globalHealConfig.ScanMode(), } return &healSequence{ - sourceCh: make(chan healSource, 50000), - respCh: make(chan healResult, 50000), + respCh: make(chan healResult), startTime: UTCNow(), clientToken: bgHealingUUID, // run-background heal with reserved bucket @@ -179,14 +169,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn Name: pathJoin(minioMetaBucket, minioConfigPrefix), }) - globalHealConfigMu.Lock() - opts := globalHealConfig - globalHealConfigMu.Unlock() - - scanMode := madmin.HealNormalScan - if opts.Bitrot { - scanMode = madmin.HealDeepScan - } + scanMode := globalHealConfig.ScanMode() // Heal all buckets with all objects for _, bucket := range buckets { @@ -332,19 +315,15 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn func healObject(bucket, object, versionID string, scan madmin.HealScanMode) { // Get background heal sequence to send elements to heal bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if !ok { - return - } - select { - case bgSeq.sourceCh <- healSource{ - bucket: bucket, - object: object, - versionID: versionID, - opts: &madmin.HealOpts{ - Remove: healDeleteDangling, // if found dangling purge it. - ScanMode: scan, - }, - }: - default: + if ok { + bgSeq.queueHealTask(healSource{ + bucket: bucket, + object: object, + versionID: versionID, + opts: &madmin.HealOpts{ + Remove: healDeleteDangling, // if found dangling purge it. + ScanMode: scan, + }, + }, madmin.HealItemObject) } } diff --git a/cmd/metacache-entries.go b/cmd/metacache-entries.go index e468a752f..ae1db6eb1 100644 --- a/cmd/metacache-entries.go +++ b/cmd/metacache-entries.go @@ -163,6 +163,10 @@ func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) { }, nil } if e.cached == nil { + if len(e.metadata) == 0 { + // only happens if the entry is not found. + return nil, errFileNotFound + } fi, err := getFileInfo(e.metadata, bucket, e.name, "", false) if err != nil { return nil, err @@ -308,15 +312,21 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa sort.Slice(r.candidates, func(i, j int) bool { return r.candidates[i].n > r.candidates[j].n }) + // Check if we have enough. if r.candidates[0].n < r.objQuorum { return nil, false } + if r.candidates[0].n > r.candidates[1].n { - return r.candidates[0].e, true + ok := r.candidates[0].e != nil && r.candidates[0].e.name != "" + return r.candidates[0].e, ok } + + e := resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket) // Tie between two, resolve using modtime+versions. - return resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket), true + ok := e != nil && e.name != "" + return e, ok } } diff --git a/cmd/mrf.go b/cmd/mrf.go index decd2eca8..4ad03e9bd 100644 --- a/cmd/mrf.go +++ b/cmd/mrf.go @@ -183,17 +183,8 @@ func (m *mrfState) healRoutine() { idler := time.NewTimer(mrfInfoResetInterval) defer idler.Stop() - globalHealConfigMu.Lock() - opts := globalHealConfig - globalHealConfigMu.Unlock() - - scanMode := madmin.HealNormalScan - if opts.Bitrot { - scanMode = madmin.HealDeepScan - } - var mrfHealingOpts = madmin.HealOpts{ - ScanMode: scanMode, + ScanMode: globalHealConfig.ScanMode(), Remove: healDeleteDangling, } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6e9fe2492..0b72f2763 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -453,9 +453,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates return cache, errServerNotInitialized } - globalHealConfigMu.Lock() - healOpts := globalHealConfig - globalHealConfigMu.Unlock() cache.Info.updates = updates dataUsageInfo, err := scanDataFolder(ctx, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) { @@ -488,10 +485,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates sizeS := sizeSummary{} for _, version := range fivs.Versions { oi := version.ToObjectInfo(item.bucket, item.objectPath()) - sz := item.applyActions(ctx, objAPI, actionMeta{ - oi: oi, - bitRotScan: healOpts.Bitrot, - }, &sizeS) + sz := item.applyActions(ctx, objAPI, oi, &sizeS) if !oi.DeleteMarker && sz == oi.Size { sizeS.versions++ } diff --git a/internal/config/heal/heal.go b/internal/config/heal/heal.go index 31151b1bf..89c31dad1 100644 --- a/internal/config/heal/heal.go +++ b/internal/config/heal/heal.go @@ -20,8 +20,10 @@ package heal import ( "fmt" "strconv" + "sync" "time" + "github.com/minio/madmin-go" "github.com/minio/minio/internal/config" "github.com/minio/pkg/env" ) @@ -37,6 +39,8 @@ const ( EnvIOCount = "MINIO_HEAL_MAX_IO" ) +var configMutex sync.RWMutex + // Config represents the heal settings. type Config struct { // Bitrot will perform bitrot scan on local disk when checking objects. @@ -46,6 +50,61 @@ type Config struct { IOCount int `json:"iocount"` } +// ScanMode returns configured scan mode +func (opts Config) ScanMode() madmin.HealScanMode { + configMutex.RLock() + defer configMutex.RUnlock() + if opts.Bitrot { + return madmin.HealDeepScan + } + return madmin.HealNormalScan +} + +// Wait waits for IOCount to go down or max sleep to elapse before returning. +// usually used in healing paths to wait for specified amount of time to +// throttle healing. +func (opts Config) Wait(currentIO func() int, systemIO func() int) { + configMutex.RLock() + maxIO, maxWait := opts.IOCount, opts.Sleep + configMutex.RUnlock() + + // No need to wait run at full speed. + if maxIO <= 0 { + return + } + + // At max 10 attempts to wait with 100 millisecond interval before proceeding + waitTick := 100 * time.Millisecond + + tmpMaxWait := maxWait + + if currentIO != nil { + for currentIO() >= maxIO+systemIO() { + if tmpMaxWait > 0 { + if tmpMaxWait < waitTick { + time.Sleep(tmpMaxWait) + } else { + time.Sleep(waitTick) + } + tmpMaxWait = tmpMaxWait - waitTick + } + if tmpMaxWait <= 0 { + return + } + } + } +} + +// Update updates opts with nopts +func (opts *Config) Update(nopts Config) { + configMutex.Lock() + defer configMutex.Unlock() + + opts.Bitrot = nopts.Bitrot + opts.IOCount = nopts.IOCount + opts.Sleep = nopts.Sleep +} + var ( // DefaultKVS - default KV config for heal settings DefaultKVS = config.KVS{