MakeBucket: Delete leftover buckets on error (#13368)

In (erasureServerPools).MakeBucketWithLocation deletes the created 
buckets if any set returns an error.

Add `NoRecreate` option, which will not recreate the bucket 
in `DeleteBucket`, if the operation fails.

Additionally use context.Background() for operations we always want to be performed.
This commit is contained in:
Klaus Post 2021-10-06 10:24:40 -07:00 committed by GitHub
parent 60aad1b717
commit 421160631a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 42 additions and 31 deletions

View File

@ -996,7 +996,7 @@ func (a adminAPIHandlers) SpeedtestHandler(w http.ResponseWriter, r *http.Reques
return
}
objectAPI.DeleteBucket(ctx, pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), true)
objectAPI.DeleteBucket(ctx, pathJoin(minioMetaSpeedTestBucket, minioMetaSpeedTestBucketPrefix), DeleteBucketOptions{Force: true, NoRecreate: true})
w.(http.Flusher).Flush()
}

View File

@ -19,6 +19,7 @@ package cmd
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"encoding/xml"
@ -727,7 +728,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
}
if err = globalDNSConfig.Put(bucket); err != nil {
objectAPI.DeleteBucket(ctx, bucket, false)
objectAPI.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{Force: false, NoRecreate: true})
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
@ -1240,7 +1241,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
deleteBucket := objectAPI.DeleteBucket
// Attempt to delete bucket.
if err := deleteBucket(ctx, bucket, forceDelete); err != nil {
if err := deleteBucket(ctx, bucket, DeleteBucketOptions{Force: forceDelete}); err != nil {
apiErr := toAPIError(ctx, err)
if _, ok := err.(BucketNotEmpty); ok {
if globalBucketVersioningSys.Enabled(bucket) || globalBucketVersioningSys.Suspended(bucket) {

View File

@ -135,7 +135,7 @@ func (er erasureObjects) GetBucketInfo(ctx context.Context, bucket string) (bi B
}
// DeleteBucket - deletes a bucket.
func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
// Collect if all disks report volume not found.
defer NSUpdated(bucket, slashSeparator)
@ -147,7 +147,7 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD
index := index
g.Go(func() error {
if storageDisks[index] != nil {
return storageDisks[index].DeleteVol(ctx, bucket, forceDelete)
return storageDisks[index].DeleteVol(ctx, bucket, opts.Force)
}
return errDiskNotFound
}, index)
@ -156,7 +156,7 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD
// Wait for all the delete vols to finish.
dErrs := g.Wait()
if forceDelete {
if opts.Force {
for _, err := range dErrs {
if err != nil {
undoDeleteBucket(storageDisks, bucket)
@ -169,7 +169,7 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD
writeQuorum := getWriteQuorum(len(storageDisks))
err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum)
if err == errErasureWriteQuorum {
if err == errErasureWriteQuorum && !opts.NoRecreate {
undoDeleteBucket(storageDisks, bucket)
}
if err != nil {

View File

@ -632,6 +632,10 @@ func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket
// Return the first encountered error
for _, err := range errs {
if err != nil {
if _, ok := err.(BucketExists); !ok {
// Delete created buckets, ignoring errors.
z.DeleteBucket(context.Background(), bucket, DeleteBucketOptions{Force: true, NoRecreate: true})
}
return err
}
}
@ -643,7 +647,7 @@ func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
}
if err := meta.Save(ctx, z); err != nil {
if err := meta.Save(context.Background(), z); err != nil {
return toObjectErr(err, bucket)
}
@ -1424,14 +1428,14 @@ func (z *erasureServerPools) IsTaggingSupported() bool {
// DeleteBucket - deletes a bucket on all serverPools simultaneously,
// even if one of the serverPools fail to delete buckets, we proceed to
// undo a successful operation.
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
g := errgroup.WithNErrs(len(z.serverPools))
// Delete buckets in parallel across all serverPools.
for index := range z.serverPools {
index := index
g.Go(func() error {
return z.serverPools[index].DeleteBucket(ctx, bucket, forceDelete)
return z.serverPools[index].DeleteBucket(ctx, bucket, opts)
}, index)
}
@ -1441,15 +1445,15 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo
// buckets operation by creating all the buckets again.
for _, err := range errs {
if err != nil {
if !z.SinglePool() {
undoDeleteBucketServerPools(ctx, bucket, z.serverPools, errs)
if !z.SinglePool() && !opts.NoRecreate {
undoDeleteBucketServerPools(context.Background(), bucket, z.serverPools, errs)
}
return err
}
}
// Purge the entire bucket metadata entirely.
z.renameAll(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket))
z.renameAll(context.Background(), minioMetaBucket, pathJoin(bucketMetaPrefix, bucket))
// Success.
return nil

View File

@ -805,14 +805,14 @@ func (s *erasureSets) IsTaggingSupported() bool {
// DeleteBucket - deletes a bucket on all sets simultaneously,
// even if one of the sets fail to delete buckets, we proceed to
// undo a successful operation.
func (s *erasureSets) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (s *erasureSets) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
g := errgroup.WithNErrs(len(s.sets))
// Delete buckets in parallel across all sets.
for index := range s.sets {
index := index
g.Go(func() error {
return s.sets[index].DeleteBucket(ctx, bucket, forceDelete)
return s.sets[index].DeleteBucket(ctx, bucket, opts)
}, index)
}
@ -820,7 +820,7 @@ func (s *erasureSets) DeleteBucket(ctx context.Context, bucket string, forceDele
// For any failure, we attempt undo all the delete buckets operation
// by creating buckets again on all sets which were successfully deleted.
for _, err := range errs {
if err != nil {
if err != nil && !opts.NoRecreate {
undoDeleteBucketSets(ctx, bucket, s.sets, errs)
return err
}

View File

@ -588,7 +588,7 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
// DeleteBucket - delete a bucket and all the metadata associated
// with the bucket including pending multipart, object metadata.
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
defer NSUpdated(bucket, slashSeparator)
atomic.AddInt64(&fs.activeIOCount, 1)
@ -601,7 +601,7 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelet
return toObjectErr(err, bucket)
}
if !forceDelete {
if !opts.Force {
// Attempt to delete regular bucket.
if err = fsRemoveDir(ctx, bucketDir); err != nil {
return toObjectErr(err, bucket)

View File

@ -224,16 +224,16 @@ func TestFSDeleteBucket(t *testing.T) {
}
// Test with an invalid bucket name
if err = fs.DeleteBucket(GlobalContext, "fo", false); !isSameType(err, BucketNotFound{}) {
if err = fs.DeleteBucket(GlobalContext, "fo", DeleteBucketOptions{}); !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}
// Test with an inexistant bucket
if err = fs.DeleteBucket(GlobalContext, "foobucket", false); !isSameType(err, BucketNotFound{}) {
if err = fs.DeleteBucket(GlobalContext, "foobucket", DeleteBucketOptions{}); !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}
// Test with a valid case
if err = fs.DeleteBucket(GlobalContext, bucketName, false); err != nil {
if err = fs.DeleteBucket(GlobalContext, bucketName, DeleteBucketOptions{}); err != nil {
t.Fatal("Unexpected error: ", err)
}
@ -241,7 +241,7 @@ func TestFSDeleteBucket(t *testing.T) {
// Delete bucket should get error disk not found.
os.RemoveAll(disk)
if err = fs.DeleteBucket(GlobalContext, bucketName, false); err != nil {
if err = fs.DeleteBucket(GlobalContext, bucketName, DeleteBucketOptions{}); err != nil {
if !isSameType(err, BucketNotFound{}) {
t.Fatal("Unexpected error: ", err)
}

View File

@ -38,7 +38,7 @@ import (
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
humanize "github.com/dustin/go-humanize"
"github.com/dustin/go-humanize"
"github.com/minio/cli"
"github.com/minio/madmin-go"
miniogopolicy "github.com/minio/minio-go/v7/pkg/policy"
@ -623,8 +623,8 @@ func (a *azureObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketI
}
// DeleteBucket - delete a container on azure, uses Azure equivalent `ContainerURL.Delete`.
func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
if !forceDelete {
func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
if !opts.Force {
// Check if the container is empty before deleting it.
result, err := a.ListObjects(ctx, bucket, "", "", "", 1)
if err != nil {

View File

@ -470,7 +470,7 @@ func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInf
}
// DeleteBucket delete a bucket on GCS.
func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
itObject := l.client.Bucket(bucket).Objects(ctx, &storage.Query{
Delimiter: minio.SlashSeparator,
Versions: false,

View File

@ -305,11 +305,11 @@ func (n *hdfsObjects) hdfsPathJoin(args ...string) string {
return minio.PathJoin(append([]string{n.subPath, hdfsSeparator}, args...)...)
}
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
if !hdfsIsValidBucketName(bucket) {
return minio.BucketNameInvalid{Bucket: bucket}
}
if forceDelete {
if opts.Force {
return hdfsToObjectErr(ctx, n.clnt.RemoveAll(n.hdfsPathJoin(bucket)), bucket)
}
return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(bucket)), bucket)

View File

@ -784,7 +784,7 @@ func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string
return
}
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
var prefix, continuationToken, delimiter, startAfter string
expParts := make(map[string]string)

View File

@ -365,7 +365,7 @@ func (l *s3Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error)
}
// DeleteBucket deletes a bucket on S3
func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
err := l.Client.RemoveBucket(ctx, bucket)
if err != nil {
return minio.ErrorRespToObjectError(err, bucket)

View File

@ -89,6 +89,12 @@ type BucketOptions struct {
VersioningEnabled bool
}
// DeleteBucketOptions provides options for DeleteBucket calls.
type DeleteBucketOptions struct {
Force bool // Force deletion
NoRecreate bool // Do not recreate on delete failures
}
// SetReplicaStatus sets replica status and timestamp for delete operations in ObjectOptions
func (o *ObjectOptions) SetReplicaStatus(st replication.StatusType) {
o.DeleteReplication.ReplicaStatus = st
@ -164,7 +170,7 @@ type ObjectLayer interface {
MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error
GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error
DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error
ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error)