Continue healing other objects even if objects without quorum exist (#5851)

fixes #5815
This commit is contained in:
Krishna Srinivas 2018-04-25 11:56:39 -07:00 committed by Dee Koder
parent 6831177394
commit 9aace6d36d
4 changed files with 21 additions and 30 deletions

View File

@ -1379,19 +1379,8 @@ func (s *xlSets) listObjectsHeal(ctx context.Context, bucket, prefix, marker, de
return loi, toObjectErr(walkResult.err, bucket, prefix)
}
var objInfo ObjectInfo
var err error
if hasSuffix(walkResult.entry, slashSeparator) {
objInfo, err = s.getHashedSet(walkResult.entry).getObjectInfoDir(ctx, bucket, walkResult.entry)
} else {
objInfo, err = s.getHashedSet(walkResult.entry).getObjectInfo(ctx, bucket, walkResult.entry)
}
if err != nil {
// Ignore errFileNotFound
if err == errFileNotFound {
continue
}
return loi, toObjectErr(err, bucket, prefix)
}
objInfo.Bucket = bucket
objInfo.Name = walkResult.entry
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
i++

View File

@ -284,11 +284,20 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, bucket, object)
// readQuorum suffices for xl.json since we use monotonic
// system time to break the tie when a split-brain situation
// arises.
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, quorum); reducedErr != nil {
return result, toObjectErr(reducedErr, bucket, object)
errCount := 0
for _, err := range errs {
if err != nil {
errCount++
}
}
if errCount == len(errs) {
// Only if we get errors from all the disks we return error. Else we need to
// continue to return filled madmin.HealResultItem struct which includes info
// on what disks the file is available etc.
if reducedErr := reduceReadQuorumErrs(ctx, errs, nil, quorum); reducedErr != nil {
return result, toObjectErr(reducedErr, bucket, object)
}
}
// List of disks having latest version of the object xl.json
@ -536,17 +545,11 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRun bool) (hr madmin.HealResultItem, err error) {
// FIXME: Metadata is read again in the healObject() call below.
// Read metadata files from all the disks
partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
// get read quorum for this object
var readQuorum int
readQuorum, _, err = objectQuorumFromMeta(xl, partsMetadata, errs)
if err != nil {
return hr, err
}
latestXLMeta, _ := getLatestXLMeta(partsMetadata, errs)
// Lock the object before healing.
objectLock := xl.nsMutex.NewNSLock(bucket, object)
@ -556,5 +559,5 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
defer objectLock.RUnlock()
// Heal the object.
return healObject(ctx, xl.getDisks(), bucket, object, readQuorum, dryRun)
return healObject(ctx, xl.getDisks(), bucket, object, latestXLMeta.Erasure.DataBlocks, dryRun)
}

View File

@ -144,7 +144,7 @@ func TestHealObjectXL(t *testing.T) {
// Try healing now, expect to receive errDiskNotFound.
_, err = obj.HealObject(context.Background(), bucket, object, false)
// since majority of xl.jsons are not available, object quorum can't be read properly and error will be errXLReadQuorum
if err != errXLReadQuorum {
t.Errorf("Expected %v but received %v", errDiskNotFound, err)
if _, ok := err.(InsufficientReadQuorum); !ok {
t.Errorf("Expected %v but received %v", InsufficientWriteQuorum{}, err)
}
}

View File

@ -387,8 +387,7 @@ func (m xlMetaV1) ObjectToPartOffset(ctx context.Context, offset int64) (partInd
}
// pickValidXLMeta - picks one valid xlMeta content and returns from a
// slice of xlmeta content. If no value is found this function panics
// and dies.
// slice of xlmeta content.
func pickValidXLMeta(ctx context.Context, metaArr []xlMetaV1, modTime time.Time) (xmv xlMetaV1, e error) {
// Pick latest valid metadata.
for _, meta := range metaArr {