xl/fs: cleanup '/.minio/tmp' directory on each initialization. (#1490)

This commit is contained in:
Harshavardhana 2016-05-05 01:54:43 -07:00 committed by Anand Babu (AB) Periasamy
parent ad40036cba
commit 46680788f9
4 changed files with 31 additions and 11 deletions

View File

@ -100,7 +100,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
// Cleanup all the parts.
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil {
recursive := false
if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID, recursive); err != nil {
return "", err
}

View File

@ -46,6 +46,11 @@ func newFSObjects(exportPath string) (ObjectLayer, error) {
return nil, err
}
}
// Cleanup all temp entries upon start.
cleanupAllTmpEntries(storage)
// Return successfully initialized object layer.
return fsObjects{storage}, nil
}

View File

@ -25,6 +25,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"github.com/Sirupsen/logrus"
"github.com/skyrings/skyring-common/tools/uuid"
@ -163,30 +164,37 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa
// Cleanup all temp entries inside tmpMetaPrefix directory, upon server initialization.
func cleanupAllTmpEntries(storage StorageAPI) error {
return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "")
recursive := true // Recursively delete all files inside 'tmp' directory.
return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "", recursive)
}
// Wrapper to which removes all the uploaded parts after a successful
// complete multipart upload.
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error {
func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string, recursive bool) error {
markerPath := ""
var wg = &sync.WaitGroup{}
for {
uploadIDPath := path.Join(prefix, bucket, object, uploadID)
fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, false, 1000)
fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, recursive, 1000)
if err != nil {
if err == errFileNotFound {
return InvalidUploadID{UploadID: uploadID}
}
return toObjectErr(err)
return toObjectErr(err, bucket, object)
}
// Loop through all files and delete each in go-routine, while
// adding each operation to a wait group.
for _, fileInfo := range fileInfos {
storage.DeleteFile(minioMetaBucket, fileInfo.Name)
markerPath = fileInfo.Name
wg.Add(1)
go func(fi FileInfo) {
defer wg.Done()
storage.DeleteFile(minioMetaBucket, fi.Name)
}(fileInfo)
}
if eof {
break
}
markerPath = fileInfos[len(fileInfos)-1].Name
}
// Wait for all the routines.
wg.Wait()
return nil
}
@ -205,7 +213,8 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str
} else if !status {
return InvalidUploadID{UploadID: uploadID}
}
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID)
recursive := false // Cleanup all the top level files and folders matching uploadID.
return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID, recursive)
}
// listLeafEntries - lists all entries if a given prefixPath is a leaf

View File

@ -42,6 +42,11 @@ func newXLObjects(exportPaths ...string) (ObjectLayer, error) {
if err != nil {
return nil, err
}
// Cleanup all temporary entries.
cleanupAllTmpEntries(storage)
// Return successfully initialized object layer.
return xlObjects{storage}, nil
}