diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 7e800d252..4412772b0 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -286,12 +286,10 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Initialize heal result object result = madmin.HealResultItem{ - Type: madmin.HealItemObject, - Bucket: bucket, - Object: object, - DiskCount: len(storageDisks), - ParityBlocks: er.defaultParityCount, - DataBlocks: len(storageDisks) - er.defaultParityCount, + Type: madmin.HealItemObject, + Bucket: bucket, + Object: object, + DiskCount: len(storageDisks), } if !opts.NoLock { @@ -306,18 +304,27 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s // Re-read when we have lock... partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true) + if isAllNotFound(errs) { + // Nothing to do, file is already gone. + return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, + errs, bucket, object, versionID), nil + } - if _, err = getLatestFileInfo(ctx, partsMetadata, errs); err != nil { + readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) + if err != nil { return er.purgeObjectDangling(ctx, bucket, object, versionID, partsMetadata, errs, []error{}, opts) } + result.ParityBlocks = result.DiskCount - readQuorum + result.DataBlocks = readQuorum + // List of disks having latest version of the object er.meta // (by modtime). onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Latest FileInfo for reference. If a valid metadata is not // present, it is as good as object not found. - latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, result.DataBlocks) + latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum) if err != nil { return result, toObjectErr(err, bucket, object, versionID) } @@ -392,13 +399,9 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } if isAllNotFound(errs) { - err = toObjectErr(errFileNotFound, bucket, object) - if versionID != "" { - err = toObjectErr(errFileVersionNotFound, bucket, object, versionID) - } // File is fully gone, fileInfo is empty. return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints, errs, - bucket, object, versionID), err + bucket, object, versionID), nil } // If less than read quorum number of disks have all the parts @@ -663,7 +666,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin } if dryRun || danglingObject || isAllNotFound(errs) { // Nothing to do, file is already gone. - return hr, toObjectErr(errFileNotFound, bucket, object) + return hr, nil } for i, err := range errs { if err == errVolumeNotFound || err == errFileNotFound { @@ -768,8 +771,15 @@ func statAllDirs(ctx context.Context, storageDisks []StorageAPI, bucket, prefix // A 0 length slice will always return false. func isAllNotFound(errs []error) bool { for _, err := range errs { - if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) || errors.Is(err, errFileVersionNotFound) { - continue + if err != nil { + switch err.Error() { + case errFileNotFound.Error(): + fallthrough + case errVolumeNotFound.Error(): + fallthrough + case errFileVersionNotFound.Error(): + continue + } } return false } diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 943d823b3..aeff37bcf 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -329,6 +329,131 @@ func TestHealingDanglingObject(t *testing.T) { } } +func TestHealCorrectQuorum(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resetGlobalHealState() + defer resetGlobalHealState() + + nDisks := 32 + fsDirs, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal(err) + } + + defer removeRoots(fsDirs) + + pools := mustGetPoolEndpoints(fsDirs[:16]...) + pools = append(pools, mustGetPoolEndpoints(fsDirs[16:]...)...) + + // Everything is fine, should return nil + objLayer, _, err := initObjectLayer(ctx, pools) + if err != nil { + t.Fatal(err) + } + + bucket := getRandomBucketName() + object := getRandomObjectName() + data := bytes.Repeat([]byte("a"), 5*1024*1024) + var opts ObjectOptions + + err = objLayer.MakeBucketWithLocation(ctx, bucket, BucketOptions{}) + if err != nil { + t.Fatalf("Failed to make a bucket - %v", err) + } + + // Create an object with multiple parts uploaded in decreasing + // part number. + uploadID, err := objLayer.NewMultipartUpload(ctx, bucket, object, opts) + if err != nil { + t.Fatalf("Failed to create a multipart upload - %v", err) + } + + var uploadedParts []CompletePart + for _, partID := range []int{2, 1} { + pInfo, err1 := objLayer.PutObjectPart(ctx, bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) + if err1 != nil { + t.Fatalf("Failed to upload a part - %v", err1) + } + uploadedParts = append(uploadedParts, CompletePart{ + PartNumber: pInfo.PartNumber, + ETag: pInfo.ETag, + }) + } + + _, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{}) + if err != nil { + t.Fatalf("Failed to complete multipart upload - %v", err) + } + + cfgFile := pathJoin(bucketConfigPrefix, bucket, ".test.bin") + if err = saveConfig(ctx, objLayer, cfgFile, data); err != nil { + t.Fatal(err) + } + + hopts := madmin.HealOpts{ + DryRun: false, + Remove: true, + ScanMode: madmin.HealNormalScan, + } + + // Test 1: Remove the object backend files from the first disk. + z := objLayer.(*erasureServerPools) + for _, set := range z.serverPools { + er := set.sets[0] + erasureDisks := er.getDisks() + + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + nfi, err := getLatestFileInfo(ctx, fileInfos, errs) + if errors.Is(err, errFileNotFound) { + continue + } + if err != nil { + t.Fatalf("Failed to getLatestFileInfo - %v", err) + } + + for i := 0; i < nfi.Erasure.ParityBlocks; i++ { + erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false) + } + + // Try healing now, it should heal the content properly. + _, err = objLayer.HealObject(ctx, bucket, object, "", hopts) + if err != nil { + t.Fatal(err) + } + + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + if countErrs(errs, nil) != len(fileInfos) { + t.Fatal("Expected all xl.meta healed, but partial heal detected") + } + + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) + nfi, err = getLatestFileInfo(ctx, fileInfos, errs) + if errors.Is(err, errFileNotFound) { + continue + } + if err != nil { + t.Fatalf("Failed to getLatestFileInfo - %v", err) + } + + for i := 0; i < nfi.Erasure.ParityBlocks; i++ { + erasureDisks[i].Delete(context.Background(), minioMetaBucket, pathJoin(cfgFile, xlStorageFormatFile), false) + } + + // Try healing now, it should heal the content properly. + _, err = objLayer.HealObject(ctx, minioMetaBucket, cfgFile, "", hopts) + if err != nil { + t.Fatal(err) + } + + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) + if countErrs(errs, nil) != len(fileInfos) { + t.Fatal("Expected all xl.meta healed, but partial heal detected") + } + } +} + func TestHealObjectCorruptedPools(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()