healObjects() should cancel() context before writing to errCh (#13262)

also remove HealObjects() code from dataScanner running another
listing from the data-scanner is super in-efficient and in-fact
this code is redundant since we already attempt to heal all
dangling objects anyways.
This commit is contained in:
Harshavardhana 2021-09-21 14:55:17 -07:00 committed by GitHub
parent 806b10b934
commit 8392765213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 33 deletions

View File

@ -652,7 +652,6 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
resolver.bucket = bucket
foundObjs := false
dangling := false
ctx, cancel := context.WithCancel(ctx)
err := listPathRaw(ctx, listPathRawOptions{
@ -674,13 +673,11 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
console.Debugf(healObjectsPrefix+" got partial, %d agreed, errs: %v\n", nAgreed, errs)
}
// agreed value less than expected quorum
dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
// proceed to heal nonetheless, since
// this object might be dangling.
entry, _ = entries.firstFound()
}
@ -736,30 +733,6 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
}
// If we found one or more disks with this folder, delete it.
if err == nil && dangling {
if f.dataUsageScannerDebug {
console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix)
}
// wait on timer per object.
wait := scannerSleeper.Timer(ctx)
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{
Recursive: true,
Remove: healDeleteDangling,
}, func(bucket, object, versionID string) error {
// Wait for each heal as per scanner frequency.
wait()
wait = scannerSleeper.Timer(ctx)
return bgSeq.queueHealTask(healSource{
bucket: bucket,
object: object,
versionID: versionID,
}, madmin.HealItemObject)
})
}
// Add unless healing returned an error.
if foundObjs {
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: 1}

View File

@ -1640,8 +1640,8 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
errCh <- errors.New("HealObjects: No non-healing disks found")
cancel()
errCh <- errors.New("HealObjects: No non-healing disks found")
return
}
@ -1669,8 +1669,8 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
if err := healObject(bucket, entry.name, ""); err != nil {
errCh <- err
cancel()
errCh <- err
return
}
return
@ -1678,8 +1678,8 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
for _, version := range fivs.Versions {
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
errCh <- err
cancel()
errCh <- err
return
}
}
@ -1718,9 +1718,10 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
cancel()
errCh <- fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)
return
}
}()