From d0e854afb7823a25b83899004bbb57792b04cf1d Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 3 May 2016 16:10:24 -0700 Subject: [PATCH] xl/fs: Bring in ".minio/tmp" directory support. (#1464) All transactions happen through this directory inside ".minio/temp". Adding this allows us to remove any temporary files which were not committed before. Fixes #1462 Fixes #1444 --- fs-objects-multipart.go | 328 ++----------------------------- object-common-multipart.go | 385 +++++++++++++++++++++++++++++++++---- object-common.go | 28 +-- object-utils.go | 4 + posix.go | 16 +- xl-erasure-v1.go | 71 ++++++- xl-objects-multipart.go | 362 ++++------------------------------ 7 files changed, 497 insertions(+), 697 deletions(-) diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index 9458928cb..06d5d11ca 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -20,231 +20,11 @@ import ( "fmt" "io" "path" - "strconv" - "strings" - - "github.com/Sirupsen/logrus" - "github.com/skyrings/skyring-common/tools/uuid" ) -// listLeafEntries - lists all entries if a given prefixPath is a leaf -// directory, returns error if any - returns empty list if prefixPath -// is not a leaf directory. -func (fs fsObjects) listLeafEntries(prefixPath string) (entries []FileInfo, e error) { - var markerPath string - for { - fileInfos, eof, err := fs.storage.ListFiles(minioMetaBucket, prefixPath, markerPath, false, 1000) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - }).Errorf("%s", err) - return nil, err - } - for _, fileInfo := range fileInfos { - // Set marker for next batch of ListFiles. - markerPath = fileInfo.Name - if fileInfo.Mode.IsDir() { - // If a directory is found, doesn't return anything. - return nil, nil - } - fileName := path.Base(fileInfo.Name) - if !strings.Contains(fileName, ".") { - // Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum - // and retain entries of the pattern bucket/object/uploadID - entries = append(entries, fileInfo) - } - } - if eof { - break - } - } - return entries, nil -} - -// listMetaBucketFiles - list all files at a given prefix inside minioMetaBucket. -func (fs fsObjects) listMetaBucketFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) { - // newMaxKeys tracks the size of entries which are going to be - // returned back. - var newMaxKeys int - - // Following loop gathers and filters out special files inside - // minio meta volume. - for { - var fileInfos []FileInfo - // List files up to maxKeys-newMaxKeys, since we are skipping entries for special files. - fileInfos, eof, err = fs.storage.ListFiles(minioMetaBucket, prefixPath, markerPath, recursive, maxKeys-newMaxKeys) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - "recursive": recursive, - "maxKeys": maxKeys, - }).Errorf("%s", err) - return nil, true, err - } - // Loop through and validate individual file. - for _, fi := range fileInfos { - var entries []FileInfo - if fi.Mode.IsDir() { - // List all the entries if fi.Name is a leaf directory, if - // fi.Name is not a leaf directory then the resulting - // entries are empty. - entries, err = fs.listLeafEntries(fi.Name) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": fi.Name, - }).Errorf("%s", err) - return nil, false, err - } - } - // Set markerPath for next batch of listing. - markerPath = fi.Name - if len(entries) > 0 { - // We reach here for non-recursive case and a leaf entry. - for _, entry := range entries { - allFileInfos = append(allFileInfos, entry) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by fs.storage.ListFiles() - // error : nil - return - } - } - } else { - // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] - if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. - // Skip files matching pattern bucket/object/uploadID.partNum.md5sum - // and retain files matching pattern bucket/object/uploadID - specialFile := path.Base(fi.Name) - if strings.Contains(specialFile, ".") { - // Contains partnumber and md5sum info, skip this. - continue - } - } - } - allFileInfos = append(allFileInfos, fi) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by fs.storage.ListFiles() - // error : nil - return - } - } - // If we have reached eof then we break out. - if eof { - break - } - } - - // Return entries here. - return allFileInfos, eof, nil -} - // ListMultipartUploads - list multipart uploads. func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - result := ListMultipartsInfo{} - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: prefix}) - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, (UnsupportedDelimiter{ - Delimiter: delimiter, - }) - } - // Verify if marker has prefix. - if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, (InvalidMarkerPrefixCombination{ - Marker: keyMarker, - Prefix: prefix, - }) - } - if uploadIDMarker != "" { - if strings.HasSuffix(keyMarker, slashSeparator) { - return result, (InvalidUploadIDKeyCombination{ - UploadIDMarker: uploadIDMarker, - KeyMarker: keyMarker, - }) - } - id, err := uuid.Parse(uploadIDMarker) - if err != nil { - return result, err - } - if id.IsZero() { - return result, (MalformedUploadID{ - UploadID: uploadIDMarker, - }) - } - } - - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - result.IsTruncated = true - result.MaxUploads = maxUploads - - // Not using path.Join() as it strips off the trailing '/'. - // Also bucket should always be followed by '/' even if prefix is empty. - prefixPath := pathJoin(bucket, prefix) - keyMarkerPath := "" - if keyMarker != "" { - keyMarkerPath = pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) - } - // List all the multipart files at prefixPath, starting with marker keyMarkerPath. - fileInfos, eof, err := fs.listMetaBucketFiles(prefixPath, keyMarkerPath, recursive, maxUploads) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": keyMarkerPath, - "recursive": recursive, - "maxUploads": maxUploads, - }).Errorf("listMetaBucketFiles failed with %s", err) - return ListMultipartsInfo{}, err - } - - // Loop through all the received files fill in the multiparts result. - for _, fi := range fileInfos { - var objectName string - var uploadID string - if fi.Mode.IsDir() { - // All directory entries are common prefixes. - uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = strings.TrimPrefix(fi.Name, retainSlash(bucket)) - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = path.Base(fi.Name) - objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket)) - result.Uploads = append(result.Uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadID, - Initiated: fi.ModTime, - }) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - } - result.IsTruncated = !eof - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil + return listMultipartUploadsCommon(fs.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // NewMultipartUpload - initialize a new multipart upload, returns a unique id. @@ -254,100 +34,32 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string) (string, error) { // PutObjectPart - writes the multipart upload chunks. func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - newMD5Hex, err := putObjectPartCommon(fs.storage, bucket, object, uploadID, partID, size, data, md5Hex) - if err != nil { - return "", err - } - partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) - partSuffixPath := path.Join(bucket, object, partSuffix) - partSuffixMD5 := fmt.Sprintf("%s.%d.%s", uploadID, partID, newMD5Hex) - partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5) - err = fs.storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) - if err != nil { - return "", err - } - return newMD5Hex, nil + return putObjectPartCommon(fs.storage, bucket, object, uploadID, partID, size, data, md5Hex) } func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListPartsInfo{}, (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return ListPartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if status, err := isUploadIDExists(fs.storage, bucket, object, uploadID); err != nil { - return ListPartsInfo{}, err - } else if !status { - return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID}) - } - result := ListPartsInfo{} - var markerPath string - nextPartNumberMarker := 0 - uploadIDPath := path.Join(bucket, object, uploadID) - // Figure out the marker for the next subsequent calls, if the - // partNumberMarker is already set. - if partNumberMarker > 0 { - partNumberMarkerPath := uploadIDPath + "." + strconv.Itoa(partNumberMarker) + "." - fileInfos, _, err := fs.storage.ListFiles(minioMetaBucket, partNumberMarkerPath, "", false, 1) - if err != nil { - return result, toObjectErr(err, minioMetaBucket, partNumberMarkerPath) - } - if len(fileInfos) == 0 { - return result, (InvalidPart{}) - } - markerPath = fileInfos[0].Name - } - uploadIDPrefix := uploadIDPath + "." - fileInfos, eof, err := fs.storage.ListFiles(minioMetaBucket, uploadIDPrefix, markerPath, false, maxParts) - if err != nil { - return result, InvalidPart{} - } - for _, fileInfo := range fileInfos { - fileName := path.Base(fileInfo.Name) - splitResult := strings.Split(fileName, ".") - partNum, err := strconv.Atoi(splitResult[1]) - if err != nil { - return result, err - } - md5sum := splitResult[2] - result.Parts = append(result.Parts, partInfo{ - PartNumber: partNum, - LastModified: fileInfo.ModTime, - ETag: md5sum, - Size: fileInfo.Size, - }) - nextPartNumberMarker = partNum - } - result.Bucket = bucket - result.Object = object - result.UploadID = uploadID - result.PartNumberMarker = partNumberMarker - result.NextPartNumberMarker = nextPartNumberMarker - result.MaxParts = maxParts - result.IsTruncated = !eof - return result, nil + return listObjectPartsCommon(fs.storage, bucket, object, uploadID, partNumberMarker, maxParts) } func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) + return "", BucketNameInvalid{Bucket: bucket} } if !IsValidObjectName(object) { - return "", (ObjectNameInvalid{ + return "", ObjectNameInvalid{ Bucket: bucket, Object: object, - }) + } } if status, err := isUploadIDExists(fs.storage, bucket, object, uploadID); err != nil { return "", err } else if !status { - return "", (InvalidUploadID{UploadID: uploadID}) + return "", InvalidUploadID{UploadID: uploadID} } - fileWriter, err := fs.storage.CreateFile(bucket, object) + tempObj := path.Join(tmpMetaPrefix, bucket, object, uploadID) + fileWriter, err := fs.storage.CreateFile(minioMetaBucket, tempObj) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -355,12 +67,13 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload var md5Sums []string for _, part := range parts { // Construct part suffix. - partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag) + partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix) var fileReader io.ReadCloser - fileReader, err = fs.storage.ReadFile(minioMetaBucket, path.Join(bucket, object, partSuffix), 0) + fileReader, err = fs.storage.ReadFile(minioMetaBucket, multipartPartFile, 0) if err != nil { if err == errFileNotFound { - return "", (InvalidPart{}) + return "", InvalidPart{} } return "", err } @@ -387,18 +100,19 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } // Cleanup all the parts. - fs.cleanupUploadedParts(bucket, object, uploadID) + if err = cleanupUploadedParts(fs.storage, mpartMetaPrefix, bucket, object, uploadID); err != nil { + return "", err + } + + err = fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object) + if err != nil { + return "", toObjectErr(err, bucket, object) + } // Return md5sum. return s3MD5, nil } -// Wrapper to which removes all the uploaded parts after a successful -// complete multipart upload. -func (fs fsObjects) cleanupUploadedParts(bucket, object, uploadID string) error { - return abortMultipartUploadCommon(fs.storage, bucket, object, uploadID) -} - // AbortMultipartUpload - aborts a multipart upload. func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { return abortMultipartUploadCommon(fs.storage, bucket, object, uploadID) diff --git a/object-common-multipart.go b/object-common-multipart.go index 48a5f7cf4..1879da910 100644 --- a/object-common-multipart.go +++ b/object-common-multipart.go @@ -23,7 +23,10 @@ import ( "io" "io/ioutil" "path" + "strconv" + "strings" + "github.com/Sirupsen/logrus" "github.com/skyrings/skyring-common/tools/uuid" ) @@ -34,7 +37,7 @@ import ( func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) (uploadID string, err error) { // Verify if bucket name is valid. if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) + return "", BucketNameInvalid{Bucket: bucket} } // Verify if object name is valid. if !IsValidObjectName(object) { @@ -48,15 +51,6 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) return "", BucketNotFound{Bucket: bucket} } - if _, err := storage.StatVol(minioMetaBucket); err != nil { - if err == errVolumeNotFound { - err = storage.MakeVol(minioMetaBucket) - if err != nil { - return "", toObjectErr(err) - } - } - } - // Loops through until successfully generates a new unique upload id. for { uuid, err := uuid.New() @@ -64,19 +58,23 @@ func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) return "", err } uploadID := uuid.String() - uploadIDPath := path.Join(bucket, object, uploadID) + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + tempUploadIDPath := path.Join(tmpMetaPrefix, bucket, object, uploadID) if _, err = storage.StatFile(minioMetaBucket, uploadIDPath); err != nil { if err != errFileNotFound { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } // uploadIDPath doesn't exist, so create empty file to reserve the name var w io.WriteCloser - if w, err = storage.CreateFile(minioMetaBucket, uploadIDPath); err == nil { - // Close the writer. - if err = w.Close(); err != nil { - return "", err - } - } else { + if w, err = storage.CreateFile(minioMetaBucket, tempUploadIDPath); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + // Close the writer. + if err = w.Close(); err != nil { + return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) + } + err = storage.RenameFile(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) + if err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } return uploadID, nil @@ -110,7 +108,7 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa } partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) - partSuffixPath := path.Join(bucket, object, partSuffix) + partSuffixPath := path.Join(tmpMetaPrefix, bucket, object, partSuffix) fileWriter, err := storage.CreateFile(minioMetaBucket, partSuffixPath) if err != nil { return "", toObjectErr(err, bucket, object) @@ -153,28 +151,27 @@ func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploa if err != nil { return "", err } + + partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex) + partSuffixMD5Path := path.Join(mpartMetaPrefix, bucket, object, partSuffixMD5) + err = storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) + if err != nil { + return "", err + } return newMD5Hex, nil } -// abortMultipartUploadCommon - aborts a multipart upload, common -// function used by both object layers. -func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return BucketNameInvalid{Bucket: bucket} - } - if !IsValidObjectName(object) { - return ObjectNameInvalid{Bucket: bucket, Object: object} - } - if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { - return err - } else if !status { - return InvalidUploadID{UploadID: uploadID} - } +// Cleanup all temp entries inside tmpMetaPrefix directory, upon server initialization. +func cleanupAllTmpEntries(storage StorageAPI) error { + return cleanupUploadedParts(storage, tmpMetaPrefix, "", "", "") +} +// Wrapper to which removes all the uploaded parts after a successful +// complete multipart upload. +func cleanupUploadedParts(storage StorageAPI, prefix, bucket, object, uploadID string) error { markerPath := "" for { - uploadIDPath := path.Join(bucket, object, uploadID) + uploadIDPath := path.Join(prefix, bucket, object, uploadID) fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, false, 1000) if err != nil { if err == errFileNotFound { @@ -192,3 +189,323 @@ func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID str } return nil } + +// abortMultipartUploadCommon - aborts a multipart upload, common +// function used by both object layers. +func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ObjectNameInvalid{Bucket: bucket, Object: object} + } + if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { + return err + } else if !status { + return InvalidUploadID{UploadID: uploadID} + } + return cleanupUploadedParts(storage, mpartMetaPrefix, bucket, object, uploadID) +} + +// listLeafEntries - lists all entries if a given prefixPath is a leaf +// directory, returns error if any - returns empty list if prefixPath +// is not a leaf directory. +func listLeafEntries(storage StorageAPI, prefixPath string) (entries []FileInfo, e error) { + var markerPath string + for { + fileInfos, eof, err := storage.ListFiles(minioMetaBucket, prefixPath, markerPath, false, 1000) + if err != nil { + log.WithFields(logrus.Fields{ + "prefixPath": prefixPath, + "markerPath": markerPath, + }).Errorf("%s", err) + return nil, err + } + for _, fileInfo := range fileInfos { + // Set marker for next batch of ListFiles. + markerPath = fileInfo.Name + if fileInfo.Mode.IsDir() { + // If a directory is found, doesn't return anything. + return nil, nil + } + fileName := path.Base(fileInfo.Name) + if !strings.Contains(fileName, ".") { + // Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum + // and retain entries of the pattern bucket/object/uploadID + entries = append(entries, fileInfo) + } + } + if eof { + break + } + } + return entries, nil +} + +// listMetaBucketMultipartFiles - list all files at a given prefix inside minioMetaBucket. +func listMetaBucketMultipartFiles(storage StorageAPI, prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) { + // newMaxKeys tracks the size of entries which are going to be + // returned back. + var newMaxKeys int + + // Following loop gathers and filters out special files inside + // minio meta volume. + for { + var fileInfos []FileInfo + // List files up to maxKeys-newMaxKeys, since we are skipping + // entries for special files. + fileInfos, eof, err = storage.ListFiles(minioMetaBucket, prefixPath, markerPath, recursive, maxKeys-newMaxKeys) + if err != nil { + log.WithFields(logrus.Fields{ + "prefixPath": prefixPath, + "markerPath": markerPath, + "recursive": recursive, + "maxKeys": maxKeys, + }).Errorf("%s", err) + return nil, true, err + } + // Loop through and validate individual file. + for _, fi := range fileInfos { + var entries []FileInfo + if fi.Mode.IsDir() { + // List all the entries if fi.Name is a leaf directory, if + // fi.Name is not a leaf directory then the resulting + // entries are empty. + entries, err = listLeafEntries(storage, fi.Name) + if err != nil { + log.WithFields(logrus.Fields{ + "prefixPath": fi.Name, + }).Errorf("%s", err) + return nil, false, err + } + } + // Set markerPath for next batch of listing. + markerPath = fi.Name + if len(entries) > 0 { + // We reach here for non-recursive case and a leaf entry. + for _, entry := range entries { + allFileInfos = append(allFileInfos, entry) + newMaxKeys++ + // If we have reached the maxKeys, it means we have listed + // everything that was requested. Return right here. + if newMaxKeys == maxKeys { + // Return values: + // allFileInfos : "maxKeys" number of entries. + // eof : eof returned by fs.storage.ListFiles() + // error : nil + return + } + } + } else { + // We reach here for a non-recursive case non-leaf entry + // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] + if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. + // Skip files matching pattern bucket/object/uploadID.partNum.md5sum + // and retain files matching pattern bucket/object/uploadID + specialFile := path.Base(fi.Name) + if strings.Contains(specialFile, ".") { + // Contains partnumber and md5sum info, skip this. + continue + } + } + allFileInfos = append(allFileInfos, fi) + newMaxKeys++ + // If we have reached the maxKeys, it means we have listed + // everything that was requested. Return right here. + if newMaxKeys == maxKeys { + // Return values: + // allFileInfos : "maxKeys" number of entries. + // eof : eof returned by fs.storage.ListFiles() + // error : nil + return + } + } + } + // If we have reached eof then we break out. + if eof { + break + } + } + + // Return entries here. + return allFileInfos, eof, nil +} + +// listMultipartUploadsCommon - lists all multipart uploads, common +// function for both object layers. +func listMultipartUploadsCommon(storage StorageAPI, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { + result := ListMultipartsInfo{} + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListMultipartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + if !IsValidObjectPrefix(prefix) { + return ListMultipartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != slashSeparator { + return ListMultipartsInfo{}, UnsupportedDelimiter{ + Delimiter: delimiter, + } + } + // Verify if marker has prefix. + if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { + return ListMultipartsInfo{}, InvalidMarkerPrefixCombination{ + Marker: keyMarker, + Prefix: prefix, + } + } + if uploadIDMarker != "" { + if strings.HasSuffix(keyMarker, slashSeparator) { + return result, InvalidUploadIDKeyCombination{ + UploadIDMarker: uploadIDMarker, + KeyMarker: keyMarker, + } + } + id, err := uuid.Parse(uploadIDMarker) + if err != nil { + return result, err + } + if id.IsZero() { + return result, MalformedUploadID{ + UploadID: uploadIDMarker, + } + } + } + + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + result.IsTruncated = true + result.MaxUploads = maxUploads + + // Not using path.Join() as it strips off the trailing '/'. + // Also bucket should always be followed by '/' even if prefix is empty. + multipartPrefixPath := pathJoin(mpartMetaPrefix, pathJoin(bucket, prefix)) + multipartMarkerPath := "" + if keyMarker != "" { + keyMarkerPath := pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) + multipartMarkerPath = pathJoin(mpartMetaPrefix, keyMarkerPath) + } + + // List all the multipart files at prefixPath, starting with marker keyMarkerPath. + fileInfos, eof, err := listMetaBucketMultipartFiles(storage, multipartPrefixPath, multipartMarkerPath, recursive, maxUploads) + if err != nil { + log.WithFields(logrus.Fields{ + "prefixPath": multipartPrefixPath, + "markerPath": multipartMarkerPath, + "recursive": recursive, + "maxUploads": maxUploads, + }).Errorf("listMetaBucketMultipartFiles failed with %s", err) + return ListMultipartsInfo{}, err + } + + // Loop through all the received files fill in the multiparts result. + for _, fi := range fileInfos { + var objectName string + var uploadID string + if fi.Mode.IsDir() { + // All directory entries are common prefixes. + uploadID = "" // Upload ids are empty for CommonPrefixes. + objectName = strings.TrimPrefix(fi.Name, retainSlash(path.Join(mpartMetaPrefix, bucket))) + result.CommonPrefixes = append(result.CommonPrefixes, objectName) + } else { + uploadID = path.Base(fi.Name) + objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(path.Join(mpartMetaPrefix, bucket))) + result.Uploads = append(result.Uploads, uploadMetadata{ + Object: objectName, + UploadID: uploadID, + Initiated: fi.ModTime, + }) + } + result.NextKeyMarker = objectName + result.NextUploadIDMarker = uploadID + } + result.IsTruncated = !eof + if !result.IsTruncated { + result.NextKeyMarker = "" + result.NextUploadIDMarker = "" + } + return result, nil +} + +// ListObjectParts - list object parts, common function across both +// object layers. +func listObjectPartsCommon(storage StorageAPI, bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListPartsInfo{}, BucketNameInvalid{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ListPartsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: object} + } + if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { + return ListPartsInfo{}, err + } else if !status { + return ListPartsInfo{}, InvalidUploadID{UploadID: uploadID} + } + result := ListPartsInfo{} + var markerPath string + nextPartNumberMarker := 0 + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + // Figure out the marker for the next subsequent calls, if the + // partNumberMarker is already set. + if partNumberMarker > 0 { + partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "." + fileInfos, _, err := storage.ListFiles(minioMetaBucket, partNumberMarkerPath, "", false, 1) + if err != nil { + return result, toObjectErr(err, minioMetaBucket, partNumberMarkerPath) + } + if len(fileInfos) == 0 { + return result, InvalidPart{} + } + markerPath = fileInfos[0].Name + } + uploadIDPrefix := uploadIDPath + "." + fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPrefix, markerPath, false, maxParts) + if err != nil { + return result, InvalidPart{} + } + for _, fileInfo := range fileInfos { + fileName := path.Base(fileInfo.Name) + splitResult := strings.Split(fileName, ".") + partNum, err := strconv.Atoi(splitResult[1]) + if err != nil { + return result, err + } + md5sum := splitResult[2] + result.Parts = append(result.Parts, partInfo{ + PartNumber: partNum, + LastModified: fileInfo.ModTime, + ETag: md5sum, + Size: fileInfo.Size, + }) + nextPartNumberMarker = partNum + } + result.Bucket = bucket + result.Object = object + result.UploadID = uploadID + result.PartNumberMarker = partNumberMarker + result.NextPartNumberMarker = nextPartNumberMarker + result.MaxParts = maxParts + result.IsTruncated = !eof + return result, nil +} + +// isUploadIDExists - verify if a given uploadID exists and is valid. +func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) (bool, error) { + uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) + st, err := storage.StatFile(minioMetaBucket, uploadIDPath) + if err != nil { + // Upload id does not exist. + if err == errFileNotFound { + return false, nil + } + return false, err + } + // Upload id exists and is a regular file. + return st.Mode.IsRegular(), nil +} diff --git a/object-common.go b/object-common.go index 35d19eac9..e5bd39096 100644 --- a/object-common.go +++ b/object-common.go @@ -105,13 +105,13 @@ func deleteBucket(storage StorageAPI, bucket string) error { func putObjectCommon(storage StorageAPI, bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) + return "", BucketNameInvalid{Bucket: bucket} } if !IsValidObjectName(object) { - return "", (ObjectNameInvalid{ + return "", ObjectNameInvalid{ Bucket: bucket, Object: object, - }) + } } // Check whether the bucket exists. if isExist, err := isBucketExist(storage, bucket); err != nil { @@ -120,7 +120,8 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6 return "", BucketNotFound{Bucket: bucket} } - fileWriter, err := storage.CreateFile(bucket, object) + tempObj := path.Join(tmpMetaPrefix, bucket, object) + fileWriter, err := storage.CreateFile(minioMetaBucket, tempObj) if err != nil { return "", toObjectErr(err, bucket, object) } @@ -166,26 +167,15 @@ func putObjectCommon(storage StorageAPI, bucket string, object string, size int6 if err != nil { return "", err } + err = storage.RenameFile(minioMetaBucket, tempObj, bucket, object) + if err != nil { + return "", err + } // Return md5sum, successfully wrote object. return newMD5Hex, nil } -// isUploadIDExists - verify if a given uploadID exists and is valid. -func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) (bool, error) { - uploadIDPath := path.Join(bucket, object, uploadID) - st, err := storage.StatFile(minioMetaBucket, uploadIDPath) - if err != nil { - // Upload id does not exist. - if err == errFileNotFound { - return false, nil - } - return false, err - } - // Upload id exists and is a regular file. - return st.Mode.IsRegular(), nil -} - // checks whether bucket exists. func isBucketExist(storage StorageAPI, bucketName string) (bool, error) { // Check whether bucket exists. diff --git a/object-utils.go b/object-utils.go index 1391a02bf..3d8548fad 100644 --- a/object-utils.go +++ b/object-utils.go @@ -32,6 +32,10 @@ import ( const ( // Minio meta bucket. minioMetaBucket = ".minio" + // Multipart meta prefix. + mpartMetaPrefix = "multipart" + // Tmp meta prefix. + tmpMetaPrefix = "tmp" ) // validBucket regexp. diff --git a/posix.go b/posix.go index e1628d53f..a0fee714b 100644 --- a/posix.go +++ b/posix.go @@ -741,7 +741,7 @@ func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) err log.WithFields(logrus.Fields{ "diskPath": s.diskPath, "volume": srcVolume, - }).Debugf("getVolumeDir failed with %s", err) + }).Errorf("getVolumeDir failed with %s", err) return err } dstVolumeDir, err := s.getVolumeDir(dstVolume) @@ -749,12 +749,20 @@ func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) err log.WithFields(logrus.Fields{ "diskPath": s.diskPath, "volume": dstVolume, - }).Debugf("getVolumeDir failed with %s", err) + }).Errorf("getVolumeDir failed with %s", err) return err } if err = os.MkdirAll(path.Join(dstVolumeDir, path.Dir(dstPath)), 0755); err != nil { - log.Debug("os.MkdirAll failed with %s", err) + log.Errorf("os.MkdirAll failed with %s", err) return err } - return os.Rename(path.Join(srcVolumeDir, srcPath), path.Join(dstVolumeDir, dstPath)) + err = os.Rename(path.Join(srcVolumeDir, srcPath), path.Join(dstVolumeDir, dstPath)) + if err != nil { + if os.IsNotExist(err) { + return errFileNotFound + } + log.Errorf("os.Rename failed with %s", err) + return err + } + return nil } diff --git a/xl-erasure-v1.go b/xl-erasure-v1.go index 03ac3ea85..f52383992 100644 --- a/xl-erasure-v1.go +++ b/xl-erasure-v1.go @@ -697,17 +697,22 @@ func (xl XL) DeleteFile(volume, path string) error { // find online disks and meta data of higherVersion var mdata *xlMetaV1 onlineDiskCount := 0 + errFileNotFoundErr := 0 for index, metadata := range partsMetadata { if errs[index] == nil { onlineDiskCount++ if metadata.Stat.Version == higherVersion && mdata == nil { mdata = &metadata } + } else if errs[index] == errFileNotFound { + errFileNotFoundErr++ } } - // return error if mdata is empty or onlineDiskCount doesn't meet write quorum - if mdata == nil || onlineDiskCount < xl.writeQuorum { + if errFileNotFoundErr == len(xl.storageDisks) { + return errFileNotFound + } else if mdata == nil || onlineDiskCount < xl.writeQuorum { + // return error if mdata is empty or onlineDiskCount doesn't meet write quorum return errWriteQuorum } @@ -768,7 +773,22 @@ func (xl XL) DeleteFile(volume, path string) error { if errCount <= len(xl.storageDisks)-xl.writeQuorum { continue } - + // Safely close and remove. + if err = safeCloseAndRemove(metadataWriter); err != nil { + return err + } + return err + } + // Safely wrote, now rename to its actual location. + if err = metadataWriter.Close(); err != nil { + log.WithFields(logrus.Fields{ + "volume": volume, + "path": path, + "diskIndex": index, + }).Errorf("Metadata commit failed with %s", err) + if err = safeCloseAndRemove(metadataWriter); err != nil { + return err + } return err } @@ -810,7 +830,7 @@ func (xl XL) DeleteFile(volume, path string) error { } } } - + // Return success. return nil } @@ -829,14 +849,47 @@ func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { if !isValidPath(dstPath) { return errInvalidArgument } - for _, disk := range xl.storageDisks { - if err := disk.RenameFile(srcVolume, srcPath, dstVolume, dstPath); err != nil { + + // Hold read lock at source before rename. + nsMutex.RLock(srcVolume, srcPath) + defer nsMutex.RUnlock(srcVolume, srcPath) + + // Hold write lock at destination before rename. + nsMutex.Lock(dstVolume, dstPath) + defer nsMutex.Unlock(dstVolume, dstPath) + + for index, disk := range xl.storageDisks { + // Make sure to rename all the files only, not directories. + srcErasurePartPath := slashpath.Join(srcPath, fmt.Sprintf("file.%d", index)) + dstErasurePartPath := slashpath.Join(dstPath, fmt.Sprintf("file.%d", index)) + err := disk.RenameFile(srcVolume, srcErasurePartPath, dstVolume, dstErasurePartPath) + if err != nil { + log.WithFields(logrus.Fields{ + "srcVolume": srcVolume, + "srcPath": srcErasurePartPath, + "dstVolume": dstVolume, + "dstPath": dstErasurePartPath, + }).Errorf("RenameFile failed with %s", err) + return err + } + srcXLMetaPath := slashpath.Join(srcPath, xlMetaV1File) + dstXLMetaPath := slashpath.Join(dstPath, xlMetaV1File) + err = disk.RenameFile(srcVolume, srcXLMetaPath, dstVolume, dstXLMetaPath) + if err != nil { + log.WithFields(logrus.Fields{ + "srcVolume": srcVolume, + "srcPath": srcXLMetaPath, + "dstVolume": dstVolume, + "dstPath": dstXLMetaPath, + }).Errorf("RenameFile failed with %s", err) + return err + } + err = disk.DeleteFile(srcVolume, srcPath) + if err != nil { log.WithFields(logrus.Fields{ "srcVolume": srcVolume, "srcPath": srcPath, - "dstVolume": dstVolume, - "dstPath": dstPath, - }).Errorf("RenameFile failed with %s", err) + }).Errorf("DeleteFile failed with %s", err) return err } } diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index 94a90b32c..e35fb9675 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -21,12 +21,6 @@ import ( "fmt" "io" "path" - "sort" - "strconv" - "strings" - - "github.com/Sirupsen/logrus" - "github.com/skyrings/skyring-common/tools/uuid" ) // MultipartPartInfo Info of each part kept in the multipart metadata file after @@ -68,224 +62,9 @@ func partNumToPartFileName(partNum int) string { return fmt.Sprintf("%.5d%s", partNum, multipartSuffix) } -// listLeafEntries - lists all entries if a given prefixPath is a leaf -// directory, returns error if any - returns empty list if prefixPath -// is not a leaf directory. -func (xl xlObjects) listLeafEntries(prefixPath string) (entries []FileInfo, e error) { - var markerPath string - for { - fileInfos, eof, err := xl.storage.ListFiles(minioMetaBucket, prefixPath, markerPath, false, 1000) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - }).Errorf("%s", err) - return nil, err - } - for _, fileInfo := range fileInfos { - // Set marker for next batch of ListFiles. - markerPath = fileInfo.Name - if fileInfo.Mode.IsDir() { - // If a directory is found, doesn't return anything. - return nil, nil - } - fileName := path.Base(fileInfo.Name) - if !strings.Contains(fileName, ".") { - // Skip the entry if it is of the pattern bucket/object/uploadID.partNum.md5sum - // and retain entries of the pattern bucket/object/uploadID - entries = append(entries, fileInfo) - } - } - if eof { - break - } - } - return entries, nil -} - -// listMetaBucketFiles - list all files at a given prefix inside minioMetaBucket. -func (xl xlObjects) listMetaBucketFiles(prefixPath string, markerPath string, recursive bool, maxKeys int) (allFileInfos []FileInfo, eof bool, err error) { - // newMaxKeys tracks the size of entries which are going to be - // returned back. - var newMaxKeys int - - // Following loop gathers and filters out special files inside - // minio meta volume. - for { - var fileInfos []FileInfo - // List files up to maxKeys-newMaxKeys, since we are skipping entries for special files. - fileInfos, eof, err = xl.storage.ListFiles(minioMetaBucket, prefixPath, markerPath, recursive, maxKeys-newMaxKeys) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": markerPath, - "recursive": recursive, - "maxKeys": maxKeys, - }).Errorf("%s", err) - return nil, true, err - } - // Loop through and validate individual file. - for _, fi := range fileInfos { - var entries []FileInfo - if fi.Mode.IsDir() { - // List all the entries if fi.Name is a leaf directory, if - // fi.Name is not a leaf directory then the resulting - // entries are empty. - entries, err = xl.listLeafEntries(fi.Name) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": fi.Name, - }).Errorf("%s", err) - return nil, false, err - } - } - // Set markerPath for next batch of listing. - markerPath = fi.Name - if len(entries) > 0 { - // We reach here for non-recursive case and a leaf entry. - for _, entry := range entries { - allFileInfos = append(allFileInfos, entry) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by xl.storage.ListFiles() - // error : nil - return - } - } - } else { - // We reach here for a non-recursive case non-leaf entry - // OR recursive case with fi.Name matching pattern bucket/object/uploadID[.partNum.md5sum] - if !fi.Mode.IsDir() { // Do not skip non-recursive case directory entries. - // Skip files matching pattern bucket/object/uploadID.partNum.md5sum - // and retain files matching pattern bucket/object/uploadID - specialFile := path.Base(fi.Name) - if strings.Contains(specialFile, ".") { - // Contains partnumber and md5sum info, skip this. - continue - } - } - } - allFileInfos = append(allFileInfos, fi) - newMaxKeys++ - // If we have reached the maxKeys, it means we have listed - // everything that was requested. Return right here. - if newMaxKeys == maxKeys { - // Return values: - // allFileInfos : "maxKeys" number of entries. - // eof : eof returned by xl.storage.ListFiles() - // error : nil - return - } - } - // If we have reached eof then we break out. - if eof { - break - } - } - - // Return entries here. - return allFileInfos, eof, nil -} - // ListMultipartUploads - list multipart uploads. func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) { - result := ListMultipartsInfo{} - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListMultipartsInfo{}, (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectPrefix(prefix) { - return ListMultipartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: prefix}) - } - // Verify if delimiter is anything other than '/', which we do not support. - if delimiter != "" && delimiter != slashSeparator { - return ListMultipartsInfo{}, (UnsupportedDelimiter{ - Delimiter: delimiter, - }) - } - // Verify if marker has prefix. - if keyMarker != "" && !strings.HasPrefix(keyMarker, prefix) { - return ListMultipartsInfo{}, (InvalidMarkerPrefixCombination{ - Marker: keyMarker, - Prefix: prefix, - }) - } - if uploadIDMarker != "" { - if strings.HasSuffix(keyMarker, slashSeparator) { - return result, (InvalidUploadIDKeyCombination{ - UploadIDMarker: uploadIDMarker, - KeyMarker: keyMarker, - }) - } - id, err := uuid.Parse(uploadIDMarker) - if err != nil { - return result, err - } - if id.IsZero() { - return result, (MalformedUploadID{ - UploadID: uploadIDMarker, - }) - } - } - - recursive := true - if delimiter == slashSeparator { - recursive = false - } - - result.IsTruncated = true - result.MaxUploads = maxUploads - - // Not using path.Join() as it strips off the trailing '/'. - // Also bucket should always be followed by '/' even if prefix is empty. - prefixPath := pathJoin(bucket, prefix) - keyMarkerPath := "" - if keyMarker != "" { - keyMarkerPath = pathJoin(pathJoin(bucket, keyMarker), uploadIDMarker) - } - // List all the multipart files at prefixPath, starting with marker keyMarkerPath. - fileInfos, eof, err := xl.listMetaBucketFiles(prefixPath, keyMarkerPath, recursive, maxUploads) - if err != nil { - log.WithFields(logrus.Fields{ - "prefixPath": prefixPath, - "markerPath": keyMarkerPath, - "recursive": recursive, - "maxUploads": maxUploads, - }).Errorf("listMetaBucketFiles failed with %s", err) - return ListMultipartsInfo{}, err - } - - // Loop through all the received files fill in the multiparts result. - for _, fi := range fileInfos { - var objectName string - var uploadID string - if fi.Mode.IsDir() { - // All directory entries are common prefixes. - uploadID = "" // Upload ids are empty for CommonPrefixes. - objectName = strings.TrimPrefix(fi.Name, retainSlash(bucket)) - result.CommonPrefixes = append(result.CommonPrefixes, objectName) - } else { - uploadID = path.Base(fi.Name) - objectName = strings.TrimPrefix(path.Dir(fi.Name), retainSlash(bucket)) - result.Uploads = append(result.Uploads, uploadMetadata{ - Object: objectName, - UploadID: uploadID, - Initiated: fi.ModTime, - }) - } - result.NextKeyMarker = objectName - result.NextUploadIDMarker = uploadID - } - result.IsTruncated = !eof - if !result.IsTruncated { - result.NextKeyMarker = "" - result.NextUploadIDMarker = "" - } - return result, nil + return listMultipartUploadsCommon(xl.storage, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // NewMultipartUpload - initialize a new multipart upload, returns a unique id. @@ -295,113 +74,46 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string) (string, error) { // PutObjectPart - writes the multipart upload chunks. func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - newMD5Hex, err := putObjectPartCommon(xl.storage, bucket, object, uploadID, partID, size, data, md5Hex) - if err != nil { - return "", err - } - partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) - partSuffixPath := path.Join(bucket, object, partSuffix) - partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex) - partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5) - err = xl.storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) - if err != nil { - return "", err - } - return newMD5Hex, nil + return putObjectPartCommon(xl.storage, bucket, object, uploadID, partID, size, data, md5Hex) } +// ListObjectParts - list object parts. func (xl xlObjects) ListObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return ListPartsInfo{}, (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return ListPartsInfo{}, (ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if status, err := isUploadIDExists(xl.storage, bucket, object, uploadID); err != nil { - return ListPartsInfo{}, err - } else if !status { - return ListPartsInfo{}, (InvalidUploadID{UploadID: uploadID}) - } - result := ListPartsInfo{} - var markerPath string - nextPartNumberMarker := 0 - uploadIDPath := path.Join(bucket, object, uploadID) - // Figure out the marker for the next subsequent calls, if the - // partNumberMarker is already set. - if partNumberMarker > 0 { - partNumberMarkerPath := uploadIDPath + "." + fmt.Sprintf("%.5d", partNumberMarker) + "." - fileInfos, _, err := xl.storage.ListFiles(minioMetaBucket, partNumberMarkerPath, "", false, 1) - if err != nil { - return result, toObjectErr(err, minioMetaBucket, partNumberMarkerPath) - } - if len(fileInfos) == 0 { - return result, (InvalidPart{}) - } - markerPath = fileInfos[0].Name - } - uploadIDPrefix := uploadIDPath + "." - fileInfos, eof, err := xl.storage.ListFiles(minioMetaBucket, uploadIDPrefix, markerPath, false, maxParts) - if err != nil { - return result, InvalidPart{} - } - for _, fileInfo := range fileInfos { - fileName := path.Base(fileInfo.Name) - splitResult := strings.Split(fileName, ".") - partNum, err := strconv.Atoi(splitResult[1]) - if err != nil { - return result, err - } - md5sum := splitResult[2] - result.Parts = append(result.Parts, partInfo{ - PartNumber: partNum, - LastModified: fileInfo.ModTime, - ETag: md5sum, - Size: fileInfo.Size, - }) - nextPartNumberMarker = partNum - } - result.Bucket = bucket - result.Object = object - result.UploadID = uploadID - result.PartNumberMarker = partNumberMarker - result.NextPartNumberMarker = nextPartNumberMarker - result.MaxParts = maxParts - result.IsTruncated = !eof - return result, nil + return listObjectPartsCommon(xl.storage, bucket, object, uploadID, partNumberMarker, maxParts) } func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) + return "", BucketNameInvalid{Bucket: bucket} } if !IsValidObjectName(object) { - return "", (ObjectNameInvalid{ + return "", ObjectNameInvalid{ Bucket: bucket, Object: object, - }) + } } if status, err := isUploadIDExists(xl.storage, bucket, object, uploadID); err != nil { return "", err } else if !status { - return "", (InvalidUploadID{UploadID: uploadID}) + return "", InvalidUploadID{UploadID: uploadID} } - sort.Sort(completedParts(parts)) var metadata MultipartObjectInfo - for _, part := range parts { - partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) - fi, err := xl.storage.StatFile(minioMetaBucket, path.Join(bucket, object, partSuffix)) - if err != nil { - return "", err - } - metadata = append(metadata, MultipartPartInfo{part.PartNumber, part.ETag, fi.Size}) - } var md5Sums []string for _, part := range parts { // Construct part suffix. partSuffix := fmt.Sprintf("%s.%.5d.%s", uploadID, part.PartNumber, part.ETag) - err := xl.storage.RenameFile(minioMetaBucket, path.Join(bucket, object, partSuffix), bucket, path.Join(object, partNumToPartFileName(part.PartNumber))) + multipartPartFile := path.Join(mpartMetaPrefix, bucket, object, partSuffix) + fi, err := xl.storage.StatFile(minioMetaBucket, multipartPartFile) + if err != nil { + if err == errFileNotFound { + return "", InvalidPart{} + } + return "", err + } + metadata = append(metadata, MultipartPartInfo{part.PartNumber, part.ETag, fi.Size}) + multipartObjSuffix := path.Join(object, partNumToPartFileName(part.PartNumber)) + err = xl.storage.RenameFile(minioMetaBucket, multipartPartFile, bucket, multipartObjSuffix) // We need a way to roll back if of the renames failed. if err != nil { return "", err @@ -409,31 +121,33 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload md5Sums = append(md5Sums, part.ETag) } - if w, err := xl.storage.CreateFile(bucket, pathJoin(object, multipartMetaFile)); err == nil { - var b []byte - b, err = json.Marshal(metadata) - if err != nil { - return "", err - } - _, err = w.Write(b) - if err != nil { - return "", err - } - // Close the writer. - if err = w.Close(); err != nil { - return "", err - } - } else { + tempMultipartMetaFile := path.Join(tmpMetaPrefix, bucket, object, multipartMetaFile) + w, err := xl.storage.CreateFile(minioMetaBucket, tempMultipartMetaFile) + if err != nil { return "", toObjectErr(err, bucket, object) } - if err := xl.storage.DeleteFile(minioMetaBucket, path.Join(bucket, object, uploadID)); err != nil { - return "", toObjectErr(err, bucket, object) + encoder := json.NewEncoder(w) + err = encoder.Encode(&metadata) + if err != nil { + if err = safeCloseAndRemove(w); err != nil { + return "", err + } + return "", err + } + // Close the writer. + if err = w.Close(); err != nil { + return "", err } // Save the s3 md5. s3MD5, err := makeS3MD5(md5Sums...) if err != nil { return "", err } + multipartObjFile := path.Join(object, multipartMetaFile) + err = xl.storage.RenameFile(minioMetaBucket, tempMultipartMetaFile, bucket, multipartObjFile) + if err != nil { + return "", toObjectErr(err, bucket, multipartObjFile) + } // Return md5sum. return s3MD5, nil