Add disksUnavailable healStatus const (#3990)

`disksUnavailable` healStatus constant indicates that a given object
needs healing but one or more of disks requiring heal are offline. This
can be used by admin heal API consumers to distinguish between a
successful heal and a no-op since the outdated disks were offline.
This commit is contained in:
Krishnan Parthasarathi
2017-04-01 06:25:15 +05:30
committed by Harshavardhana
parent a2a8d54bb6
commit 2bd694dbc8
16 changed files with 168 additions and 69 deletions

View File

@@ -125,7 +125,7 @@ func healBucketMetadata(storageDisks []StorageAPI, bucket string, readQuorum int
metaLock.RLock()
defer metaLock.RUnlock()
// Heals the given file at metaPath.
if err := healObject(storageDisks, minioMetaBucket, metaPath, readQuorum); err != nil && !isErrObjectNotFound(err) {
if _, _, err := healObject(storageDisks, minioMetaBucket, metaPath, readQuorum); err != nil && !isErrObjectNotFound(err) {
return err
} // Success.
return nil
@@ -313,18 +313,18 @@ func quickHeal(storageDisks []StorageAPI, writeQuorum int, readQuorum int) error
}
// Heals an object only the corrupted/missing erasure blocks.
func healObject(storageDisks []StorageAPI, bucket string, object string, quorum int) error {
func healObject(storageDisks []StorageAPI, bucket string, object string, quorum int) (int, int, error) {
partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object)
// readQuorum suffices for xl.json since we use monotonic
// system time to break the tie when a split-brain situation
// arises.
if reducedErr := reduceReadQuorumErrs(errs, nil, quorum); reducedErr != nil {
return toObjectErr(reducedErr, bucket, object)
return 0, 0, toObjectErr(reducedErr, bucket, object)
}
if !xlShouldHeal(storageDisks, partsMetadata, errs, bucket, object) {
// There is nothing to heal.
return nil
return 0, 0, nil
}
// List of disks having latest version of the object.
@@ -333,12 +333,16 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// List of disks having all parts as per latest xl.json.
availableDisks, errs, aErr := disksWithAllParts(latestDisks, partsMetadata, errs, bucket, object)
if aErr != nil {
return toObjectErr(aErr, bucket, object)
return 0, 0, toObjectErr(aErr, bucket, object)
}
numAvailableDisks := 0
for _, disk := range availableDisks {
if disk != nil {
numOfflineDisks := 0
for index, disk := range availableDisks {
switch {
case disk == nil, errs[index] == errDiskNotFound:
numOfflineDisks++
case disk != nil:
numAvailableDisks++
}
}
@@ -346,18 +350,25 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// If less than read quorum number of disks have all the parts
// of the data, we can't reconstruct the erasure-coded data.
if numAvailableDisks < quorum {
return toObjectErr(errXLReadQuorum, bucket, object)
return 0, 0, toObjectErr(errXLReadQuorum, bucket, object)
}
// List of disks having outdated version of the object or missing object.
outDatedDisks := outDatedDisks(storageDisks, availableDisks, errs, partsMetadata,
bucket, object)
numHealedDisks := 0
for _, disk := range outDatedDisks {
if disk != nil {
numHealedDisks++
}
}
// Latest xlMetaV1 for reference. If a valid metadata is not
// present, it is as good as object not found.
latestMeta, pErr := pickValidXLMeta(partsMetadata, modTime)
if pErr != nil {
return toObjectErr(pErr, bucket, object)
return 0, 0, toObjectErr(pErr, bucket, object)
}
for index, disk := range outDatedDisks {
@@ -389,14 +400,14 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
for _, part := range outDatedMeta.Parts {
dErr := disk.DeleteFile(bucket, pathJoin(object, part.Name))
if dErr != nil && !isErr(dErr, errFileNotFound) {
return toObjectErr(traceError(dErr), bucket, object)
return 0, 0, toObjectErr(traceError(dErr), bucket, object)
}
}
// Delete xl.json file. Ignore if xl.json not found.
dErr := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
if dErr != nil && !isErr(dErr, errFileNotFound) {
return toObjectErr(traceError(dErr), bucket, object)
return 0, 0, toObjectErr(traceError(dErr), bucket, object)
}
}
@@ -425,7 +436,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
minioMetaTmpBucket, pathJoin(tmpID, partName),
partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
if hErr != nil {
return toObjectErr(hErr, bucket, object)
return 0, 0, toObjectErr(hErr, bucket, object)
}
for index, sum := range checkSums {
if outDatedDisks[index] != nil {
@@ -450,7 +461,7 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// Generate and write `xl.json` generated from other disks.
aErr = writeUniqueXLMetadata(outDatedDisks, minioMetaTmpBucket, tmpID, partsMetadata, diskCount(outDatedDisks))
if aErr != nil {
return toObjectErr(aErr, bucket, object)
return 0, 0, toObjectErr(aErr, bucket, object)
}
// Rename from tmp location to the actual location.
@@ -461,22 +472,22 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// Remove any lingering partial data from current namespace.
aErr = disk.DeleteFile(bucket, retainSlash(object))
if aErr != nil && aErr != errFileNotFound {
return toObjectErr(traceError(aErr), bucket, object)
return 0, 0, toObjectErr(traceError(aErr), bucket, object)
}
// Attempt a rename now from healed data to final location.
aErr = disk.RenameFile(minioMetaTmpBucket, retainSlash(tmpID), bucket, retainSlash(object))
if aErr != nil {
return toObjectErr(traceError(aErr), bucket, object)
return 0, 0, toObjectErr(traceError(aErr), bucket, object)
}
}
return nil
return numOfflineDisks, numHealedDisks, nil
}
// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down,
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
func (xl xlObjects) HealObject(bucket, object string) (int, int, error) {
// Lock the object before healing.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()