fix: introduce isLeafDir in healing to fix the crash (#5920)

This PR also supports healing directories.

Fixes #5917
This commit is contained in:
Harshavardhana 2018-05-10 16:53:42 -07:00 committed by Dee Koder
parent 5b74f918d4
commit c872c30ea3
3 changed files with 75 additions and 16 deletions

View File

@ -153,6 +153,12 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
return err
}
}
// When isleaf check is delayed, make sure that it is set correctly here.
if delayIsLeaf && isLeaf == nil {
return errInvalidArgument
}
// For an empty list return right here.
if len(entries) == 0 {
return nil
@ -169,6 +175,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
if len(entries) == 0 {
return nil
}
for i, entry := range entries {
var leaf, leafDir bool
@ -187,7 +194,6 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
}
isDir := !leafDir && !leaf
if i == 0 && markerDir == entry {
if !recursive {
// Skip as the marker would already be listed in the previous listing.

View File

@ -1360,13 +1360,17 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de
return s.getHashedSet(entry).isObject(bucket, entry)
}
isLeafDir := func(bucket, entry string) bool {
return s.getHashedSet(entry).isObjectDir(bucket, entry)
}
var setDisks = make([][]StorageAPI, len(s.sets))
for _, set := range s.sets {
setDisks = append(setDisks, set.getLoadBalancedDisks())
}
listDir := listDirSetsHealFactory(isLeaf, setDisks...)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, nil, endWalkCh)
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, nil, isLeafDir, endWalkCh)
}
var objInfos []ObjectInfo
@ -1403,20 +1407,7 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
// Add each object seen to the result - objects are
// checked for healing later.
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: objInfo.Name,
ModTime: objInfo.ModTime,
Size: objInfo.Size,
IsDir: false,
})
result.Objects = append(result.Objects, objInfo)
}
return result, nil
}

View File

@ -539,12 +539,74 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
return result, nil
}
// healObjectDir - heals object directory specifically, this special call
// is needed since we do not have a special backend format for directories.
func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
storageDisks := xl.getDisks()
// Initialize heal result object
hr = madmin.HealResultItem{
Type: madmin.HealItemObject,
Bucket: bucket,
Object: object,
DiskCount: len(storageDisks),
ParityBlocks: len(storageDisks) / 2,
DataBlocks: len(storageDisks) / 2,
ObjectSize: 0,
}
// Prepare object creation in all disks
for _, disk := range storageDisks {
if disk == nil {
hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
})
hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateMissing,
})
continue
}
drive := disk.String()
hr.Before.Drives = append(hr.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: madmin.DriveStateMissing,
})
hr.After.Drives = append(hr.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: madmin.DriveStateMissing,
})
if !dryRun {
if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists {
return hr, toObjectErr(err, bucket, object)
}
for i, v := range hr.Before.Drives {
if v.Endpoint == drive {
hr.After.Drives[i].State = madmin.DriveStateOk
}
}
}
}
return hr, nil
}
// HealObject - heal the given object.
//
// FIXME: If an object object was deleted and one disk was down,
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
// Healing directories handle it separately.
if hasSuffix(object, slashSeparator) {
return xl.healObjectDir(ctx, bucket, object, dryRun)
}
// FIXME: Metadata is read again in the healObject() call below.
// Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)