XL/Multipart: rename the parts instead of concatenating. (#1416)

This commit is contained in:
Krishna Srinivas 2016-04-30 00:47:48 +05:30 committed by Harshavardhana
parent 39df425b2a
commit 7066ce5160
6 changed files with 139 additions and 27 deletions

27
fs.go
View File

@ -25,6 +25,8 @@ import (
"sync"
"syscall"
"path"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/safe"
@ -731,3 +733,28 @@ func (s fsStorage) DeleteFile(volume, path string) error {
// Delete file and delete parent directory as well if its empty.
return deleteFile(volumeDir, filePath)
}
// RenameFile - rename file.
func (s fsStorage) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
srcVolumeDir, err := s.getVolumeDir(srcVolume)
if err != nil {
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
"volume": srcVolume,
}).Debugf("getVolumeDir failed with %s", err)
return err
}
dstVolumeDir, err := s.getVolumeDir(dstVolume)
if err != nil {
log.WithFields(logrus.Fields{
"diskPath": s.diskPath,
"volume": dstVolume,
}).Debugf("getVolumeDir failed with %s", err)
return err
}
if err = os.MkdirAll(path.Join(dstVolumeDir, path.Dir(dstPath)), 0755); err != nil {
log.Debug("os.MkdirAll failed with %s", err)
return err
}
return os.Rename(path.Join(srcVolumeDir, srcPath), path.Join(dstVolumeDir, dstPath))
}

View File

@ -285,3 +285,8 @@ func (n networkFS) DeleteFile(volume, path string) (err error) {
}
return nil
}
// RenameFile - Rename file.
func (n networkFS) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
return errUnexpected
}

View File

@ -496,39 +496,21 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
fileWriter, e := o.storage.CreateFile(bucket, object)
if e != nil {
return "", probe.NewError(toObjectErr(e, bucket, object))
}
var md5Sums []string
for _, part := range parts {
// Construct part suffix.
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag)
var fileReader io.ReadCloser
fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0)
if e != nil {
if e == errFileNotFound {
return "", probe.NewError(InvalidPart{})
}
return "", probe.NewError(e)
}
_, e = io.Copy(fileWriter, fileReader)
if e != nil {
return "", probe.NewError(e)
}
e = fileReader.Close()
e := o.storage.RenameFile(minioMetaVolume, path.Join(bucket, object, partSuffix), bucket, path.Join(object, fmt.Sprint(part.PartNumber)))
if e != nil {
return "", probe.NewError(e)
}
md5Sums = append(md5Sums, part.ETag)
}
e = fileWriter.Close()
fileWriter, e := o.storage.CreateFile(bucket, path.Join(object, "multipart.json"))
if e != nil {
return "", probe.NewError(e)
}
fileWriter.Close()
// Save the s3 md5.
s3MD5, err := makeS3MD5(md5Sums...)
if err != nil {
@ -536,7 +518,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
}
// Cleanup all the parts.
o.removeMultipartUpload(bucket, object, uploadID)
// o.removeMultipartUpload(bucket, object, uploadID)
// Return md5sum.
return s3MD5, nil

View File

@ -20,6 +20,7 @@ import (
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"path/filepath"
"sort"
@ -30,6 +31,10 @@ import (
"github.com/minio/minio/pkg/safe"
)
const (
multipartMetaFile = "multipart.json"
)
type objectAPI struct {
storage StorageAPI
}
@ -138,6 +143,28 @@ func (o objectAPI) DeleteBucket(bucket string) *probe.Error {
// GetObject - get an object.
func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadCloser, *probe.Error) {
findPathOffset := func() (i int, partOffset int64, err error) {
partOffset = startOffset
for i = 1; i < 10000; i++ {
var fileInfo FileInfo
fileInfo, err = o.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i)))
if err != nil {
if err == errFileNotFound {
continue
}
return
}
if partOffset < fileInfo.Size {
return
}
partOffset -= fileInfo.Size
}
err = errors.New("offset too high")
return
}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return nil, probe.NewError(BucketNameInvalid{Bucket: bucket})
@ -146,15 +173,59 @@ func (o objectAPI) GetObject(bucket, object string, startOffset int64) (io.ReadC
if !IsValidObjectName(object) {
return nil, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
r, e := o.storage.ReadFile(bucket, object, startOffset)
if e != nil {
return nil, probe.NewError(toObjectErr(e, bucket, object))
_, err := o.storage.StatFile(bucket, object)
if err == nil {
fmt.Println("1", err)
r, e := o.storage.ReadFile(bucket, object, startOffset)
if e != nil {
fmt.Println("1.5", err)
return nil, probe.NewError(toObjectErr(e, bucket, object))
}
return r, nil
}
return r, nil
_, err = o.storage.StatFile(bucket, pathJoin(object, multipartMetaFile))
if err != nil {
fmt.Println("2", err)
return nil, probe.NewError(toObjectErr(err, bucket, object))
}
fileReader, fileWriter := io.Pipe()
partNum, offset, err := findPathOffset()
if err != nil {
fmt.Println("3", err)
return nil, probe.NewError(toObjectErr(err, bucket, object))
}
go func() {
for ; partNum < 10000; partNum++ {
r, err := o.storage.ReadFile(bucket, pathJoin(object, fmt.Sprint(partNum)), offset)
if err != nil {
if err == errFileNotFound {
continue
}
fileWriter.CloseWithError(err)
return
}
if _, err := io.Copy(fileWriter, r); err != nil {
fileWriter.CloseWithError(err)
return
}
}
fileWriter.Close()
}()
return fileReader, nil
}
// GetObjectInfo - get object info.
func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Error) {
getMultpartFileSize := func() (size int64) {
for i := 0; i < 10000; i++ {
fi, err := o.storage.StatFile(bucket, pathJoin(object, fmt.Sprint(i)))
if err != nil {
continue
}
size += fi.Size
}
return size
}
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
@ -165,7 +236,11 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro
}
fi, e := o.storage.StatFile(bucket, object)
if e != nil {
return ObjectInfo{}, probe.NewError(toObjectErr(e, bucket, object))
fi, e = o.storage.StatFile(bucket, pathJoin(object, multipartMetaFile))
if e != nil {
return ObjectInfo{}, probe.NewError(toObjectErr(e, bucket, object))
}
fi.Size = getMultpartFileSize()
}
contentType := "application/octet-stream"
if objectExt := filepath.Ext(object); objectExt != "" {

View File

@ -32,4 +32,5 @@ type StorageAPI interface {
CreateFile(volume string, path string) (writeCloser io.WriteCloser, err error)
StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error)
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
}

View File

@ -570,3 +570,25 @@ func (xl XL) DeleteFile(volume, path string) error {
}
return nil
}
// RenameFile - rename file.
func (xl XL) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if !isValidVolname(srcVolume) {
return errInvalidArgument
}
if !isValidPath(srcPath) {
return errInvalidArgument
}
if !isValidVolname(dstVolume) {
return errInvalidArgument
}
if !isValidPath(dstPath) {
return errInvalidArgument
}
for _, disk := range xl.storageDisks {
if err := disk.RenameFile(srcVolume, srcPath, dstVolume, dstPath); err != nil {
return err
}
}
return nil
}