diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 14c018c0b..33917edd2 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -528,7 +528,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req } if err = globalDNSConfig.Put(bucket); err != nil { - objectAPI.DeleteBucket(ctx, bucket) + objectAPI.DeleteBucket(ctx, bucket, false) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } @@ -877,6 +877,18 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. vars := mux.Vars(r) bucket := vars["bucket"] + forceDelete := false + if vs, found := r.Header[http.CanonicalHeaderKey("x-minio-force-delete")]; found { + switch strings.ToLower(strings.Join(vs, "")) { + case "true": + forceDelete = true + case "false": + default: + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r)) + return + } + } + objectAPI := api.ObjectAPI() if objectAPI == nil { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) @@ -891,7 +903,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. deleteBucket := objectAPI.DeleteBucket // Attempt to delete bucket. - if err := deleteBucket(ctx, bucket); err != nil { + if err := deleteBucket(ctx, bucket, forceDelete); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/bucket-handlers_test.go b/cmd/bucket-handlers_test.go index cce4ab122..b223010df 100644 --- a/cmd/bucket-handlers_test.go +++ b/cmd/bucket-handlers_test.go @@ -29,6 +29,51 @@ import ( "github.com/minio/minio/pkg/auth" ) +// Wrapper for calling RemoveBucket HTTP handler tests for both XL multiple disks and single node setup. +func TestRemoveBucketHandler(t *testing.T) { + ExecObjectLayerAPITest(t, testRemoveBucketHandler, []string{"RemoveBucket"}) +} + +func testRemoveBucketHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler, + credentials auth.Credentials, t *testing.T) { + _, err := obj.PutObject(context.Background(), bucketName, "test-object", mustGetPutObjReader(t, bytes.NewBuffer([]byte{}), int64(0), "", "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), ObjectOptions{}) + // if object upload fails stop the test. + if err != nil { + t.Fatalf("Error uploading object: %v", err) + } + + // initialize httptest Recorder, this records any mutations to response writer inside the handler. + rec := httptest.NewRecorder() + // construct HTTP request for DELETE bucket. + req, err := newTestSignedRequestV4("DELETE", getBucketLocationURL("", bucketName), 0, nil, credentials.AccessKey, credentials.SecretKey, nil) + if err != nil { + t.Fatalf("Test %s: Failed to create HTTP request for RemoveBucketHandler: %v", instanceType, err) + } + // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. + // Call the ServeHTTP to execute the handler. + apiRouter.ServeHTTP(rec, req) + switch rec.Code { + case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: + t.Fatalf("Test %v: expected failure, but succeeded with %v", instanceType, rec.Code) + } + + // Verify response of the V2 signed HTTP request. + // initialize HTTP NewRecorder, this records any mutations to response writer inside the handler. + recV2 := httptest.NewRecorder() + // construct HTTP request for DELETE bucket. + reqV2, err := newTestSignedRequestV2("DELETE", getBucketLocationURL("", bucketName), 0, nil, credentials.AccessKey, credentials.SecretKey, nil) + if err != nil { + t.Fatalf("Test %s: Failed to create HTTP request for RemoveBucketHandler: %v", instanceType, err) + } + // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. + // Call the ServeHTTP to execute the handler. + apiRouter.ServeHTTP(recV2, reqV2) + switch recV2.Code { + case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: + t.Fatalf("Test %v: expected failure, but succeeded with %v", instanceType, recV2.Code) + } +} + // Wrapper for calling GetBucketPolicy HTTP handler tests for both XL multiple disks and single node setup. func TestGetBucketLocationHandler(t *testing.T) { ExecObjectLayerAPITest(t, testGetBucketLocationHandler, []string{"GetBucketLocation"}) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 386c3ca1c..becea0bcc 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -397,13 +397,15 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) { // DeleteBucket - delete a bucket and all the metadata associated // with the bucket including pending multipart, object metadata. -func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error { - bucketLock := fs.NewNSLock(ctx, bucket, "") - if err := bucketLock.GetLock(globalObjectTimeout); err != nil { - logger.LogIf(ctx, err) - return err +func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { + if !forceDelete { + bucketLock := fs.NewNSLock(ctx, bucket, "") + if err := bucketLock.GetLock(globalObjectTimeout); err != nil { + logger.LogIf(ctx, err) + return err + } + defer bucketLock.Unlock() } - defer bucketLock.Unlock() atomic.AddInt64(&fs.activeIOCount, 1) defer func() { @@ -415,9 +417,20 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error { return toObjectErr(err, bucket) } - // Attempt to delete regular bucket. - if err = fsRemoveDir(ctx, bucketDir); err != nil { - return toObjectErr(err, bucket) + if !forceDelete { + // Attempt to delete regular bucket. + if err = fsRemoveDir(ctx, bucketDir); err != nil { + return toObjectErr(err, bucket) + } + } else { + tmpBucketPath := pathJoin(fs.fsPath, minioMetaTmpBucket, bucket+"."+mustGetUUID()) + if err = fsSimpleRenameFile(ctx, bucketDir, tmpBucketPath); err != nil { + return toObjectErr(err, bucket) + } + + go func() { + fsRemoveAll(ctx, tmpBucketPath) // ignore returned error if any. + }() } // Cleanup all the bucket metadata. diff --git a/cmd/fs-v1_test.go b/cmd/fs-v1_test.go index 2b7b0ba42..439605593 100644 --- a/cmd/fs-v1_test.go +++ b/cmd/fs-v1_test.go @@ -318,16 +318,16 @@ func TestFSDeleteBucket(t *testing.T) { } // Test with an invalid bucket name - if err = fs.DeleteBucket(context.Background(), "fo"); !isSameType(err, BucketNotFound{}) { + if err = fs.DeleteBucket(context.Background(), "fo", false); !isSameType(err, BucketNotFound{}) { t.Fatal("Unexpected error: ", err) } // Test with an inexistant bucket - if err = fs.DeleteBucket(context.Background(), "foobucket"); !isSameType(err, BucketNotFound{}) { + if err = fs.DeleteBucket(context.Background(), "foobucket", false); !isSameType(err, BucketNotFound{}) { t.Fatal("Unexpected error: ", err) } // Test with a valid case - if err = fs.DeleteBucket(context.Background(), bucketName); err != nil { + if err = fs.DeleteBucket(context.Background(), bucketName, false); err != nil { t.Fatal("Unexpected error: ", err) } @@ -335,7 +335,7 @@ func TestFSDeleteBucket(t *testing.T) { // Delete bucket should get error disk not found. os.RemoveAll(disk) - if err = fs.DeleteBucket(context.Background(), bucketName); err != nil { + if err = fs.DeleteBucket(context.Background(), bucketName, false); err != nil { if !isSameType(err, BucketNotFound{}) { t.Fatal("Unexpected error: ", err) } diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index f31219f09..5af757c4c 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -559,7 +559,7 @@ func (a *azureObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketI } // DeleteBucket - delete a container on azure, uses Azure equivalent `ContainerURL.Delete`. -func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string) error { +func (a *azureObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { containerURL := a.client.NewContainerURL(bucket) _, err := containerURL.Delete(ctx, azblob.ContainerAccessConditions{}) return azureToObjectError(err, bucket) diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index 397c67159..dea36bce3 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -317,7 +317,7 @@ func (l *b2Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) } // DeleteBucket deletes a bucket on B2 -func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string) error { +func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 9c6c12d79..3b840376e 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -475,7 +475,7 @@ func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInf } // DeleteBucket delete a bucket on GCS. -func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string) error { +func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { itObject := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: minio.SlashSeparator, Versions: false, diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index ee1257696..9ed4be080 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -274,7 +274,7 @@ func hdfsIsValidBucketName(bucket string) bool { return s3utils.CheckValidBucketNameStrict(bucket) == nil } -func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string) error { +func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { if !hdfsIsValidBucketName(bucket) { return minio.BucketNameInvalid{Bucket: bucket} } diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index 79b89f384..5e63b2143 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -398,7 +398,7 @@ func (l *ossObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInf } // DeleteBucket deletes a bucket on OSS. -func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string) error { +func (l *ossObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { err := l.Client.DeleteBucket(bucket) if err != nil { logger.LogIf(ctx, err) diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index ee170dd3c..dbafd8331 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -742,7 +742,7 @@ func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string return } -func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string) error { +func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { var prefix, continuationToken, delimiter, startAfter string expParts := make(map[string]string) diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 0209c0f7e..0d924147b 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -351,7 +351,7 @@ func (l *s3Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) } // DeleteBucket deletes a bucket on S3 -func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string) error { +func (l *s3Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { err := l.Client.RemoveBucket(bucket) if err != nil { return minio.ErrorRespToObjectError(err, bucket) diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 649a797ac..60c5e90e3 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -124,11 +124,11 @@ func (d *naughtyDisk) StatVol(volume string) (volInfo VolInfo, err error) { } return d.disk.StatVol(volume) } -func (d *naughtyDisk) DeleteVol(volume string) (err error) { +func (d *naughtyDisk) DeleteVol(volume string, forceDelete bool) (err error) { if err := d.calcError(); err != nil { return err } - return d.disk.DeleteVol(volume) + return d.disk.DeleteVol(volume, forceDelete) } func (d *naughtyDisk) WalkSplunk(volume, path, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index f59fb401d..56ae7c842 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -66,7 +66,7 @@ type ObjectLayer interface { MakeBucketWithLocation(ctx context.Context, bucket string, location string) error GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) - DeleteBucket(ctx context.Context, bucket string) error + DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error diff --git a/cmd/posix-diskid-check.go b/cmd/posix-diskid-check.go index 5ce8dd6a5..253fc9869 100644 --- a/cmd/posix-diskid-check.go +++ b/cmd/posix-diskid-check.go @@ -107,11 +107,11 @@ func (p *posixDiskIDCheck) StatVol(volume string) (vol VolInfo, err error) { return p.storage.StatVol(volume) } -func (p *posixDiskIDCheck) DeleteVol(volume string) (err error) { +func (p *posixDiskIDCheck) DeleteVol(volume string, forceDelete bool) (err error) { if p.isDiskStale() { return errDiskNotFound } - return p.storage.DeleteVol(volume) + return p.storage.DeleteVol(volume, forceDelete) } func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) { diff --git a/cmd/posix.go b/cmd/posix.go index f46e4644d..dbe934965 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -620,7 +620,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { } // DeleteVol - delete a volume. -func (s *posix) DeleteVol(volume string) (err error) { +func (s *posix) DeleteVol(volume string, forceDelete bool) (err error) { atomic.AddInt32(&s.activeIOCount, 1) defer func() { atomic.AddInt32(&s.activeIOCount, -1) @@ -631,7 +631,13 @@ func (s *posix) DeleteVol(volume string) (err error) { if err != nil { return err } - err = os.Remove((volumeDir)) + + if forceDelete { + err = os.RemoveAll(volumeDir) + } else { + err = os.Remove(volumeDir) + } + if err != nil { switch { case os.IsNotExist(err): diff --git a/cmd/posix_test.go b/cmd/posix_test.go index 040e1842d..c49e04cdf 100644 --- a/cmd/posix_test.go +++ b/cmd/posix_test.go @@ -508,7 +508,7 @@ func TestPosixDeleteVol(t *testing.T) { if _, ok := posixStorage.(*posixDiskIDCheck); !ok { t.Errorf("Expected the StorageAPI to be of type *posixDiskIDCheck") } - if err = posixStorage.DeleteVol(testCase.volName); err != testCase.expectedErr { + if err = posixStorage.DeleteVol(testCase.volName, false); err != testCase.expectedErr { t.Fatalf("TestPosix: %d, expected: %s, got: %s", i+1, testCase.expectedErr, err) } } @@ -547,7 +547,7 @@ func TestPosixDeleteVol(t *testing.T) { t.Fatalf("Unable to change permission to temporary directory %v. %v", permDeniedDir, err) } - if err = posixStorage.DeleteVol("mybucket"); err != errDiskAccessDenied { + if err = posixStorage.DeleteVol("mybucket", false); err != errDiskAccessDenied { t.Fatalf("expected: Permission error, got: %s", err) } } @@ -561,7 +561,7 @@ func TestPosixDeleteVol(t *testing.T) { // TestPosix for delete on an removed disk. // should fail with disk not found. - err = posixDeletedStorage.DeleteVol("Del-Vol") + err = posixDeletedStorage.DeleteVol("Del-Vol", false) if err != errDiskNotFound { t.Errorf("Expected: \"Disk not found\", got \"%s\"", err) } diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 84fa2f166..faf440058 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -41,7 +41,7 @@ type StorageAPI interface { MakeVolBulk(volumes ...string) (err error) ListVols() (vols []VolInfo, err error) StatVol(volume string) (vol VolInfo, err error) - DeleteVol(volume string) (err error) + DeleteVol(volume string, forceDelete bool) (err error) // Walk in sorted order directly on disk. Walk(volume, dirPath string, marker string, recursive bool, leafFile string, diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 164d8e5a6..12e9eae68 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -234,9 +234,12 @@ func (client *storageRESTClient) StatVol(volume string) (volInfo VolInfo, err er } // DeleteVol - Deletes a volume over the network. -func (client *storageRESTClient) DeleteVol(volume string) (err error) { +func (client *storageRESTClient) DeleteVol(volume string, forceDelete bool) (err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) + if forceDelete { + values.Set(storageRESTForceDelete, "true") + } respBody, err := client.call(storageRESTMethodDeleteVol, values, nil, -1) defer http.DrainBody(respBody) return err diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 7fd2f7d29..e9d8ed5c7 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v16" // CrawlAndGetDataUsageHandler API change + storageRESTVersion = "v17" // RemoveBucket API change storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -49,22 +49,23 @@ const ( ) const ( - storageRESTVolume = "volume" - storageRESTVolumes = "volumes" - storageRESTDirPath = "dir-path" - storageRESTFilePath = "file-path" - storageRESTSrcVolume = "source-volume" - storageRESTSrcPath = "source-path" - storageRESTDstVolume = "destination-volume" - storageRESTDstPath = "destination-path" - storageRESTOffset = "offset" - storageRESTLength = "length" - storageRESTShardSize = "shard-size" - storageRESTCount = "count" - storageRESTMarkerPath = "marker" - storageRESTLeafFile = "leaf-file" - storageRESTRecursive = "recursive" - storageRESTBitrotAlgo = "bitrot-algo" - storageRESTBitrotHash = "bitrot-hash" - storageRESTDiskID = "disk-id" + storageRESTVolume = "volume" + storageRESTVolumes = "volumes" + storageRESTDirPath = "dir-path" + storageRESTFilePath = "file-path" + storageRESTSrcVolume = "source-volume" + storageRESTSrcPath = "source-path" + storageRESTDstVolume = "destination-volume" + storageRESTDstPath = "destination-path" + storageRESTOffset = "offset" + storageRESTLength = "length" + storageRESTShardSize = "shard-size" + storageRESTCount = "count" + storageRESTMarkerPath = "marker" + storageRESTLeafFile = "leaf-file" + storageRESTRecursive = "recursive" + storageRESTBitrotAlgo = "bitrot-algo" + storageRESTBitrotHash = "bitrot-hash" + storageRESTDiskID = "disk-id" + storageRESTForceDelete = "force-delete" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 317f27d85..33ce9c5ad 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -227,7 +227,8 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ } vars := mux.Vars(r) volume := vars[storageRESTVolume] - err := s.storage.DeleteVol(volume) + forceDelete := vars[storageRESTForceDelete] == "true" + err := s.storage.DeleteVol(volume, forceDelete) if err != nil { s.writeErrorResponse(w, err) } diff --git a/cmd/storage-rest_test.go b/cmd/storage-rest_test.go index b84325cf7..525e50a5c 100644 --- a/cmd/storage-rest_test.go +++ b/cmd/storage-rest_test.go @@ -182,7 +182,7 @@ func testStorageAPIDeleteVol(t *testing.T, storage StorageAPI) { } for i, testCase := range testCases { - err := storage.DeleteVol(testCase.volumeName) + err := storage.DeleteVol(testCase.volumeName, false) expectErr := (err != nil) if expectErr != testCase.expectErr { diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 44349b4c6..70a1ec86b 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -176,7 +176,7 @@ func (web *webAPIHandlers) MakeBucket(r *http.Request, args *MakeBucketArgs, rep return toJSONError(ctx, err) } if err = globalDNSConfig.Put(args.BucketName); err != nil { - objectAPI.DeleteBucket(ctx, args.BucketName) + objectAPI.DeleteBucket(ctx, args.BucketName, false) return toJSONError(ctx, err) } @@ -254,7 +254,7 @@ func (web *webAPIHandlers) DeleteBucket(r *http.Request, args *RemoveBucketArgs, deleteBucket := objectAPI.DeleteBucket - if err := deleteBucket(ctx, args.BucketName); err != nil { + if err := deleteBucket(ctx, args.BucketName, false); err != nil { return toJSONError(ctx, err, args.BucketName) } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 41e32cbfc..93ebf90df 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -528,7 +528,7 @@ func undoMakeBucketSets(bucket string, sets []*xlObjects, errs []error) { index := index g.Go(func() error { if errs[index] == nil { - return sets[index].DeleteBucket(context.Background(), bucket) + return sets[index].DeleteBucket(context.Background(), bucket, false) } return nil }, index) @@ -665,14 +665,14 @@ func (s *xlSets) IsCompressionSupported() bool { // DeleteBucket - deletes a bucket on all sets simultaneously, // even if one of the sets fail to delete buckets, we proceed to // undo a successful operation. -func (s *xlSets) DeleteBucket(ctx context.Context, bucket string) error { +func (s *xlSets) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { g := errgroup.WithNErrs(len(s.sets)) // Delete buckets in parallel across all sets. for index := range s.sets { index := index g.Go(func() error { - return s.sets[index].DeleteBucket(ctx, bucket) + return s.sets[index].DeleteBucket(ctx, bucket, forceDelete) }, index) } diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index 0adde25b2..e0e994d68 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -102,7 +102,7 @@ func undoMakeBucket(storageDisks []StorageAPI, bucket string) { } index := index g.Go(func() error { - _ = storageDisks[index].DeleteVol(bucket) + _ = storageDisks[index].DeleteVol(bucket, false) return nil }, index) } @@ -209,10 +209,10 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs for index, err := range dErrs { if err == errVolumeNotEmpty { // Attempt to delete bucket again. - if derr := storageDisks[index].DeleteVol(bucket); derr == errVolumeNotEmpty { + if derr := storageDisks[index].DeleteVol(bucket, false); derr == errVolumeNotEmpty { _ = cleanupDir(ctx, storageDisks[index], bucket, "") - _ = storageDisks[index].DeleteVol(bucket) + _ = storageDisks[index].DeleteVol(bucket, false) // Cleanup all the previously incomplete multiparts. _ = cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket) @@ -222,7 +222,7 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs } // DeleteBucket - deletes a bucket. -func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { +func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { // Collect if all disks report volume not found. storageDisks := xl.getDisks() @@ -232,7 +232,7 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { index := index g.Go(func() error { if storageDisks[index] != nil { - if err := storageDisks[index].DeleteVol(bucket); err != nil { + if err := storageDisks[index].DeleteVol(bucket, forceDelete); err != nil { return err } err := cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket) @@ -248,6 +248,17 @@ func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error { // Wait for all the delete vols to finish. dErrs := g.Wait() + if forceDelete { + for _, err := range dErrs { + if err != nil { + undoDeleteBucket(storageDisks, bucket) + return toObjectErr(err, bucket) + } + } + + return nil + } + writeQuorum := len(storageDisks)/2 + 1 err := reduceWriteQuorumErrs(ctx, dErrs, bucketOpIgnoredErrs, writeQuorum) if err == errXLWriteQuorum { diff --git a/cmd/xl-v1-healing-common_test.go b/cmd/xl-v1-healing-common_test.go index b7022e179..5c452401e 100644 --- a/cmd/xl-v1-healing-common_test.go +++ b/cmd/xl-v1-healing-common_test.go @@ -176,7 +176,7 @@ func TestListOnlineDisks(t *testing.T) { // Cleanup from previous test. obj.DeleteObject(context.Background(), bucket, object) - obj.DeleteBucket(context.Background(), bucket) + obj.DeleteBucket(context.Background(), bucket, false) err = obj.MakeBucketWithLocation(context.Background(), "bucket", "") if err != nil { diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index d6ae69a47..4e60c230a 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -326,7 +326,7 @@ func undoMakeBucketZones(bucket string, zones []*xlSets, errs []error) { index := index g.Go(func() error { if errs[index] == nil { - return zones[index].DeleteBucket(context.Background(), bucket) + return zones[index].DeleteBucket(context.Background(), bucket, false) } return nil }, index) @@ -1232,9 +1232,9 @@ func (z *xlZones) IsCompressionSupported() bool { // DeleteBucket - deletes a bucket on all zones simultaneously, // even if one of the zones fail to delete buckets, we proceed to // undo a successful operation. -func (z *xlZones) DeleteBucket(ctx context.Context, bucket string) error { +func (z *xlZones) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { if z.SingleZone() { - return z.zones[0].DeleteBucket(ctx, bucket) + return z.zones[0].DeleteBucket(ctx, bucket, forceDelete) } g := errgroup.WithNErrs(len(z.zones)) @@ -1242,11 +1242,26 @@ func (z *xlZones) DeleteBucket(ctx context.Context, bucket string) error { for index := range z.zones { index := index g.Go(func() error { - return z.zones[index].DeleteBucket(ctx, bucket) + return z.zones[index].DeleteBucket(ctx, bucket, forceDelete) }, index) } errs := g.Wait() + + if forceDelete { + for _, err := range errs { + if err != nil { + if _, ok := err.(InsufficientWriteQuorum); ok { + undoDeleteBucketZones(bucket, z.zones, errs) + } + + return err + } + } + + return nil + } + // For any write quorum failure, we undo all the delete buckets operation // by creating all the buckets again. for _, err := range errs {