mirror of
https://github.com/minio/minio.git
synced 2025-05-22 10:01:50 -04:00
Merge pull request #1 from harshavardhana/devel
Fix list objects test and remove all the old unnecessary files.
This commit is contained in:
commit
2b3a118636
@ -359,7 +359,12 @@ func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMult
|
|||||||
listMultipartUploadsResponse.MaxUploads = multipartsInfo.MaxUploads
|
listMultipartUploadsResponse.MaxUploads = multipartsInfo.MaxUploads
|
||||||
listMultipartUploadsResponse.NextUploadIDMarker = multipartsInfo.NextUploadIDMarker
|
listMultipartUploadsResponse.NextUploadIDMarker = multipartsInfo.NextUploadIDMarker
|
||||||
listMultipartUploadsResponse.UploadIDMarker = multipartsInfo.UploadIDMarker
|
listMultipartUploadsResponse.UploadIDMarker = multipartsInfo.UploadIDMarker
|
||||||
|
listMultipartUploadsResponse.CommonPrefixes = make([]CommonPrefix, len(multipartsInfo.CommonPrefixes))
|
||||||
|
for index, commonPrefix := range multipartsInfo.CommonPrefixes {
|
||||||
|
listMultipartUploadsResponse.CommonPrefixes[index] = CommonPrefix{
|
||||||
|
Prefix: commonPrefix,
|
||||||
|
}
|
||||||
|
}
|
||||||
listMultipartUploadsResponse.Uploads = make([]Upload, len(multipartsInfo.Uploads))
|
listMultipartUploadsResponse.Uploads = make([]Upload, len(multipartsInfo.Uploads))
|
||||||
for index, upload := range multipartsInfo.Uploads {
|
for index, upload := range multipartsInfo.Uploads {
|
||||||
newUpload := Upload{}
|
newUpload := Upload{}
|
||||||
|
@ -179,8 +179,22 @@ func (api objectStorageAPI) ListMultipartUploadsHandler(w http.ResponseWriter, r
|
|||||||
writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path)
|
writeErrorResponse(w, r, ErrInvalidMaxUploads, r.URL.Path)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if maxUploads == 0 {
|
if keyMarker != "" {
|
||||||
maxUploads = maxObjectList
|
// Unescape keyMarker string
|
||||||
|
keyMarkerUnescaped, e := url.QueryUnescape(keyMarker)
|
||||||
|
if e != nil {
|
||||||
|
if e != nil {
|
||||||
|
// Return 'NoSuchKey' to indicate invalid marker key.
|
||||||
|
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
keyMarker = keyMarkerUnescaped
|
||||||
|
// Marker not common with prefix is not implemented.
|
||||||
|
if !strings.HasPrefix(keyMarker, prefix) {
|
||||||
|
writeErrorResponse(w, r, ErrNotImplemented, r.URL.Path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
listMultipartsInfo, err := api.ObjectAPI.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||||
|
@ -1,56 +0,0 @@
|
|||||||
/*
|
|
||||||
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/minio/minio/pkg/probe"
|
|
||||||
"github.com/minio/minio/pkg/quick"
|
|
||||||
)
|
|
||||||
|
|
||||||
var multipartsMetadataPath string
|
|
||||||
|
|
||||||
// SetFSMultipartsMetadataPath - set custom multiparts session metadata path.
|
|
||||||
func setFSMultipartsMetadataPath(metadataPath string) {
|
|
||||||
multipartsMetadataPath = metadataPath
|
|
||||||
}
|
|
||||||
|
|
||||||
// saveMultipartsSession - save multiparts.
|
|
||||||
func saveMultipartsSession(mparts multiparts) *probe.Error {
|
|
||||||
qc, err := quick.New(mparts)
|
|
||||||
if err != nil {
|
|
||||||
return err.Trace()
|
|
||||||
}
|
|
||||||
if err := qc.Save(multipartsMetadataPath); err != nil {
|
|
||||||
return err.Trace()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadMultipartsSession load multipart session file.
|
|
||||||
func loadMultipartsSession() (*multiparts, *probe.Error) {
|
|
||||||
mparts := &multiparts{}
|
|
||||||
mparts.Version = "1"
|
|
||||||
mparts.ActiveSession = make(map[string]*multipartSession)
|
|
||||||
qc, err := quick.New(mparts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err.Trace()
|
|
||||||
}
|
|
||||||
if err := qc.Load(multipartsMetadataPath); err != nil {
|
|
||||||
return nil, err.Trace()
|
|
||||||
}
|
|
||||||
return qc.Data().(*multiparts), nil
|
|
||||||
}
|
|
@ -32,15 +32,13 @@ const (
|
|||||||
|
|
||||||
// isDirExist - returns whether given directory is exist or not.
|
// isDirExist - returns whether given directory is exist or not.
|
||||||
func isDirExist(dirname string) (bool, error) {
|
func isDirExist(dirname string) (bool, error) {
|
||||||
fi, err := os.Lstat(dirname)
|
fi, e := os.Lstat(dirname)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(e) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
return false, e
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return fi.IsDir(), nil
|
return fi.IsDir(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,9 +127,8 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
|||||||
// Verify if prefix exists.
|
// Verify if prefix exists.
|
||||||
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
||||||
rootDir := filepath.Join(bucketDir, prefixDir)
|
rootDir := filepath.Join(bucketDir, prefixDir)
|
||||||
_, e := isDirExist(rootDir)
|
if status, e := isDirExist(rootDir); !status {
|
||||||
if e != nil {
|
if e == nil {
|
||||||
if os.IsNotExist(e) {
|
|
||||||
// Prefix does not exist, not an error just respond empty
|
// Prefix does not exist, not an error just respond empty
|
||||||
// list response.
|
// list response.
|
||||||
return result, nil
|
return result, nil
|
||||||
|
@ -135,7 +135,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
|
|||||||
timeoutCh := make(chan struct{}, 1)
|
timeoutCh := make(chan struct{}, 1)
|
||||||
|
|
||||||
// TODO: check if bucketDir is absolute path
|
// TODO: check if bucketDir is absolute path
|
||||||
|
|
||||||
scanDir := bucketDir
|
scanDir := bucketDir
|
||||||
dirDepth := bucketDir
|
dirDepth := bucketDir
|
||||||
|
|
||||||
@ -145,7 +144,6 @@ func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string,
|
|||||||
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
|
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
|
||||||
tmpPrefixPath += string(os.PathSeparator)
|
tmpPrefixPath += string(os.PathSeparator)
|
||||||
}
|
}
|
||||||
|
|
||||||
prefixPath = tmpPrefixPath
|
prefixPath = tmpPrefixPath
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -375,7 +373,6 @@ func (oic multipartObjectInfoChannel) IsClosed() bool {
|
|||||||
if oic.objInfo != nil {
|
if oic.objInfo != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
return oic.closed
|
return oic.closed
|
||||||
}
|
}
|
||||||
|
|
336
fs-multipart.go
336
fs-multipart.go
@ -23,7 +23,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/url"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -39,37 +38,43 @@ const configDir = ".minio"
|
|||||||
const uploadIDSuffix = ".uploadid"
|
const uploadIDSuffix = ".uploadid"
|
||||||
|
|
||||||
func removeFileTree(fileName string, level string) error {
|
func removeFileTree(fileName string, level string) error {
|
||||||
if err := os.Remove(fileName); err != nil {
|
if e := os.Remove(fileName); e != nil {
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) {
|
for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) {
|
||||||
if status, err := isDirEmpty(fileDir); err != nil {
|
if status, e := isDirEmpty(fileDir); e != nil {
|
||||||
return err
|
return e
|
||||||
} else if !status {
|
} else if !status {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Remove(fileDir); err != nil {
|
if e := os.Remove(fileDir); e != nil {
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error {
|
func safeRemoveFile(file *os.File) error {
|
||||||
tempFile, err := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-")
|
if e := file.Close(); e != nil {
|
||||||
if err != nil {
|
return e
|
||||||
return err
|
}
|
||||||
|
return os.Remove(file.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
func safeWriteFile(fileName string, data io.Reader, size int64, md5sum string) error {
|
||||||
|
tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-")
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
md5Hasher := md5.New()
|
md5Hasher := md5.New()
|
||||||
multiWriter := io.MultiWriter(md5Hasher, tempFile)
|
multiWriter := io.MultiWriter(md5Hasher, tempFile)
|
||||||
if _, err := io.CopyN(multiWriter, data, size); err != nil {
|
if _, e := io.CopyN(multiWriter, data, size); e != nil {
|
||||||
tempFile.Close()
|
safeRemoveFile(tempFile)
|
||||||
os.Remove(tempFile.Name())
|
return e
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
tempFile.Close()
|
tempFile.Close()
|
||||||
|
|
||||||
@ -79,22 +84,22 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error
|
|||||||
return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum}
|
return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.Rename(tempFile.Name(), fileName); err != nil {
|
if e := os.Rename(tempFile.Name(), fileName); e != nil {
|
||||||
os.Remove(tempFile.Name())
|
os.Remove(tempFile.Name())
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isFileExist(filename string) (bool, error) {
|
func isFileExist(filename string) (bool, error) {
|
||||||
fi, err := os.Lstat(filename)
|
fi, e := os.Lstat(filename)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(e) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, err
|
return false, e
|
||||||
}
|
}
|
||||||
|
|
||||||
return fi.Mode().IsRegular(), nil
|
return fi.Mode().IsRegular(), nil
|
||||||
@ -120,30 +125,30 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) {
|
|||||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||||
|
|
||||||
// create metaObjectDir if not exist
|
// create metaObjectDir if not exist
|
||||||
if status, err := isDirExist(metaObjectDir); err != nil {
|
if status, e := isDirExist(metaObjectDir); e != nil {
|
||||||
return "", err
|
return "", e
|
||||||
} else if !status {
|
} else if !status {
|
||||||
if err := os.MkdirAll(metaObjectDir, 0755); err != nil {
|
if e := os.MkdirAll(metaObjectDir, 0755); e != nil {
|
||||||
return "", err
|
return "", e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
uuid, err := uuid.New()
|
uuid, e := uuid.New()
|
||||||
if err != nil {
|
if e != nil {
|
||||||
return "", err
|
return "", e
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadID := uuid.String()
|
uploadID := uuid.String()
|
||||||
uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix)
|
uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix)
|
||||||
if _, err := os.Lstat(uploadIDFile); err != nil {
|
if _, e := os.Lstat(uploadIDFile); e != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(e) {
|
||||||
return "", err
|
return "", e
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadIDFile doesn't exist, so create empty file to reserve the name
|
// uploadIDFile doesn't exist, so create empty file to reserve the name
|
||||||
if err := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); err != nil {
|
if e := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); e != nil {
|
||||||
return "", err
|
return "", e
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadID, nil
|
return uploadID, nil
|
||||||
@ -161,32 +166,32 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error {
|
|||||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||||
uploadIDPrefix := uploadID + "."
|
uploadIDPrefix := uploadID + "."
|
||||||
|
|
||||||
names, err := filteredReaddirnames(metaObjectDir,
|
names, e := filteredReaddirnames(metaObjectDir,
|
||||||
func(name string) bool {
|
func(name string) bool {
|
||||||
return strings.HasPrefix(name, uploadIDPrefix)
|
return strings.HasPrefix(name, uploadIDPrefix)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if e != nil {
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if err := os.Remove(filepath.Join(metaObjectDir, name)); err != nil {
|
if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil {
|
||||||
//return InternalError{Err: err}
|
//return InternalError{Err: err}
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if status, err := isDirEmpty(metaObjectDir); err != nil {
|
if status, e := isDirEmpty(metaObjectDir); e != nil {
|
||||||
// TODO: add log than returning error
|
// TODO: add log than returning error
|
||||||
//return InternalError{Err: err}
|
//return InternalError{Err: err}
|
||||||
return err
|
return e
|
||||||
} else if status {
|
} else if status {
|
||||||
if err := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); err != nil {
|
if e := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); e != nil {
|
||||||
// TODO: add log than returning error
|
// TODO: add log than returning error
|
||||||
//return InternalError{Err: err}
|
//return InternalError{Err: err}
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -199,9 +204,9 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
bucket = getActualBucketname(fs.path, bucket)
|
bucket = getActualBucketname(fs.path, bucket)
|
||||||
if status, err := isDirExist(filepath.Join(fs.path, bucket)); err != nil {
|
if status, e := isDirExist(filepath.Join(fs.path, bucket)); e != nil {
|
||||||
//return "", InternalError{Err: err}
|
//return "", InternalError{Err: err}
|
||||||
return "", err
|
return "", e
|
||||||
} else if !status {
|
} else if !status {
|
||||||
return "", BucketNotFound{Bucket: bucket}
|
return "", BucketNotFound{Bucket: bucket}
|
||||||
}
|
}
|
||||||
@ -210,13 +215,12 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs Filesystem) checkDiskFree() error {
|
func (fs Filesystem) checkDiskFree() error {
|
||||||
di, err := disk.GetInfo(fs.path)
|
di, e := disk.GetInfo(fs.path)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
return err
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove 5% from total space for cumulative disk space used for
|
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||||
// journalling, inodes etc.
|
|
||||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||||
return RootPathFull{Path: fs.path}
|
return RootPathFull{Path: fs.path}
|
||||||
@ -226,9 +230,9 @@ func (fs Filesystem) checkDiskFree() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
|
func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
|
||||||
bucket, err := fs.checkBucketArg(bucket)
|
bucket, e := fs.checkBucketArg(bucket)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
return "", err
|
return "", e
|
||||||
}
|
}
|
||||||
|
|
||||||
if !IsValidObjectName(object) {
|
if !IsValidObjectName(object) {
|
||||||
@ -240,19 +244,19 @@ func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
|
|||||||
|
|
||||||
// NewMultipartUpload - initiate a new multipart session
|
// NewMultipartUpload - initiate a new multipart session
|
||||||
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
||||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||||
bucket = bucketDirName
|
bucket = bucketDirName
|
||||||
} else {
|
} else {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.checkDiskFree(); err != nil {
|
if e := fs.checkDiskFree(); e != nil {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadID, err := fs.newUploadID(bucket, object)
|
uploadID, e := fs.newUploadID(bucket, object)
|
||||||
if err != nil {
|
if e != nil {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
return uploadID, nil
|
return uploadID, nil
|
||||||
@ -260,15 +264,15 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
|||||||
|
|
||||||
// PutObjectPart - create a part in a multipart session
|
// PutObjectPart - create a part in a multipart session
|
||||||
func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
|
func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
|
||||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||||
bucket = bucketDirName
|
bucket = bucketDirName
|
||||||
} else {
|
} else {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||||
//return "", probe.NewError(InternalError{Err: err})
|
//return "", probe.NewError(InternalError{Err: err})
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
} else if !status {
|
} else if !status {
|
||||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
}
|
}
|
||||||
@ -282,13 +286,13 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i
|
|||||||
return "", probe.NewError(errors.New("invalid part id, should be not more than 10000"))
|
return "", probe.NewError(errors.New("invalid part id, should be not more than 10000"))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.checkDiskFree(); err != nil {
|
if e := fs.checkDiskFree(); e != nil {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex)
|
partFile := filepath.Join(fs.path, configDir, bucket, object, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5Hex))
|
||||||
if err := safeWrite(partFile, data, size, md5Hex); err != nil {
|
if e := safeWriteFile(partFile, data, size, md5Hex); e != nil {
|
||||||
return "", probe.NewError(err)
|
return "", probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
return md5Hex, nil
|
return md5Hex, nil
|
||||||
@ -296,21 +300,21 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i
|
|||||||
|
|
||||||
// AbortMultipartUpload - abort an incomplete multipart session
|
// AbortMultipartUpload - abort an incomplete multipart session
|
||||||
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
||||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||||
bucket = bucketDirName
|
bucket = bucketDirName
|
||||||
} else {
|
} else {
|
||||||
return probe.NewError(err)
|
return probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||||
//return probe.NewError(InternalError{Err: err})
|
//return probe.NewError(InternalError{Err: err})
|
||||||
return probe.NewError(err)
|
return probe.NewError(e)
|
||||||
} else if !status {
|
} else if !status {
|
||||||
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.cleanupUploadID(bucket, object, uploadID); err != nil {
|
if e := fs.cleanupUploadID(bucket, object, uploadID); e != nil {
|
||||||
return probe.NewError(err)
|
return probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -318,88 +322,82 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
|
|||||||
|
|
||||||
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
||||||
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
|
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
|
||||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||||
bucket = bucketDirName
|
bucket = bucketDirName
|
||||||
} else {
|
} else {
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||||
//return probe.NewError(InternalError{Err: err})
|
//return probe.NewError(InternalError{Err: err})
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
} else if !status {
|
} else if !status {
|
||||||
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fs.checkDiskFree(); err != nil {
|
if e := fs.checkDiskFree(); e != nil {
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||||
|
|
||||||
var md5sums []string
|
var md5Sums []string
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
partNumber := part.PartNumber
|
partNumber := part.PartNumber
|
||||||
md5sum := strings.Trim(part.ETag, "\"")
|
md5sum := strings.Trim(part.ETag, "\"")
|
||||||
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
|
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
|
||||||
if status, err := isFileExist(partFile); err != nil {
|
if status, err := isFileExist(partFile); err != nil {
|
||||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(err)
|
||||||
} else if !status {
|
} else if !status {
|
||||||
return ObjectInfo{}, probe.NewError(InvalidPart{})
|
return ObjectInfo{}, probe.NewError(InvalidPart{})
|
||||||
}
|
}
|
||||||
md5sums = append(md5sums, md5sum)
|
md5Sums = append(md5Sums, md5sum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the s3 md5.
|
// Save the s3 md5.
|
||||||
s3MD5, perr := makeS3MD5(md5sums...)
|
s3MD5, err := makeS3MD5(md5Sums...)
|
||||||
if perr != nil {
|
|
||||||
return ObjectInfo{}, perr
|
|
||||||
}
|
|
||||||
|
|
||||||
tempFile, err := ioutil.TempFile(metaObjectDir, uploadID+".complete.")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
return ObjectInfo{}, err.Trace(md5Sums...)
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.")
|
||||||
|
if e != nil {
|
||||||
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
|
}
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
partNumber := part.PartNumber
|
partNumber := part.PartNumber
|
||||||
md5sum := strings.Trim(part.ETag, "\"")
|
// Trim off the odd double quotes from ETag in the beginning and end.
|
||||||
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
|
md5sum := strings.TrimPrefix(part.ETag, "\"")
|
||||||
var f *os.File
|
md5sum = strings.TrimSuffix(md5sum, "\"")
|
||||||
f, err = os.Open(partFile)
|
partFileStr := filepath.Join(metaObjectDir, fmt.Sprintf("%s.%d.%s", uploadID, partNumber, md5sum))
|
||||||
if err != nil {
|
var partFile *os.File
|
||||||
tempFile.Close()
|
partFile, e = os.Open(partFileStr)
|
||||||
os.Remove(tempFile.Name())
|
if e != nil {
|
||||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
safeRemoveFile(tempFile)
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
} else if _, err = io.Copy(tempFile, f); err != nil {
|
} else if _, e = io.Copy(tempFile, partFile); e != nil {
|
||||||
tempFile.Close()
|
safeRemoveFile(tempFile)
|
||||||
os.Remove(tempFile.Name())
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
|
||||||
}
|
}
|
||||||
f.Close()
|
partFile.Close() // Close part file after successful copy.
|
||||||
}
|
}
|
||||||
tempFile.Close()
|
tempFile.Close()
|
||||||
// fi is used later
|
|
||||||
fi, err := os.Stat(tempFile.Name())
|
// Stat to gather fresh stat info.
|
||||||
if err != nil {
|
objSt, e := os.Stat(tempFile.Name())
|
||||||
os.Remove(tempFile.Name())
|
if e != nil {
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketPath := filepath.Join(fs.path, bucket)
|
bucketPath := filepath.Join(fs.path, bucket)
|
||||||
objectPath := filepath.Join(bucketPath, object)
|
objectPath := filepath.Join(bucketPath, object)
|
||||||
if err = os.MkdirAll(filepath.Dir(objectPath), 0755); err != nil {
|
if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil {
|
||||||
os.Remove(tempFile.Name())
|
os.Remove(tempFile.Name())
|
||||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
|
||||||
}
|
}
|
||||||
if err = os.Rename(tempFile.Name(), objectPath); err != nil {
|
if e = os.Rename(tempFile.Name(), objectPath); e != nil {
|
||||||
os.Remove(tempFile.Name())
|
os.Remove(tempFile.Name())
|
||||||
return ObjectInfo{}, probe.NewError(err)
|
return ObjectInfo{}, probe.NewError(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error
|
fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error
|
||||||
@ -414,8 +412,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
|
|||||||
newObject := ObjectInfo{
|
newObject := ObjectInfo{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Name: object,
|
Name: object,
|
||||||
ModifiedTime: fi.ModTime(),
|
ModifiedTime: objSt.ModTime(),
|
||||||
Size: fi.Size(),
|
Size: objSt.Size(),
|
||||||
ContentType: contentType,
|
ContentType: contentType,
|
||||||
MD5Sum: s3MD5,
|
MD5Sum: s3MD5,
|
||||||
}
|
}
|
||||||
@ -423,6 +421,42 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
|
|||||||
return newObject, nil
|
return newObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *Filesystem) saveListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) {
|
||||||
|
fs.listMultipartObjectMapMutex.Lock()
|
||||||
|
defer fs.listMultipartObjectMapMutex.Unlock()
|
||||||
|
|
||||||
|
channels := []multipartObjectInfoChannel{ch}
|
||||||
|
if _, ok := fs.listMultipartObjectMap[params]; ok {
|
||||||
|
channels = append(fs.listMultipartObjectMap[params], ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.listMultipartObjectMap[params] = channels
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fs *Filesystem) lookupListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel {
|
||||||
|
fs.listMultipartObjectMapMutex.Lock()
|
||||||
|
defer fs.listMultipartObjectMapMutex.Unlock()
|
||||||
|
|
||||||
|
if channels, ok := fs.listMultipartObjectMap[params]; ok {
|
||||||
|
for i, channel := range channels {
|
||||||
|
if !channel.IsTimedOut() {
|
||||||
|
chs := channels[i+1:]
|
||||||
|
if len(chs) > 0 {
|
||||||
|
fs.listMultipartObjectMap[params] = chs
|
||||||
|
} else {
|
||||||
|
delete(fs.listMultipartObjectMap, params)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &channel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// As all channels are timed out, delete the map entry
|
||||||
|
delete(fs.listMultipartObjectMap, params)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
// ListMultipartUploads - list incomplete multipart sessions for a given BucketMultipartResourcesMetadata
|
||||||
func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
|
func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, *probe.Error) {
|
||||||
result := ListMultipartsInfo{}
|
result := ListMultipartsInfo{}
|
||||||
@ -444,30 +478,19 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
|
|||||||
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
|
return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unescape keyMarker string
|
|
||||||
if tmpKeyMarker, err := url.QueryUnescape(keyMarker); err == nil {
|
|
||||||
keyMarker = tmpKeyMarker
|
|
||||||
} else {
|
|
||||||
return result, probe.NewError(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) {
|
if keyMarker != "" && !strings.HasPrefix(keyMarker, objectPrefix) {
|
||||||
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix))
|
return result, probe.NewError(fmt.Errorf("Invalid combination of marker '%s' and prefix '%s'", keyMarker, objectPrefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
markerPath := filepath.FromSlash(keyMarker)
|
markerPath := filepath.FromSlash(keyMarker)
|
||||||
|
|
||||||
if uploadIDMarker != "" {
|
if uploadIDMarker != "" {
|
||||||
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
|
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
|
||||||
return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker))
|
return result, probe.NewError(fmt.Errorf("Invalid combination of uploadID marker '%s' and marker '%s'", uploadIDMarker, keyMarker))
|
||||||
}
|
}
|
||||||
|
id, e := uuid.Parse(uploadIDMarker)
|
||||||
id, err := uuid.Parse(uploadIDMarker)
|
if e != nil {
|
||||||
|
return result, probe.NewError(e)
|
||||||
if err != nil {
|
|
||||||
return result, probe.NewError(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if id.IsZero() {
|
if id.IsZero() {
|
||||||
return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker))
|
return result, probe.NewError(fmt.Errorf("Invalid upload ID marker %s", uploadIDMarker))
|
||||||
}
|
}
|
||||||
@ -490,50 +513,65 @@ func (fs Filesystem) ListMultipartUploads(bucket, objectPrefix, keyMarker, uploa
|
|||||||
recursive = false
|
recursive = false
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketDir := filepath.Join(fs.path, bucket)
|
bucketDir := filepath.Join(fs.path, configDir, bucket)
|
||||||
// If listMultipartObjectChannel is available for given parameters, then use it, else create new one
|
// Lookup of if listMultipartObjectChannel is available for given
|
||||||
objectInfoCh := fs.popListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, markerPath, prefixPath, uploadIDMarker})
|
// parameters, else create a new one.
|
||||||
if objectInfoCh == nil {
|
multipartObjectInfoCh := fs.lookupListMultipartObjectCh(listMultipartObjectParams{
|
||||||
|
bucket: bucket,
|
||||||
|
delimiter: delimiter,
|
||||||
|
keyMarker: markerPath,
|
||||||
|
prefix: prefixPath,
|
||||||
|
uploadIDMarker: uploadIDMarker,
|
||||||
|
})
|
||||||
|
if multipartObjectInfoCh == nil {
|
||||||
ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive)
|
ch := scanMultipartDir(bucketDir, objectPrefix, keyMarker, uploadIDMarker, recursive)
|
||||||
objectInfoCh = &ch
|
multipartObjectInfoCh = &ch
|
||||||
}
|
}
|
||||||
|
|
||||||
nextKeyMarker := ""
|
nextKeyMarker := ""
|
||||||
nextUploadIDMarker := ""
|
nextUploadIDMarker := ""
|
||||||
for i := 0; i < maxUploads; {
|
for i := 0; i < maxUploads; {
|
||||||
objInfo, ok := objectInfoCh.Read()
|
multipartObjInfo, ok := multipartObjectInfoCh.Read()
|
||||||
if !ok {
|
if !ok {
|
||||||
// Closed channel.
|
// Closed channel.
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if objInfo.Err != nil {
|
if multipartObjInfo.Err != nil {
|
||||||
return ListMultipartsInfo{}, probe.NewError(objInfo.Err)
|
if os.IsNotExist(multipartObjInfo.Err) {
|
||||||
|
return ListMultipartsInfo{}, nil
|
||||||
|
}
|
||||||
|
return ListMultipartsInfo{}, probe.NewError(multipartObjInfo.Err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") {
|
if strings.Contains(multipartObjInfo.Name, "$multiparts") ||
|
||||||
|
strings.Contains(multipartObjInfo.Name, "$tmpobject") {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if objInfo.IsDir && skipDir {
|
if multipartObjInfo.IsDir && skipDir {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if objInfo.IsDir {
|
if multipartObjInfo.IsDir {
|
||||||
result.CommonPrefixes = append(result.CommonPrefixes, objInfo.Name)
|
result.CommonPrefixes = append(result.CommonPrefixes, multipartObjInfo.Name)
|
||||||
} else {
|
} else {
|
||||||
result.Uploads = append(result.Uploads, uploadMetadata{Object: objInfo.Name, UploadID: objInfo.UploadID, Initiated: objInfo.ModifiedTime})
|
result.Uploads = append(result.Uploads, uploadMetadata{
|
||||||
|
Object: multipartObjInfo.Name,
|
||||||
|
UploadID: multipartObjInfo.UploadID,
|
||||||
|
Initiated: multipartObjInfo.ModifiedTime,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
nextKeyMarker = objInfo.Name
|
nextKeyMarker = multipartObjInfo.Name
|
||||||
nextUploadIDMarker = objInfo.UploadID
|
nextUploadIDMarker = multipartObjInfo.UploadID
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
if !objectInfoCh.IsClosed() {
|
if !multipartObjectInfoCh.IsClosed() {
|
||||||
result.IsTruncated = true
|
result.IsTruncated = true
|
||||||
result.NextKeyMarker = nextKeyMarker
|
result.NextKeyMarker = nextKeyMarker
|
||||||
result.NextUploadIDMarker = nextUploadIDMarker
|
result.NextUploadIDMarker = nextUploadIDMarker
|
||||||
fs.pushListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *objectInfoCh)
|
fs.saveListMultipartObjectCh(listMultipartObjectParams{bucket, delimiter, nextKeyMarker, objectPrefix, nextUploadIDMarker}, *multipartObjectInfoCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
|
77
fs.go
77
fs.go
@ -17,10 +17,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/minio/minio/pkg/probe"
|
"github.com/minio/minio/pkg/probe"
|
||||||
)
|
)
|
||||||
@ -47,92 +44,18 @@ type Filesystem struct {
|
|||||||
path string
|
path string
|
||||||
minFreeDisk int64
|
minFreeDisk int64
|
||||||
rwLock *sync.RWMutex
|
rwLock *sync.RWMutex
|
||||||
multiparts *multiparts
|
|
||||||
listObjectMap map[listObjectParams][]*treeWalker
|
listObjectMap map[listObjectParams][]*treeWalker
|
||||||
listObjectMapMutex *sync.Mutex
|
listObjectMapMutex *sync.Mutex
|
||||||
listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel
|
listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel
|
||||||
listMultipartObjectMapMutex *sync.Mutex
|
listMultipartObjectMapMutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultipartSession holds active session information
|
|
||||||
type multipartSession struct {
|
|
||||||
TotalParts int
|
|
||||||
ObjectName string
|
|
||||||
UploadID string
|
|
||||||
Initiated time.Time
|
|
||||||
Parts []partInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// multiparts collection of many parts
|
|
||||||
type multiparts struct {
|
|
||||||
Version string `json:"version"`
|
|
||||||
ActiveSession map[string]*multipartSession `json:"activeSessions"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) {
|
|
||||||
fs.listMultipartObjectMapMutex.Lock()
|
|
||||||
defer fs.listMultipartObjectMapMutex.Unlock()
|
|
||||||
|
|
||||||
channels := []multipartObjectInfoChannel{ch}
|
|
||||||
if _, ok := fs.listMultipartObjectMap[params]; ok {
|
|
||||||
channels = append(fs.listMultipartObjectMap[params], ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
fs.listMultipartObjectMap[params] = channels
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams) *multipartObjectInfoChannel {
|
|
||||||
fs.listMultipartObjectMapMutex.Lock()
|
|
||||||
defer fs.listMultipartObjectMapMutex.Unlock()
|
|
||||||
|
|
||||||
if channels, ok := fs.listMultipartObjectMap[params]; ok {
|
|
||||||
for i, channel := range channels {
|
|
||||||
if !channel.IsTimedOut() {
|
|
||||||
chs := channels[i+1:]
|
|
||||||
if len(chs) > 0 {
|
|
||||||
fs.listMultipartObjectMap[params] = chs
|
|
||||||
} else {
|
|
||||||
delete(fs.listMultipartObjectMap, params)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &channel
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// As all channels are timed out, delete the map entry
|
|
||||||
delete(fs.listMultipartObjectMap, params)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// newFS instantiate a new filesystem.
|
// newFS instantiate a new filesystem.
|
||||||
func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
||||||
setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json"))
|
|
||||||
|
|
||||||
var err *probe.Error
|
|
||||||
// load multiparts session from disk
|
|
||||||
var mparts *multiparts
|
|
||||||
mparts, err = loadMultipartsSession()
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err.ToGoError()) {
|
|
||||||
mparts = &multiparts{
|
|
||||||
Version: "1",
|
|
||||||
ActiveSession: make(map[string]*multipartSession),
|
|
||||||
}
|
|
||||||
if err = saveMultipartsSession(*mparts); err != nil {
|
|
||||||
return nil, err.Trace()
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return nil, err.Trace()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fs := &Filesystem{
|
fs := &Filesystem{
|
||||||
rwLock: &sync.RWMutex{},
|
rwLock: &sync.RWMutex{},
|
||||||
}
|
}
|
||||||
fs.path = rootPath
|
fs.path = rootPath
|
||||||
fs.multiparts = mparts
|
|
||||||
|
|
||||||
/// Defaults
|
/// Defaults
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) {
|
|||||||
}
|
}
|
||||||
objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
c.Assert(objInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
|
c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user