init: Cleanup .minio/tmp directories recursively. Also takes care of cleaning up of parts directory during abortMultipartUpload. (#1532)

This commit is contained in:
Krishna Srinivas 2016-05-08 22:45:34 +05:30 committed by Harshavardhana
parent 3f51dd4fd4
commit a205aca6d2
4 changed files with 41 additions and 22 deletions

View File

@ -123,7 +123,7 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Cleanup all the parts if everything else has been safely committed. // Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { if err = cleanupUploadedParts(fs.storage, bucket, object, uploadID); err != nil {
return "", err return "", err
} }

View File

@ -215,25 +215,10 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
return newMD5Hex, nil return newMD5Hex, nil
} }
// Cleanup all temp entries inside tmpMetaPrefix directory, upon server initialization.
func cleanupAllTmpEntries(storage StorageAPI) error {
return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "")
}
// Wrapper to which removes all the uploaded parts after a successful // Wrapper to which removes all the uploaded parts after a successful
// complete multipart upload. // complete multipart upload.
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error { func cleanupUploadedParts(storage StorageAPI, bucket, object, uploadID string) error {
multipartDir := path.Join(prefix, bucket, object, uploadID) return cleanupDir(storage, minioMetaBucket, path.Join(mpartMetaPrefix, bucket, object, uploadID))
entries, err := storage.ListDir(minioMetaBucket, multipartDir)
if err != nil {
return err
}
for _, entry := range entries {
if err = storage.DeleteFile(minioMetaBucket, path.Join(multipartDir, entry)); err != nil {
return err
}
}
return nil
} }
// abortMultipartUploadCommon - aborts a multipart upload, common // abortMultipartUploadCommon - aborts a multipart upload, common
@ -253,7 +238,7 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
if !isUploadIDExists(storage, bucket, object, uploadID) { if !isUploadIDExists(storage, bucket, object, uploadID) {
return InvalidUploadID{UploadID: uploadID} return InvalidUploadID{UploadID: uploadID}
} }
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID) return cleanupUploadedParts(storage, bucket, object, uploadID)
} }
// isIncompleteMultipart - is object incomplete multipart. // isIncompleteMultipart - is object incomplete multipart.

View File

@ -38,13 +38,42 @@ func initObjectLayer(storage StorageAPI) error {
} }
} }
// Cleanup all temp entries upon start. // Cleanup all temp entries upon start.
err := cleanupAllTmpEntries(storage) err := cleanupDir(storage, minioMetaBucket, tmpMetaPrefix)
if err != nil { if err != nil {
return toObjectErr(err, minioMetaBucket, tmpMetaPrefix) return toObjectErr(err, minioMetaBucket, tmpMetaPrefix)
} }
return nil return nil
} }
// Cleanup a directory recursively.
func cleanupDir(storage StorageAPI, volume, dirPath string) error {
var delFunc func(string) error
// Function to delete entries recursively.
delFunc = func(entryPath string) error {
if !strings.HasSuffix(entryPath, slashSeparator) {
// No trailing "/" means that this is a file which can be deleted.
return storage.DeleteFile(volume, entryPath)
}
// If it's a directory, list and call delFunc() for each entry.
entries, err := storage.ListDir(volume, entryPath)
if err != nil {
if err == errFileNotFound {
// if dirPath prefix never existed.
return nil
}
return err
}
for _, entry := range entries {
err = delFunc(pathJoin(entryPath, entry))
if err != nil {
return err
}
}
return nil
}
return delFunc(retainSlash(dirPath))
}
/// Common object layer functions. /// Common object layer functions.
// makeBucket - create a bucket, is a common function for both object layers. // makeBucket - create a bucket, is a common function for both object layers.

View File

@ -22,6 +22,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"path"
"regexp" "regexp"
"strings" "strings"
"unicode/utf8" "unicode/utf8"
@ -103,9 +104,13 @@ func retainSlash(s string) string {
return strings.TrimSuffix(s, slashSeparator) + slashSeparator return strings.TrimSuffix(s, slashSeparator) + slashSeparator
} }
// pathJoin - path join. // pathJoin - like path.Join() but retains trailing "/" of the last element
func pathJoin(s1 string, s2 string) string { func pathJoin(s1 string, s2 string) string {
return retainSlash(s1) + s2 trailingSlash := ""
if strings.HasSuffix(s2, slashSeparator) {
trailingSlash = "/"
}
return path.Join(s1, s2) + trailingSlash
} }
// Create an s3 compatible MD5sum for complete multipart transaction. // Create an s3 compatible MD5sum for complete multipart transaction.