refactor healing to remove certain structs (#13079)

- remove sourceCh usage from healing
  we already have tasks and resp channel

- use read locks to lookup globalHealConfig

- fix healing resolver to pick candidates quickly
  that need healing, without this resolver was
  unexpectedly skipping.
This commit is contained in:
Harshavardhana
2021-08-26 14:06:04 -07:00
committed by GitHub
parent 2f9ab26372
commit c11a2ac396
11 changed files with 144 additions and 202 deletions

View File

@@ -396,9 +396,6 @@ type healSequence struct {
// bucket, and object on which heal seq. was initiated
bucket, object string
// A channel of entities (format, buckets, objects) to heal
sourceCh chan healSource
// A channel of entities with heal result
respCh chan healResult
@@ -648,11 +645,7 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) {
h.currentStatus.StartTime = UTCNow()
h.mutex.Unlock()
if h.sourceCh == nil {
go h.traverseAndHeal(objAPI)
} else {
go h.healFromSourceCh()
}
go h.traverseAndHeal(objAPI)
select {
case err, ok := <-h.traverseAndHealDoneCh:
@@ -696,10 +689,6 @@ func (h *healSequence) logHeal(healType madmin.HealItemType) {
}
func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error {
globalHealConfigMu.Lock()
opts := globalHealConfig
globalHealConfigMu.Unlock()
// Send heal request
task := healTask{
bucket: source.bucket,
@@ -711,9 +700,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if source.opts != nil {
task.opts = *source.opts
}
if opts.Bitrot {
task.opts.ScanMode = madmin.HealDeepScan
}
task.opts.ScanMode = globalHealConfig.ScanMode()
h.mutex.Lock()
h.scannedItemsMap[healType]++
@@ -773,48 +760,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
}
}
func (h *healSequence) healItemsFromSourceCh() error {
for {
select {
case source, ok := <-h.sourceCh:
if !ok {
return nil
}
var itemType madmin.HealItemType
switch source.bucket {
case nopHeal:
continue
case SlashSeparator:
itemType = madmin.HealItemMetadata
default:
if source.object == "" {
itemType = madmin.HealItemBucket
} else {
itemType = madmin.HealItemObject
}
}
if err := h.queueHealTask(source, itemType); err != nil {
switch err.(type) {
case ObjectExistsAsDirectory:
case ObjectNotFound:
case VersionNotFound:
default:
logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w",
pathJoin(source.bucket, source.object), err))
}
}
case <-h.ctx.Done():
return nil
}
}
}
func (h *healSequence) healFromSourceCh() {
h.healItemsFromSourceCh()
}
func (h *healSequence) healDiskMeta(objAPI ObjectLayer) error {
// Start healing the config prefix.
return h.healMinioSysMeta(objAPI, minioConfigPrefix)()

View File

@@ -19,10 +19,8 @@ package cmd
import (
"context"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger"
)
// healTask represents what to heal along with options
@@ -52,51 +50,22 @@ type healRoutine struct {
// Add a new task in the tasks queue
func (h *healRoutine) queueHealTask(task healTask) {
select {
case h.tasks <- task:
default:
}
h.tasks <- task
}
func systemIO() int {
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
return int(globalHTTPListen.NumSubscribers()) + int(globalTrace.NumSubscribers())
}
func waitForLowHTTPReq() {
globalHealConfigMu.Lock()
maxIO, maxWait := globalHealConfig.IOCount, globalHealConfig.Sleep
globalHealConfigMu.Unlock()
// No need to wait run at full speed.
if maxIO <= 0 {
return
}
// At max 10 attempts to wait with 100 millisecond interval before proceeding
waitTick := 100 * time.Millisecond
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
maxIOFn := func() int {
return maxIO + int(globalHTTPListen.NumSubscribers()) + int(globalTrace.NumSubscribers())
}
tmpMaxWait := maxWait
var currentIO func() int
if httpServer := newHTTPServerFn(); httpServer != nil {
// Any requests in progress, delay the heal.
for httpServer.GetRequestCount() >= maxIOFn() {
if tmpMaxWait > 0 {
if tmpMaxWait < waitTick {
time.Sleep(tmpMaxWait)
} else {
time.Sleep(waitTick)
}
tmpMaxWait = tmpMaxWait - waitTick
}
if tmpMaxWait <= 0 {
if intDataUpdateTracker.debug {
logger.Info("waitForLowHTTPReq: waited max %s, resuming", maxWait)
}
break
}
}
currentIO = httpServer.GetRequestCount
}
globalHealConfig.Wait(currentIO, systemIO)
}
// Wait for heal requests and process them
@@ -123,10 +92,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
}
}
select {
case task.responseCh <- healResult{result: res, err: err}:
default:
}
task.responseCh <- healResult{result: res, err: err}
case <-h.doneCh:
return
@@ -138,7 +104,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
func newHealRoutine() *healRoutine {
return &healRoutine{
tasks: make(chan healTask, 50000),
tasks: make(chan healTask),
doneCh: make(chan struct{}),
}

View File

@@ -337,10 +337,10 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints()
if len(healDisks) > 0 {
// Reformat disks
bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
bgSeq.queueHealTask(healSource{bucket: SlashSeparator}, madmin.HealItemMetadata)
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- healSource{bucket: nopHeal}
bgSeq.queueHealTask(healSource{bucket: nopHeal}, madmin.HealItemMetadata)
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
len(healDisks)))

View File

@@ -579,9 +579,7 @@ func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config
globalCompressConfig = cmpCfg
globalCompressConfigMu.Unlock()
globalHealConfigMu.Lock()
globalHealConfig = healCfg
globalHealConfigMu.Unlock()
globalHealConfig.Update(healCfg)
// update dynamic scanner values.
scannerCycle.Update(scannerCfg.Cycle)

View File

@@ -57,8 +57,7 @@ const (
)
var (
globalHealConfig heal.Config
globalHealConfigMu sync.Mutex
globalHealConfig heal.Config
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
// Sleeper values are updated when config is loaded.
@@ -638,6 +637,13 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
}
if bucket != resolver.bucket {
// Bucket might be missing as well with abandoned children.
// make sure it is created first otherwise healing won't proceed
// for objects.
_, _ = objAPI.HealBucket(ctx, bucket, madmin.HealOpts{})
}
resolver.bucket = bucket
foundObjs := false
@@ -856,27 +862,21 @@ func (i *scannerItem) transformMetaDir() {
i.objectName = split[len(split)-1]
}
// actionMeta contains information used to apply actions.
type actionMeta struct {
oi ObjectInfo
bitRotScan bool // indicates if bitrot check was requested.
}
var applyActionsLogPrefix = color.Green("applyActions:")
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) {
if i.debug {
if meta.oi.VersionID != "" {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
if oi.VersionID != "" {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), oi.VersionID)
} else {
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
}
}
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
if meta.bitRotScan {
healOpts.ScanMode = madmin.HealDeepScan
healOpts := madmin.HealOpts{
Remove: healDeleteDangling,
ScanMode: globalHealConfig.ScanMode(),
}
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts)
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
return 0
}
@@ -887,8 +887,8 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, meta acti
return res.ObjectSize
}
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta actionMeta) (applied bool, size int64) {
size, err := meta.oi.GetActualSize()
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (applied bool, size int64) {
size, err := oi.GetActualSize()
if i.debug {
logger.LogIf(ctx, err)
}
@@ -900,20 +900,20 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
return false, size
}
versionID := meta.oi.VersionID
versionID := oi.VersionID
action := i.lifeCycle.ComputeAction(
lifecycle.ObjectOpts{
Name: i.objectPath(),
UserTags: meta.oi.UserTags,
ModTime: meta.oi.ModTime,
VersionID: meta.oi.VersionID,
DeleteMarker: meta.oi.DeleteMarker,
IsLatest: meta.oi.IsLatest,
NumVersions: meta.oi.NumVersions,
SuccessorModTime: meta.oi.SuccessorModTime,
RestoreOngoing: meta.oi.RestoreOngoing,
RestoreExpires: meta.oi.RestoreExpires,
TransitionStatus: meta.oi.TransitionedObject.Status,
UserTags: oi.UserTags,
ModTime: oi.ModTime,
VersionID: oi.VersionID,
DeleteMarker: oi.DeleteMarker,
IsLatest: oi.IsLatest,
NumVersions: oi.NumVersions,
SuccessorModTime: oi.SuccessorModTime,
RestoreOngoing: oi.RestoreOngoing,
RestoreExpires: oi.RestoreExpires,
TransitionStatus: oi.TransitionedObject.Status,
RemoteTiersImmediately: globalDebugRemoteTiersImmediately,
})
if i.debug {
@@ -972,8 +972,8 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
// applyTierObjSweep removes remote object pending deletion and the free-version
// tracking this information.
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta actionMeta) {
if !meta.oi.TransitionedObject.FreeVersion {
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
if !oi.TransitionedObject.FreeVersion {
// nothing to be done
return
}
@@ -986,18 +986,18 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta
return err
}
// Remove the remote object
err := deleteObjectFromRemoteTier(ctx, meta.oi.TransitionedObject.Name, meta.oi.TransitionedObject.VersionID, meta.oi.TransitionedObject.Tier)
err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
if ignoreNotFoundErr(err) != nil {
logger.LogIf(ctx, err)
return
}
// Remove this free version
_, err = o.DeleteObject(ctx, meta.oi.Bucket, meta.oi.Name, ObjectOptions{
VersionID: meta.oi.VersionID,
_, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
VersionID: oi.VersionID,
})
if err == nil {
auditLogLifecycle(ctx, meta.oi, ILMFreeVersionDelete)
auditLogLifecycle(ctx, oi, ILMFreeVersionDelete)
}
if ignoreNotFoundErr(err) != nil {
logger.LogIf(ctx, err)
@@ -1009,19 +1009,19 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, meta
// The resulting size on disk will always be returned.
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) int64 {
i.applyTierObjSweep(ctx, o, meta)
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 {
i.applyTierObjSweep(ctx, o, oi)
applied, size := i.applyLifecycle(ctx, o, meta)
applied, size := i.applyLifecycle(ctx, o, oi)
// For instance, an applied lifecycle means we remove/transitioned an object
// from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag.
if !applied {
if i.heal {
size = i.applyHealing(ctx, o, meta)
size = i.applyHealing(ctx, o, oi)
}
// replicate only if lifecycle rules are not applied.
i.healReplication(ctx, o, meta.oi.Clone(), sizeS)
i.healReplication(ctx, o, oi.Clone(), sizeS)
}
return size
}

View File

@@ -386,7 +386,7 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs
}
oi := fsMeta.ToObjectInfo(bucket, object, fi)
sz := item.applyActions(ctx, fs, actionMeta{oi: oi}, &sizeSummary{})
sz := item.applyActions(ctx, fs, oi, &sizeSummary{})
if sz >= 0 {
return sizeSummary{totalSize: sz, versions: 1}, nil
}

View File

@@ -41,24 +41,14 @@ func newBgHealSequence() *healSequence {
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
ctx, cancelCtx := context.WithCancel(logger.SetReqInfo(GlobalContext, reqInfo))
globalHealConfigMu.Lock()
opts := globalHealConfig
globalHealConfigMu.Unlock()
scanMode := madmin.HealNormalScan
if opts.Bitrot {
scanMode = madmin.HealDeepScan
}
hs := madmin.HealOpts{
// Remove objects that do not have read-quorum
Remove: healDeleteDangling,
ScanMode: scanMode,
ScanMode: globalHealConfig.ScanMode(),
}
return &healSequence{
sourceCh: make(chan healSource, 50000),
respCh: make(chan healResult, 50000),
respCh: make(chan healResult),
startTime: UTCNow(),
clientToken: bgHealingUUID,
// run-background heal with reserved bucket
@@ -179,14 +169,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
})
globalHealConfigMu.Lock()
opts := globalHealConfig
globalHealConfigMu.Unlock()
scanMode := madmin.HealNormalScan
if opts.Bitrot {
scanMode = madmin.HealDeepScan
}
scanMode := globalHealConfig.ScanMode()
// Heal all buckets with all objects
for _, bucket := range buckets {
@@ -332,19 +315,15 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
func healObject(bucket, object, versionID string, scan madmin.HealScanMode) {
// Get background heal sequence to send elements to heal
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if !ok {
return
}
select {
case bgSeq.sourceCh <- healSource{
bucket: bucket,
object: object,
versionID: versionID,
opts: &madmin.HealOpts{
Remove: healDeleteDangling, // if found dangling purge it.
ScanMode: scan,
},
}:
default:
if ok {
bgSeq.queueHealTask(healSource{
bucket: bucket,
object: object,
versionID: versionID,
opts: &madmin.HealOpts{
Remove: healDeleteDangling, // if found dangling purge it.
ScanMode: scan,
},
}, madmin.HealItemObject)
}
}

View File

@@ -163,6 +163,10 @@ func (e *metaCacheEntry) fileInfo(bucket string) (*FileInfo, error) {
}, nil
}
if e.cached == nil {
if len(e.metadata) == 0 {
// only happens if the entry is not found.
return nil, errFileNotFound
}
fi, err := getFileInfo(e.metadata, bucket, e.name, "", false)
if err != nil {
return nil, err
@@ -308,15 +312,21 @@ func (m metaCacheEntries) resolve(r *metadataResolutionParams) (selected *metaCa
sort.Slice(r.candidates, func(i, j int) bool {
return r.candidates[i].n > r.candidates[j].n
})
// Check if we have enough.
if r.candidates[0].n < r.objQuorum {
return nil, false
}
if r.candidates[0].n > r.candidates[1].n {
return r.candidates[0].e, true
ok := r.candidates[0].e != nil && r.candidates[0].e.name != ""
return r.candidates[0].e, ok
}
e := resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket)
// Tie between two, resolve using modtime+versions.
return resolveEntries(r.candidates[0].e, r.candidates[1].e, r.bucket), true
ok := e != nil && e.name != ""
return e, ok
}
}

View File

@@ -183,17 +183,8 @@ func (m *mrfState) healRoutine() {
idler := time.NewTimer(mrfInfoResetInterval)
defer idler.Stop()
globalHealConfigMu.Lock()
opts := globalHealConfig
globalHealConfigMu.Unlock()
scanMode := madmin.HealNormalScan
if opts.Bitrot {
scanMode = madmin.HealDeepScan
}
var mrfHealingOpts = madmin.HealOpts{
ScanMode: scanMode,
ScanMode: globalHealConfig.ScanMode(),
Remove: healDeleteDangling,
}

View File

@@ -453,9 +453,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
return cache, errServerNotInitialized
}
globalHealConfigMu.Lock()
healOpts := globalHealConfig
globalHealConfigMu.Unlock()
cache.Info.updates = updates
dataUsageInfo, err := scanDataFolder(ctx, s.diskPath, cache, func(item scannerItem) (sizeSummary, error) {
@@ -488,10 +485,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
sizeS := sizeSummary{}
for _, version := range fivs.Versions {
oi := version.ToObjectInfo(item.bucket, item.objectPath())
sz := item.applyActions(ctx, objAPI, actionMeta{
oi: oi,
bitRotScan: healOpts.Bitrot,
}, &sizeS)
sz := item.applyActions(ctx, objAPI, oi, &sizeS)
if !oi.DeleteMarker && sz == oi.Size {
sizeS.versions++
}