From afd59c45dcab6d2e78706e3fccde7897887eb5c7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 2 May 2016 16:57:31 -0700 Subject: [PATCH] xl/fs: Move few functions into common code. (#1453) - PutObject() - PutObjectPart() - NewMultipartUpload() - AbortMultipartUpload() Implementations across both FS and XL object layer share common implementation. --- fs-objects-multipart.go | 165 +++---------------------------- fs-objects.go | 70 +------------ object-common-multipart.go | 194 +++++++++++++++++++++++++++++++++++++ object-common.go | 73 ++++++++++++++ xl-objects-multipart.go | 152 ++--------------------------- xl-objects.go | 70 +------------ 6 files changed, 288 insertions(+), 436 deletions(-) create mode 100644 object-common-multipart.go diff --git a/fs-objects-multipart.go b/fs-objects-multipart.go index ec5c87782..9458928cb 100644 --- a/fs-objects-multipart.go +++ b/fs-objects-multipart.go @@ -17,11 +17,8 @@ package main import ( - "crypto/md5" - "encoding/hex" "fmt" "io" - "io/ioutil" "path" "strconv" "strings" @@ -250,124 +247,19 @@ func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } +// NewMultipartUpload - initialize a new multipart upload, returns a unique id. func (fs fsObjects) NewMultipartUpload(bucket, object string) (string, error) { - // Verify if bucket name is valid. - if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) - } - // Verify if object name is valid. - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(fs.storage, bucket); err != nil { - return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} - } - - if _, err := fs.storage.StatVol(minioMetaBucket); err != nil { - if err == errVolumeNotFound { - err = fs.storage.MakeVol(minioMetaBucket) - if err != nil { - return "", toObjectErr(err) - } - } - } - for { - uuid, err := uuid.New() - if err != nil { - return "", err - } - uploadID := uuid.String() - uploadIDPath := path.Join(bucket, object, uploadID) - if _, err = fs.storage.StatFile(minioMetaBucket, uploadIDPath); err != nil { - if err != errFileNotFound { - return "", (toObjectErr(err, minioMetaBucket, uploadIDPath)) - } - // uploadIDPath doesn't exist, so create empty file to reserve the name - var w io.WriteCloser - if w, err = fs.storage.CreateFile(minioMetaBucket, uploadIDPath); err == nil { - // Close the writer. - if err = w.Close(); err != nil { - return "", err - } - } else { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } - return uploadID, nil - } - // uploadIDPath already exists. - // loop again to try with different uuid generated. - } + return newMultipartUploadCommon(fs.storage, bucket, object) } // PutObjectPart - writes the multipart upload chunks. func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(fs.storage, bucket); err != nil { + newMD5Hex, err := putObjectPartCommon(fs.storage, bucket, object, uploadID, partID, size, data, md5Hex) + if err != nil { return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} } - - if status, err := isUploadIDExists(fs.storage, bucket, object, uploadID); err != nil { - return "", err - } else if !status { - return "", InvalidUploadID{UploadID: uploadID} - } - partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) partSuffixPath := path.Join(bucket, object, partSuffix) - fileWriter, err := fs.storage.CreateFile(minioMetaBucket, partSuffixPath) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Initialize md5 writer. - md5Writer := md5.New() - - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - safeCloseAndRemove(fileWriter) - return "", (toObjectErr(err)) - } - // Reader shouldn't have more data what mentioned in size argument. - // reading one more byte from the reader to validate it. - // expected to fail, success validates existence of more data in the reader. - if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { - safeCloseAndRemove(fileWriter) - return "", (UnExpectedDataSize{Size: int(size)}) - } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - safeCloseAndRemove(fileWriter) - return "", (toObjectErr(err)) - } - } - - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - if md5Hex != "" { - if newMD5Hex != md5Hex { - safeCloseAndRemove(fileWriter) - return "", (BadDigest{md5Hex, newMD5Hex}) - } - } - err = fileWriter.Close() - if err != nil { - return "", err - } partSuffixMD5 := fmt.Sprintf("%s.%d.%s", uploadID, partID, newMD5Hex) partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5) err = fs.storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) @@ -495,54 +387,19 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } // Cleanup all the parts. - fs.removeMultipartUpload(bucket, object, uploadID) + fs.cleanupUploadedParts(bucket, object, uploadID) // Return md5sum. return s3MD5, nil } -func (fs fsObjects) removeMultipartUpload(bucket, object, uploadID string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return (ObjectNameInvalid{Bucket: bucket, Object: object}) - } - - marker := "" - for { - uploadIDPath := path.Join(bucket, object, uploadID) - fileInfos, eof, err := fs.storage.ListFiles(minioMetaBucket, uploadIDPath, marker, false, 1000) - if err != nil { - if err == errFileNotFound { - return (InvalidUploadID{UploadID: uploadID}) - } - return toObjectErr(err) - } - for _, fileInfo := range fileInfos { - fs.storage.DeleteFile(minioMetaBucket, fileInfo.Name) - marker = fileInfo.Name - } - if eof { - break - } - } - return nil +// Wrapper to which removes all the uploaded parts after a successful +// complete multipart upload. +func (fs fsObjects) cleanupUploadedParts(bucket, object, uploadID string) error { + return abortMultipartUploadCommon(fs.storage, bucket, object, uploadID) } +// AbortMultipartUpload - aborts a multipart upload. func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return (ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if status, err := isUploadIDExists(fs.storage, bucket, object, uploadID); err != nil { - return err - } else if !status { - return (InvalidUploadID{UploadID: uploadID}) - } - return fs.removeMultipartUpload(bucket, object, uploadID) + return abortMultipartUploadCommon(fs.storage, bucket, object, uploadID) } diff --git a/fs-objects.go b/fs-objects.go index 99dd463ed..5a7d99b87 100644 --- a/fs-objects.go +++ b/fs-objects.go @@ -17,8 +17,6 @@ package main import ( - "crypto/md5" - "encoding/hex" "io" "path/filepath" "strings" @@ -124,73 +122,9 @@ func (fs fsObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { }, nil } +// PutObject - create an object. func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return "", (ObjectNameInvalid{ - Bucket: bucket, - Object: object, - }) - } - // Check whether the bucket exists. - if isExist, err := isBucketExist(fs.storage, bucket); err != nil { - return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} - } - - fileWriter, err := fs.storage.CreateFile(bucket, object) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Initialize md5 writer. - md5Writer := md5.New() - - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", toObjectErr(err) - } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err - } - } - - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - // md5Hex representation. - var md5Hex string - if len(metadata) != 0 { - md5Hex = metadata["md5Sum"] - } - if md5Hex != "" { - if newMD5Hex != md5Hex { - if err = safeCloseAndRemove(fileWriter); err != nil { - return "", err - } - return "", BadDigest{md5Hex, newMD5Hex} - } - } - err = fileWriter.Close() - if err != nil { - return "", err - } - - // Return md5sum, successfully wrote object. - return newMD5Hex, nil + return putObjectCommon(fs.storage, bucket, object, size, data, metadata) } func (fs fsObjects) DeleteObject(bucket, object string) error { diff --git a/object-common-multipart.go b/object-common-multipart.go new file mode 100644 index 000000000..48a5f7cf4 --- /dev/null +++ b/object-common-multipart.go @@ -0,0 +1,194 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "path" + + "github.com/skyrings/skyring-common/tools/uuid" +) + +/// Common multipart object layer functions. + +// newMultipartUploadCommon - initialize a new multipart, is a common +// function for both object layers. +func newMultipartUploadCommon(storage StorageAPI, bucket string, object string) (uploadID string, err error) { + // Verify if bucket name is valid. + if !IsValidBucketName(bucket) { + return "", (BucketNameInvalid{Bucket: bucket}) + } + // Verify if object name is valid. + if !IsValidObjectName(object) { + return "", ObjectNameInvalid{Bucket: bucket, Object: object} + } + + // Verify whether the bucket exists. + if isExist, err := isBucketExist(storage, bucket); err != nil { + return "", err + } else if !isExist { + return "", BucketNotFound{Bucket: bucket} + } + + if _, err := storage.StatVol(minioMetaBucket); err != nil { + if err == errVolumeNotFound { + err = storage.MakeVol(minioMetaBucket) + if err != nil { + return "", toObjectErr(err) + } + } + } + + // Loops through until successfully generates a new unique upload id. + for { + uuid, err := uuid.New() + if err != nil { + return "", err + } + uploadID := uuid.String() + uploadIDPath := path.Join(bucket, object, uploadID) + if _, err = storage.StatFile(minioMetaBucket, uploadIDPath); err != nil { + if err != errFileNotFound { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + // uploadIDPath doesn't exist, so create empty file to reserve the name + var w io.WriteCloser + if w, err = storage.CreateFile(minioMetaBucket, uploadIDPath); err == nil { + // Close the writer. + if err = w.Close(); err != nil { + return "", err + } + } else { + return "", toObjectErr(err, minioMetaBucket, uploadIDPath) + } + return uploadID, nil + } + // uploadIDPath already exists. + // loop again to try with different uuid generated. + } +} + +// putObjectPartCommon - put object part. +func putObjectPartCommon(storage StorageAPI, bucket string, object string, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return "", BucketNameInvalid{Bucket: bucket} + } + if !IsValidObjectName(object) { + return "", ObjectNameInvalid{Bucket: bucket, Object: object} + } + + // Verify whether the bucket exists. + if isExist, err := isBucketExist(storage, bucket); err != nil { + return "", err + } else if !isExist { + return "", BucketNotFound{Bucket: bucket} + } + + if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { + return "", err + } else if !status { + return "", InvalidUploadID{UploadID: uploadID} + } + + partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) + partSuffixPath := path.Join(bucket, object, partSuffix) + fileWriter, err := storage.CreateFile(minioMetaBucket, partSuffixPath) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Initialize md5 writer. + md5Writer := md5.New() + + // Instantiate a new multi writer. + multiWriter := io.MultiWriter(md5Writer, fileWriter) + + // Instantiate checksum hashers and create a multiwriter. + if size > 0 { + if _, err = io.CopyN(multiWriter, data, size); err != nil { + safeCloseAndRemove(fileWriter) + return "", toObjectErr(err, bucket, object) + } + // Reader shouldn't have more data what mentioned in size argument. + // reading one more byte from the reader to validate it. + // expected to fail, success validates existence of more data in the reader. + if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { + safeCloseAndRemove(fileWriter) + return "", UnExpectedDataSize{Size: int(size)} + } + } else { + if _, err = io.Copy(multiWriter, data); err != nil { + safeCloseAndRemove(fileWriter) + return "", toObjectErr(err, bucket, object) + } + } + + newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) + if md5Hex != "" { + if newMD5Hex != md5Hex { + safeCloseAndRemove(fileWriter) + return "", BadDigest{md5Hex, newMD5Hex} + } + } + err = fileWriter.Close() + if err != nil { + return "", err + } + return newMD5Hex, nil +} + +// abortMultipartUploadCommon - aborts a multipart upload, common +// function used by both object layers. +func abortMultipartUploadCommon(storage StorageAPI, bucket, object, uploadID string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + if !IsValidObjectName(object) { + return ObjectNameInvalid{Bucket: bucket, Object: object} + } + if status, err := isUploadIDExists(storage, bucket, object, uploadID); err != nil { + return err + } else if !status { + return InvalidUploadID{UploadID: uploadID} + } + + markerPath := "" + for { + uploadIDPath := path.Join(bucket, object, uploadID) + fileInfos, eof, err := storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, false, 1000) + if err != nil { + if err == errFileNotFound { + return InvalidUploadID{UploadID: uploadID} + } + return toObjectErr(err) + } + for _, fileInfo := range fileInfos { + storage.DeleteFile(minioMetaBucket, fileInfo.Name) + markerPath = fileInfo.Name + } + if eof { + break + } + } + return nil +} diff --git a/object-common.go b/object-common.go index 357ab5ac4..35d19eac9 100644 --- a/object-common.go +++ b/object-common.go @@ -17,6 +17,9 @@ package main import ( + "crypto/md5" + "encoding/hex" + "io" "path" "sort" ) @@ -98,6 +101,76 @@ func deleteBucket(storage StorageAPI, bucket string) error { return nil } +// putObjectCommon - create an object, is a common function for both object layers. +func putObjectCommon(storage StorageAPI, bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return "", (BucketNameInvalid{Bucket: bucket}) + } + if !IsValidObjectName(object) { + return "", (ObjectNameInvalid{ + Bucket: bucket, + Object: object, + }) + } + // Check whether the bucket exists. + if isExist, err := isBucketExist(storage, bucket); err != nil { + return "", err + } else if !isExist { + return "", BucketNotFound{Bucket: bucket} + } + + fileWriter, err := storage.CreateFile(bucket, object) + if err != nil { + return "", toObjectErr(err, bucket, object) + } + + // Initialize md5 writer. + md5Writer := md5.New() + + // Instantiate a new multi writer. + multiWriter := io.MultiWriter(md5Writer, fileWriter) + + // Instantiate checksum hashers and create a multiwriter. + if size > 0 { + if _, err = io.CopyN(multiWriter, data, size); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } + return "", toObjectErr(err) + } + } else { + if _, err = io.Copy(multiWriter, data); err != nil { + if clErr := safeCloseAndRemove(fileWriter); clErr != nil { + return "", clErr + } + return "", err + } + } + + newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) + // md5Hex representation. + var md5Hex string + if len(metadata) != 0 { + md5Hex = metadata["md5Sum"] + } + if md5Hex != "" { + if newMD5Hex != md5Hex { + if err = safeCloseAndRemove(fileWriter); err != nil { + return "", err + } + return "", BadDigest{md5Hex, newMD5Hex} + } + } + err = fileWriter.Close() + if err != nil { + return "", err + } + + // Return md5sum, successfully wrote object. + return newMD5Hex, nil +} + // isUploadIDExists - verify if a given uploadID exists and is valid. func isUploadIDExists(storage StorageAPI, bucket, object, uploadID string) (bool, error) { uploadIDPath := path.Join(bucket, object, uploadID) diff --git a/xl-objects-multipart.go b/xl-objects-multipart.go index ca707f588..94a90b32c 100644 --- a/xl-objects-multipart.go +++ b/xl-objects-multipart.go @@ -17,12 +17,9 @@ package main import ( - "crypto/md5" - "encoding/hex" "encoding/json" "fmt" "io" - "io/ioutil" "path" "sort" "strconv" @@ -291,125 +288,19 @@ func (xl xlObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark return result, nil } +// NewMultipartUpload - initialize a new multipart upload, returns a unique id. func (xl xlObjects) NewMultipartUpload(bucket, object string) (string, error) { - // Verify if bucket name is valid. - if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) - } - // Verify if object name is valid. - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(xl.storage, bucket); err != nil { - return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} - } - - if _, err := xl.storage.StatVol(minioMetaBucket); err != nil { - if err == errVolumeNotFound { - err = xl.storage.MakeVol(minioMetaBucket) - if err != nil { - return "", toObjectErr(err) - } - } - } - - for { - uuid, err := uuid.New() - if err != nil { - return "", err - } - uploadID := uuid.String() - uploadIDPath := path.Join(bucket, object, uploadID) - if _, err = xl.storage.StatFile(minioMetaBucket, uploadIDPath); err != nil { - if err != errFileNotFound { - return "", (toObjectErr(err, minioMetaBucket, uploadIDPath)) - } - // uploadIDPath doesn't exist, so create empty file to reserve the name - var w io.WriteCloser - if w, err = xl.storage.CreateFile(minioMetaBucket, uploadIDPath); err == nil { - // Close the writer. - if err = w.Close(); err != nil { - return "", err - } - } else { - return "", toObjectErr(err, minioMetaBucket, uploadIDPath) - } - return uploadID, nil - } - // uploadIDPath already exists. - // loop again to try with different uuid generated. - } + return newMultipartUploadCommon(xl.storage, bucket, object) } // PutObjectPart - writes the multipart upload chunks. func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", BucketNameInvalid{Bucket: bucket} - } - if !IsValidObjectName(object) { - return "", ObjectNameInvalid{Bucket: bucket, Object: object} - } - // Verify whether the bucket exists. - if isExist, err := isBucketExist(xl.storage, bucket); err != nil { + newMD5Hex, err := putObjectPartCommon(xl.storage, bucket, object, uploadID, partID, size, data, md5Hex) + if err != nil { return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} } - - if status, err := isUploadIDExists(xl.storage, bucket, object, uploadID); err != nil { - return "", err - } else if !status { - return "", InvalidUploadID{UploadID: uploadID} - } - partSuffix := fmt.Sprintf("%s.%d", uploadID, partID) partSuffixPath := path.Join(bucket, object, partSuffix) - fileWriter, err := xl.storage.CreateFile(minioMetaBucket, partSuffixPath) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Initialize md5 writer. - md5Writer := md5.New() - - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - safeCloseAndRemove(fileWriter) - return "", (toObjectErr(err)) - } - // Reader shouldn't have more data what mentioned in size argument. - // reading one more byte from the reader to validate it. - // expected to fail, success validates existence of more data in the reader. - if _, err = io.CopyN(ioutil.Discard, data, 1); err == nil { - safeCloseAndRemove(fileWriter) - return "", (UnExpectedDataSize{Size: int(size)}) - } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - safeCloseAndRemove(fileWriter) - return "", (toObjectErr(err)) - } - } - - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - if md5Hex != "" { - if newMD5Hex != md5Hex { - safeCloseAndRemove(fileWriter) - return "", (BadDigest{md5Hex, newMD5Hex}) - } - } - err = fileWriter.Close() - if err != nil { - return "", err - } partSuffixMD5 := fmt.Sprintf("%s.%.5d.%s", uploadID, partID, newMD5Hex) partSuffixMD5Path := path.Join(bucket, object, partSuffixMD5) err = xl.storage.RenameFile(minioMetaBucket, partSuffixPath, minioMetaBucket, partSuffixMD5Path) @@ -548,38 +439,7 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload return s3MD5, nil } -// AbortMultipartUpload - abort multipart upload. +// AbortMultipartUpload - aborts a multipart upload. func (xl xlObjects) AbortMultipartUpload(bucket, object, uploadID string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return (ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if status, err := isUploadIDExists(xl.storage, bucket, object, uploadID); err != nil { - return err - } else if !status { - return (InvalidUploadID{UploadID: uploadID}) - } - - markerPath := "" - for { - uploadIDPath := path.Join(bucket, object, uploadID) - fileInfos, eof, err := xl.storage.ListFiles(minioMetaBucket, uploadIDPath, markerPath, false, 1000) - if err != nil { - if err == errFileNotFound { - return (InvalidUploadID{UploadID: uploadID}) - } - return toObjectErr(err) - } - for _, fileInfo := range fileInfos { - xl.storage.DeleteFile(minioMetaBucket, fileInfo.Name) - markerPath = fileInfo.Name - } - if eof { - break - } - } - return nil + return abortMultipartUploadCommon(xl.storage, bucket, object, uploadID) } diff --git a/xl-objects.go b/xl-objects.go index b830bda15..3ba3e4737 100644 --- a/xl-objects.go +++ b/xl-objects.go @@ -17,8 +17,6 @@ package main import ( - "crypto/md5" - "encoding/hex" "encoding/json" "io" "path" @@ -167,73 +165,9 @@ func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { }, nil } +// PutObject - create an object. func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return "", (BucketNameInvalid{Bucket: bucket}) - } - if !IsValidObjectName(object) { - return "", (ObjectNameInvalid{ - Bucket: bucket, - Object: object, - }) - } - // Check whether the bucket exists. - if isExist, err := isBucketExist(xl.storage, bucket); err != nil { - return "", err - } else if !isExist { - return "", BucketNotFound{Bucket: bucket} - } - - fileWriter, err := xl.storage.CreateFile(bucket, object) - if err != nil { - return "", toObjectErr(err, bucket, object) - } - - // Initialize md5 writer. - md5Writer := md5.New() - - // Instantiate a new multi writer. - multiWriter := io.MultiWriter(md5Writer, fileWriter) - - // Instantiate checksum hashers and create a multiwriter. - if size > 0 { - if _, err = io.CopyN(multiWriter, data, size); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", toObjectErr(err) - } - } else { - if _, err = io.Copy(multiWriter, data); err != nil { - if clErr := safeCloseAndRemove(fileWriter); clErr != nil { - return "", clErr - } - return "", err - } - } - - newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) - // md5Hex representation. - var md5Hex string - if len(metadata) != 0 { - md5Hex = metadata["md5Sum"] - } - if md5Hex != "" { - if newMD5Hex != md5Hex { - if err = safeCloseAndRemove(fileWriter); err != nil { - return "", err - } - return "", BadDigest{md5Hex, newMD5Hex} - } - } - err = fileWriter.Close() - if err != nil { - return "", err - } - - // Return md5sum, successfully wrote object. - return newMD5Hex, nil + return putObjectCommon(xl.storage, bucket, object, size, data, metadata) } func (xl xlObjects) DeleteObject(bucket, object string) error {