fix: allow danging objects to be purged properly deleteMultipleObjects() (#14273)

Deleting bulk objects had an issue since the relevant versionID
is not passed through the layers to ensure that the dangling
object purge actually works cleanly.

This is a continuation of quorum related error returned by
multi-object delete API from #14248

This PR ensures that we pass down correct information as
well as extend the scope of dangling object detection.
This commit is contained in:
Harshavardhana 2022-02-08 20:08:23 -08:00 committed by GitHub
parent 0ee2933234
commit f19a414e09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 70 deletions

View File

@ -808,52 +808,20 @@ func (er erasureObjects) purgeObjectDangling(ctx context.Context, bucket, object
) {
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()
// Check if the object is dangling, if yes and user requested
// remove we simply delete it from namespace.
m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok {
parityBlocks := er.defaultParityCount
dataBlocks := len(storageDisks) - parityBlocks
if m.IsValid() {
parityBlocks = m.Erasure.ParityBlocks
dataBlocks = m.Erasure.DataBlocks
}
writeQuorum := dataBlocks
if dataBlocks == parityBlocks {
writeQuorum++
}
var err error
if opts.Remove {
err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, FileInfo{
VersionID: versionID,
}, false)
// If Delete was successful, make sure to return the appropriate error
// and heal result appropriate with delete's error messages
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
return er.defaultHealResult(FileInfo{}, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), toObjectErr(err, bucket, object, versionID)
m, err := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, dataErrs, ObjectOptions{
VersionID: versionID,
})
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
readQuorum := len(storageDisks) - er.defaultParityCount
err := toObjectErr(reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum),
bucket, object, versionID)
return er.defaultHealResult(m, storageDisks, storageEndpoints, errs, bucket, object, versionID), err
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), nil
}
// Object is considered dangling/corrupted if any only

View File

@ -1043,10 +1043,14 @@ func TestHealObjectErasure(t *testing.T) {
z.serverPools[0].erasureDisksMu.Unlock()
// Try healing now, expect to receive errDiskNotFound.
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan})
// since majority of xl.meta's are not available, object quorum can't be read properly and error will be errErasureReadQuorum
if _, ok := err.(InsufficientReadQuorum); !ok {
t.Errorf("Expected %v but received %v", InsufficientReadQuorum{}, err)
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{
ScanMode: madmin.HealDeepScan,
})
// since majority of xl.meta's are not available, object quorum
// can't be read properly will be deleted automatically and
// err is nil
if err != nil {
t.Fatal(err)
}
}

View File

@ -397,6 +397,29 @@ func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object strin
return er.getObjectInfo(ctx, bucket, object, opts)
}
func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object string, metaArr []FileInfo, errs []error, dataErrs []error, opts ObjectOptions) (FileInfo, error) {
var err error
m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok {
err = errFileNotFound
if opts.VersionID != "" {
err = errFileVersionNotFound
}
defer NSUpdated(bucket, object)
if opts.VersionID != "" {
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
VersionID: opts.VersionID,
}, false)
} else {
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
VersionID: m.VersionID,
}, false)
}
}
return m, err
}
func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
disks := er.getDisks()
@ -405,27 +428,20 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
if err != nil {
return fi, nil, nil, err
if errors.Is(err, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
}
}
return fi, nil, nil, toObjectErr(err, bucket, object)
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
if errors.Is(reducedErr, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
// Skip buckets that live at `.minio.sys` bucket.
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
reducedErr = errFileNotFound
if opts.VersionID != "" {
reducedErr = errFileVersionNotFound
}
// Remove the dangling object only when:
// - This is a non versioned bucket
// - This is a versioned bucket and the version ID is passed, the reason
// is that we cannot fetch the ID of the latest version when we don't trust xl.meta
if !opts.Versioned || opts.VersionID != "" {
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
Name: object,
VersionID: opts.VersionID,
}, false)
}
_, derr := er.deleteIfDangling(ctx, bucket, object, metaArr, errs, nil, opts)
if derr != nil {
err = derr
}
}
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)

View File

@ -335,7 +335,9 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
pinfo := poolObjInfo{
PoolIndex: i,
}
opts.VersionID = "" // no need to check for specific versionId
// do not remove this check as it can lead to inconsistencies
// for all callers of bucket replication.
opts.VersionID = ""
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
poolObjInfos[i] = pinfo
}(i, pool, poolOpts[i])
@ -353,15 +355,15 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
})
for _, pinfo := range poolObjInfos {
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
// skip all objects from suspended pools for mutating calls.
if z.IsSuspended(pinfo.PoolIndex) && opts.Mutate {
continue
}
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker,
// check objInfo to confirm.