add healing for invalid shards by skipping the blocks (#13978)

Built on top of #13945, now we need to simply skip the
shards and its automated.
This commit is contained in:
Harshavardhana
2021-12-23 23:01:46 -08:00
committed by GitHub
parent 9ad6012782
commit 7e3a7d7044
8 changed files with 88 additions and 33 deletions

View File

@@ -225,10 +225,11 @@ func getLatestFileInfo(ctx context.Context, partsMetadata []FileInfo, errs []err
// - slice of errors about the state of data files on disk - can have
// a not-found error or a hash-mismatch error.
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo,
errs []error, latestMeta FileInfo,
bucket, object string, scanMode madmin.HealScanMode) ([]StorageAPI, []error) {
errs []error, latestMeta FileInfo, bucket, object string,
scanMode madmin.HealScanMode) ([]StorageAPI, []error, time.Time) {
var diskMTime time.Time
var shardFix bool
if !latestMeta.DataShardFixed() {
diskMTime = pickValidDiskTimeWithQuorum(partsMetadata,
latestMeta.Erasure.DataBlocks)
@@ -283,6 +284,8 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
if erasureDistributionReliable {
if !meta.IsValid() {
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
@@ -305,6 +308,23 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
}
}
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
if !partsMetadata[i].AcceptableDelta(diskMTime, shardDiskTimeDelta) {
// not with in acceptable delta, skip.
// If disk mTime mismatches it is considered outdated
// https://github.com/minio/minio/pull/13803
//
// This check only is active if we could find maximally
// occurring disk mtimes that are somewhat same across
// the quorum. Allowing to skip those shards which we
// might think are wrong.
shardFix = true
partsMetadata[i] = FileInfo{}
dataErrs[i] = errFileCorrupt
continue
}
}
// Always check data, if we got it.
if (len(meta.Data) > 0 || meta.Size == 0) && len(meta.Parts) > 0 {
checksumInfo := meta.Erasure.GetChecksumInfo(meta.Parts[0].Number)
@@ -338,14 +358,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
}
}
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
if !partsMetadata[i].AcceptableDelta(diskMTime, shardDiskTimeDelta) {
// not with in acceptable delta, skip.
partsMetadata[i] = FileInfo{}
continue
}
}
if dataErrs[i] == nil {
// All parts verified, mark it as all data available.
availableDisks[i] = onlineDisk
@@ -355,5 +367,10 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
}
}
return availableDisks, dataErrs
if shardFix {
// Only when shard is fixed return an appropriate disk mtime value.
return availableDisks, dataErrs, diskMTime
} // else return timeSentinel for disk mtime
return availableDisks, dataErrs, timeSentinel
}

View File

@@ -251,7 +251,8 @@ func TestListOnlineDisks(t *testing.T) {
t.Fatalf("Expected modTime to be equal to %v but was found to be %v",
test.expectedTime, modTime)
}
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, fi, bucket, object, madmin.HealDeepScan)
availableDisks, newErrs, _ := disksWithAllParts(ctx, onlineDisks, partsMetadata,
test.errs, fi, bucket, object, madmin.HealDeepScan)
test.errs = newErrs
if test._tamperBackend != noTamper {
@@ -433,7 +434,8 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
test.expectedTime, modTime)
}
availableDisks, newErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata, test.errs, fi, bucket, object, madmin.HealDeepScan)
availableDisks, newErrs, _ := disksWithAllParts(ctx, onlineDisks, partsMetadata,
test.errs, fi, bucket, object, madmin.HealDeepScan)
test.errs = newErrs
if test._tamperBackend != noTamper {
@@ -494,7 +496,8 @@ func TestDisksWithAllParts(t *testing.T) {
erasureDisks, _ = listOnlineDisks(erasureDisks, partsMetadata, errs)
filteredDisks, errs := disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
filteredDisks, errs, _ := disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
@@ -515,7 +518,8 @@ func TestDisksWithAllParts(t *testing.T) {
partsMetadata[0].ModTime = partsMetadata[0].ModTime.Add(-1 * time.Hour)
errs = make([]error, len(erasureDisks))
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
filteredDisks, _, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
@@ -535,7 +539,8 @@ func TestDisksWithAllParts(t *testing.T) {
partsMetadata[1].DataDir = "foo-random"
errs = make([]error, len(erasureDisks))
filteredDisks, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
filteredDisks, _, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))
@@ -571,7 +576,8 @@ func TestDisksWithAllParts(t *testing.T) {
}
errs = make([]error, len(erasureDisks))
filteredDisks, errs = disksWithAllParts(ctx, erasureDisks, partsMetadata, errs, fi, bucket, object, madmin.HealDeepScan)
filteredDisks, errs, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of disks: %d", len(filteredDisks))

View File

@@ -331,7 +331,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
// used here for reconstruction. This is done to ensure that
// we do not skip drives that have inconsistent metadata to be
// skipped from purging when they are stale.
availableDisks, dataErrs := disksWithAllParts(ctx, onlineDisks, partsMetadata,
availableDisks, dataErrs, diskMTime := disksWithAllParts(ctx, onlineDisks, partsMetadata,
errs, latestMeta, bucket, object, scanMode)
// Loop to find number of disks with valid data, per-drive
@@ -581,6 +581,20 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
}
}
if !diskMTime.Equal(timeSentinel) && !diskMTime.IsZero() {
// Update metadata to indicate special fix.
_, err = er.PutObjectMetadata(ctx, bucket, object, ObjectOptions{
NoLock: true,
UserDefined: map[string]string{
reservedMetadataPrefixLowerDataShardFix: "true",
// another reserved metadata to capture original disk-mtime
// captured for this version of the object, to be used
// possibly in future to heal other versions if possible.
ReservedMetadataPrefixLower + "disk-mtime": diskMTime.String(),
},
})
}
// Set the size of the object in the heal result
result.ObjectSize = latestMeta.Size