heal: Use etag as quorum when none found for modtime (#20500)

This commit is contained in:
Anis Eleuch 2024-10-01 16:19:10 +01:00 committed by GitHub
parent 6186d11761
commit 0abfd1bcb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 13 deletions

View File

@ -289,7 +289,7 @@ func hasPartErr(partErrs []int) bool {
// - slice of errors about the state of data files on disk - can have
// a not-found error or a hash-mismatch error.
func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetadata []FileInfo,
errs []error, latestMeta FileInfo, bucket, object string,
errs []error, latestMeta FileInfo, filterByETag bool, bucket, object string,
scanMode madmin.HealScanMode,
) (availableDisks []StorageAPI, dataErrsByDisk map[int][]int, dataErrsByPart map[int][]int) {
availableDisks = make([]StorageAPI, len(onlineDisks))
@ -345,7 +345,14 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
}
meta := partsMetadata[i]
if !meta.ModTime.Equal(latestMeta.ModTime) || meta.DataDir != latestMeta.DataDir {
corrupted := false
if filterByETag {
corrupted = meta.Metadata["etag"] != latestMeta.Metadata["etag"]
} else {
corrupted = !meta.ModTime.Equal(latestMeta.ModTime) || meta.DataDir != latestMeta.DataDir
}
if corrupted {
metaErrs[i] = errFileCorrupt
partsMetadata[i] = FileInfo{}
continue
@ -408,7 +415,6 @@ func disksWithAllParts(ctx context.Context, onlineDisks []StorageAPI, partsMetad
verifyResp *CheckPartsResp
)
meta.DataDir = latestMeta.DataDir
switch scanMode {
case madmin.HealDeepScan:
// disk has a valid xl.meta but may not have all the

View File

@ -314,7 +314,7 @@ func TestListOnlineDisks(t *testing.T) {
test.expectedTime, modTime)
}
availableDisks, _, _ := disksWithAllParts(ctx, onlineDisks, partsMetadata,
test.errs, fi, bucket, object, madmin.HealDeepScan)
test.errs, fi, false, bucket, object, madmin.HealDeepScan)
if test._tamperBackend != noTamper {
if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil {
@ -496,7 +496,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
}
availableDisks, _, _ := disksWithAllParts(ctx, onlineDisks, partsMetadata,
test.errs, fi, bucket, object, madmin.HealDeepScan)
test.errs, fi, false, bucket, object, madmin.HealDeepScan)
if test._tamperBackend != noTamper {
if tamperedIndex != -1 && availableDisks[tamperedIndex] != nil {
@ -558,7 +558,7 @@ func TestDisksWithAllParts(t *testing.T) {
erasureDisks, _, _ = listOnlineDisks(erasureDisks, partsMetadata, errs, readQuorum)
filteredDisks, _, dataErrsPerDisk := disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
errs, fi, false, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of drives: %d", len(filteredDisks))
@ -580,7 +580,7 @@ func TestDisksWithAllParts(t *testing.T) {
errs = make([]error, len(erasureDisks))
filteredDisks, _, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
errs, fi, false, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of drives: %d", len(filteredDisks))
@ -601,7 +601,7 @@ func TestDisksWithAllParts(t *testing.T) {
errs = make([]error, len(erasureDisks))
filteredDisks, _, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
errs, fi, false, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of drives: %d", len(filteredDisks))
@ -638,7 +638,7 @@ func TestDisksWithAllParts(t *testing.T) {
errs = make([]error, len(erasureDisks))
filteredDisks, dataErrsPerDisk, _ = disksWithAllParts(ctx, erasureDisks, partsMetadata,
errs, fi, bucket, object, madmin.HealDeepScan)
errs, fi, false, bucket, object, madmin.HealDeepScan)
if len(filteredDisks) != len(erasureDisks) {
t.Errorf("Unexpected number of drives: %d", len(filteredDisks))

View File

@ -327,15 +327,18 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
// List of disks having latest version of the object xl.meta
// (by modtime).
onlineDisks, modTime, etag := listOnlineDisks(storageDisks, partsMetadata, errs, readQuorum)
onlineDisks, quorumModTime, quorumETag := listOnlineDisks(storageDisks, partsMetadata, errs, readQuorum)
// Latest FileInfo for reference. If a valid metadata is not
// present, it is as good as object not found.
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, modTime, etag, readQuorum)
latestMeta, err := pickValidFileInfo(ctx, partsMetadata, quorumModTime, quorumETag, readQuorum)
if err != nil {
return result, err
}
// No modtime quorum
filterDisksByETag := quorumETag != ""
// List of disks having all parts as per latest metadata.
// NOTE: do not pass in latestDisks to diskWithAllParts since
// the diskWithAllParts needs to reach the drive to ensure
@ -346,7 +349,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
// we do not skip drives that have inconsistent metadata to be
// skipped from purging when they are stale.
availableDisks, dataErrsByDisk, dataErrsByPart := disksWithAllParts(ctx, onlineDisks, partsMetadata,
errs, latestMeta, bucket, object, scanMode)
errs, latestMeta, filterDisksByETag, bucket, object, scanMode)
var erasure Erasure
if !latestMeta.Deleted && !latestMeta.IsRemote() {
@ -421,7 +424,14 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
return result, nil
}
if !latestMeta.XLV1 && !latestMeta.Deleted && disksToHealCount > latestMeta.Erasure.ParityBlocks {
cannotHeal := !latestMeta.XLV1 && !latestMeta.Deleted && disksToHealCount > latestMeta.Erasure.ParityBlocks
if cannotHeal && quorumETag != "" {
// This is an object that is supposed to be removed by the dangling code
// but we noticed that ETag is the same for all objects, let's give it a shot
cannotHeal = false
}
if cannotHeal {
// Allow for dangling deletes, on versions that have DataDir missing etc.
// this would end up restoring the correct readable versions.
m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, dataErrsByPart, ObjectOptions{