heal: Re-heal an object when a corruption is found during normal scan (#14482)

When scanning using normal mode, HealObject() can report an 
error saying that it found a corrupted part. This doesn't have 
when HealObject() is called with bitrot scan flag. However, when 
this happens, we can still restart HealObject() with the bitrot scan.

This is also important because this means the scanner and the 
new disks healer will not be able to heal an object that doesn't 
exist in a specific disk and has corruption in another disk.

Also without this PR, mc admin heal command without bitrot will report
an error.
This commit is contained in:
Anis Elleuch 2022-03-05 03:24:34 +01:00 committed by GitHub
parent 66afa16aed
commit 3fca4055d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 196 additions and 54 deletions

View File

@ -931,5 +931,12 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
}
// Heal the object.
return er.healObject(healCtx, bucket, object, versionID, opts)
hr, err = er.healObject(healCtx, bucket, object, versionID, opts)
if errors.Is(err, errFileCorrupt) && opts.ScanMode != madmin.HealDeepScan {
// Instead of returning an error when a bitrot error is detected
// during a normal heal scan, heal again with bitrot flag enabled.
opts.ScanMode = madmin.HealDeepScan
hr, err = er.healObject(healCtx, bucket, object, versionID, opts)
}
return hr, err
}

View File

@ -807,7 +807,140 @@ func TestHealObjectCorruptedPools(t *testing.T) {
}
}
func TestHealObjectCorrupted(t *testing.T) {
func TestHealObjectCorruptedXLMeta(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resetGlobalHealState()
defer resetGlobalHealState()
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
defer removeRoots(fsDirs)
// Everything is fine, should return nil
objLayer, _, err := initObjectLayer(ctx, mustGetPoolEndpoints(fsDirs...))
if err != nil {
t.Fatal(err)
}
bucket := getRandomBucketName()
object := getRandomObjectName()
data := bytes.Repeat([]byte("a"), 5*1024*1024)
var opts ObjectOptions
err = objLayer.MakeBucketWithLocation(ctx, bucket, BucketOptions{})
if err != nil {
t.Fatalf("Failed to make a bucket - %v", err)
}
// Create an object with multiple parts uploaded in decreasing
// part number.
uploadID, err := objLayer.NewMultipartUpload(ctx, bucket, object, opts)
if err != nil {
t.Fatalf("Failed to create a multipart upload - %v", err)
}
var uploadedParts []CompletePart
for _, partID := range []int{2, 1} {
pInfo, err1 := objLayer.PutObjectPart(ctx, bucket, object, uploadID, partID, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
if err1 != nil {
t.Fatalf("Failed to upload a part - %v", err1)
}
uploadedParts = append(uploadedParts, CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = objLayer.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{})
if err != nil {
t.Fatalf("Failed to complete multipart upload - %v", err)
}
z := objLayer.(*erasureServerPools)
er := z.serverPools[0].sets[0]
erasureDisks := er.getDisks()
firstDisk := erasureDisks[0]
// Test 1: Remove the object backend files from the first disk.
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
if err != nil {
t.Fatalf("Failed to delete a file - %v", err)
}
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}
if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile, false); err != nil {
t.Errorf("Expected xl.meta file to be present but stat failed - %v", err)
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi1, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
if !reflect.DeepEqual(fi, nfi1) {
t.Fatalf("FileInfo not equal after healing")
}
// Test 2: Test with a corrupted xl.meta
err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), []byte("abcd"))
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
}
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Errorf("Expected nil but received %v", err)
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi2, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
if !reflect.DeepEqual(fi, nfi2) {
t.Fatalf("FileInfo not equal after healing")
}
// Test 3: checks if HealObject returns an error when xl.meta is not found
// in more than read quorum number of disks, to create a corrupted situation.
for i := 0; i <= nfi2.Erasure.DataBlocks; i++ {
erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
}
// Try healing now, expect to receive errFileNotFound.
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
if err != nil {
if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
}
}
// since majority of xl.meta's are not available, object should be successfully deleted.
_, err = objLayer.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
}
}
func TestHealObjectCorruptedParts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -867,7 +1000,26 @@ func TestHealObjectCorrupted(t *testing.T) {
er := z.serverPools[0].sets[0]
erasureDisks := er.getDisks()
firstDisk := erasureDisks[0]
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
secondDisk := erasureDisks[1]
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
}
part1Disk1Origin, err := firstDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Fatalf("Failed to read a file - %v", err)
}
part1Disk2Origin, err := secondDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Fatalf("Failed to read a file - %v", err)
}
// Test 1, remove part.1
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false)
if err != nil {
t.Fatalf("Failed to delete a file - %v", err)
}
@ -877,85 +1029,67 @@ func TestHealObjectCorrupted(t *testing.T) {
t.Fatalf("Failed to heal object - %v", err)
}
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
fi, err := getLatestFileInfo(ctx, fileInfos, errs)
part1Replaced, err := firstDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
t.Fatalf("Failed to read a file - %v", err)
}
if _, err = firstDisk.StatInfoFile(context.Background(), bucket, object+"/"+xlStorageFormatFile, false); err != nil {
t.Errorf("Expected xl.meta file to be present but stat failed - %v", err)
if !reflect.DeepEqual(part1Disk1Origin, part1Replaced) {
t.Fatalf("part.1 not healed correctly")
}
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false)
// Test 2, Corrupt part.1
err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), []byte("foobytes"))
if err != nil {
t.Errorf("Failure during deleting part.1 - %v", err)
t.Fatalf("Failed to write a file - %v", err)
}
err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), []byte{})
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
t.Fatalf("Failed to heal object - %v", err)
}
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
part1Replaced, err = firstDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Errorf("Expected nil but received %v", err)
t.Fatalf("Failed to read a file - %v", err)
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err := getLatestFileInfo(ctx, fileInfos, errs)
if !reflect.DeepEqual(part1Disk1Origin, part1Replaced) {
t.Fatalf("part.1 not healed correctly")
}
// Test 3, Corrupt one part and remove data in another disk
err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), []byte("foobytes"))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
t.Fatalf("Failed to write a file - %v", err)
}
if !reflect.DeepEqual(fi, nfi) {
t.Fatalf("FileInfo not equal after healing")
}
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), false)
err = secondDisk.Delete(context.Background(), bucket, object, true)
if err != nil {
t.Errorf("Failure during deleting part.1 - %v", err)
t.Fatalf("Failed to delete a file - %v", err)
}
bdata := bytes.Repeat([]byte("b"), int(nfi.Size))
err = firstDisk.WriteAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"), bdata)
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
if err != nil {
t.Errorf("Failure during creating part.1 - %v", err)
t.Fatalf("Failed to heal object - %v", err)
}
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
partReconstructed, err := firstDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Errorf("Expected nil but received %v", err)
t.Fatalf("Failed to read a file - %v", err)
}
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
nfi, err = getLatestFileInfo(ctx, fileInfos, errs)
if !reflect.DeepEqual(part1Disk1Origin, partReconstructed) {
t.Fatalf("part.1 not healed correctly")
}
partReconstructed, err = secondDisk.ReadAll(context.Background(), bucket, pathJoin(object, fi.DataDir, "part.1"))
if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err)
t.Fatalf("Failed to read a file - %v", err)
}
if !reflect.DeepEqual(fi, nfi) {
t.Fatalf("FileInfo not equal after healing")
}
// Test 4: checks if HealObject returns an error when xl.meta is not found
// in more than read quorum number of disks, to create a corrupted situation.
for i := 0; i <= nfi.Erasure.DataBlocks; i++ {
erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
}
// Try healing now, expect to receive errFileNotFound.
_, err = objLayer.HealObject(ctx, bucket, object, "", madmin.HealOpts{DryRun: false, Remove: true, ScanMode: madmin.HealDeepScan})
if err != nil {
if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
}
}
// since majority of xl.meta's are not available, object should be successfully deleted.
_, err = objLayer.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
if _, ok := err.(ObjectNotFound); !ok {
t.Errorf("Expect %v but received %v", ObjectNotFound{Bucket: bucket, Object: object}, err)
if !reflect.DeepEqual(part1Disk2Origin, partReconstructed) {
t.Fatalf("part.1 not healed correctly")
}
}
@ -1017,7 +1151,8 @@ func TestHealObjectErasure(t *testing.T) {
t.Fatalf("Failed to complete multipart upload - %v", err)
}
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
// Delete the whole object folder
err = firstDisk.Delete(context.Background(), bucket, object, true)
if err != nil {
t.Fatalf("Failed to delete a file - %v", err)
}