HealObject should succeed when only N/2 disks have data (#3952)

This commit is contained in:
Krishnan Parthasarathi 2017-03-22 22:45:16 +05:30 committed by Harshavardhana
parent fbfb4fc5a0
commit 417ec0df56
4 changed files with 38 additions and 7 deletions

View File

@ -225,10 +225,10 @@ func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs
// disk has a valid xl.json but may not have all the // disk has a valid xl.json but may not have all the
// parts. This is considered an outdated disk, since // parts. This is considered an outdated disk, since
// it needs healing too. // it needs healing too.
for pIndex, part := range partsMetadata[index].Parts { for _, part := range partsMetadata[index].Parts {
// compute blake2b sum of part. // compute blake2b sum of part.
partPath := filepath.Join(object, part.Name) partPath := filepath.Join(object, part.Name)
hash := newHash(blake2bAlgo) hash := newHash(partsMetadata[index].Erasure.Algorithm)
blakeBytes, hErr := hashSum(onlineDisk, bucket, partPath, hash) blakeBytes, hErr := hashSum(onlineDisk, bucket, partPath, hash)
if hErr == errFileNotFound { if hErr == errFileNotFound {
errs[index] = errFileNotFound errs[index] = errFileNotFound
@ -239,7 +239,7 @@ func disksWithAllParts(onlineDisks []StorageAPI, partsMetadata []xlMetaV1, errs
return nil, nil, traceError(hErr) return nil, nil, traceError(hErr)
} }
partChecksum := partsMetadata[index].Erasure.Checksum[pIndex].Hash partChecksum := partsMetadata[index].Erasure.GetCheckSumInfo(part.Name).Hash
blakeSum := hex.EncodeToString(blakeBytes) blakeSum := hex.EncodeToString(blakeBytes)
// if blake2b sum doesn't match for a part // if blake2b sum doesn't match for a part
// then this disk is outdated and needs // then this disk is outdated and needs

View File

@ -518,15 +518,36 @@ func TestHealObjectXL(t *testing.T) {
bucket := "bucket" bucket := "bucket"
object := "object" object := "object"
data := []byte("hello") data := bytes.Repeat([]byte("a"), 5*1024*1024)
err = obj.MakeBucket(bucket) err = obj.MakeBucket(bucket)
if err != nil { if err != nil {
t.Fatalf("Failed to make a bucket - %v", err) t.Fatalf("Failed to make a bucket - %v", err)
} }
_, err = obj.PutObject(bucket, object, int64(len(data)), bytes.NewReader(data), nil, "") // Create an object with multiple parts uploaded in decreasing
// part number.
uploadID, err := obj.NewMultipartUpload(bucket, object, nil)
if err != nil { if err != nil {
t.Fatalf("Failed to put an object - %v", err) t.Fatalf("Failed to create a multipart upload - %v", err)
}
var uploadedParts []completePart
for _, partID := range []int{2, 1} {
pInfo, err := obj.PutObjectPart(bucket, object, uploadID, partID,
int64(len(data)), bytes.NewReader(data), "", "")
if err != nil {
t.Fatalf("Failed to upload a part - %v", err)
}
uploadedParts = append(uploadedParts, completePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = obj.CompleteMultipartUpload(bucket, object, uploadID, uploadedParts)
if err != nil {
t.Fatalf("Failed to complete multipart upload - %v", err)
} }
// Remove the object backend files from the first disk. // Remove the object backend files from the first disk.

View File

@ -41,11 +41,18 @@ func reduceErrs(errs []error, ignoredErrs []error) (maxCount int, maxErr error)
} }
errorCounts[err]++ errorCounts[err]++
} }
max := 0 max := 0
for err, count := range errorCounts { for err, count := range errorCounts {
if max < count { switch {
case max < count:
max = count max = count
maxErr = err maxErr = err
// Prefer `nil` over other error values with the same
// number of occurrences.
case max == count && err == nil:
maxErr = err
} }
} }
return max, maxErr return max, maxErr

View File

@ -82,6 +82,9 @@ func TestReduceErrs(t *testing.T) {
errDiskNotFound, errDiskNotFound,
}, []error{errDiskNotFound}, errVolumeNotFound}, }, []error{errDiskNotFound}, errVolumeNotFound},
{[]error{}, []error{}, errXLReadQuorum}, {[]error{}, []error{}, errXLReadQuorum},
{[]error{errFileNotFound, errFileNotFound, errFileNotFound,
errFileNotFound, errFileNotFound, nil, nil, nil, nil, nil},
nil, nil},
} }
// Validates list of all the testcases for returning valid errors. // Validates list of all the testcases for returning valid errors.
for i, testCase := range testCases { for i, testCase := range testCases {