fix: delete dangling directories properly (#9222)

This commit is contained in:
Harshavardhana 2020-03-30 09:48:24 -07:00 committed by GitHub
parent fdda5f98c6
commit ba52a925f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -483,7 +483,7 @@ func (xl xlObjects) healObject(ctx context.Context, bucket string, object string
// healObjectDir - heals object directory specifically, this special call
// is needed since we do not have a special backend format for directories.
func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
storageDisks := xl.getDisks()
// Initialize heal result object
@ -501,11 +501,10 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
hr.After.Drives = make([]madmin.HealDriveInfo, len(storageDisks))
errs := statAllDirs(ctx, storageDisks, bucket, object)
if isObjectDirDangling(errs) {
for i, err := range errs {
if err == nil {
storageDisks[i].DeleteFile(bucket, object)
}
danglingObject := isObjectDirDangling(errs)
if danglingObject {
if !dryRun && remove {
xl.deleteObject(ctx, bucket, object, hr.DataBlocks+1, true)
}
}
@ -517,8 +516,8 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
}
switch err {
case nil:
hr.Before.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOk}
hr.After.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOk}
hr.Before.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateOk}
case errDiskNotFound:
hr.Before.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
hr.After.Drives[i] = madmin.HealDriveInfo{State: madmin.DriveStateOffline}
@ -531,12 +530,11 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
hr.After.Drives[i] = madmin.HealDriveInfo{Endpoint: drive, State: madmin.DriveStateCorrupt}
}
}
if dryRun {
if dryRun || danglingObject {
return hr, nil
}
for i, err := range errs {
switch err {
case errVolumeNotFound, errFileNotFound:
if err == errVolumeNotFound || err == errFileNotFound {
// Bucket or prefix/directory not found
merr := storageDisks[i].MakeVol(pathJoin(bucket, object))
switch merr {
@ -640,13 +638,22 @@ func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix
// if total disks - a combination of corrupted and missing
// files is lesser than N/2+1 number of disks.
func isObjectDirDangling(errs []error) (ok bool) {
var notFoundDir int
var found int
var notFound int
var foundNotEmpty int
var otherFound int
for _, readErr := range errs {
if readErr == errFileNotFound {
notFoundDir++
if readErr == nil {
found++
} else if readErr == errFileNotFound || readErr == errVolumeNotFound {
notFound++
} else if readErr == errVolumeNotEmpty {
foundNotEmpty++
} else {
otherFound++
}
}
return notFoundDir > len(errs)/2
return found+foundNotEmpty+otherFound < notFound
}
// Object is considered dangling/corrupted if any only
@ -709,7 +716,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, opts
// Healing directories handle it separately.
if HasSuffix(object, SlashSeparator) {
return xl.healObjectDir(healCtx, bucket, object, opts.DryRun)
return xl.healObjectDir(healCtx, bucket, object, opts.DryRun, opts.Remove)
}
storageDisks := xl.getDisks()