fix: trigger Heal when xl.meta needs healing during PUT (#15661)

This PR is a continuation of the previous change instead
of returning an error, instead trigger a spot heal on the
'xl.meta' and return only after the healing is complete.

This allows for future GETs on the same resource to be
consistent for any version of the object.
This commit is contained in:
Harshavardhana
2022-09-07 07:25:39 -07:00
committed by GitHub
parent 228c6686f8
commit 8e997eba4a
4 changed files with 159 additions and 24 deletions

View File

@@ -42,6 +42,7 @@ import (
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/mimedb"
"github.com/minio/pkg/wildcard"
uatomic "go.uber.org/atomic"
)
@@ -744,24 +745,19 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
// Wait for all renames to finish.
errs := g.Wait()
versions := reduceCommonVersions(diskVersions, writeQuorum)
if versions == 0 {
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if err == nil {
err = errErasureWriteQuorum
}
return nil, err
}
for index, dversions := range diskVersions {
if versions != dversions {
errs[index] = errFileCorrupt
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if err == nil {
versions := reduceCommonVersions(diskVersions, writeQuorum)
for index, dversions := range diskVersions {
if versions != dversions {
errs[index] = errFileNeedsHealing
}
}
err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
}
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
// otherwise return failure. Cleanup successful renames.
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
// otherwise return failure.
return evalDisks(disks, errs), err
}
@@ -1178,16 +1174,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
if errors.Is(err, errFileNotFound) {
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
}
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
defer NSUpdated(bucket, object)
onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
@@ -1211,6 +1198,57 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
}
}
healEntry := func(entry metaCacheEntry) error {
if entry.isDir() {
return nil
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.minio.sys'
if bucket == minioMetaBucket {
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
return nil
}
if wildcard.Match("tmp/*", entry.name) {
return nil
}
if wildcard.Match("multipart/*", entry.name) {
return nil
}
if wildcard.Match("tmp-old/*", entry.name) {
return nil
}
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
_, err = er.HealObject(ctx, bucket, entry.name, "", madmin.HealOpts{NoLock: true})
return err
}
for _, version := range fivs.Versions {
_, err = er.HealObject(ctx, bucket, version.Name, version.VersionID, madmin.HealOpts{NoLock: true})
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
return err
}
}
return nil
}
if err != nil {
if errors.Is(err, errFileNotFound) {
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
}
if errors.Is(err, errFileNeedsHealing) && !opts.Speedtest {
listAndHeal(ctx, bucket, object, &er, healEntry)
} else {
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
}
defer NSUpdated(bucket, object)
fi.ReplicationState = opts.PutReplicationState()
online = countOnlineDisks(onlineDisks)