mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
Fix list objects test and remove all the old unnecessary files.
- Fix tests for new changes. - Change Golang err as 'e' for the time being, before we bring in probe removal change. - Remove old structs and temporary files.
This commit is contained in:
parent
083e4e9479
commit
9632c94e7a
@ -1,56 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2015, 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
"github.com/minio/minio/pkg/quick"
|
||||
)
|
||||
|
||||
var multipartsMetadataPath string
|
||||
|
||||
// SetFSMultipartsMetadataPath - set custom multiparts session metadata path.
|
||||
func setFSMultipartsMetadataPath(metadataPath string) {
|
||||
multipartsMetadataPath = metadataPath
|
||||
}
|
||||
|
||||
// saveMultipartsSession - save multiparts.
|
||||
func saveMultipartsSession(mparts multiparts) *probe.Error {
|
||||
qc, err := quick.New(mparts)
|
||||
if err != nil {
|
||||
return err.Trace()
|
||||
}
|
||||
if err := qc.Save(multipartsMetadataPath); err != nil {
|
||||
return err.Trace()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadMultipartsSession load multipart session file.
|
||||
func loadMultipartsSession() (*multiparts, *probe.Error) {
|
||||
mparts := &multiparts{}
|
||||
mparts.Version = "1"
|
||||
mparts.ActiveSession = make(map[string]*multipartSession)
|
||||
qc, err := quick.New(mparts)
|
||||
if err != nil {
|
||||
return nil, err.Trace()
|
||||
}
|
||||
if err := qc.Load(multipartsMetadataPath); err != nil {
|
||||
return nil, err.Trace()
|
||||
}
|
||||
return qc.Data().(*multiparts), nil
|
||||
}
|
@ -32,15 +32,13 @@ const (
|
||||
|
||||
// isDirExist - returns whether given directory is exist or not.
|
||||
func isDirExist(dirname string) (bool, error) {
|
||||
fi, err := os.Lstat(dirname)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
fi, e := os.Lstat(dirname)
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
return false, e
|
||||
}
|
||||
|
||||
return fi.IsDir(), nil
|
||||
}
|
||||
|
||||
@ -129,9 +127,8 @@ func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKe
|
||||
// Verify if prefix exists.
|
||||
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
||||
rootDir := filepath.Join(bucketDir, prefixDir)
|
||||
_, e := isDirExist(rootDir)
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
if status, e := isDirExist(rootDir); !status {
|
||||
if e == nil {
|
||||
// Prefix does not exist, not an error just respond empty
|
||||
// list response.
|
||||
return result, nil
|
||||
|
190
fs-multipart.go
190
fs-multipart.go
@ -39,19 +39,19 @@ const configDir = ".minio"
|
||||
const uploadIDSuffix = ".uploadid"
|
||||
|
||||
func removeFileTree(fileName string, level string) error {
|
||||
if err := os.Remove(fileName); err != nil {
|
||||
return err
|
||||
if e := os.Remove(fileName); e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
for fileDir := filepath.Dir(fileName); fileDir > level; fileDir = filepath.Dir(fileDir) {
|
||||
if status, err := isDirEmpty(fileDir); err != nil {
|
||||
return err
|
||||
if status, e := isDirEmpty(fileDir); e != nil {
|
||||
return e
|
||||
} else if !status {
|
||||
break
|
||||
}
|
||||
|
||||
if err := os.Remove(fileDir); err != nil {
|
||||
return err
|
||||
if e := os.Remove(fileDir); e != nil {
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,17 +59,17 @@ func removeFileTree(fileName string, level string) error {
|
||||
}
|
||||
|
||||
func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error {
|
||||
tempFile, err := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-")
|
||||
if err != nil {
|
||||
return err
|
||||
tempFile, e := ioutil.TempFile(filepath.Dir(fileName), filepath.Base(fileName)+"-")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
md5Hasher := md5.New()
|
||||
multiWriter := io.MultiWriter(md5Hasher, tempFile)
|
||||
if _, err := io.CopyN(multiWriter, data, size); err != nil {
|
||||
if _, e := io.CopyN(multiWriter, data, size); e != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempFile.Name())
|
||||
return err
|
||||
return e
|
||||
}
|
||||
tempFile.Close()
|
||||
|
||||
@ -79,22 +79,22 @@ func safeWrite(fileName string, data io.Reader, size int64, md5sum string) error
|
||||
return BadDigest{ExpectedMD5: md5sum, CalculatedMD5: dataMd5sum}
|
||||
}
|
||||
|
||||
if err := os.Rename(tempFile.Name(), fileName); err != nil {
|
||||
if e := os.Rename(tempFile.Name(), fileName); e != nil {
|
||||
os.Remove(tempFile.Name())
|
||||
return err
|
||||
return e
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isFileExist(filename string) (bool, error) {
|
||||
fi, err := os.Lstat(filename)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
fi, e := os.Lstat(filename)
|
||||
if e != nil {
|
||||
if os.IsNotExist(e) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, err
|
||||
return false, e
|
||||
}
|
||||
|
||||
return fi.Mode().IsRegular(), nil
|
||||
@ -120,30 +120,30 @@ func (fs Filesystem) newUploadID(bucket, object string) (string, error) {
|
||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||
|
||||
// create metaObjectDir if not exist
|
||||
if status, err := isDirExist(metaObjectDir); err != nil {
|
||||
return "", err
|
||||
if status, e := isDirExist(metaObjectDir); e != nil {
|
||||
return "", e
|
||||
} else if !status {
|
||||
if err := os.MkdirAll(metaObjectDir, 0755); err != nil {
|
||||
return "", err
|
||||
if e := os.MkdirAll(metaObjectDir, 0755); e != nil {
|
||||
return "", e
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
uuid, err := uuid.New()
|
||||
if err != nil {
|
||||
return "", err
|
||||
uuid, e := uuid.New()
|
||||
if e != nil {
|
||||
return "", e
|
||||
}
|
||||
|
||||
uploadID := uuid.String()
|
||||
uploadIDFile := filepath.Join(metaObjectDir, uploadID+uploadIDSuffix)
|
||||
if _, err := os.Lstat(uploadIDFile); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return "", err
|
||||
if _, e := os.Lstat(uploadIDFile); e != nil {
|
||||
if !os.IsNotExist(e) {
|
||||
return "", e
|
||||
}
|
||||
|
||||
// uploadIDFile doesn't exist, so create empty file to reserve the name
|
||||
if err := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); err != nil {
|
||||
return "", err
|
||||
if e := ioutil.WriteFile(uploadIDFile, []byte{}, 0644); e != nil {
|
||||
return "", e
|
||||
}
|
||||
|
||||
return uploadID, nil
|
||||
@ -161,32 +161,32 @@ func (fs Filesystem) cleanupUploadID(bucket, object, uploadID string) error {
|
||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||
uploadIDPrefix := uploadID + "."
|
||||
|
||||
names, err := filteredReaddirnames(metaObjectDir,
|
||||
names, e := filteredReaddirnames(metaObjectDir,
|
||||
func(name string) bool {
|
||||
return strings.HasPrefix(name, uploadIDPrefix)
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
for _, name := range names {
|
||||
if err := os.Remove(filepath.Join(metaObjectDir, name)); err != nil {
|
||||
if e := os.Remove(filepath.Join(metaObjectDir, name)); e != nil {
|
||||
//return InternalError{Err: err}
|
||||
return err
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
if status, err := isDirEmpty(metaObjectDir); err != nil {
|
||||
if status, e := isDirEmpty(metaObjectDir); e != nil {
|
||||
// TODO: add log than returning error
|
||||
//return InternalError{Err: err}
|
||||
return err
|
||||
return e
|
||||
} else if status {
|
||||
if err := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); err != nil {
|
||||
if e := removeFileTree(metaObjectDir, filepath.Join(fs.path, configDir, bucket)); e != nil {
|
||||
// TODO: add log than returning error
|
||||
//return InternalError{Err: err}
|
||||
return err
|
||||
return e
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,9 +199,9 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) {
|
||||
}
|
||||
|
||||
bucket = getActualBucketname(fs.path, bucket)
|
||||
if status, err := isDirExist(filepath.Join(fs.path, bucket)); err != nil {
|
||||
if status, e := isDirExist(filepath.Join(fs.path, bucket)); e != nil {
|
||||
//return "", InternalError{Err: err}
|
||||
return "", err
|
||||
return "", e
|
||||
} else if !status {
|
||||
return "", BucketNotFound{Bucket: bucket}
|
||||
}
|
||||
@ -210,13 +210,12 @@ func (fs Filesystem) checkBucketArg(bucket string) (string, error) {
|
||||
}
|
||||
|
||||
func (fs Filesystem) checkDiskFree() error {
|
||||
di, err := disk.GetInfo(fs.path)
|
||||
if err != nil {
|
||||
return err
|
||||
di, e := disk.GetInfo(fs.path)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
// Remove 5% from total space for cumulative disk space used for
|
||||
// journalling, inodes etc.
|
||||
// Remove 5% from total space for cumulative disk space used for journalling, inodes etc.
|
||||
availableDiskSpace := (float64(di.Free) / (float64(di.Total) - (0.05 * float64(di.Total)))) * 100
|
||||
if int64(availableDiskSpace) <= fs.minFreeDisk {
|
||||
return RootPathFull{Path: fs.path}
|
||||
@ -226,9 +225,9 @@ func (fs Filesystem) checkDiskFree() error {
|
||||
}
|
||||
|
||||
func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
|
||||
bucket, err := fs.checkBucketArg(bucket)
|
||||
if err != nil {
|
||||
return "", err
|
||||
bucket, e := fs.checkBucketArg(bucket)
|
||||
if e != nil {
|
||||
return "", e
|
||||
}
|
||||
|
||||
if !IsValidObjectName(object) {
|
||||
@ -240,19 +239,19 @@ func (fs Filesystem) checkMultipartArgs(bucket, object string) (string, error) {
|
||||
|
||||
// NewMultipartUpload - initiate a new multipart session
|
||||
func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.Error) {
|
||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
||||
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||
bucket = bucketDirName
|
||||
} else {
|
||||
return "", probe.NewError(err)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
if err := fs.checkDiskFree(); err != nil {
|
||||
return "", probe.NewError(err)
|
||||
if e := fs.checkDiskFree(); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
uploadID, err := fs.newUploadID(bucket, object)
|
||||
if err != nil {
|
||||
return "", probe.NewError(err)
|
||||
uploadID, e := fs.newUploadID(bucket, object)
|
||||
if e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
return uploadID, nil
|
||||
@ -260,15 +259,15 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
|
||||
|
||||
// PutObjectPart - create a part in a multipart session
|
||||
func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber int, size int64, data io.Reader, md5Hex string) (string, *probe.Error) {
|
||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
||||
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||
bucket = bucketDirName
|
||||
} else {
|
||||
return "", probe.NewError(err)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
||||
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
//return "", probe.NewError(InternalError{Err: err})
|
||||
return "", probe.NewError(err)
|
||||
return "", probe.NewError(e)
|
||||
} else if !status {
|
||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
@ -282,13 +281,13 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i
|
||||
return "", probe.NewError(errors.New("invalid part id, should be not more than 10000"))
|
||||
}
|
||||
|
||||
if err := fs.checkDiskFree(); err != nil {
|
||||
return "", probe.NewError(err)
|
||||
if e := fs.checkDiskFree(); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
partFile := filepath.Join(fs.path, configDir, bucket, object, uploadID+"."+strconv.Itoa(partNumber)+"."+md5Hex)
|
||||
if err := safeWrite(partFile, data, size, md5Hex); err != nil {
|
||||
return "", probe.NewError(err)
|
||||
if e := safeWrite(partFile, data, size, md5Hex); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
return md5Hex, nil
|
||||
@ -296,21 +295,21 @@ func (fs Filesystem) PutObjectPart(bucket, object, uploadID string, partNumber i
|
||||
|
||||
// AbortMultipartUpload - abort an incomplete multipart session
|
||||
func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
||||
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||
bucket = bucketDirName
|
||||
} else {
|
||||
return probe.NewError(err)
|
||||
return probe.NewError(e)
|
||||
}
|
||||
|
||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
||||
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
//return probe.NewError(InternalError{Err: err})
|
||||
return probe.NewError(err)
|
||||
return probe.NewError(e)
|
||||
} else if !status {
|
||||
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
|
||||
if err := fs.cleanupUploadID(bucket, object, uploadID); err != nil {
|
||||
return probe.NewError(err)
|
||||
if e := fs.cleanupUploadID(bucket, object, uploadID); e != nil {
|
||||
return probe.NewError(e)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -318,49 +317,48 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
|
||||
|
||||
// CompleteMultipartUpload - complete a multipart upload and persist the data
|
||||
func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
|
||||
if bucketDirName, err := fs.checkMultipartArgs(bucket, object); err == nil {
|
||||
if bucketDirName, e := fs.checkMultipartArgs(bucket, object); e == nil {
|
||||
bucket = bucketDirName
|
||||
} else {
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
if status, err := fs.isUploadIDExist(bucket, object, uploadID); err != nil {
|
||||
if status, e := fs.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
//return probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
} else if !status {
|
||||
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
|
||||
if err := fs.checkDiskFree(); err != nil {
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
if e := fs.checkDiskFree(); e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
metaObjectDir := filepath.Join(fs.path, configDir, bucket, object)
|
||||
|
||||
var md5sums []string
|
||||
var md5Sums []string
|
||||
for _, part := range parts {
|
||||
partNumber := part.PartNumber
|
||||
md5sum := strings.Trim(part.ETag, "\"")
|
||||
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
|
||||
if status, err := isFileExist(partFile); err != nil {
|
||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
} else if !status {
|
||||
return ObjectInfo{}, probe.NewError(InvalidPart{})
|
||||
}
|
||||
md5sums = append(md5sums, md5sum)
|
||||
md5Sums = append(md5Sums, md5sum)
|
||||
}
|
||||
|
||||
// Save the s3 md5.
|
||||
s3MD5, perr := makeS3MD5(md5sums...)
|
||||
if perr != nil {
|
||||
return ObjectInfo{}, perr
|
||||
s3MD5, err := makeS3MD5(md5Sums...)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, err.Trace(md5Sums...)
|
||||
}
|
||||
|
||||
tempFile, err := ioutil.TempFile(metaObjectDir, uploadID+".complete.")
|
||||
if err != nil {
|
||||
tempFile, e := ioutil.TempFile(metaObjectDir, uploadID+".complete.")
|
||||
if e != nil {
|
||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
for _, part := range parts {
|
||||
@ -368,38 +366,38 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, pa
|
||||
md5sum := strings.Trim(part.ETag, "\"")
|
||||
partFile := filepath.Join(metaObjectDir, uploadID+"."+strconv.Itoa(partNumber)+"."+md5sum)
|
||||
var f *os.File
|
||||
f, err = os.Open(partFile)
|
||||
if err != nil {
|
||||
f, e = os.Open(partFile)
|
||||
if e != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempFile.Name())
|
||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
} else if _, err = io.Copy(tempFile, f); err != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
} else if _, e = io.Copy(tempFile, f); e != nil {
|
||||
tempFile.Close()
|
||||
os.Remove(tempFile.Name())
|
||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
tempFile.Close()
|
||||
// fi is used later
|
||||
fi, err := os.Stat(tempFile.Name())
|
||||
if err != nil {
|
||||
fi, e := os.Stat(tempFile.Name())
|
||||
if e != nil {
|
||||
os.Remove(tempFile.Name())
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
bucketPath := filepath.Join(fs.path, bucket)
|
||||
objectPath := filepath.Join(bucketPath, object)
|
||||
if err = os.MkdirAll(filepath.Dir(objectPath), 0755); err != nil {
|
||||
if e = os.MkdirAll(filepath.Dir(objectPath), 0755); e != nil {
|
||||
os.Remove(tempFile.Name())
|
||||
//return ObjectInfo{}, probe.NewError(InternalError{Err: err})
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
if err = os.Rename(tempFile.Name(), objectPath); err != nil {
|
||||
if e = os.Rename(tempFile.Name(), objectPath); e != nil {
|
||||
os.Remove(tempFile.Name())
|
||||
return ObjectInfo{}, probe.NewError(err)
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
|
||||
fs.cleanupUploadID(bucket, object, uploadID) // TODO: handle and log the error
|
||||
|
40
fs.go
40
fs.go
@ -17,10 +17,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
)
|
||||
@ -47,28 +44,12 @@ type Filesystem struct {
|
||||
path string
|
||||
minFreeDisk int64
|
||||
rwLock *sync.RWMutex
|
||||
multiparts *multiparts
|
||||
listObjectMap map[listObjectParams][]*treeWalker
|
||||
listObjectMapMutex *sync.Mutex
|
||||
listMultipartObjectMap map[listMultipartObjectParams][]multipartObjectInfoChannel
|
||||
listMultipartObjectMapMutex *sync.Mutex
|
||||
}
|
||||
|
||||
// MultipartSession holds active session information
|
||||
type multipartSession struct {
|
||||
TotalParts int
|
||||
ObjectName string
|
||||
UploadID string
|
||||
Initiated time.Time
|
||||
Parts []partInfo
|
||||
}
|
||||
|
||||
// multiparts collection of many parts
|
||||
type multiparts struct {
|
||||
Version string `json:"version"`
|
||||
ActiveSession map[string]*multipartSession `json:"activeSessions"`
|
||||
}
|
||||
|
||||
func (fs *Filesystem) pushListMultipartObjectCh(params listMultipartObjectParams, ch multipartObjectInfoChannel) {
|
||||
fs.listMultipartObjectMapMutex.Lock()
|
||||
defer fs.listMultipartObjectMapMutex.Unlock()
|
||||
@ -108,31 +89,10 @@ func (fs *Filesystem) popListMultipartObjectCh(params listMultipartObjectParams)
|
||||
|
||||
// newFS instantiate a new filesystem.
|
||||
func newFS(rootPath string) (ObjectAPI, *probe.Error) {
|
||||
setFSMultipartsMetadataPath(filepath.Join(rootPath, "$multiparts-session.json"))
|
||||
|
||||
var err *probe.Error
|
||||
// load multiparts session from disk
|
||||
var mparts *multiparts
|
||||
mparts, err = loadMultipartsSession()
|
||||
if err != nil {
|
||||
if os.IsNotExist(err.ToGoError()) {
|
||||
mparts = &multiparts{
|
||||
Version: "1",
|
||||
ActiveSession: make(map[string]*multipartSession),
|
||||
}
|
||||
if err = saveMultipartsSession(*mparts); err != nil {
|
||||
return nil, err.Trace()
|
||||
}
|
||||
} else {
|
||||
return nil, err.Trace()
|
||||
}
|
||||
}
|
||||
|
||||
fs := &Filesystem{
|
||||
rwLock: &sync.RWMutex{},
|
||||
}
|
||||
fs.path = rootPath
|
||||
fs.multiparts = mparts
|
||||
|
||||
/// Defaults
|
||||
|
||||
|
@ -79,7 +79,7 @@ func testMultipartObjectCreation(c *check.C, create func() ObjectAPI) {
|
||||
}
|
||||
objInfo, err := fs.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(objInfo.MD5Sum, check.Equals, "9b7d6f13ba00e24d0b02de92e814891b-10")
|
||||
c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
||||
}
|
||||
|
||||
func testMultipartObjectAbort(c *check.C, create func() ObjectAPI) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user