mirror of
https://github.com/minio/minio.git
synced 2025-11-08 21:24:55 -05:00
fix: allow DeleteObject unversioned objects with insufficient read quorum (#19581)
Since the object is being permanently deleted, the lack of read quorum should not matter as long as sufficient disks are online to complete the deletion with parity requirements. If several pools have the same object with insufficient read quorum, attempt to delete object from all the pools where it exists
This commit is contained in:
@@ -458,7 +458,13 @@ type PoolObjInfo struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (PoolObjInfo, error) {
|
||||
type poolErrs struct {
|
||||
Index int
|
||||
Err error
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (PoolObjInfo, []poolErrs, error) {
|
||||
var noReadQuorumPools []poolErrs
|
||||
poolObjInfos := make([]PoolObjInfo, len(z.serverPools))
|
||||
poolOpts := make([]ObjectOptions, len(z.serverPools))
|
||||
for i := range z.serverPools {
|
||||
@@ -508,8 +514,9 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
||||
}
|
||||
if pinfo.Err == nil {
|
||||
// found a pool
|
||||
return pinfo, nil
|
||||
return pinfo, z.poolsWithObject(poolObjInfos, opts), nil
|
||||
}
|
||||
|
||||
if isErrReadQuorum(pinfo.Err) && !opts.MetadataChg {
|
||||
// read quorum is returned when the object is visibly
|
||||
// present but its unreadable, we simply ask the writes to
|
||||
@@ -518,30 +525,49 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu
|
||||
// with enough disks online but sufficiently inconsistent to
|
||||
// break parity threshold, allow them to be overwritten
|
||||
// or allow new versions to be added.
|
||||
return pinfo, nil
|
||||
|
||||
return pinfo, z.poolsWithObject(poolObjInfos, opts), nil
|
||||
}
|
||||
defPool = pinfo
|
||||
if !isErrObjectNotFound(pinfo.Err) {
|
||||
return pinfo, pinfo.Err
|
||||
return pinfo, noReadQuorumPools, pinfo.Err
|
||||
}
|
||||
|
||||
// No object exists or its a delete marker,
|
||||
// check objInfo to confirm.
|
||||
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
||||
return pinfo, nil
|
||||
return pinfo, noReadQuorumPools, nil
|
||||
}
|
||||
}
|
||||
if opts.ReplicationRequest && opts.DeleteMarker && defPool.Index >= 0 {
|
||||
// If the request is a delete marker replication request, return a default pool
|
||||
// in cases where the object does not exist.
|
||||
// This is to ensure that the delete marker is replicated to the destination.
|
||||
return defPool, nil
|
||||
return defPool, noReadQuorumPools, nil
|
||||
}
|
||||
return PoolObjInfo{}, toObjectErr(errFileNotFound, bucket, object)
|
||||
return PoolObjInfo{}, noReadQuorumPools, toObjectErr(errFileNotFound, bucket, object)
|
||||
}
|
||||
|
||||
// return all pools with read quorum error or no error for an object with given opts.Note that this array is
|
||||
// returned in the order of recency of object ModTime.
|
||||
func (z *erasureServerPools) poolsWithObject(pools []PoolObjInfo, opts ObjectOptions) (errs []poolErrs) {
|
||||
for _, pool := range pools {
|
||||
if opts.SkipDecommissioned && z.IsSuspended(pool.Index) {
|
||||
continue
|
||||
}
|
||||
// Skip object if it's from pools participating in a rebalance operation.
|
||||
if opts.SkipRebalancing && z.IsPoolRebalancing(pool.Index) {
|
||||
continue
|
||||
}
|
||||
if isErrReadQuorum(pool.Err) || pool.Err == nil {
|
||||
errs = append(errs, poolErrs{Err: pool.Err, Index: pool.Index})
|
||||
}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) {
|
||||
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, opts)
|
||||
pinfo, _, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
@@ -1082,7 +1108,8 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
|
||||
|
||||
gopts := opts
|
||||
gopts.NoLock = true
|
||||
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, gopts)
|
||||
|
||||
pinfo, noReadQuorumPools, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, gopts)
|
||||
if err != nil {
|
||||
if _, ok := err.(InsufficientReadQuorum); ok {
|
||||
return objInfo, InsufficientWriteQuorum{}
|
||||
@@ -1096,11 +1123,44 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
|
||||
return pinfo.ObjInfo, nil
|
||||
}
|
||||
|
||||
// Delete concurrently in all server pools with read quorum error for unversioned objects.
|
||||
if len(noReadQuorumPools) > 0 && !opts.Versioned && !opts.VersionSuspended {
|
||||
return z.deleteObjectFromAllPools(ctx, bucket, object, opts, noReadQuorumPools)
|
||||
}
|
||||
objInfo, err = z.serverPools[pinfo.Index].DeleteObject(ctx, bucket, object, opts)
|
||||
objInfo.Name = decodeDirObject(object)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) deleteObjectFromAllPools(ctx context.Context, bucket string, object string, opts ObjectOptions, poolIndices []poolErrs) (objInfo ObjectInfo, err error) {
|
||||
derrs := make([]error, len(poolIndices))
|
||||
dobjects := make([]ObjectInfo, len(poolIndices))
|
||||
|
||||
// Delete concurrently in all server pools that reported no error or read quorum error
|
||||
// where the read quorum issue is from metadata inconsistency.
|
||||
var wg sync.WaitGroup
|
||||
for idx, pe := range poolIndices {
|
||||
if v, ok := pe.Err.(InsufficientReadQuorum); ok && v.Type != RQInconsistentMeta {
|
||||
derrs[idx] = InsufficientWriteQuorum{}
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
pool := z.serverPools[pe.Index]
|
||||
go func(idx int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
dobjects[idx], derrs[idx] = pool.DeleteObject(ctx, bucket, object, opts)
|
||||
}(idx, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// the poolIndices array is pre-sorted in order of latest ModTime, we care only about pool with latest object though
|
||||
// the delete call tries to clean up other pools during DeleteObject call.
|
||||
objInfo = dobjects[0]
|
||||
objInfo.Name = decodeDirObject(object)
|
||||
err = derrs[0]
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
||||
derrs := make([]error, len(objects))
|
||||
dobjects := make([]DeletedObject, len(objects))
|
||||
@@ -1142,7 +1202,7 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
|
||||
j := j
|
||||
obj := obj
|
||||
eg.Go(func() error {
|
||||
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, obj.ObjectName, ObjectOptions{
|
||||
pinfo, _, err := z.getPoolInfoExistingWithOpts(ctx, bucket, obj.ObjectName, ObjectOptions{
|
||||
NoLock: true,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user