mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Do not return an error when version disparity is detected (#16269)
This commit is contained in:
parent
80fc3a8a52
commit
89db3fdb5d
@ -1211,13 +1211,17 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Rename the multipart object to final location.
|
// Rename the multipart object to final location.
|
||||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
||||||
partsMetadata, bucket, object, writeQuorum); err != nil {
|
partsMetadata, bucket, object, writeQuorum)
|
||||||
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer NSUpdated(bucket, object)
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
|
if !opts.Speedtest && versionsDisparity {
|
||||||
|
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)
|
||||||
|
}
|
||||||
|
|
||||||
// Check if there is any offline disk and add it to the MRF list
|
// Check if there is any offline disk and add it to the MRF list
|
||||||
for _, disk := range onlineDisks {
|
for _, disk := range onlineDisks {
|
||||||
if disk != nil && disk.IsOnline() {
|
if disk != nil && disk.IsOnline() {
|
||||||
|
@ -729,7 +729,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
||||||
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, error) {
|
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, bool, error) {
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
|
||||||
fvID := mustGetUUID()
|
fvID := mustGetUUID()
|
||||||
@ -768,20 +768,22 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
|||||||
// Wait for all renames to finish.
|
// Wait for all renames to finish.
|
||||||
errs := g.Wait()
|
errs := g.Wait()
|
||||||
|
|
||||||
|
var versionsDisparity bool
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
versions := reduceCommonVersions(diskVersions, writeQuorum)
|
versions := reduceCommonVersions(diskVersions, writeQuorum)
|
||||||
for index, dversions := range diskVersions {
|
for _, dversions := range diskVersions {
|
||||||
if versions != dversions {
|
if versions != dversions {
|
||||||
errs[index] = errFileNeedsHealing
|
versionsDisparity = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
||||||
// otherwise return failure.
|
// otherwise return failure.
|
||||||
return evalDisks(disks, errs), err
|
return evalDisks(disks, errs), versionsDisparity, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
@ -934,6 +936,47 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
|
|||||||
return er.putObject(ctx, bucket, object, data, opts)
|
return er.putObject(ctx, bucket, object, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Heal up to two versions of one object when there is disparity between disks
|
||||||
|
func healObjectVersionsDisparity(bucket string, entry metaCacheEntry) error {
|
||||||
|
if entry.isDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// We might land at .metacache, .trash, .multipart
|
||||||
|
// no need to heal them skip, only when bucket
|
||||||
|
// is '.minio.sys'
|
||||||
|
if bucket == minioMetaBucket {
|
||||||
|
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if wildcard.Match("tmp/*", entry.name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if wildcard.Match("multipart/*", entry.name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if wildcard.Match("tmp-old/*", entry.name) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fivs, err := entry.fileInfoVersions(bucket)
|
||||||
|
if err != nil {
|
||||||
|
healObject(bucket, entry.name, "", madmin.HealNormalScan)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(fivs.Versions) > 2 {
|
||||||
|
return fmt.Errorf("bucket(%s)/object(%s) object with %d versions needs healing, allowing lazy healing via scanner instead here for versions greater than 2",
|
||||||
|
bucket, entry.name, len(fivs.Versions))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, version := range fivs.Versions {
|
||||||
|
healObject(bucket, entry.name, version.VersionID, madmin.HealNormalScan)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// putObject wrapper for erasureObjects PutObject
|
// putObject wrapper for erasureObjects PutObject
|
||||||
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
auditObjectErasureSet(ctx, object, &er)
|
auditObjectErasureSet(ctx, object, &er)
|
||||||
@ -1205,7 +1248,17 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Rename the successfully written temporary object to final location.
|
// Rename the successfully written temporary object to final location.
|
||||||
onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
|
onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, errFileNotFound) {
|
||||||
|
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
|
||||||
|
}
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
for i := 0; i < len(onlineDisks); i++ {
|
for i := 0; i < len(onlineDisks); i++ {
|
||||||
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
||||||
// Object info is the same in all disks, so we can pick
|
// Object info is the same in all disks, so we can pick
|
||||||
@ -1227,63 +1280,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
er.addPartial(bucket, object, fi.VersionID, fi.Size)
|
er.addPartial(bucket, object, fi.VersionID, fi.Size)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
healEntry := func(entry metaCacheEntry) error {
|
if versionsDisparity {
|
||||||
if entry.isDir() {
|
listAndHeal(ctx, bucket, object, &er, healObjectVersionsDisparity)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// We might land at .metacache, .trash, .multipart
|
|
||||||
// no need to heal them skip, only when bucket
|
|
||||||
// is '.minio.sys'
|
|
||||||
if bucket == minioMetaBucket {
|
|
||||||
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("tmp/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("multipart/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("tmp-old/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fivs, err := entry.fileInfoVersions(bucket)
|
|
||||||
if err != nil {
|
|
||||||
healObject(bucket, entry.name, "", madmin.HealNormalScan)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(fivs.Versions) > 2 {
|
|
||||||
return fmt.Errorf("bucket(%s)/object(%s) object with %d versions needs healing, allowing lazy healing via scanner instead here for versions greater than 2",
|
|
||||||
bucket, entry.name,
|
|
||||||
len(fivs.Versions))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, version := range fivs.Versions {
|
|
||||||
healObject(bucket, entry.name, version.VersionID, madmin.HealNormalScan)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, errFileNotFound) {
|
|
||||||
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
|
|
||||||
}
|
|
||||||
if errors.Is(err, errFileNeedsHealing) && !opts.Speedtest {
|
|
||||||
listAndHeal(ctx, bucket, object, &er, healEntry)
|
|
||||||
} else {
|
|
||||||
logger.LogIf(ctx, err)
|
|
||||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
defer NSUpdated(bucket, object)
|
|
||||||
|
|
||||||
fi.ReplicationState = opts.PutReplicationState()
|
fi.ReplicationState = opts.PutReplicationState()
|
||||||
online = countOnlineDisks(onlineDisks)
|
online = countOnlineDisks(onlineDisks)
|
||||||
|
|
||||||
|
@ -1954,7 +1954,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||||||
// HealObjectFn closure function heals the object.
|
// HealObjectFn closure function heals the object.
|
||||||
type HealObjectFn func(bucket, object, versionID string) error
|
type HealObjectFn func(bucket, object, versionID string) error
|
||||||
|
|
||||||
func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(metaCacheEntry) error) error {
|
func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects, healEntry func(string, metaCacheEntry) error) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -1987,7 +1987,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
|
|||||||
minDisks: 1,
|
minDisks: 1,
|
||||||
reportNotFound: false,
|
reportNotFound: false,
|
||||||
agreed: func(entry metaCacheEntry) {
|
agreed: func(entry metaCacheEntry) {
|
||||||
if err := healEntry(entry); err != nil {
|
if err := healEntry(bucket, entry); err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
@ -2000,7 +2000,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
|
|||||||
entry, _ = entries.firstFound()
|
entry, _ = entries.firstFound()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := healEntry(*entry); err != nil {
|
if err := healEntry(bucket, *entry); err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
@ -2017,7 +2017,7 @@ func listAndHeal(ctx context.Context, bucket, prefix string, set *erasureObjects
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
|
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObjectFn HealObjectFn) error {
|
||||||
healEntry := func(entry metaCacheEntry) error {
|
healEntry := func(bucket string, entry metaCacheEntry) error {
|
||||||
if entry.isDir() {
|
if entry.isDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -69,9 +69,6 @@ var errTooManyOpenFiles = StorageErr("too many open files, please increase 'ulim
|
|||||||
// errFileNameTooLong - given file name is too long than supported length.
|
// errFileNameTooLong - given file name is too long than supported length.
|
||||||
var errFileNameTooLong = StorageErr("file name too long")
|
var errFileNameTooLong = StorageErr("file name too long")
|
||||||
|
|
||||||
// errFileNeedsHealing - given file name needs to heal.
|
|
||||||
var errFileNeedsHealing = StorageErr("file name needs healing")
|
|
||||||
|
|
||||||
// errVolumeExists - cannot create same volume again.
|
// errVolumeExists - cannot create same volume again.
|
||||||
var errVolumeExists = StorageErr("volume already exists")
|
var errVolumeExists = StorageErr("volume already exists")
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user