ListMultipartUploads, ListObjectParts return empty response (#4694)

Also, periodically removes incomplete multipart uploads older than 2 weeks.
This commit is contained in:
Krishnan Parthasarathi
2017-08-04 10:45:57 -07:00
committed by Dee Koder
parent 0aca2ab970
commit 75c43bfb6c
4 changed files with 146 additions and 119 deletions

View File

@@ -31,6 +31,11 @@ import (
"github.com/minio/sha256-simd"
)
const (
fsMultipartExpiry = time.Hour * 24 * 14
fsMultipartCleanupPeriod = time.Hour * 24
)
// Returns if the prefix is a multipart upload.
func (fs fsObjects) isMultipartUpload(bucket, prefix string) bool {
uploadsIDPath := pathJoin(fs.fsPath, bucket, prefix, uploadsJSONFile)
@@ -302,16 +307,35 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
return result, nil
}
// ListMultipartUploads - lists all the pending multipart uploads on a
// ListMultipartUploads - returns empty response always. The onus is
// on applications to remember uploadId of the multipart uploads that
// are in progress.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return lmi, err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return lmi, toObjectErr(err, bucket)
}
return ListMultipartsInfo{
Prefix: prefix,
KeyMarker: keyMarker,
UploadIDMarker: uploadIDMarker,
Delimiter: delimiter,
MaxUploads: maxUploads,
IsTruncated: false,
}, nil
}
// listMultipartUploadsHelper - lists all the pending multipart uploads on a
// bucket. Additionally takes 'prefix, keyMarker, uploadIDmarker and a
// delimiter' which allows us to list uploads match a particular
// prefix or lexically starting from 'keyMarker' or delimiting the
// output to get a directory like listing.
//
// Implements S3 compatible ListMultipartUploads API. The resulting
// ListMultipartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
// This function remains here to aid in listing uploads that require cleanup.
func (fs fsObjects) listMultipartUploadsHelper(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return lmi, err
}
@@ -596,103 +620,37 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}, nil
}
// listObjectParts - wrapper scanning through
// '.minio.sys/multipart/bucket/object/UPLOADID'. Lists all the parts
// saved inside '.minio.sys/multipart/bucket/object/UPLOADID'.
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
result := ListPartsInfo{}
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Open(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
// On windows oddly this is returned.
return lpi, traceError(InvalidUploadID{UploadID: uploadID})
}
return lpi, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(fsMetaPath)
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(metaFile.LockedFile)
if err != nil {
return lpi, toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
parts := fsMeta.Parts
if partIdx != -1 {
parts = fsMeta.Parts[partIdx+1:]
}
count := maxParts
for _, part := range parts {
var fi os.FileInfo
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name)
fi, err = fsStatFile(partNamePath)
if err != nil {
return lpi, toObjectErr(err, minioMetaMultipartBucket, partNamePath)
}
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime(),
Size: fi.Size(),
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(parts) > len(result.Parts) {
result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
}
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
// Success.
return result, nil
}
// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID. Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is unmarshalled directly into XML and
// replied back to the client.
// ListObjectParts - returns empty response always. Applications are
// expected to remember the uploaded part numbers and corresponding
// etags.
func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (lpi ListPartsInfo, e error) {
if err := checkListPartsArgs(bucket, object, fs); err != nil {
return lpi, err
}
// Check if bucket exists
if _, err := fs.statBucketDir(bucket); err != nil {
return lpi, toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.RLock()
defer objectMPartPathLock.RUnlock()
listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
// Check if uploadID exists. N B This check may race with a
// concurrent abortMultipartUpload on the same uploadID. It is
// OK to be eventually consistent w.r.t listing of objects,
// uploads and parts.
uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID)
_, err := fsStatDir(uploadIDPath)
if err != nil {
return lpi, toObjectErr(err, bucket, object)
return lpi, traceError(InvalidUploadID{UploadID: uploadID})
}
// Success.
return listPartsInfo, nil
// Return empty list parts response
return ListPartsInfo{
Bucket: bucket,
Object: object,
UploadID: uploadID,
PartNumberMarker: 0,
MaxParts: maxParts,
}, nil
}
// CompleteMultipartUpload - completes an ongoing multipart
@@ -978,3 +936,55 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return nil
}
// Remove multipart uploads left unattended in a given bucket older than `fsMultipartExpiry`
func (fs fsObjects) cleanupStaleMultipartUpload(bucket string) (err error) {
var lmi ListMultipartsInfo
for {
// List multipart uploads in a bucket 1000 at a time
lmi, err = fs.listMultipartUploadsHelper(bucket, "", "", "", "/", 1000)
if err != nil {
errorIf(err, fmt.Sprintf("Failed to list uploads of %s for cleaning up of multipart uploads older than %d weeks", bucket, fsMultipartExpiry))
return err
}
// Remove uploads (and its parts) older than 2 weeks
for _, upload := range lmi.Uploads {
uploadIDPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, upload.Object, upload.UploadID)
st, err := fsStatDir(uploadIDPath)
if err != nil {
errorIf(err, "Failed to stat uploads directory", uploadIDPath)
return err
}
if time.Since(st.ModTime()) > fsMultipartExpiry {
fs.AbortMultipartUpload(bucket, upload.Object, upload.UploadID)
}
}
// No more incomplete uploads remain
if !lmi.IsTruncated {
break
}
}
return nil
}
// Remove multipart uploads left unattended for more than `fsMultipartExpiry` seconds.
func (fs fsObjects) cleanupStaleMultipartUploads() {
for {
bucketInfos, err := fs.ListBuckets()
if err != nil {
errorIf(err, fmt.Sprintf("Failed to list buckets for cleaning up of multipart uploads older than %d weeks", fsMultipartExpiry))
return
}
for _, bucketInfo := range bucketInfos {
fs.cleanupStaleMultipartUpload(bucketInfo.Name)
}
// Schedule for the next multipart backend cleanup
time.Sleep(fsMultipartCleanupPeriod)
}
}