fs: Re-implement object layer to remember the fd (#3509)

This patch re-writes FS backend to support shared backend sharing locks for safe concurrent access across multiple servers.
This commit is contained in:
Harshavardhana
2017-01-16 17:05:00 -08:00
committed by GitHub
parent a054c73e22
commit 1c699d8d3f
68 changed files with 3860 additions and 1580 deletions

View File

@@ -22,13 +22,68 @@ import (
"fmt"
"hash"
"io"
"path"
"os"
pathutil "path"
"strings"
"time"
"github.com/minio/sha256-simd"
)
// listMultipartUploadIDs - list all the upload ids from a marker up to 'count'.
func (fs fsObjects) listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count int) ([]uploadMetadata, bool, error) {
var uploads []uploadMetadata
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucketName, objectName))
objectMPartPathLock.RLock()
defer objectMPartPathLock.RUnlock()
uploadsPath := pathJoin(bucketName, objectName, uploadsJSONFile)
rlk, err := fs.rwPool.Open(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return nil, true, nil
}
return nil, false, traceError(err)
}
defer fs.rwPool.Close(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
// Read `uploads.json`.
uploadIDs := uploadsV1{}
if _, err = uploadIDs.ReadFrom(io.NewSectionReader(rlk, 0, rlk.Size())); err != nil {
return nil, false, err
}
index := 0
if uploadIDMarker != "" {
for ; index < len(uploadIDs.Uploads); index++ {
if uploadIDs.Uploads[index].UploadID == uploadIDMarker {
// Skip the uploadID as it would already be listed in previous listing.
index++
break
}
}
}
for index < len(uploadIDs.Uploads) {
uploads = append(uploads, uploadMetadata{
Object: objectName,
UploadID: uploadIDs.Uploads[index].UploadID,
Initiated: uploadIDs.Uploads[index].Initiated,
})
count--
index++
if count == 0 {
break
}
}
end := (index == len(uploadIDs.Uploads))
return uploads, end, nil
}
// listMultipartUploads - lists all multipart uploads.
func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{}
@@ -54,31 +109,40 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if keyMarker != "" {
multipartMarkerPath = pathJoin(bucket, keyMarker)
}
var uploads []uploadMetadata
var err error
var eof bool
if uploadIDMarker != "" {
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
keyMarkerLock.RLock()
uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, fs.storage)
keyMarkerLock.RUnlock()
uploads, _, err = fs.listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads)
if err != nil {
return ListMultipartsInfo{}, err
}
maxUploads = maxUploads - len(uploads)
}
var walkResultCh chan treeWalkResult
var endWalkCh chan struct{}
heal := false // true only for xl.ListObjectsHeal()
// true only for xl.ListObjectsHeal(), set to false.
heal := false
// Proceed to list only if we have more uploads to be listed.
if maxUploads > 0 {
walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal})
listPrms := listParams{minioMetaMultipartBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}
// Pop out any previously waiting marker.
walkResultCh, endWalkCh = fs.listPool.Release(listPrms)
if walkResultCh == nil {
endWalkCh = make(chan struct{})
isLeaf := fs.isMultipartUpload
listDir := listDirFactory(isLeaf, fsTreeWalkIgnoredErrs, fs.storage)
walkResultCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath, multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh)
listDir := fs.listDirFactory(isLeaf)
walkResultCh = startTreeWalk(minioMetaMultipartBucket, multipartPrefixPath,
multipartMarkerPath, recursive, listDir, isLeaf, endWalkCh)
}
// List until maxUploads requested.
for maxUploads > 0 {
walkResult, ok := <-walkResultCh
if !ok {
@@ -86,6 +150,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
// File not found or Disk not found is a valid case.
@@ -95,6 +160,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
return ListMultipartsInfo{}, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry, retainSlash(bucket))
if strings.HasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, uploadMetadata{
@@ -109,18 +175,16 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
continue
}
var tmpUploads []uploadMetadata
var end bool
uploadIDMarker = ""
entryLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, entry))
entryLock.RLock()
tmpUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, fs.storage)
entryLock.RUnlock()
tmpUploads, end, err = fs.listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads)
if err != nil {
return ListMultipartsInfo{}, err
}
uploads = append(uploads, tmpUploads...)
maxUploads -= len(tmpUploads)
if walkResult.end && end {
@@ -129,6 +193,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
}
}
}
// Loop through all the received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
@@ -158,6 +223,8 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
// Success.
return result, nil
}
@@ -174,6 +241,11 @@ func (fs fsObjects) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMark
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, fs); err != nil {
return ListMultipartsInfo{}, err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return ListMultipartsInfo{}, toObjectErr(err, bucket)
}
return fs.listMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
}
@@ -191,23 +263,35 @@ func (fs fsObjects) newMultipartUpload(bucket string, object string, meta map[st
// Save additional metadata.
fsMeta.Meta = meta
// This lock needs to be held for any changes to the directory
// contents of ".minio.sys/multipart/object/"
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
uploadID = mustGetUUID()
initiated := time.Now().UTC()
// Add upload ID to uploads.json
if err = fs.addUploadID(bucket, object, uploadID, initiated); err != nil {
return "", err
uploadsPath := pathJoin(bucket, object, uploadsJSONFile)
rwlk, err := fs.rwPool.Create(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
}
uploadIDPath := path.Join(bucket, object, uploadID)
if err = writeFSMetadata(fs.storage, minioMetaMultipartBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
defer rwlk.Close()
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Create(fsMetaPath)
if err != nil {
return "", toObjectErr(traceError(err), bucket, object)
}
defer metaFile.Close()
// Add a new upload id.
if err = fs.addUploadID(bucket, object, uploadID, initiated, rwlk); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Write all the set metadata.
if _, err = fsMeta.WriteTo(metaFile); err != nil {
return "", toObjectErr(err, bucket, object)
}
// Return success.
return uploadID, nil
}
@@ -221,6 +305,17 @@ func (fs fsObjects) NewMultipartUpload(bucket, object string, meta map[string]st
if err := checkNewMultipartArgs(bucket, object, fs); err != nil {
return "", err
}
if _, err := fs.statBucketDir(bucket); err != nil {
return "", toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
return fs.newMultipartUpload(bucket, object, meta)
}
@@ -229,12 +324,14 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo,
if len(fsMeta.Parts) == 0 {
return
}
// As fsAppendMeta.Parts will be sorted len(fsAppendMeta.Parts) will naturally be the next part number
nextPartNum := len(fsAppendMeta.Parts) + 1
nextPartIndex := fsMeta.ObjectPartIndex(nextPartNum)
if nextPartIndex == -1 {
return
}
return fsMeta.Parts[nextPartIndex], true
}
@@ -247,15 +344,43 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", err
}
uploadIDPath := path.Join(bucket, object, uploadID)
if _, err := fs.statBucketDir(bucket); err != nil {
return "", toObjectErr(err, bucket)
}
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
// Disallow any parallel abort or complete multipart operations.
uploadsPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile)
if _, err := fs.rwPool.Open(uploadsPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(uploadsPath)
uploadIDPath := pathJoin(bucket, object, uploadID)
preUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
preUploadIDLock.RLock()
// Just check if the uploadID exists to avoid copy if it doesn't.
uploadIDExists := fs.isUploadIDExists(bucket, object, uploadID)
preUploadIDLock.RUnlock()
if !uploadIDExists {
return "", traceError(InvalidUploadID{UploadID: uploadID})
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
rwlk, err := fs.rwPool.Write(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom(io.NewSectionReader(rwlk, 0, rwlk.Size()))
if err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
}
partSuffix := fmt.Sprintf("object%d", partID)
@@ -263,7 +388,6 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
// Initialize md5 writer.
md5Writer := md5.New()
hashWriters := []io.Writer{md5Writer}
var sha256Writer hash.Hash
@@ -272,6 +396,7 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
hashWriters = append(hashWriters, sha256Writer)
}
multiWriter := io.MultiWriter(hashWriters...)
// Limit the reader to its provided size if specified.
var limitDataReader io.Reader
if size > 0 {
@@ -289,31 +414,24 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
buf := make([]byte, int(bufSize))
if size > 0 {
// Prepare file to avoid disk fragmentation
err := fs.storage.PrepareFile(minioMetaTmpBucket, tmpPartPath, size)
if err != nil {
return "", toObjectErr(err, minioMetaTmpBucket, tmpPartPath)
}
}
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaTmpBucket, tmpPartPath)
fsPartPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tmpPartPath)
bytesWritten, cErr := fsCreateFile(fsPartPath, teeReader, buf, size)
if cErr != nil {
fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
fsRemoveFile(fsPartPath)
return "", toObjectErr(cErr, minioMetaTmpBucket, tmpPartPath)
}
// Should return IncompleteBody{} error when reader has fewer
// bytes than specified in request header.
if bytesWritten < size {
fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
fsRemoveFile(fsPartPath)
return "", traceError(IncompleteBody{})
}
// Delete temporary part in case of failure. If
// PutObjectPart succeeds then there would be nothing to
// delete.
defer fs.storage.DeleteFile(minioMetaTmpBucket, tmpPartPath)
defer fsRemoveFile(fsPartPath)
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
if md5Hex != "" {
@@ -329,45 +447,32 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
}
}
// Hold write lock as we are updating fs.json
postUploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
postUploadIDLock.Lock()
defer postUploadIDLock.Unlock()
// Just check if the uploadID exists to avoid copy if it doesn't.
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaMultipartBucket, fsMetaPath)
if err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
}
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
partPath := path.Join(bucket, object, uploadID, partSuffix)
partPath := pathJoin(bucket, object, uploadID, partSuffix)
// Lock the part so that another part upload with same part-number gets blocked
// while the part is getting appended in the background.
partLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, partPath)
partLock.Lock()
err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath)
if err != nil {
fsNSPartPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, partPath)
if err = fsRenameFile(fsPartPath, fsNSPartPath); err != nil {
partLock.Unlock()
return "", toObjectErr(traceError(err), minioMetaMultipartBucket, partPath)
return "", toObjectErr(err, minioMetaMultipartBucket, partPath)
}
uploadIDPath = path.Join(bucket, object, uploadID)
if err = writeFSMetadata(fs.storage, minioMetaMultipartBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil {
// Save the object part info in `fs.json`.
fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size)
if _, err = fsMeta.WriteTo(rwlk); err != nil {
partLock.Unlock()
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
// Append the part in background.
errCh := fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta)
errCh := fs.append(bucket, object, uploadID, fsMeta)
go func() {
// Also receive the error so that the appendParts go-routine does not block on send.
// But the error received is ignored as fs.PutObjectPart() would have already
// returned success to the client.
// Also receive the error so that the appendParts go-routine
// does not block on send. But the error received is ignored
// as fs.PutObjectPart() would have already returned success
// to the client.
<-errCh
partLock.Unlock()
}()
@@ -381,36 +486,51 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberMarker, maxParts int) (ListPartsInfo, error) {
result := ListPartsInfo{}
fsMetaPath := path.Join(bucket, object, uploadID, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaMultipartBucket, fsMetaPath)
uploadIDPath := pathJoin(bucket, object, uploadID)
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
metaFile, err := fs.rwPool.Open(fsMetaPath)
if err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
// On windows oddly this is returned.
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
}
return ListPartsInfo{}, toObjectErr(traceError(err), bucket, object)
}
defer fs.rwPool.Close(fsMetaPath)
fsMeta := fsMetaV1{}
_, err = fsMeta.ReadFrom((io.NewSectionReader(metaFile, 0, metaFile.Size())))
if err != nil {
return ListPartsInfo{}, toObjectErr(err, minioMetaBucket, fsMetaPath)
}
// Only parts with higher part numbers will be listed.
partIdx := fsMeta.ObjectPartIndex(partNumberMarker)
parts := fsMeta.Parts
if partIdx != -1 {
parts = fsMeta.Parts[partIdx+1:]
}
count := maxParts
for _, part := range parts {
var fi FileInfo
partNamePath := path.Join(bucket, object, uploadID, part.Name)
fi, err = fs.storage.StatFile(minioMetaMultipartBucket, partNamePath)
var fi os.FileInfo
partNamePath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, part.Name)
fi, err = fsStatFile(partNamePath)
if err != nil {
return ListPartsInfo{}, toObjectErr(traceError(err), minioMetaMultipartBucket, partNamePath)
}
result.Parts = append(result.Parts, partInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime,
Size: fi.Size,
LastModified: fi.ModTime(),
Size: fi.Size(),
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(parts) > len(result.Parts) {
result.IsTruncated = true
@@ -423,6 +543,8 @@ func (fs fsObjects) listObjectParts(bucket, object, uploadID string, partNumberM
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
// Success.
return result, nil
}
@@ -438,29 +560,23 @@ func (fs fsObjects) ListObjectParts(bucket, object, uploadID string, partNumberM
return ListPartsInfo{}, err
}
// Hold lock so that there is no competing
// abort-multipart-upload or complete-multipart-upload.
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return ListPartsInfo{}, traceError(InvalidUploadID{UploadID: uploadID})
if _, err := fs.statBucketDir(bucket); err != nil {
return ListPartsInfo{}, toObjectErr(err, bucket)
}
return fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
}
func (fs fsObjects) totalObjectSize(fsMeta fsMetaV1, parts []completePart) (int64, error) {
objSize := int64(0)
for _, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
return 0, InvalidPart{}
}
objSize += fsMeta.Parts[partIdx].Size
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.RLock()
defer objectMPartPathLock.RUnlock()
listPartsInfo, err := fs.listObjectParts(bucket, object, uploadID, partNumberMarker, maxParts)
if err != nil {
return ListPartsInfo{}, toObjectErr(err, bucket, object)
}
return objSize, nil
// Success.
return listPartsInfo, nil
}
// CompleteMultipartUpload - completes an ongoing multipart
@@ -474,18 +590,8 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", err
}
uploadIDPath := path.Join(bucket, object, uploadID)
// Hold lock so that
// 1) no one aborts this multipart upload
// 2) no one does a parallel complete-multipart-upload on this
// multipart upload
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, uploadIDPath)
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return "", traceError(InvalidUploadID{UploadID: uploadID})
if _, err := fs.statBucketDir(bucket); err != nil {
return "", toObjectErr(err, bucket)
}
// Calculate s3 compatible md5sum for complete multipart.
@@ -494,22 +600,65 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
return "", err
}
// Read saved fs metadata for ongoing multipart.
fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile)
fsMeta, err := readFSMetadata(fs.storage, minioMetaMultipartBucket, fsMetaPath)
uploadIDPath := pathJoin(bucket, object, uploadID)
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
fsMetaPathMultipart := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
rlk, err := fs.rwPool.Open(fsMetaPathMultipart)
if err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPath)
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
}
// Disallow any parallel abort or complete multipart operations.
rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadsJSONFile))
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
if err == errFileNotFound || err == errFileAccessDenied {
return "", traceError(InvalidUploadID{UploadID: uploadID})
}
return "", toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
fsMeta := fsMetaV1{}
// Read saved fs metadata for ongoing multipart.
_, err = fsMeta.ReadFrom(io.NewSectionReader(rlk, 0, rlk.Size()))
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(err, minioMetaMultipartBucket, fsMetaPathMultipart)
}
// Wait for any competing PutObject() operation on bucket/object, since same namespace
// would be acquired for `fs.json`.
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fsMetaJSONFile)
metaFile, err := fs.rwPool.Create(fsMetaPath)
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(traceError(err), bucket, object)
}
defer metaFile.Close()
// This lock is held during rename of the appended tmp file to the actual
// location so that any competing GetObject/PutObject/DeleteObject do not race.
appendFallback := true // In case background-append did not append the required parts.
if isPartsSame(fsMeta.Parts, parts) {
err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta)
err = fs.complete(bucket, object, uploadID, fsMeta)
if err == nil {
appendFallback = false
if err = fs.storage.RenameFile(minioMetaTmpBucket, uploadID, bucket, object); err != nil {
return "", toObjectErr(traceError(err), minioMetaTmpBucket, uploadID)
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID)
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(err, minioMetaTmpBucket, uploadID)
}
}
}
@@ -518,77 +667,79 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
// background append could not do append all the required parts, hence we do it here.
tempObj := uploadID + "-" + "part.1"
fsTmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, tempObj)
// Delete the temporary object in the case of a
// failure. If PutObject succeeds, then there would be
// nothing to delete.
defer fsRemoveFile(fsTmpObjPath)
// Allocate staging buffer.
var buf = make([]byte, readSizeV1)
var objSize int64
objSize, err = fs.totalObjectSize(fsMeta, parts)
if err != nil {
return "", traceError(err)
}
if objSize > 0 {
// Prepare file to avoid disk fragmentation
err = fs.storage.PrepareFile(minioMetaTmpBucket, tempObj, objSize)
if err != nil {
return "", traceError(err)
}
}
// Loop through all parts, validate them and then commit to disk.
// Validate all parts and then commit to disk.
for i, part := range parts {
partIdx := fsMeta.ObjectPartIndex(part.PartNumber)
if partIdx == -1 {
fs.rwPool.Close(fsMetaPathMultipart)
return "", traceError(InvalidPart{})
}
if fsMeta.Parts[partIdx].ETag != part.ETag {
fs.rwPool.Close(fsMetaPathMultipart)
return "", traceError(BadDigest{})
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(fsMeta.Parts[partIdx].Size) {
fs.rwPool.Close(fsMetaPathMultipart)
return "", traceError(PartTooSmall{
PartNumber: part.PartNumber,
PartSize: fsMeta.Parts[partIdx].Size,
PartETag: part.ETag,
})
}
// Construct part suffix.
partSuffix := fmt.Sprintf("object%d", part.PartNumber)
multipartPartFile := path.Join(bucket, object, uploadID, partSuffix)
multipartPartFile := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, partSuffix)
var reader io.ReadCloser
offset := int64(0)
totalLeft := fsMeta.Parts[partIdx].Size
for totalLeft > 0 {
curLeft := int64(readSizeV1)
if totalLeft < readSizeV1 {
curLeft = totalLeft
reader, _, err = fsOpenFile(multipartPartFile, offset)
if err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
if err == errFileNotFound {
return "", traceError(InvalidPart{})
}
var n int64
n, err = fs.storage.ReadFile(minioMetaMultipartBucket, multipartPartFile, offset, buf[:curLeft])
if n > 0 {
if err = fs.storage.AppendFile(minioMetaTmpBucket, tempObj, buf[:n]); err != nil {
return "", toObjectErr(traceError(err), minioMetaTmpBucket, tempObj)
}
}
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
}
if err == errFileNotFound {
return "", traceError(InvalidPart{})
}
return "", toObjectErr(traceError(err), minioMetaMultipartBucket, multipartPartFile)
}
offset += n
totalLeft -= n
return "", toObjectErr(traceError(err), minioMetaMultipartBucket, partSuffix)
}
// No need to hold a lock, this is a unique file and will be only written
// to one one process per uploadID per minio process.
var wfile *os.File
wfile, err = os.OpenFile(preparePath(fsTmpObjPath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
reader.Close()
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(traceError(err), bucket, object)
}
_, err = io.CopyBuffer(wfile, reader, buf)
if err != nil {
wfile.Close()
reader.Close()
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(traceError(err), bucket, object)
}
wfile.Close()
reader.Close()
}
// Rename the file back to original location, if not delete the temporary object.
err = fs.storage.RenameFile(minioMetaTmpBucket, tempObj, bucket, object)
if err != nil {
if dErr := fs.storage.DeleteFile(minioMetaTmpBucket, tempObj); dErr != nil {
return "", toObjectErr(traceError(dErr), minioMetaTmpBucket, tempObj)
}
return "", toObjectErr(traceError(err), bucket, object)
fsNSObjPath := pathJoin(fs.fsPath, bucket, object)
if err = fsRenameFile(fsTmpObjPath, fsNSObjPath); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(err, minioMetaTmpBucket, uploadID)
}
}
@@ -601,54 +752,33 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload
}
fsMeta.Meta["md5Sum"] = s3MD5
fsMetaPath = path.Join(bucketMetaPrefix, bucket, object, fsMetaJSONFile)
// Write the metadata to a temp file and rename it to the actual location.
if err = writeFSMetadata(fs.storage, minioMetaBucket, fsMetaPath, fsMeta); err != nil {
// Write all the set metadata.
if _, err = fsMeta.WriteTo(metaFile); err != nil {
fs.rwPool.Close(fsMetaPathMultipart)
return "", toObjectErr(err, bucket, object)
}
// Close lock held on bucket/object/uploadid/fs.json,
// this needs to be done for windows so that we can happily
// delete the bucket/object/uploadid
fs.rwPool.Close(fsMetaPathMultipart)
// Cleanup all the parts if everything else has been safely committed.
if err = cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return "", toObjectErr(err, bucket, object)
multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object)
multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID)
if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil {
return "", toObjectErr(traceError(err), bucket, object)
}
// Hold the lock so that two parallel
// complete-multipart-uploads do not leave a stale
// uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
// remove entry from uploads.json
if err := fs.removeUploadID(bucket, object, uploadID); err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, path.Join(bucket, object))
// Remove entry from `uploads.json`.
if err = fs.removeUploadID(bucket, object, uploadID, rwlk); err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, pathutil.Join(bucket, object))
}
// Return md5sum.
return s3MD5, nil
}
// abortMultipartUpload - wrapper for purging an ongoing multipart
// transaction, deletes uploadID entry from `uploads.json` and purges
// the directory at '.minio.sys/multipart/bucket/object/uploadID' holding
// all the upload parts.
func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error {
// Signal appendParts routine to stop waiting for new parts to arrive.
fs.bgAppend.abort(uploadID)
// Cleanup all uploaded parts.
if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil {
return err
}
// remove entry from uploads.json with quorum
if err := fs.removeUploadID(bucket, object, uploadID); err != nil {
return toObjectErr(err, bucket, object)
}
// success
return nil
}
// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
@@ -666,17 +796,57 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error
return err
}
// Hold lock so that there is no competing
// complete-multipart-upload or put-object-part.
uploadIDLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object, uploadID))
uploadIDLock.Lock()
defer uploadIDLock.Unlock()
if !fs.isUploadIDExists(bucket, object, uploadID) {
return traceError(InvalidUploadID{UploadID: uploadID})
if _, err := fs.statBucketDir(bucket); err != nil {
return toObjectErr(err, bucket)
}
err := fs.abortMultipartUpload(bucket, object, uploadID)
return err
uploadIDPath := pathJoin(bucket, object, uploadID)
// Hold the lock so that two parallel complete-multipart-uploads
// do not leave a stale uploads.json behind.
objectMPartPathLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, object))
objectMPartPathLock.Lock()
defer objectMPartPathLock.Unlock()
fsMetaPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadIDPath, fsMetaJSONFile)
if _, err := fs.rwPool.Open(fsMetaPath); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return traceError(InvalidUploadID{UploadID: uploadID})
}
return toObjectErr(traceError(err), bucket, object)
}
uploadsPath := pathJoin(bucket, object, uploadsJSONFile)
rwlk, err := fs.rwPool.Write(pathJoin(fs.fsPath, minioMetaMultipartBucket, uploadsPath))
if err != nil {
fs.rwPool.Close(fsMetaPath)
if err == errFileNotFound || err == errFileAccessDenied {
return traceError(InvalidUploadID{UploadID: uploadID})
}
return toObjectErr(traceError(err), bucket, object)
}
defer rwlk.Close()
// Signal appendParts routine to stop waiting for new parts to arrive.
fs.abort(uploadID)
// Close lock held on bucket/object/uploadid/fs.json,
// this needs to be done for windows so that we can happily
// delete the bucket/object/uploadid
fs.rwPool.Close(fsMetaPath)
// Cleanup all uploaded parts and abort the upload.
multipartObjectDir := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object)
multipartUploadIDDir := pathJoin(multipartObjectDir, uploadID)
if err = fsRemoveUploadIDPath(multipartObjectDir, multipartUploadIDDir); err != nil {
return toObjectErr(traceError(err), bucket, object)
}
// Remove entry from `uploads.json`.
if err = fs.removeUploadID(bucket, object, uploadID, rwlk); err != nil {
return toObjectErr(err, bucket, object)
}
return nil
}