diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index d9262c0e8..59dd9faf6 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -404,7 +404,7 @@ func healObject(ctx context.Context, storageDisks []StorageAPI, bucket string, o // Latest xlMetaV1 for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, pErr := pickValidXLMeta(ctx, partsMetadata, modTime) + latestMeta, pErr := pickValidXLMeta(ctx, partsMetadata, modTime, quorum) if pErr != nil { return result, toObjectErr(pErr, bucket, object) } diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index cd5032adf..33f34767c 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -18,9 +18,9 @@ package cmd import ( "context" + "crypto/sha256" "encoding/hex" "encoding/json" - "fmt" "path" "sort" "sync" @@ -295,18 +295,52 @@ func (m xlMetaV1) ObjectToPartOffset(ctx context.Context, offset int64) (partInd return 0, 0, InvalidRange{} } -// pickValidXLMeta - picks one valid xlMeta content and returns from a -// slice of xlmeta content. -func pickValidXLMeta(ctx context.Context, metaArr []xlMetaV1, modTime time.Time) (xmv xlMetaV1, e error) { - // Pick latest valid metadata. - for _, meta := range metaArr { +func getXLMetaInQuorum(ctx context.Context, metaArr []xlMetaV1, modTime time.Time, quorum int) (xmv xlMetaV1, e error) { + metaHashes := make([]string, len(metaArr)) + for i, meta := range metaArr { if meta.IsValid() && meta.Stat.ModTime.Equal(modTime) { - return meta, nil + h := sha256.New() + for _, p := range meta.Parts { + h.Write([]byte(p.Name)) + } + metaHashes[i] = hex.EncodeToString(h.Sum(nil)) } } - err := fmt.Errorf("No valid xl.json present") - logger.LogIf(ctx, err) - return xmv, err + + metaHashCountMap := make(map[string]int) + for _, hash := range metaHashes { + if hash == "" { + continue + } + metaHashCountMap[hash]++ + } + + maxHash := "" + maxCount := 0 + for hash, count := range metaHashCountMap { + if count > maxCount { + maxCount = count + maxHash = hash + } + } + + if maxCount < quorum { + return xlMetaV1{}, errXLReadQuorum + } + + for i, hash := range metaHashes { + if hash == maxHash { + return metaArr[i], nil + } + } + + return xlMetaV1{}, errXLReadQuorum +} + +// pickValidXLMeta - picks one valid xlMeta content and returns from a +// slice of xlmeta content. +func pickValidXLMeta(ctx context.Context, metaArr []xlMetaV1, modTime time.Time, quorum int) (xmv xlMetaV1, e error) { + return getXLMetaInQuorum(ctx, metaArr, modTime, quorum) } // list of all errors that can be ignored in a metadata operation. diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index d5371112f..3e59a66e1 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "errors" "os" "path" "strconv" @@ -349,11 +348,11 @@ func TestPickValidXLMeta(t *testing.T) { metaArr: invalidXS, modTime: now, xlMeta: invalidX1, - expectedErr: errors.New("No valid xl.json present"), + expectedErr: errXLReadQuorum, }, } for i, test := range testCases { - xlMeta, err := pickValidXLMeta(context.Background(), test.metaArr, test.modTime) + xlMeta, err := pickValidXLMeta(context.Background(), test.metaArr, test.modTime, len(test.metaArr)/2) if test.expectedErr != nil { if err.Error() != test.expectedErr.Error() { t.Errorf("Test %d: Expected to fail with %v but received %v", diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 77bb7e63e..ed46d9373 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -346,7 +346,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID onlineDisks, modTime := listOnlineDisks(xl.getDisks(), partsMetadata, errs) // Pick one from the first valid metadata. - xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime) + xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } @@ -445,7 +445,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. - xlMeta, err = pickValidXLMeta(ctx, partsMetadata, modTime) + xlMeta, err = pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } @@ -645,7 +645,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, var objectSize int64 // Pick one from the first valid metadata. - xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime) + xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum) if err != nil { return oi, err } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 6437752a4..c4a110528 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -102,7 +102,7 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc _, modTime := listOnlineDisks(storageDisks, metaArr, errs) // Pick latest valid metadata. - xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime) + xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) if err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } @@ -221,7 +221,7 @@ func (xl xlObjects) getObject(ctx context.Context, bucket, object string, startO onlineDisks, modTime := listOnlineDisks(xl.getDisks(), metaArr, errs) // Pick latest valid metadata. - xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime) + xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) if err != nil { return err } @@ -402,7 +402,7 @@ func (xl xlObjects) getObjectInfo(ctx context.Context, bucket, object string) (o modTime, _ := commonTime(modTimes) // Pick latest valid metadata. - xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime) + xlMeta, err := pickValidXLMeta(ctx, metaArr, modTime, readQuorum) if err != nil { return objInfo, err }