diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 84ab60c97..deb9dac52 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -31,6 +31,11 @@ import ( "github.com/minio/sha256-simd" ) +const ( + fsMultipartExpiry = time.Hour * 24 * 14 + fsMultipartCleanupPeriod = time.Hour * 24 +) + // Returns if the prefix is a multipart upload. func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool { uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile) @@ -302,16 +307,35 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } -// ListMultipartUploads - lists all the pending multipart uploads on a +// ListMultipartUploads - returns empty response always. The onus is +// on applications to remember uploadId of the multipart uploads that +// are in progress. +func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { + if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { + return lmi, err + } + + if _, err := fs.statBucketDir(bucket); err != nil { + return lmi, toObjectErr(err, bucket) + } + return ListMultipartsInfo{ + Prefix: prefix, + KeyMarker: keyMarker, + UploadIDMarker: uploadIDMarker, + Delimiter: delimiter, + MaxUploads: maxUploads, + IsTruncated: false, + }, nil +} + +// listMultipartUploadsHelper - lists all the pending multipart uploads on a // bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a // delimiter' which allows us to list uploads match a particular // prefix or lexically starting from 'keyMarker' or delimiting the // output to get a directory like listing. // -// Implements S3 compatible ListMultipartUploads API. The resulting -// ListMultipartsInfo structure is unmarshalled directly into XML and -// replied back to the client. -func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { +// This function remains here to aid in listing uploads that require cleanup. +func (fs fsObjects) listMultipartUploadsHelper(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) { if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil { return lmi, err } @@ -596,103 +620,37 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s }, nil } -// listObjectParts - wrapper scanning through -// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts -// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'. -func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { - result := ListPartsInfo{} - - uploadIDPath := pathJoin(bucket, object, uploadID) - fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile) - metaFile, err := fs.rwPool.Open(fsMetaPath) - if err != nil { - if err == errFileNotFound || err == errFileAccessDenied { - // On windows oddly this is returned. - return lpi, traceError(InvalidUploadID{UploadID: uploadID}) - } - return lpi, toObjectErr(traceError(err), bucket, object) - } - defer fs.rwPool.Close(fsMetaPath) - - fsMeta := fsMetaV1{} - _, err = fsMeta.ReadFrom(metaFile.LockedFile) - if err != nil { - return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath) - } - - // Only parts with higher part numbers will be listed. - partIdx := fsMeta.ObjectPartIndex(partNumberMarker) - parts := fsMeta.Parts - if partIdx != -1 { - parts = fsMeta.Parts[partIdx+1:] - } - - count := maxParts - for _, part := range parts { - var fi os.FileInfo - partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name) - fi, err = fsStatFile(partNamePath) - if err != nil { - return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath) - } - result.Parts = append(result.Parts, PartInfo{ - PartNumber: part.Number, - ETag: part.ETag, - LastModified: fi.ModTime(), - Size: fi.Size(), - }) - count-- - if count == 0 { - break - } - } - - // If listed entries are more than maxParts, we set IsTruncated as true. - if len(parts) > len(result.Parts) { - result.IsTruncated = true - // Make sure to fill next part number marker if IsTruncated is - // true for subsequent listing. - nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber - result.NextPartNumberMarker = nextPartNumberMarker - } - result.Bucket = bucket - result.Object = object - result.UploadID = uploadID - result.MaxParts = maxParts - - // Success. - return result, nil -} - -// ListObjectParts - lists all previously uploaded parts for a given -// object and uploadID. Takes additional input of part-number-marker -// to indicate where the listing should begin from. -// -// Implements S3 compatible ListObjectParts API. The resulting -// ListPartsInfo structure is unmarshalled directly into XML and -// replied back to the client. +// ListObjectParts - returns empty response always. Applications are +// expected to remember the uploaded part numbers and corresponding +// etags. func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) { if err := checkListPartsArgs(bucket, object, fs); err != nil { return lpi, err } + // Check if bucket exists if _, err := fs.statBucketDir(bucket); err != nil { return lpi, toObjectErr(err, bucket) } - // Hold the lock so that two parallel complete-multipart-uploads - // do not leave a stale uploads.json behind. - objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object)) - objectMPartPathLock.RLock() - defer objectMPartPathLock.RUnlock() - - listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts) + // Check if uploadID exists. N B This check may race with a + // concurrent abortMultipartUpload on the same uploadID. It is + // OK to be eventually consistent w.r.t listing of objects, + // uploads and parts. + uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID) + _, err := fsStatDir(uploadIDPath) if err != nil { - return lpi, toObjectErr(err, bucket, object) + return lpi, traceError(InvalidUploadID{UploadID: uploadID}) } - // Success. - return listPartsInfo, nil + // Return empty list parts response + return ListPartsInfo{ + Bucket: bucket, + Object: object, + UploadID: uploadID, + PartNumberMarker: 0, + MaxParts: maxParts, + }, nil } // CompleteMultipartUpload - completes an ongoing multipart @@ -978,3 +936,55 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return nil } + +// Remove multipart uploads left unattended in a given bucket older than `fsMultipartExpiry` +func (fs fsObjects) cleanupStaleMultipartUpload(bucket string) (err error) { + var lmi ListMultipartsInfo + + for { + // List multipart uploads in a bucket 1000 at a time + lmi, err = fs.listMultipartUploadsHelper(bucket, "", "", "", "/", 1000) + if err != nil { + errorIf(err, fmt.Sprintf("Failed to list uploads of %s for cleaning up of multipart uploads older than %d weeks", bucket, fsMultipartExpiry)) + return err + } + + // Remove uploads (and its parts) older than 2 weeks + for _, upload := range lmi.Uploads { + uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID) + st, err := fsStatDir(uploadIDPath) + if err != nil { + errorIf(err, "Failed to stat uploads directory", uploadIDPath) + return err + } + if time.Since(st.ModTime()) > fsMultipartExpiry { + fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID) + } + } + + // No more incomplete uploads remain + if !lmi.IsTruncated { + break + } + } + + return nil +} + +// Remove multipart uploads left unattended for more than `fsMultipartExpiry` seconds. +func (fs fsObjects) cleanupStaleMultipartUploads() { + for { + bucketInfos, err := fs.ListBuckets() + if err != nil { + errorIf(err, fmt.Sprintf("Failed to list buckets for cleaning up of multipart uploads older than %d weeks", fsMultipartExpiry)) + return + } + + for _, bucketInfo := range bucketInfos { + fs.cleanupStaleMultipartUpload(bucketInfo.Name) + } + + // Schedule for the next multipart backend cleanup + time.Sleep(fsMultipartCleanupPeriod) + } +} diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 960a905ca..bc5f233ba 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -157,6 +157,9 @@ func newFSObjectLayer(fsPath string) (ObjectLayer, error) { return nil, fmt.Errorf("Unable to initialize event notification. %s", err) } + // Start background process to cleanup old files in minio.sys.tmp + go fs.cleanupStaleMultipartUploads() + // Return successfully initialized object layer. return fs, nil } diff --git a/cmd/object-api-multipart_test.go b/cmd/object-api-multipart_test.go index 044c06428..113ce233c 100644 --- a/cmd/object-api-multipart_test.go +++ b/cmd/object-api-multipart_test.go @@ -1236,26 +1236,33 @@ func testListMultipartUploads(obj ObjectLayer, instanceType string, t TestErrHan if actualResult.Delimiter != expectedResult.Delimiter { t.Errorf("Test %d: %s: Expected Delimiter to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.Delimiter, actualResult.Delimiter) } - // Asserting NextUploadIDMarker. - if actualResult.NextUploadIDMarker != expectedResult.NextUploadIDMarker { - t.Errorf("Test %d: %s: Expected NextUploadIDMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextUploadIDMarker, actualResult.NextUploadIDMarker) - } - // Asserting NextKeyMarker. - if actualResult.NextKeyMarker != expectedResult.NextKeyMarker { - t.Errorf("Test %d: %s: Expected NextKeyMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextKeyMarker, actualResult.NextKeyMarker) - } // Asserting the keyMarker. if actualResult.KeyMarker != expectedResult.KeyMarker { t.Errorf("Test %d: %s: Expected keyMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.KeyMarker, actualResult.KeyMarker) } - // Asserting IsTruncated. - if actualResult.IsTruncated != testCase.expectedResult.IsTruncated { - t.Errorf("Test %d: %s: Expected Istruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated) - } - // Asserting the number of upload Metadata info. - if len(expectedResult.Uploads) != len(actualResult.Uploads) { - t.Errorf("Test %d: %s: Expected the result to contain info of %d Multipart Uploads, but found %d instead", i+1, instanceType, len(expectedResult.Uploads), len(actualResult.Uploads)) - } else { + + // ListMultipartUploads returns empty respsonse always in FS mode + if instanceType != FSTestStr { + // Asserting IsTruncated. + if actualResult.IsTruncated != testCase.expectedResult.IsTruncated { + t.Errorf("Test %d: %s: Expected Istruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated) + continue + } + // Asserting NextUploadIDMarker. + if actualResult.NextUploadIDMarker != expectedResult.NextUploadIDMarker { + t.Errorf("Test %d: %s: Expected NextUploadIDMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextUploadIDMarker, actualResult.NextUploadIDMarker) + continue + } + // Asserting NextKeyMarker. + if actualResult.NextKeyMarker != expectedResult.NextKeyMarker { + t.Errorf("Test %d: %s: Expected NextKeyMarker to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.NextKeyMarker, actualResult.NextKeyMarker) + continue + } + // Asserting the number of upload Metadata info. + if len(expectedResult.Uploads) != len(actualResult.Uploads) { + t.Errorf("Test %d: %s: Expected the result to contain info of %d Multipart Uploads, but found %d instead", i+1, instanceType, len(expectedResult.Uploads), len(actualResult.Uploads)) + continue + } // Iterating over the uploads Metadata and asserting the fields. for j, actualMetaData := range actualResult.Uploads { // Asserting the object name in the upload Metadata. @@ -1712,10 +1719,6 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) if actualResult.UploadID != expectedResult.UploadID { t.Errorf("Test %d: %s: Expected UploadID to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.UploadID, actualResult.UploadID) } - // Asserting NextPartNumberMarker. - if actualResult.NextPartNumberMarker != expectedResult.NextPartNumberMarker { - t.Errorf("Test %d: %s: Expected NextPartNumberMarker to be \"%d\", but instead found it to be \"%d\"", i+1, instanceType, expectedResult.NextPartNumberMarker, actualResult.NextPartNumberMarker) - } // Asserting PartNumberMarker. if actualResult.PartNumberMarker != expectedResult.PartNumberMarker { t.Errorf("Test %d: %s: Expected PartNumberMarker to be \"%d\", but instead found it to be \"%d\"", i+1, instanceType, expectedResult.PartNumberMarker, actualResult.PartNumberMarker) @@ -1724,14 +1727,24 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) if actualResult.Bucket != expectedResult.Bucket { t.Errorf("Test %d: %s: Expected Bucket to be \"%s\", but instead found it to be \"%s\"", i+1, instanceType, expectedResult.Bucket, actualResult.Bucket) } - // Asserting IsTruncated. - if actualResult.IsTruncated != testCase.expectedResult.IsTruncated { - t.Errorf("Test %d: %s: Expected IsTruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated) - } - // Asserting the number of Parts. - if len(expectedResult.Parts) != len(actualResult.Parts) { - t.Errorf("Test %d: %s: Expected the result to contain info of %d Parts, but found %d instead", i+1, instanceType, len(expectedResult.Parts), len(actualResult.Parts)) - } else { + + // ListObjectParts returns empty response always in FS mode + if instanceType != FSTestStr { + // Asserting IsTruncated. + if actualResult.IsTruncated != testCase.expectedResult.IsTruncated { + t.Errorf("Test %d: %s: Expected IsTruncated to be \"%v\", but found it to \"%v\"", i+1, instanceType, expectedResult.IsTruncated, actualResult.IsTruncated) + continue + } + // Asserting NextPartNumberMarker. + if actualResult.NextPartNumberMarker != expectedResult.NextPartNumberMarker { + t.Errorf("Test %d: %s: Expected NextPartNumberMarker to be \"%d\", but instead found it to be \"%d\"", i+1, instanceType, expectedResult.NextPartNumberMarker, actualResult.NextPartNumberMarker) + continue + } + // Asserting the number of Parts. + if len(expectedResult.Parts) != len(actualResult.Parts) { + t.Errorf("Test %d: %s: Expected the result to contain info of %d Parts, but found %d instead", i+1, instanceType, len(expectedResult.Parts), len(actualResult.Parts)) + continue + } // Iterating over the partInfos and asserting the fields. for j, actualMetaData := range actualResult.Parts { // Asserting the PartNumber in the PartInfo. @@ -1747,6 +1760,7 @@ func testListObjectParts(obj ObjectLayer, instanceType string, t TestErrHandler) t.Errorf("Test %d: %s: Part %d: Expected Etag to be \"%s\", but instead found \"%s\"", i+1, instanceType, j+1, expectedResult.Parts[j].ETag, actualMetaData.ETag) } } + } } } diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index bfe9b676a..624329a00 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -1407,7 +1407,7 @@ func testAPICopyObjectPartHandler(obj ObjectLayer, instanceType, bucketName stri if err != nil { t.Fatalf("Test %d: %s: Failed to look for copied object part: %s", i+1, instanceType, err) } - if len(results.Parts) != 1 { + if instanceType != FSTestStr && len(results.Parts) != 1 { t.Fatalf("Test %d: %s: Expected only one entry returned %d entries", i+1, instanceType, len(results.Parts)) } }