mirror of https://github.com/minio/minio.git
s3: DeleteBucket to use listing before returning bucket not empty error (#20301)
Use Walk(), which is a recursive listing with versioning, to check if the bucket has some objects before being removed. This is beneficial because the bucket can contain multiple dangling objects in multiple drives. Also, this will prevent a bug where a bucket is deleted in a deployment that has many erasure sets but the bucket contains one or few objects not spread to enough erasure sets.
This commit is contained in:
parent
a8f143298f
commit
73992d2b9f
|
@ -1980,6 +1980,34 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, op
|
|||
defer lk.Unlock(lkctx)
|
||||
}
|
||||
|
||||
if !opts.Force {
|
||||
results := make(chan itemOrErr[ObjectInfo])
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
err := z.Walk(ctx, bucket, "", results, WalkOptions{Limit: 1})
|
||||
if err != nil {
|
||||
s3LogIf(ctx, fmt.Errorf("unable to verify if the bucket %s is empty: %w", bucket, err))
|
||||
return toObjectErr(err, bucket)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case r, found := <-results:
|
||||
if found {
|
||||
if r.Err != nil {
|
||||
s3LogIf(ctx, fmt.Errorf("unable to verify if the bucket %s is empty: %w", bucket, r.Err))
|
||||
return toObjectErr(r.Err, bucket)
|
||||
}
|
||||
return toObjectErr(errVolumeNotEmpty, bucket)
|
||||
}
|
||||
}
|
||||
|
||||
// Always pass force to the lower level
|
||||
opts.Force = true
|
||||
}
|
||||
|
||||
err := z.s3Peer.DeleteBucket(ctx, bucket, opts)
|
||||
if err == nil || isErrBucketNotFound(err) {
|
||||
// If site replication is configured, hold on to deleted bucket state until sites sync
|
||||
|
@ -2198,6 +2226,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
|
|||
filterPrefix: filterPrefix,
|
||||
recursive: true,
|
||||
forwardTo: opts.Marker,
|
||||
perDiskLimit: opts.Limit,
|
||||
minDisks: listingQuorum,
|
||||
reportNotFound: false,
|
||||
agreed: send,
|
||||
|
|
|
@ -145,6 +145,7 @@ type WalkOptions struct {
|
|||
LatestOnly bool // returns only latest versions for all matching objects
|
||||
AskDisks string // dictates how many disks are being listed
|
||||
VersionsSort WalkVersionsSortOrder // sort order for versions of the same object; default: Ascending order in ModTime
|
||||
Limit int // maximum number of items, 0 means no limit
|
||||
}
|
||||
|
||||
// ExpirationOptions represents object options for object expiration at objectLayer.
|
||||
|
|
|
@ -219,9 +219,9 @@ func (z *MakeBucketOptions) Msgsize() (s int) {
|
|||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *WalkOptions) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 4
|
||||
// map header, size 5
|
||||
// string "Marker"
|
||||
o = append(o, 0x84, 0xa6, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72)
|
||||
o = append(o, 0x85, 0xa6, 0x4d, 0x61, 0x72, 0x6b, 0x65, 0x72)
|
||||
o = msgp.AppendString(o, z.Marker)
|
||||
// string "LatestOnly"
|
||||
o = append(o, 0xaa, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x6e, 0x6c, 0x79)
|
||||
|
@ -232,6 +232,9 @@ func (z *WalkOptions) MarshalMsg(b []byte) (o []byte, err error) {
|
|||
// string "VersionsSort"
|
||||
o = append(o, 0xac, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x6f, 0x72, 0x74)
|
||||
o = msgp.AppendUint8(o, uint8(z.VersionsSort))
|
||||
// string "Limit"
|
||||
o = append(o, 0xa5, 0x4c, 0x69, 0x6d, 0x69, 0x74)
|
||||
o = msgp.AppendInt(o, z.Limit)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -281,6 +284,12 @@ func (z *WalkOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|||
}
|
||||
z.VersionsSort = WalkVersionsSortOrder(zb0002)
|
||||
}
|
||||
case "Limit":
|
||||
z.Limit, bts, err = msgp.ReadIntBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Limit")
|
||||
return
|
||||
}
|
||||
default:
|
||||
bts, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
|
@ -295,7 +304,7 @@ func (z *WalkOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *WalkOptions) Msgsize() (s int) {
|
||||
s = 1 + 7 + msgp.StringPrefixSize + len(z.Marker) + 11 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 13 + msgp.Uint8Size
|
||||
s = 1 + 7 + msgp.StringPrefixSize + len(z.Marker) + 11 + msgp.BoolSize + 9 + msgp.StringPrefixSize + len(z.AskDisks) + 13 + msgp.Uint8Size + 6 + msgp.IntSize
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -279,24 +279,7 @@ func deleteBucketLocal(ctx context.Context, bucket string, opts DeleteBucketOpti
|
|||
}, index)
|
||||
}
|
||||
|
||||
var recreate bool
|
||||
errs := g.Wait()
|
||||
for index, err := range errs {
|
||||
if errors.Is(err, errVolumeNotEmpty) {
|
||||
recreate = true
|
||||
}
|
||||
if err == nil && recreate {
|
||||
// ignore any errors
|
||||
localDrives[index].MakeVol(ctx, bucket)
|
||||
}
|
||||
}
|
||||
|
||||
// Since we recreated buckets and error was `not-empty`, return not-empty.
|
||||
if recreate {
|
||||
return errVolumeNotEmpty
|
||||
} // for all other errors reduce by write quorum.
|
||||
|
||||
return reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, (len(localDrives)/2)+1)
|
||||
return reduceWriteQuorumErrs(ctx, g.Wait(), bucketOpIgnoredErrs, (len(localDrives)/2)+1)
|
||||
}
|
||||
|
||||
func makeBucketLocal(ctx context.Context, bucket string, opts MakeBucketOptions) error {
|
||||
|
|
Loading…
Reference in New Issue