mirror of
https://github.com/minio/minio.git
synced 2025-01-26 06:03:17 -05:00
Remove stale data in HEAD/GET object (#18460)
Currently if the object does not exist in quorum disks of an erasure set, the dangling code is never called because the returned error will be errFileNotFound or errFileVersionNotFound; With this commit, when errFileNotFound or errFileVersionNotFound is returning when trying to calculate the quorum of a given object, the code checks if a disk returned nil, which means a stale object exists in that disk, that will trigger deleteIfDangling() function
This commit is contained in:
parent
0daa2dbf59
commit
22d59e757d
@ -666,6 +666,32 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
|
|||||||
return metaFileInfos, errs
|
return metaFileInfos, errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checking if an object is dangling costs some IOPS; hence implementing this function
|
||||||
|
// which decides which condition it is useful to check if an object is dangling
|
||||||
|
//
|
||||||
|
// errs: errors from reading xl.meta in all disks
|
||||||
|
// err: reduced errs
|
||||||
|
// bucket: the object name in question
|
||||||
|
func shouldCheckForDangling(err error, errs []error, bucket string) bool {
|
||||||
|
// Avoid data in .minio.sys for now
|
||||||
|
if bucket == minioMetaBucket {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
// Check if we have a read quorum issue
|
||||||
|
case errors.Is(err, errErasureReadQuorum):
|
||||||
|
return true
|
||||||
|
// Check if the object is inexistent in most disks but not all of them
|
||||||
|
case errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound):
|
||||||
|
for i := range errs {
|
||||||
|
if errs[i] == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
|
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
|
||||||
@ -680,7 +706,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
|
if shouldCheckForDangling(err, errs, bucket) {
|
||||||
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
|
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
|
||||||
if derr != nil {
|
if derr != nil {
|
||||||
err = derr
|
err = derr
|
||||||
@ -690,7 +716,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||||||
}
|
}
|
||||||
|
|
||||||
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
||||||
if errors.Is(reducedErr, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
|
if shouldCheckForDangling(err, errs, bucket) {
|
||||||
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
|
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
|
||||||
if derr != nil {
|
if derr != nil {
|
||||||
reducedErr = derr
|
reducedErr = derr
|
||||||
|
Loading…
x
Reference in New Issue
Block a user