diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index b198d5dcb..8be9d34e2 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -134,30 +134,6 @@ func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string) (bi B return bucketInfo, nil } -// Dangling buckets should be handled appropriately, in this following situation -// we actually have quorum error to be `nil` but we have some disks where -// the bucket delete returned `errVolumeNotEmpty` but this is not correct -// can only happen if there are dangling objects in a bucket. Under such -// a situation we simply attempt a full delete of the bucket including -// the dangling objects. All of this happens under a lock and there -// is no way a user can create buckets and sneak in objects into namespace, -// so it is safer to do. -func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs []error, bucket string) { - for index, err := range dErrs { - if err == errVolumeNotEmpty { - // Attempt to delete bucket again. - if derr := storageDisks[index].DeleteVol(ctx, bucket, false); derr == errVolumeNotEmpty { - _ = storageDisks[index].Delete(ctx, bucket, "", true) - - _ = storageDisks[index].DeleteVol(ctx, bucket, false) - - // Cleanup all the previously incomplete multiparts. - _ = storageDisks[index].Delete(ctx, minioMetaMultipartBucket, bucket, true) - } - } - } -} - // DeleteBucket - deletes a bucket. func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { // Collect if all disks report volume not found. @@ -171,13 +147,7 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD index := index g.Go(func() error { if storageDisks[index] != nil { - if err := storageDisks[index].DeleteVol(ctx, bucket, forceDelete); err != nil { - return err - } - if err := storageDisks[index].Delete(ctx, minioMetaMultipartBucket, bucket, true); err != errFileNotFound { - return err - } - return nil + return storageDisks[index].DeleteVol(ctx, bucket, forceDelete) } return errDiskNotFound }, index) @@ -206,13 +176,13 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD return toObjectErr(err, bucket) } - // If we reduce quorum to nil, means we have deleted buckets properly - // on some servers in quorum, we should look for volumeNotEmpty errors - // and delete those buckets as well. - // - // let this call succeed, even if client cancels the context - // this is to ensure that we don't leave any stale content - deleteDanglingBucket(context.Background(), storageDisks, dErrs, bucket) + // At this point we have `err == nil` but some errors might be `errVolumeNotEmpty` + // we should proceed to attempt a force delete of such buckets. + for index, err := range dErrs { + if err == errVolumeNotEmpty && storageDisks[index] != nil { + storageDisks[index].RenameFile(ctx, bucket, "", minioMetaTmpDeletedBucket, mustGetUUID()) + } + } return nil } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index fcb8a2cea..0603d5cb5 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1448,7 +1448,8 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo } } - deleteBucketMetadata(ctx, z, bucket) + // Purge the entire bucket metadata entirely. + z.renameAll(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket)) // Success. return nil diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9fdceb287..a153b25ac 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -364,7 +364,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt retries = 1 } - const retryDelay = 500 * time.Millisecond + const retryDelay = 250 * time.Millisecond // All operations are performed without locks, so we must be careful and allow for failures. // Read metadata associated with the object from a disk. if retries > 0 {