heal: Ignore disks with non quorum modtime and dataDir (#12328)

This commit is contained in:
Anis Elleuch 2021-05-19 20:04:08 +01:00 committed by GitHub
parent ecb5525c91
commit 866593fd94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 2 deletions

View File

@ -198,6 +198,9 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
// a not-found error or a hash-mismatch error.
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo, errs []error, bucket,
object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
// List of disks having latest version of the object er.meta (by modtime)
_, modTime, dataDir := listOnlineDisks(onlineDisks, partsMetadata, errs)
availableDisks := make([]StorageAPI, len(onlineDisks))
dataErrs := make([]error, len(onlineDisks))
inconsistent := 0
@ -236,6 +239,13 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
continue
}
meta := partsMetadata[i]
if !meta.ModTime.Equal(modTime) || meta.DataDir != dataDir {
dataErrs[i] = errFileCorrupt
partsMetadata[i] = FileInfo{}
continue
}
if erasureDistributionReliable {
if !meta.IsValid() {
continue

View File

@ -484,13 +484,15 @@ func TestDisksWithAllParts(t *testing.T) {
t.Fatalf("Failed to read xl meta data %v", reducedErr)
}
// Test that all disks are returned without any failures with
// Test 1: Test that all disks are returned without any failures with
// unmodified meta data
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false)
if err != nil {
t.Fatalf("Failed to read xl meta data %v", err)
}
erasureDisks, _, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
filteredDisks, errs := disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
@ -507,8 +509,48 @@ func TestDisksWithAllParts(t *testing.T) {
}
}
// Test 2: Not synchronized modtime
partsMetadataBackup := partsMetadata[0]
partsMetadata[0].ModTime = partsMetadata[0].ModTime.Add(-1 * time.Hour)
errs = make([]error, len(erasureDisks))
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
}
for diskIndex, disk := range filteredDisks {
if diskIndex == 0 && disk != nil {
t.Errorf("Disk not filtered as expected, disk: %d", diskIndex)
}
if diskIndex != 0 && disk == nil {
t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex)
}
}
partsMetadata[0] = partsMetadataBackup // Revert before going to the next test
// Test 3: Not synchronized DataDir
partsMetadataBackup = partsMetadata[1]
partsMetadata[1].DataDir = "foo-random"
errs = make([]error, len(erasureDisks))
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
}
for diskIndex, disk := range filteredDisks {
if diskIndex == 1 && disk != nil {
t.Errorf("Disk not filtered as expected, disk: %d", diskIndex)
}
if diskIndex != 1 && disk == nil {
t.Errorf("Disk erroneously filtered, diskIndex: %d", diskIndex)
}
}
partsMetadata[1] = partsMetadataBackup // Revert before going to the next test
// Test 4: key = disk index, value = part name with hash mismatch
diskFailures := make(map[int]string)
// key = disk index, value = part name with hash mismatch
diskFailures[0] = "part.1"
diskFailures[3] = "part.1"
diskFailures[15] = "part.1"