Return always a default heal item upon unexpected error (#6556)

Never return an empty result item even upon error,
choose all the default values and based on the errors
make sure to send right result reply.
This commit is contained in:
Harshavardhana
2018-10-02 17:13:51 -07:00
committed by Dee Koder
parent 274b35154c
commit 223967fd32
3 changed files with 73 additions and 26 deletions

View File

@@ -295,7 +295,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
// continue to return filled madmin.HealResultItem struct which includes info
// on what disks the file is available etc.
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, quorum); reducedErr != nil {
return result, toObjectErr(reducedErr, bucket, object)
return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(reducedErr, bucket, object)
}
}
@@ -304,10 +304,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// List of disks having all parts as per latest xl.json.
availableDisks, dataErrs, aErr := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object)
if aErr != nil {
return result, toObjectErr(aErr, bucket, object)
}
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object)
// Initialize heal result object
result = madmin.HealResultItem{
@@ -338,7 +335,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
result.ObjectSize = partsMetadata[i].Stat.Size
result.ParityBlocks = partsMetadata[i].Erasure.ParityBlocks
result.DataBlocks = partsMetadata[i].Erasure.DataBlocks
case errs[i] == errDiskNotFound:
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
driveState = madmin.DriveStateOffline
case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound:
fallthrough
@@ -388,6 +385,10 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
// If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < quorum {
// Default to most common configuration for erasure
// blocks upon returning quorum error.
result.ParityBlocks = len(storageDisks) / 2
result.DataBlocks = len(storageDisks) / 2
return result, toObjectErr(errXLReadQuorum, bucket, object)
}
@@ -512,7 +513,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
}
// Generate and write `xl.json` generated from other disks.
outDatedDisks, aErr = writeUniqueXLMetadata(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
outDatedDisks, aErr := writeUniqueXLMetadata(ctx, outDatedDisks, minioMetaTmpBucket, tmpID,
partsMetadata, diskCount(outDatedDisks))
if aErr != nil {
return result, toObjectErr(aErr, bucket, object)
@@ -602,6 +603,58 @@ func (xl xlObjects) healObjectDir(ctx context.Context, bucket, object string, dr
return hr, nil
}
// Populates default heal result item entries with possible values when we are returning prematurely.
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
func defaultHealResult(storageDisks []StorageAPI, errs []error, bucket, object string) madmin.HealResultItem {
// Initialize heal result object
result := madmin.HealResultItem{
Type: madmin.HealItemObject,
Bucket: bucket,
Object: object,
DiskCount: len(storageDisks),
// Initialize object size to -1, so we can detect if we are
// unable to reliably find the object size.
ObjectSize: -1,
}
for index, disk := range storageDisks {
if disk == nil {
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
State: madmin.DriveStateOffline,
})
continue
}
drive := disk.String()
driveState := madmin.DriveStateCorrupt
switch errs[index] {
case errFileNotFound, errVolumeNotFound:
driveState = madmin.DriveStateMissing
}
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: driveState,
})
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: drive,
State: driveState,
})
}
// Default to most common configuration for erasure blocks.
result.ParityBlocks = len(storageDisks) / 2
result.DataBlocks = len(storageDisks) / 2
return result
}
// HealObject - heal the given object.
//
// FIXME: If an object object was deleted and one disk was down,
@@ -624,19 +677,21 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
return xl.healObjectDir(healCtx, bucket, object, dryRun)
}
storageDisks := xl.getDisks()
// FIXME: Metadata is read again in the healObject() call below.
// Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(healCtx, xl.getDisks(), bucket, object)
partsMetadata, errs := readAllXLMetadata(healCtx, storageDisks, bucket, object)
latestXLMeta, err := getLatestXLMeta(healCtx, partsMetadata, errs)
if err != nil {
return hr, toObjectErr(err, bucket, object)
return defaultHealResult(storageDisks, errs, bucket, object), toObjectErr(err, bucket, object)
}
// Lock the object before healing.
objectLock := xl.nsMutex.NewNSLock(bucket, object)
if lerr := objectLock.GetRLock(globalHealingTimeout); lerr != nil {
return hr, lerr
return defaultHealResult(storageDisks, errs, bucket, object), lerr
}
defer objectLock.RUnlock()