diff --git a/api-headers.go b/api-headers.go index ba9ecaecd..8099d32de 100644 --- a/api-headers.go +++ b/api-headers.go @@ -79,8 +79,8 @@ func setObjectHeaders(w http.ResponseWriter, metadata fs.ObjectMetadata, content lastModified := metadata.Created.Format(http.TimeFormat) // object related headers w.Header().Set("Content-Type", metadata.ContentType) - if metadata.Md5 != "" { - w.Header().Set("ETag", "\""+metadata.Md5+"\"") + if metadata.MD5 != "" { + w.Header().Set("ETag", "\""+metadata.MD5+"\"") } w.Header().Set("Last-Modified", lastModified) diff --git a/api-response.go b/api-response.go index e294e7ffe..e033668da 100644 --- a/api-response.go +++ b/api-response.go @@ -108,8 +108,8 @@ func generateListObjectsResponse(bucket, prefix, marker, delimiter string, maxKe } content.Key = object.Object content.LastModified = object.Created.Format(rfcFormat) - if object.Md5 != "" { - content.ETag = "\"" + object.Md5 + "\"" + if object.MD5 != "" { + content.ETag = "\"" + object.MD5 + "\"" } content.Size = object.Size content.StorageClass = "STANDARD" diff --git a/bucket-handlers.go b/bucket-handlers.go index c9de6505b..546cfefc6 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -353,8 +353,8 @@ func (api CloudStorageAPI) PostPolicyBucketHandler(w http.ResponseWriter, req *h } return } - if metadata.Md5 != "" { - w.Header().Set("ETag", "\""+metadata.Md5+"\"") + if metadata.MD5 != "" { + w.Header().Set("ETag", "\""+metadata.MD5+"\"") } writeSuccessResponse(w, nil) } diff --git a/object-handlers.go b/object-handlers.go index 68217810f..2195ca76d 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -198,8 +198,8 @@ func (api CloudStorageAPI) PutObjectHandler(w http.ResponseWriter, req *http.Req } return } - if metadata.Md5 != "" { - w.Header().Set("ETag", "\""+metadata.Md5+"\"") + if metadata.MD5 != "" { + w.Header().Set("ETag", "\""+metadata.MD5+"\"") } writeSuccessResponse(w, nil) } @@ -501,7 +501,7 @@ func (api CloudStorageAPI) CompleteMultipartUploadHandler(w http.ResponseWriter, } return } - response := generateCompleteMultpartUploadResponse(bucket, object, req.URL.String(), metadata.Md5) + response := generateCompleteMultpartUploadResponse(bucket, object, req.URL.String(), metadata.MD5) encodedSuccessResponse := encodeSuccessResponse(response) // write headers setCommonHeaders(w) diff --git a/pkg/fs/api_suite_nix_test.go b/pkg/fs/api_suite_nix_test.go index 61e1ef83d..40f60f7d5 100644 --- a/pkg/fs/api_suite_nix_test.go +++ b/pkg/fs/api_suite_nix_test.go @@ -92,7 +92,7 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { c.Assert(e, check.IsNil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex) + c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) } func testMultipartObjectAbort(c *check.C, create func() Filesystem) { @@ -147,7 +147,7 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) { objects[key] = []byte(randomString) objectMetadata, err := fs.CreateObject("bucket", key, expectedmd5Sum, int64(len(randomString)), bytes.NewBufferString(randomString), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, expectedmd5Sumhex) + c.Assert(objectMetadata.MD5, check.Equals, expectedmd5Sumhex) } for key, value := range objects { @@ -276,7 +276,7 @@ func testObjectOverwriteWorks(c *check.C, create func() Filesystem) { md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil)) objectMetadata, err := fs.CreateObject("bucket", "object", md5Sum1, int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.IsNil) - c.Assert(md5Sum1hex, check.Equals, objectMetadata.Md5) + c.Assert(md5Sum1hex, check.Equals, objectMetadata.MD5) hasher2 := md5.New() hasher2.Write([]byte("three")) @@ -326,7 +326,7 @@ func testPutObjectInSubdir(c *check.C, create func() Filesystem) { md5Sum1hex := hex.EncodeToString(hasher.Sum(nil)) objectMetadata, err := fs.CreateObject("bucket", "dir1/dir2/object", md5Sum1, int64(len("hello world")), bytes.NewBufferString("hello world"), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, md5Sum1hex) + c.Assert(objectMetadata.MD5, check.Equals, md5Sum1hex) var bytesBuffer bytes.Buffer length, err := fs.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object", 0, 0) @@ -458,7 +458,7 @@ func testDefaultContentType(c *check.C, create func() Filesystem) { c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") } -func testContentMd5Set(c *check.C, create func() Filesystem) { +func testContentMD5Set(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) diff --git a/pkg/fs/api_suite_windows_test.go b/pkg/fs/api_suite_windows_test.go index 45ececdef..574ce69dc 100644 --- a/pkg/fs/api_suite_windows_test.go +++ b/pkg/fs/api_suite_windows_test.go @@ -91,7 +91,7 @@ func testMultipartObjectCreation(c *check.C, create func() Filesystem) { c.Assert(e, check.IsNil) objectMetadata, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, bytes.NewReader(completedPartsBytes), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, finalExpectedmd5SumHex) + c.Assert(objectMetadata.MD5, check.Equals, finalExpectedmd5SumHex) } func testMultipartObjectAbort(c *check.C, create func() Filesystem) { @@ -146,7 +146,7 @@ func testMultipleObjectCreation(c *check.C, create func() Filesystem) { objects[key] = []byte(randomString) objectMetadata, err := fs.CreateObject("bucket", key, expectedmd5Sum, int64(len(randomString)), bytes.NewBufferString(randomString), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, expectedmd5Sumhex) + c.Assert(objectMetadata.MD5, check.Equals, expectedmd5Sumhex) } for key, value := range objects { @@ -273,7 +273,7 @@ func testObjectOverwriteWorks(c *check.C, create func() Filesystem) { md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil)) objectMetadata, err := fs.CreateObject("bucket", "object", md5Sum1, int64(len("one")), bytes.NewBufferString("one"), nil) c.Assert(err, check.IsNil) - c.Assert(md5Sum1hex, check.Equals, objectMetadata.Md5) + c.Assert(md5Sum1hex, check.Equals, objectMetadata.MD5) hasher2 := md5.New() hasher2.Write([]byte("three")) @@ -323,7 +323,7 @@ func testPutObjectInSubdir(c *check.C, create func() Filesystem) { md5Sum1hex := hex.EncodeToString(hasher.Sum(nil)) objectMetadata, err := fs.CreateObject("bucket", "dir1/dir2/object", md5Sum1, int64(len("hello world")), bytes.NewBufferString("hello world"), nil) c.Assert(err, check.IsNil) - c.Assert(objectMetadata.Md5, check.Equals, md5Sum1hex) + c.Assert(objectMetadata.MD5, check.Equals, md5Sum1hex) var bytesBuffer bytes.Buffer length, err := fs.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object", 0, 0) @@ -459,7 +459,7 @@ func testDefaultContentType(c *check.C, create func() Filesystem) { c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") } -func testContentMd5Set(c *check.C, create func() Filesystem) { +func testContentMD5Set(c *check.C, create func() Filesystem) { fs := create() err := fs.MakeBucket("bucket", "") c.Assert(err, check.IsNil) diff --git a/pkg/fs/config.go b/pkg/fs/config.go index 339cfe156..ea8aa3baf 100644 --- a/pkg/fs/config.go +++ b/pkg/fs/config.go @@ -17,77 +17,42 @@ package fs import ( - "path/filepath" - "github.com/minio/minio-xl/pkg/probe" "github.com/minio/minio-xl/pkg/quick" - "github.com/minio/minio/pkg/user" ) -func getFSBucketsConfigPath() (string, *probe.Error) { - if customBucketsConfigPath != "" { - return customBucketsConfigPath, nil - } - homeDir, e := user.HomeDir() - if e != nil { - return "", probe.NewError(e) - } - fsBucketsConfigPath := filepath.Join(homeDir, ".minio", "$buckets.json") - return fsBucketsConfigPath, nil +var multipartsMetadataPath, bucketsMetadataPath string + +// setFSBucketsMetadataPath - set fs buckets metadata path. +func setFSBucketsMetadataPath(metadataPath string) { + bucketsMetadataPath = metadataPath } -func getFSMultipartsSessionConfigPath() (string, *probe.Error) { - if customMultipartsConfigPath != "" { - return customMultipartsConfigPath, nil - } - homeDir, e := user.HomeDir() - if e != nil { - return "", probe.NewError(e) - } - fsMultipartsConfigPath := filepath.Join(homeDir, ".minio", "$multiparts-session.json") - return fsMultipartsConfigPath, nil -} - -// internal variable only accessed via get/set methods -var customMultipartsConfigPath, customBucketsConfigPath string - -// setFSBucketsConfigPath - set custom fs buckets config path -func setFSBucketsConfigPath(configPath string) { - customBucketsConfigPath = configPath -} - -// SetFSMultipartsConfigPath - set custom multiparts session config path -func setFSMultipartsConfigPath(configPath string) { - customMultipartsConfigPath = configPath +// SetFSMultipartsMetadataPath - set custom multiparts session +// metadata path. +func setFSMultipartsMetadataPath(metadataPath string) { + multipartsMetadataPath = metadataPath } // saveMultipartsSession - save multiparts -func saveMultipartsSession(multiparts *Multiparts) *probe.Error { - fsMultipartsConfigPath, err := getFSMultipartsSessionConfigPath() - if err != nil { - return err.Trace() - } +func saveMultipartsSession(multiparts Multiparts) *probe.Error { qc, err := quick.New(multiparts) if err != nil { return err.Trace() } - if err := qc.Save(fsMultipartsConfigPath); err != nil { + if err := qc.Save(multipartsMetadataPath); err != nil { return err.Trace() } return nil } // saveBucketsMetadata - save metadata of all buckets -func saveBucketsMetadata(buckets *Buckets) *probe.Error { - fsBucketsConfigPath, err := getFSBucketsConfigPath() - if err != nil { - return err.Trace() - } +func saveBucketsMetadata(buckets Buckets) *probe.Error { qc, err := quick.New(buckets) if err != nil { return err.Trace() } - if err := qc.Save(fsBucketsConfigPath); err != nil { + if err := qc.Save(bucketsMetadataPath); err != nil { return err.Trace() } return nil @@ -95,10 +60,6 @@ func saveBucketsMetadata(buckets *Buckets) *probe.Error { // loadMultipartsSession load multipart session file func loadMultipartsSession() (*Multiparts, *probe.Error) { - fsMultipartsConfigPath, err := getFSMultipartsSessionConfigPath() - if err != nil { - return nil, err.Trace() - } multiparts := &Multiparts{} multiparts.Version = "1" multiparts.ActiveSession = make(map[string]*MultipartSession) @@ -106,7 +67,7 @@ func loadMultipartsSession() (*Multiparts, *probe.Error) { if err != nil { return nil, err.Trace() } - if err := qc.Load(fsMultipartsConfigPath); err != nil { + if err := qc.Load(multipartsMetadataPath); err != nil { return nil, err.Trace() } return qc.Data().(*Multiparts), nil @@ -114,10 +75,6 @@ func loadMultipartsSession() (*Multiparts, *probe.Error) { // loadBucketsMetadata load buckets metadata file func loadBucketsMetadata() (*Buckets, *probe.Error) { - fsBucketsConfigPath, err := getFSBucketsConfigPath() - if err != nil { - return nil, err.Trace() - } buckets := &Buckets{} buckets.Version = "1" buckets.Metadata = make(map[string]*BucketMetadata) @@ -125,7 +82,7 @@ func loadBucketsMetadata() (*Buckets, *probe.Error) { if err != nil { return nil, err.Trace() } - if err := qc.Load(fsBucketsConfigPath); err != nil { + if err := qc.Load(bucketsMetadataPath); err != nil { return nil, err.Trace() } return qc.Data().(*Buckets), nil diff --git a/pkg/fs/definitions.go b/pkg/fs/definitions.go index 96c31a4ef..a6d8d036b 100644 --- a/pkg/fs/definitions.go +++ b/pkg/fs/definitions.go @@ -68,7 +68,7 @@ type ObjectMetadata struct { ContentType string Created time.Time Mode os.FileMode - Md5 string + MD5 string Size int64 } diff --git a/pkg/fs/errors.go b/pkg/fs/errors.go index e6f27ad88..df81424d1 100644 --- a/pkg/fs/errors.go +++ b/pkg/fs/errors.go @@ -157,7 +157,7 @@ func (e InvalidDisksArgument) Error() string { // BadDigest bad md5sum type BadDigest struct { - Md5 string + MD5 string Bucket string Object string } @@ -222,11 +222,11 @@ type ImplementationError struct { Err error } -// DigestError - Generic Md5 error +// DigestError - Generic MD5 error type DigestError struct { Bucket string Key string - Md5 string + MD5 string } /// ACL related errors @@ -322,7 +322,7 @@ func (e BackendCorrupted) Error() string { // Return string an error formatted as the given text func (e InvalidDigest) Error() string { - return "Md5 provided " + e.Md5 + " is invalid" + return "MD5 provided " + e.MD5 + " is invalid" } // OperationNotPermitted - operation not permitted diff --git a/pkg/fs/fs-bucket-listobjects.go b/pkg/fs/fs-bucket-listobjects.go index 8269d0fb8..cc9121f57 100644 --- a/pkg/fs/fs-bucket-listobjects.go +++ b/pkg/fs/fs-bucket-listobjects.go @@ -78,7 +78,13 @@ func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKe walkPath = prefixPath } } - ioutils.FTW(walkPath, func(path string, info os.FileInfo, err error) error { + ioutils.FTW(walkPath, func(path string, info os.FileInfo, e error) error { + if e != nil { + return e + } + if strings.HasSuffix(path, "$multiparts") { + return nil + } // We don't need to list the walk path. if path == walkPath { return nil @@ -271,9 +277,6 @@ func (fs *Filesystem) listObjectsService() *probe.Error { // ListObjects - lists all objects for a given prefix, returns upto // maxKeys number of objects per call. func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - // Input validation. if !IsValidBucketName(bucket) { return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) diff --git a/pkg/fs/fs-bucket.go b/pkg/fs/fs-bucket.go index 1352026d3..a39aac11a 100644 --- a/pkg/fs/fs-bucket.go +++ b/pkg/fs/fs-bucket.go @@ -32,8 +32,6 @@ import ( // DeleteBucket - delete bucket func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() // verify bucket path legal if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -59,8 +57,10 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { } return probe.NewError(e) } + fs.rwLock.Lock() delete(fs.buckets.Metadata, bucket) - if err := saveBucketsMetadata(fs.buckets); err != nil { + fs.rwLock.Unlock() + if err := saveBucketsMetadata(*fs.buckets); err != nil { return err.Trace(bucket) } return nil @@ -68,9 +68,6 @@ func (fs Filesystem) DeleteBucket(bucket string) *probe.Error { // ListBuckets - Get service. func (fs Filesystem) ListBuckets() ([]BucketMetadata, *probe.Error) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - files, err := ioutils.ReadDirN(fs.path, fs.maxBuckets) if err != nil && err != io.EOF { return []BucketMetadata{}, probe.NewError(err) @@ -118,9 +115,6 @@ func removeDuplicateBuckets(elements []BucketMetadata) (result []BucketMetadata) // MakeBucket - PUT Bucket. func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - di, err := disk.GetInfo(fs.path) if err != nil { return probe.NewError(err) @@ -171,8 +165,10 @@ func (fs Filesystem) MakeBucket(bucket, acl string) *probe.Error { bucketMetadata.Name = fi.Name() bucketMetadata.Created = fi.ModTime() bucketMetadata.ACL = BucketACL(acl) + fs.rwLock.Lock() fs.buckets.Metadata[bucket] = bucketMetadata - if err := saveBucketsMetadata(fs.buckets); err != nil { + fs.rwLock.Unlock() + if err := saveBucketsMetadata(*fs.buckets); err != nil { return err.Trace(bucket) } return nil @@ -198,8 +194,6 @@ func (fs Filesystem) denormalizeBucket(bucket string) string { // GetBucketMetadata - get bucket metadata. func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Error) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() if !IsValidBucketName(bucket) { return BucketMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) } @@ -215,7 +209,9 @@ func (fs Filesystem) GetBucketMetadata(bucket string) (BucketMetadata, *probe.Er } return BucketMetadata{}, probe.NewError(e) } + fs.rwLock.RLock() bucketMetadata, ok := fs.buckets.Metadata[bucket] + fs.rwLock.RUnlock() if !ok { bucketMetadata = &BucketMetadata{} bucketMetadata.Name = fi.Name() @@ -258,8 +254,10 @@ func (fs Filesystem) SetBucketMetadata(bucket string, metadata map[string]string bucketMetadata.Created = fi.ModTime() } bucketMetadata.ACL = BucketACL(acl) + fs.rwLock.Lock() fs.buckets.Metadata[bucket] = bucketMetadata - if err := saveBucketsMetadata(fs.buckets); err != nil { + fs.rwLock.Unlock() + if err := saveBucketsMetadata(*fs.buckets); err != nil { return err.Trace(bucket) } return nil diff --git a/pkg/fs/fs-common.go b/pkg/fs/fs-common.go index 172cf23f0..71d2d53a1 100644 --- a/pkg/fs/fs-common.go +++ b/pkg/fs/fs-common.go @@ -27,7 +27,7 @@ import ( // Metadata - carries metadata about object type Metadata struct { - Md5sum []byte + MD5sum []byte ContentType string } diff --git a/pkg/fs/fs-multipart.go b/pkg/fs/fs-multipart.go index 997c7d01f..12446dfc2 100644 --- a/pkg/fs/fs-multipart.go +++ b/pkg/fs/fs-multipart.go @@ -44,6 +44,8 @@ import ( // isValidUploadID - is upload id. func (fs Filesystem) isValidUploadID(object, uploadID string) bool { + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() s, ok := fs.multiparts.ActiveSession[object] if !ok { return false @@ -56,9 +58,6 @@ func (fs Filesystem) isValidUploadID(object, uploadID string) bool { // ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, *probe.Error) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - // Input validation. if !IsValidBucketName(bucket) { return BucketMultipartResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -73,6 +72,8 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa return BucketMultipartResourcesMetadata{}, probe.NewError(e) } var uploads []*UploadMetadata + fs.rwLock.RLock() + defer fs.rwLock.RUnlock() for object, session := range fs.multiparts.ActiveSession { if strings.HasPrefix(object, resources.Prefix) { if len(uploads) > resources.MaxUploads { @@ -117,34 +118,81 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa return resources, nil } -// concatenate parts. -func (fs Filesystem) concatParts(parts *CompleteMultipartUpload, objectPath string, mw io.Writer) *probe.Error { - for _, part := range parts.Part { - partFile, e := os.OpenFile(objectPath+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) - defer partFile.Close() +// verify if parts sent over the network do really match with what we +// have for the session. +func doPartsMatch(parts []CompletePart, savedParts []*PartMetadata) bool { + if parts == nil || savedParts == nil { + return false + } + // Range of incoming parts and compare them with saved parts. + for i, part := range parts { + if strings.Trim(part.ETag, "\"") != savedParts[i].ETag { + return false + } + } + return true +} + +type multiCloser struct { + Closers []io.Closer +} + +func (m multiCloser) Close() error { + for _, c := range m.Closers { + if e := c.Close(); e != nil { + return e + } + } + return nil +} + +// MultiCloser - returns a Closer that's the logical +// concatenation of the provided input closers. They're closed +// sequentially. If any of the closers return a non-nil error, Close +// will return that error. +func MultiCloser(closers ...io.Closer) io.Closer { + return multiCloser{closers} +} + +// removeParts - remove all parts. +func removeParts(partPathPrefix string, parts []*PartMetadata) *probe.Error { + for _, part := range parts { + if e := os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil { + return probe.NewError(e) + } + } + if e := os.Remove(partPathPrefix + "$multiparts"); e != nil { + return probe.NewError(e) + } + return nil +} + +// saveParts - concantenate and save all parts. +func saveParts(partPathPrefix string, mw io.Writer, parts []CompletePart) *probe.Error { + var partReaders []io.Reader + var partClosers []io.Closer + for _, part := range parts { + partFile, e := os.OpenFile(partPathPrefix+fmt.Sprintf("$%d-$multiparts", part.PartNumber), os.O_RDONLY, 0600) if e != nil { return probe.NewError(e) } + partReaders = append(partReaders, partFile) + partClosers = append(partClosers, partFile) + } + // Concatenate a list of closers and close upon return. + closer := MultiCloser(partClosers...) + defer closer.Close() - recvMD5 := part.ETag - // Complete multipart request header md5sum per part is hex - // encoded trim it and decode if possible. - if _, e = hex.DecodeString(strings.Trim(recvMD5, "\"")); e != nil { - return probe.NewError(InvalidDigest{Md5: recvMD5}) - } - - if _, e = io.Copy(mw, partFile); e != nil { - return probe.NewError(e) - } + reader := io.MultiReader(partReaders...) + readBuffer := make([]byte, 4*1024*1024) + if _, e := io.CopyBuffer(mw, reader, readBuffer); e != nil { + return probe.NewError(e) } return nil } // NewMultipartUpload - initiate a new multipart session func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - di, e := disk.GetInfo(fs.path) if e != nil { return "", probe.NewError(e) @@ -192,12 +240,6 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] - multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_WRONLY|os.O_CREATE, 0600) - if e != nil { - return "", probe.NewError(e) - } - defer multiPartfile.Close() - // Initialize multipart session. mpartSession := &MultipartSession{} mpartSession.TotalParts = 0 @@ -205,14 +247,23 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E mpartSession.Initiated = time.Now().UTC() var parts []*PartMetadata mpartSession.Parts = parts - fs.multiparts.ActiveSession[object] = mpartSession - encoder := json.NewEncoder(multiPartfile) - if e = encoder.Encode(mpartSession); e != nil { + fs.rwLock.Lock() + fs.multiparts.ActiveSession[object] = mpartSession + fs.rwLock.Unlock() + + mpartSessionBytes, e := json.Marshal(mpartSession) + if e != nil { return "", probe.NewError(e) } - if err := saveMultipartsSession(fs.multiparts); err != nil { - return "", err.Trace() + + partPathPrefix := objectPath + uploadID + if e = ioutil.WriteFile(partPathPrefix+"$multiparts", mpartSessionBytes, 0600); e != nil { + return "", probe.NewError(e) + } + + if err := saveMultipartsSession(*fs.multiparts); err != nil { + return "", err.Trace(partPathPrefix) } return uploadID, nil } @@ -226,9 +277,6 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb // CreateObjectPart - create a part in a multipart session func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum string, partID int, size int64, data io.Reader, signature *Signature) (string, *probe.Error) { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - di, err := disk.GetInfo(fs.path) if err != nil { return "", probe.NewError(err) @@ -266,7 +314,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s expectedMD5SumBytes, err = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if err != nil { // Pro-actively close the connection - return "", probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) + return "", probe.NewError(InvalidDigest{MD5: expectedMD5Sum}) } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } @@ -282,29 +330,32 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s } objectPath := filepath.Join(bucketPath, object) - partPath := objectPath + fmt.Sprintf("$%d-$multiparts", partID) + partPathPrefix := objectPath + uploadID + partPath := partPathPrefix + fmt.Sprintf("$%d-$multiparts", partID) partFile, e := atomic.FileCreateWithPrefix(partPath, "$multiparts") if e != nil { return "", probe.NewError(e) } - h := md5.New() - sh := sha256.New() - mw := io.MultiWriter(partFile, h, sh) - if _, e = io.CopyN(mw, data, size); e != nil { + + md5Hasher := md5.New() + sha256Hasher := sha256.New() + partWriter := io.MultiWriter(partFile, md5Hasher, sha256Hasher) + if _, e = io.CopyN(partWriter, data, size); e != nil { partFile.CloseAndPurge() return "", probe.NewError(e) } - md5sum := hex.EncodeToString(h.Sum(nil)) + + md5sum := hex.EncodeToString(md5Hasher.Sum(nil)) // Verify if the written object is equal to what is expected, only // if it is requested as such. if strings.TrimSpace(expectedMD5Sum) != "" { if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5sum) { partFile.CloseAndPurge() - return "", probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object}) + return "", probe.NewError(BadDigest{MD5: expectedMD5Sum, Bucket: bucket, Object: object}) } } if signature != nil { - ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sh.Sum(nil))) + ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256Hasher.Sum(nil))) if err != nil { partFile.CloseAndPurge() return "", err.Trace() @@ -326,24 +377,30 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s partMetadata.Size = fi.Size() partMetadata.LastModified = fi.ModTime() - multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_RDWR|os.O_APPEND, 0600) + multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") if e != nil { return "", probe.NewError(e) } - defer multiPartfile.Close() var deserializedMultipartSession MultipartSession - decoder := json.NewDecoder(multiPartfile) - if e = decoder.Decode(&deserializedMultipartSession); e != nil { + if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil { return "", probe.NewError(e) } deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata) deserializedMultipartSession.TotalParts++ - fs.multiparts.ActiveSession[object] = &deserializedMultipartSession + fs.rwLock.Lock() + fs.multiparts.ActiveSession[object] = &deserializedMultipartSession + fs.rwLock.Unlock() + + // Sort by part number before saving. sort.Sort(partNumber(deserializedMultipartSession.Parts)) - encoder := json.NewEncoder(multiPartfile) - if e = encoder.Encode(&deserializedMultipartSession); e != nil { + + multipartSessionBytes, e = json.Marshal(deserializedMultipartSession) + if e != nil { + return "", probe.NewError(e) + } + if e = ioutil.WriteFile(partPathPrefix+"$multiparts", multipartSessionBytes, 0600); e != nil { return "", probe.NewError(e) } return partMetadata.ETag, nil @@ -351,9 +408,6 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s // CompleteMultipartUpload - complete a multipart upload and persist the data func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - // Check bucket name is valid. if !IsValidBucketName(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -384,8 +438,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da if e != nil { return ObjectMetadata{}, probe.NewError(e) } - h := md5.New() - mw := io.MultiWriter(file, h) + md5Hasher := md5.New() + objectWriter := io.MultiWriter(file, md5Hasher) partBytes, e := ioutil.ReadAll(data) if e != nil { @@ -405,35 +459,45 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{}) } } - parts := &CompleteMultipartUpload{} - if e := xml.Unmarshal(partBytes, parts); e != nil { + completeMultipartUpload := &CompleteMultipartUpload{} + if e := xml.Unmarshal(partBytes, completeMultipartUpload); e != nil { file.CloseAndPurge() return ObjectMetadata{}, probe.NewError(MalformedXML{}) } - if !sort.IsSorted(completedParts(parts.Part)) { + if !sort.IsSorted(completedParts(completeMultipartUpload.Part)) { file.CloseAndPurge() return ObjectMetadata{}, probe.NewError(InvalidPartOrder{}) } - if err := fs.concatParts(parts, objectPath, mw); err != nil { + // Save parts for verification. + parts := completeMultipartUpload.Part + + fs.rwLock.RLock() + savedParts := fs.multiparts.ActiveSession[object].Parts + fs.rwLock.RUnlock() + + if !doPartsMatch(parts, savedParts) { file.CloseAndPurge() - return ObjectMetadata{}, err.Trace() + return ObjectMetadata{}, probe.NewError(InvalidPart{}) } + partPathPrefix := objectPath + uploadID + if err := saveParts(partPathPrefix, objectWriter, parts); err != nil { + file.CloseAndPurge() + return ObjectMetadata{}, err.Trace(partPathPrefix) + } + if err := removeParts(partPathPrefix, savedParts); err != nil { + file.CloseAndPurge() + return ObjectMetadata{}, err.Trace(partPathPrefix) + } + + fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, object) - for _, part := range parts.Part { - if e = os.Remove(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil { - file.CloseAndPurge() - return ObjectMetadata{}, probe.NewError(e) - } - } - if e := os.Remove(objectPath + "$multiparts"); e != nil { + fs.rwLock.Unlock() + + if err := saveMultipartsSession(*fs.multiparts); err != nil { file.CloseAndPurge() - return ObjectMetadata{}, probe.NewError(e) - } - if e := saveMultipartsSession(fs.multiparts); e != nil { - file.CloseAndPurge() - return ObjectMetadata{}, e.Trace() + return ObjectMetadata{}, err.Trace(partPathPrefix) } file.Close() @@ -451,16 +515,13 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da Created: st.ModTime(), Size: st.Size(), ContentType: contentType, - Md5: hex.EncodeToString(h.Sum(nil)), + MD5: hex.EncodeToString(md5Hasher.Sum(nil)), } return newObject, nil } // ListObjectParts - list parts from incomplete multipart session for a given ObjectResourcesMetadata func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, *probe.Error) { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - // Check bucket name is valid. if !IsValidBucketName(bucket) { return ObjectResourcesMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -498,17 +559,16 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso } objectPath := filepath.Join(bucketPath, object) - multiPartfile, e := os.OpenFile(objectPath+"$multiparts", os.O_RDONLY, 0600) + partPathPrefix := objectPath + resources.UploadID + multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts") if e != nil { return ObjectResourcesMetadata{}, probe.NewError(e) } - defer multiPartfile.Close() - var deserializedMultipartSession MultipartSession - decoder := json.NewDecoder(multiPartfile) - if e = decoder.Decode(&deserializedMultipartSession); e != nil { + if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil { return ObjectResourcesMetadata{}, probe.NewError(e) } + var parts []*PartMetadata for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ { if len(parts) > objectResourcesMetadata.MaxParts { @@ -527,9 +587,6 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso // AbortMultipartUpload - abort an incomplete multipart session func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - // Check bucket name valid. if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -555,15 +612,20 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob } objectPath := filepath.Join(bucketPath, object) - for _, part := range fs.multiparts.ActiveSession[object].Parts { - e := os.RemoveAll(objectPath + fmt.Sprintf("$%d-$multiparts", part.PartNumber)) - if e != nil { - return probe.NewError(e) - } + partPathPrefix := objectPath + uploadID + fs.rwLock.RLock() + savedParts := fs.multiparts.ActiveSession[object].Parts + fs.rwLock.RUnlock() + + if err := removeParts(partPathPrefix, savedParts); err != nil { + return err.Trace(partPathPrefix) } + + fs.rwLock.Lock() delete(fs.multiparts.ActiveSession, object) - if e := os.RemoveAll(objectPath + "$multiparts"); e != nil { - return probe.NewError(e) + fs.rwLock.Unlock() + if err := saveMultipartsSession(*fs.multiparts); err != nil { + return err.Trace(partPathPrefix) } return nil } diff --git a/pkg/fs/fs-object.go b/pkg/fs/fs-object.go index c0082826c..87f5b3528 100644 --- a/pkg/fs/fs-object.go +++ b/pkg/fs/fs-object.go @@ -102,9 +102,6 @@ func (fs Filesystem) GetObject(w io.Writer, bucket, object string, start, length // GetObjectMetadata - get object metadata. func (fs Filesystem) GetObjectMetadata(bucket, object string) (ObjectMetadata, *probe.Error) { - fs.rwLock.RLock() - defer fs.rwLock.RUnlock() - // Input validation. if !IsValidBucketName(bucket) { return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -193,9 +190,6 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) bool { // CreateObject - create an object. func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - di, e := disk.GetInfo(fs.path) if e != nil { return ObjectMetadata{}, probe.NewError(e) @@ -233,7 +227,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in expectedMD5SumBytes, e = base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) if e != nil { // Pro-actively close the connection. - return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: expectedMD5Sum}) + return ObjectMetadata{}, probe.NewError(InvalidDigest{MD5: expectedMD5Sum}) } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } @@ -276,7 +270,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in if strings.TrimSpace(expectedMD5Sum) != "" { if !isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum) { file.CloseAndPurge() - return ObjectMetadata{}, probe.NewError(BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Object: object}) + return ObjectMetadata{}, probe.NewError(BadDigest{MD5: expectedMD5Sum, Bucket: bucket, Object: object}) } } sha256Sum := hex.EncodeToString(sh.Sum(nil)) @@ -307,7 +301,7 @@ func (fs Filesystem) CreateObject(bucket, object, expectedMD5Sum string, size in Created: st.ModTime(), Size: st.Size(), ContentType: contentType, - Md5: md5Sum, + MD5: md5Sum, } return newObject, nil } @@ -344,9 +338,6 @@ func deleteObjectPath(basePath, deletePath, bucket, object string) *probe.Error // DeleteObject - delete and object func (fs Filesystem) DeleteObject(bucket, object string) *probe.Error { - fs.rwLock.Lock() - defer fs.rwLock.Unlock() - // check bucket name valid if !IsValidBucketName(bucket) { return probe.NewError(BucketNameInvalid{Bucket: bucket}) diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 1b685fa8c..ba4528a63 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -60,8 +60,8 @@ type Multiparts struct { // New instantiate a new donut func New(rootPath string, minFreeDisk int64, maxBuckets int) (Filesystem, *probe.Error) { - setFSBucketsConfigPath(filepath.Join(rootPath, "$buckets.json")) - setFSMultipartsConfigPath(filepath.Join(rootPath, "$multiparts-session.json")) + setFSBucketsMetadataPath(filepath.Join(rootPath, "$buckets.json")) + setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json")) var err *probe.Error // load multiparts session from disk @@ -73,7 +73,7 @@ func New(rootPath string, minFreeDisk int64, maxBuckets int) (Filesystem, *probe Version: "1", ActiveSession: make(map[string]*MultipartSession), } - if err := saveMultipartsSession(multiparts); err != nil { + if err := saveMultipartsSession(*multiparts); err != nil { return Filesystem{}, err.Trace() } } else { @@ -94,7 +94,7 @@ func New(rootPath string, minFreeDisk int64, maxBuckets int) (Filesystem, *probe Version: "1", Metadata: make(map[string]*BucketMetadata), } - if err := saveBucketsMetadata(buckets); err != nil { + if err := saveBucketsMetadata(*buckets); err != nil { return Filesystem{}, err.Trace() } } else {