From 7066ce51606580050d80d99a6399cd6365852181 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sat, 30 Apr 2016 00:47:48 +0530 Subject: [PATCH] XL/Multipart: rename the parts instead of concatenating. (#1416) --- fs.go | 27 +++++++++++++ network-fs.go | 5 +++ object-api-multipart.go | 26 ++---------- object-api.go | 85 +++++++++++++++++++++++++++++++++++++--- storage-api-interface.go | 1 + xl-v1.go | 22 +++++++++++ 6 files changed, 139 insertions(+), 27 deletions(-) diff --git a/fs.go b/fs.go index c1be45299..1a9cadb96 100644 --- a/fs.go +++ b/fs.go @@ -25,6 +25,8 @@ import ( "sync" "syscall" + "path" + "github.com/Sirupsen/logrus" "github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/safe" @@ -731,3 +733,28 @@ func (s fsStorage) DeleteFile(volume, path string) error { // Delete file and delete parent directory as well if its empty. return deleteFile(volumeDir, filePath) } + +// RenameFile - rename file. +func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { + srcVolumeDir, err := s.getVolumeDir(srcVolume) + if err != nil { + log.WithFields(logrus.Fields{ + "diskPath": s.diskPath, + "volume": srcVolume, + }).Debugf("getVolumeDir failed with %s", err) + return err + } + dstVolumeDir, err := s.getVolumeDir(dstVolume) + if err != nil { + log.WithFields(logrus.Fields{ + "diskPath": s.diskPath, + "volume": dstVolume, + }).Debugf("getVolumeDir failed with %s", err) + return err + } + if err = os.MkdirAll(path.Join(dstVolumeDir, path.Dir(dstPath)), 0755); err != nil { + log.Debug("os.MkdirAll failed with %s", err) + return err + } + return os.Rename(path.Join(srcVolumeDir, srcPath), path.Join(dstVolumeDir, dstPath)) +} diff --git a/network-fs.go b/network-fs.go index 4ac3a1f2f..7c5dca2b5 100644 --- a/network-fs.go +++ b/network-fs.go @@ -285,3 +285,8 @@ func (n networkFS) DeleteFile(volume, path string) (err error) { } return nil } + +// RenameFile - Rename file. +func (n networkFS) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { + return errUnexpected +} diff --git a/object-api-multipart.go b/object-api-multipart.go index 4734c6d9d..860c2fa74 100644 --- a/object-api-multipart.go +++ b/object-api-multipart.go @@ -496,39 +496,21 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } - fileWriter, e := o.storage.CreateFile(bucket, object) - if e != nil { - return "", probe.NewError(toObjectErr(e, bucket, object)) - } - var md5Sums []string for _, part := range parts { // Construct part suffix. partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag) - var fileReader io.ReadCloser - fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0) - if e != nil { - if e == errFileNotFound { - return "", probe.NewError(InvalidPart{}) - } - return "", probe.NewError(e) - } - _, e = io.Copy(fileWriter, fileReader) - if e != nil { - return "", probe.NewError(e) - } - e = fileReader.Close() + e := o.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, fmt.Sprint(part.PartNumber))) if e != nil { return "", probe.NewError(e) } md5Sums = append(md5Sums, part.ETag) } - - e = fileWriter.Close() + fileWriter, e := o.storage.CreateFile(bucket, path.Join(object, "multipart.json")) if e != nil { return "", probe.NewError(e) } - + fileWriter.Close() // Save the s3 md5. s3MD5, err := makeS3MD5(md5Sums...) if err != nil { @@ -536,7 +518,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI } // Cleanup all the parts. - o.removeMultipartUpload(bucket, object, uploadID) + // o.removeMultipartUpload(bucket, object, uploadID) // Return md5sum. return s3MD5, nil diff --git a/object-api.go b/object-api.go index 802bbc679..2027207d8 100644 --- a/object-api.go +++ b/object-api.go @@ -20,6 +20,7 @@ import ( "crypto/md5" "encoding/hex" "errors" + "fmt" "io" "path/filepath" "sort" @@ -30,6 +31,10 @@ import ( "github.com/minio/minio/pkg/safe" ) +const ( + multipartMetaFile = "multipart.json" +) + type objectAPI struct { storage StorageAPI } @@ -138,6 +143,28 @@ func (o objectAPI) DeleteBucket(bucket string) *probe.Error { // GetObject - get an object. func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) { + findPathOffset := func() (i int, partOffset int64, err error) { + partOffset = startOffset + for i = 1; i < 10000; i++ { + var fileInfo FileInfo + fileInfo, err = o.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i))) + if err != nil { + if err == errFileNotFound { + continue + } + return + } + if partOffset < fileInfo.Size { + return + + } + partOffset -= fileInfo.Size + + } + err = errors.New("offset too high") + return + } + // Verify if bucket is valid. if !IsValidBucketName(bucket) { return nil, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -146,15 +173,59 @@ func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadC if !IsValidObjectName(object) { return nil, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - r, e := o.storage.ReadFile(bucket, object, startOffset) - if e != nil { - return nil, probe.NewError(toObjectErr(e, bucket, object)) + _, err := o.storage.StatFile(bucket, object) + if err == nil { + fmt.Println("1", err) + r, e := o.storage.ReadFile(bucket, object, startOffset) + if e != nil { + fmt.Println("1.5", err) + return nil, probe.NewError(toObjectErr(e, bucket, object)) + } + return r, nil } - return r, nil + _, err = o.storage.StatFile(bucket, pathJoin(object, multipartMetaFile)) + if err != nil { + fmt.Println("2", err) + return nil, probe.NewError(toObjectErr(err, bucket, object)) + } + fileReader, fileWriter := io.Pipe() + partNum, offset, err := findPathOffset() + if err != nil { + fmt.Println("3", err) + return nil, probe.NewError(toObjectErr(err, bucket, object)) + } + go func() { + for ; partNum < 10000; partNum++ { + r, err := o.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(partNum)), offset) + if err != nil { + if err == errFileNotFound { + continue + } + fileWriter.CloseWithError(err) + return + } + if _, err := io.Copy(fileWriter, r); err != nil { + fileWriter.CloseWithError(err) + return + } + } + fileWriter.Close() + }() + return fileReader, nil } // GetObjectInfo - get object info. func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) { + getMultpartFileSize := func() (size int64) { + for i := 0; i < 10000; i++ { + fi, err := o.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i))) + if err != nil { + continue + } + size += fi.Size + } + return size + } // Verify if bucket is valid. if !IsValidBucketName(bucket) { return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) @@ -165,7 +236,11 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro } fi, e := o.storage.StatFile(bucket, object) if e != nil { - return ObjectInfo{}, probe.NewError(toObjectErr(e, bucket, object)) + fi, e = o.storage.StatFile(bucket, pathJoin(object, multipartMetaFile)) + if e != nil { + return ObjectInfo{}, probe.NewError(toObjectErr(e, bucket, object)) + } + fi.Size = getMultpartFileSize() } contentType := "application/octet-stream" if objectExt := filepath.Ext(object); objectExt != "" { diff --git a/storage-api-interface.go b/storage-api-interface.go index eeb0d4070..dff3865e7 100644 --- a/storage-api-interface.go +++ b/storage-api-interface.go @@ -32,4 +32,5 @@ type StorageAPI interface { CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error) StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) + RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error } diff --git a/xl-v1.go b/xl-v1.go index d724c046c..c70ae3caf 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -570,3 +570,25 @@ func (xl XL) DeleteFile(volume, path string) error { } return nil } + +// RenameFile - rename file. +func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error { + if !isValidVolname(srcVolume) { + return errInvalidArgument + } + if !isValidPath(srcPath) { + return errInvalidArgument + } + if !isValidVolname(dstVolume) { + return errInvalidArgument + } + if !isValidPath(dstPath) { + return errInvalidArgument + } + for _, disk := range xl.storageDisks { + if err := disk.RenameFile(srcVolume, srcPath, dstVolume, dstPath); err != nil { + return err + } + } + return nil +}