mirror of
https://github.com/minio/minio.git
synced 2025-04-06 21:00:40 -04:00
fix: sleeper behavior in data scanner (#12164)
do not apply healReplication() for ILM expired, transitioned objects
This commit is contained in:
parent
edda244066
commit
c8050bc079
@ -45,8 +45,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dataScannerSleepPerFolder = time.Millisecond // Time to wait between folders.
|
dataScannerSleepPerFolder = 20 * time.Millisecond // Time to wait between folders.
|
||||||
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
|
dataScannerStartDelay = 1 * time.Minute // Time to wait on startup and between cycles.
|
||||||
|
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
|
||||||
|
|
||||||
healDeleteDangling = true
|
healDeleteDangling = true
|
||||||
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
|
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
|
||||||
@ -60,7 +61,9 @@ var (
|
|||||||
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
||||||
// Sleeper values are updated when config is loaded.
|
// Sleeper values are updated when config is loaded.
|
||||||
scannerSleeper = newDynamicSleeper(10, 10*time.Second)
|
scannerSleeper = newDynamicSleeper(10, 10*time.Second)
|
||||||
scannerCycle = &safeDuration{}
|
scannerCycle = &safeDuration{
|
||||||
|
t: dataScannerStartDelay,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// initDataScanner will start the scanner in the background.
|
// initDataScanner will start the scanner in the background.
|
||||||
@ -450,6 +453,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
}
|
}
|
||||||
|
|
||||||
if typ&os.ModeDir != 0 {
|
if typ&os.ModeDir != 0 {
|
||||||
|
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
|
||||||
|
|
||||||
h := hashPath(entName)
|
h := hashPath(entName)
|
||||||
_, exists := f.oldCache.Cache[h.Key()]
|
_, exists := f.oldCache.Cache[h.Key()]
|
||||||
cache.addChildString(entName)
|
cache.addChildString(entName)
|
||||||
@ -562,8 +567,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
|
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dynamic time delay.
|
|
||||||
wait := scannerSleeper.Timer(ctx)
|
|
||||||
resolver.bucket = bucket
|
resolver.bucket = bucket
|
||||||
|
|
||||||
foundObjs := false
|
foundObjs := false
|
||||||
@ -592,9 +595,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
// agreed value less than expected quorum
|
// agreed value less than expected quorum
|
||||||
dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum
|
dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum
|
||||||
|
|
||||||
// Sleep and reset.
|
|
||||||
wait()
|
|
||||||
wait = scannerSleeper.Timer(ctx)
|
|
||||||
entry, ok := entries.resolve(&resolver)
|
entry, ok := entries.resolve(&resolver)
|
||||||
if !ok {
|
if !ok {
|
||||||
for _, err := range errs {
|
for _, err := range errs {
|
||||||
@ -614,9 +614,14 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
if entry.isDir() {
|
if entry.isDir() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait on timer per object.
|
||||||
|
wait := scannerSleeper.Timer(ctx)
|
||||||
|
|
||||||
// We got an entry which we should be able to heal.
|
// We got an entry which we should be able to heal.
|
||||||
fiv, err := entry.fileInfoVersions(bucket)
|
fiv, err := entry.fileInfoVersions(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
wait()
|
||||||
err := bgSeq.queueHealTask(healSource{
|
err := bgSeq.queueHealTask(healSource{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: entry.name,
|
object: entry.name,
|
||||||
@ -628,10 +633,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
foundObjs = foundObjs || err == nil
|
foundObjs = foundObjs || err == nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ver := range fiv.Versions {
|
for _, ver := range fiv.Versions {
|
||||||
// Sleep and reset.
|
// Sleep and reset.
|
||||||
wait()
|
wait()
|
||||||
wait = scannerSleeper.Timer(ctx)
|
wait = scannerSleeper.Timer(ctx)
|
||||||
|
|
||||||
err := bgSeq.queueHealTask(healSource{
|
err := bgSeq.queueHealTask(healSource{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: fiv.Name,
|
object: fiv.Name,
|
||||||
@ -662,6 +669,9 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix)
|
console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait on timer per object.
|
||||||
|
wait := scannerSleeper.Timer(ctx)
|
||||||
|
|
||||||
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{
|
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{
|
||||||
Recursive: true,
|
Recursive: true,
|
||||||
Remove: healDeleteDangling,
|
Remove: healDeleteDangling,
|
||||||
@ -669,7 +679,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
func(bucket, object, versionID string) error {
|
func(bucket, object, versionID string) error {
|
||||||
// Wait for each heal as per scanner frequency.
|
// Wait for each heal as per scanner frequency.
|
||||||
wait()
|
wait()
|
||||||
wait = scannerSleeper.Timer(ctx)
|
|
||||||
return bgSeq.queueHealTask(healSource{
|
return bgSeq.queueHealTask(healSource{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: object,
|
object: object,
|
||||||
@ -678,8 +687,6 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
wait()
|
|
||||||
|
|
||||||
// Add unless healing returned an error.
|
// Add unless healing returned an error.
|
||||||
if foundObjs {
|
if foundObjs {
|
||||||
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
||||||
@ -936,13 +943,17 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, meta ac
|
|||||||
// The resulting size on disk will always be returned.
|
// The resulting size on disk will always be returned.
|
||||||
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
||||||
// If no metadata is supplied, -1 is returned if no action is taken.
|
// If no metadata is supplied, -1 is returned if no action is taken.
|
||||||
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) int64 {
|
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta, sizeS *sizeSummary) int64 {
|
||||||
applied, size := i.applyLifecycle(ctx, o, meta)
|
applied, size := i.applyLifecycle(ctx, o, meta)
|
||||||
// For instance, an applied lifecycle means we remove/transitioned an object
|
// For instance, an applied lifecycle means we remove/transitioned an object
|
||||||
// from the current deployment, which means we don't have to call healing
|
// from the current deployment, which means we don't have to call healing
|
||||||
// routine even if we are asked to do via heal flag.
|
// routine even if we are asked to do via heal flag.
|
||||||
if !applied && i.heal {
|
if !applied {
|
||||||
size = i.applyHealing(ctx, o, meta)
|
if i.heal {
|
||||||
|
size = i.applyHealing(ctx, o, meta)
|
||||||
|
}
|
||||||
|
// replicate only if lifecycle rules are not applied.
|
||||||
|
i.healReplication(ctx, o, meta.oi.Clone(), sizeS)
|
||||||
}
|
}
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
@ -360,7 +360,7 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs
|
|||||||
}
|
}
|
||||||
|
|
||||||
oi := fsMeta.ToObjectInfo(bucket, object, fi)
|
oi := fsMeta.ToObjectInfo(bucket, object, fi)
|
||||||
sz := item.applyActions(ctx, fs, actionMeta{oi: oi})
|
sz := item.applyActions(ctx, fs, actionMeta{oi: oi}, &sizeSummary{})
|
||||||
if sz >= 0 {
|
if sz >= 0 {
|
||||||
return sizeSummary{totalSize: sz}, nil
|
return sizeSummary{totalSize: sz}, nil
|
||||||
}
|
}
|
||||||
|
@ -394,6 +394,10 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs
|
|||||||
|
|
||||||
// return initialized object layer
|
// return initialized object layer
|
||||||
objAPI := newObjectLayerFn()
|
objAPI := newObjectLayerFn()
|
||||||
|
// object layer not initialized, return.
|
||||||
|
if objAPI == nil {
|
||||||
|
return cache, errServerNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
globalHealConfigMu.Lock()
|
globalHealConfigMu.Lock()
|
||||||
healOpts := globalHealConfig
|
healOpts := globalHealConfig
|
||||||
@ -431,13 +435,10 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUs
|
|||||||
sizeS := sizeSummary{}
|
sizeS := sizeSummary{}
|
||||||
for _, version := range fivs.Versions {
|
for _, version := range fivs.Versions {
|
||||||
oi := version.ToObjectInfo(item.bucket, item.objectPath())
|
oi := version.ToObjectInfo(item.bucket, item.objectPath())
|
||||||
if objAPI != nil {
|
totalSize += item.applyActions(ctx, objAPI, actionMeta{
|
||||||
totalSize += item.applyActions(ctx, objAPI, actionMeta{
|
oi: oi,
|
||||||
oi: oi,
|
bitRotScan: healOpts.Bitrot,
|
||||||
bitRotScan: healOpts.Bitrot,
|
}, &sizeS)
|
||||||
})
|
|
||||||
item.healReplication(ctx, objAPI, oi.Clone(), &sizeS)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
sizeS.totalSize = totalSize
|
sizeS.totalSize = totalSize
|
||||||
return sizeS, nil
|
return sizeS, nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user