XL/bucket: Remove bucket should cleanup incomplete uploads as well. (#2173)

This behavior is in accordance with S3.

Fixes #2170
This commit is contained in:
Harshavardhana 2016-07-12 01:01:47 -07:00 committed by GitHub
parent 1c82b81408
commit 126865e8df
3 changed files with 24 additions and 12 deletions

View File

@ -199,9 +199,14 @@ func (fs fsObjects) DeleteBucket(bucket string) error {
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return BucketNameInvalid{Bucket: bucket} return BucketNameInvalid{Bucket: bucket}
} }
// Attempt to delete regular bucket.
if err := fs.storage.DeleteVol(bucket); err != nil { if err := fs.storage.DeleteVol(bucket); err != nil {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)
} }
// Cleanup all the previously incomplete multiparts.
if err := cleanupDir(fs.storage, path.Join(minioMetaBucket, mpartMetaPrefix), bucket); err != nil {
return toObjectErr(err, bucket)
}
return nil return nil
} }

View File

@ -171,22 +171,22 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error {
// Function to delete entries recursively. // Function to delete entries recursively.
delFunc = func(entryPath string) error { delFunc = func(entryPath string) error {
if !strings.HasSuffix(entryPath, slashSeparator) { if !strings.HasSuffix(entryPath, slashSeparator) {
// No trailing "/" means that this is a file which can be deleted. // Delete the file entry.
return storage.DeleteFile(volume, entryPath) return storage.DeleteFile(volume, entryPath)
} }
// If it's a directory, list and call delFunc() for each entry. // If it's a directory, list and call delFunc() for each entry.
entries, err := storage.ListDir(volume, entryPath) entries, err := storage.ListDir(volume, entryPath)
if err != nil { // If entryPath prefix never existed, safe to ignore.
if err == errFileNotFound { if err == errFileNotFound {
// if dirPath prefix never existed. return nil
return nil } else if err != nil { // For any other errors fail.
}
return err return err
} } // else on success..
for _, entry := range entries {
err = delFunc(pathJoin(entryPath, entry))
if err != nil { // Recurse and delete all other entries.
for _, entry := range entries {
if err = delFunc(pathJoin(entryPath, entry)); err != nil {
return err return err
} }
} }

View File

@ -17,6 +17,7 @@
package main package main
import ( import (
"path"
"sort" "sort"
"sync" "sync"
) )
@ -238,7 +239,14 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Delete volume inside a go-routine. // Delete volume inside a go-routine.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
// Attempt to delete bucket.
err := disk.DeleteVol(bucket) err := disk.DeleteVol(bucket)
if err != nil {
dErrs[index] = err
return
}
// Cleanup all the previously incomplete multiparts.
err = cleanupDir(disk, path.Join(minioMetaBucket, mpartMetaPrefix), bucket)
if err != nil { if err != nil {
dErrs[index] = err dErrs[index] = err
} }
@ -248,8 +256,7 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Wait for all the delete vols to finish. // Wait for all the delete vols to finish.
wg.Wait() wg.Wait()
// Count the errors for known errors, return quickly if we found // Count the errors for known errors, return quickly if we found an unknown error.
// an unknown error.
for _, err := range dErrs { for _, err := range dErrs {
if err != nil { if err != nil {
if isErrIgnored(err, objMetadataOpIgnoredErrs) { if isErrIgnored(err, objMetadataOpIgnoredErrs) {