From afa4c7c3efef201d66d2e708fb7392813b5efa62 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Mon, 21 Nov 2016 13:12:53 +0530 Subject: [PATCH] fs/multipart: Append multipart parts in a proper Go routine in background. (#3282) --- cmd/fs-v1-background-append.go | 210 +++++++++++++++++++++++++++++++++ cmd/fs-v1-multipart.go | 148 +++-------------------- cmd/fs-v1.go | 6 + cmd/posix.go | 7 +- 4 files changed, 240 insertions(+), 131 deletions(-) create mode 100644 cmd/fs-v1-background-append.go diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go new file mode 100644 index 000000000..8b103541c --- /dev/null +++ b/cmd/fs-v1-background-append.go @@ -0,0 +1,210 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "errors" + "reflect" + "sync" + "time" +) + +// Error sent by appendParts go-routine when there are holes in parts. +// For ex. let's say client uploads part-2 before part-1 in which case we +// can not append and have to wait till part-1 is uploaded. Hence we return +// this error. Currently this error is not used in the caller. +var errPartsMissing = errors.New("required parts missing") + +// Error sent when appendParts go-routine has waited long enough and timedout. +var errAppendPartsTimeout = errors.New("appendParts goroutine timeout") + +// Timeout value for the appendParts go-routine. +var appendPartsTimeout = 24 * 60 * 60 * time.Second + +// Holds a map of uploadID->appendParts go-routine +type backgroundAppend struct { + infoMap map[string]bgAppendPartsInfo + sync.Mutex +} + +// Input to the appendParts go-routine +type bgAppendPartsInput struct { + meta fsMetaV1 // list of parts that need to be appended + errCh chan error // error sent by appendParts go-routine +} + +// Identifies an appendParts go-routine. +type bgAppendPartsInfo struct { + inputCh chan bgAppendPartsInput + timeoutCh chan struct{} // closed by appendParts go-routine when it timesout + endCh chan struct{} // closed after complete/abort of upload to end the appendParts go-routine +} + +// Called after a part is uploaded so that it can be appended in the background. +func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) { + b.Lock() + info, ok := b.infoMap[uploadID] + if !ok { + // Corresponding appendParts go-routine was not found, create a new one. Would happen when the first + // part of a multipart upload is uploaded. + inputCh := make(chan bgAppendPartsInput) + timeoutCh := make(chan struct{}) + endCh := make(chan struct{}) + + info = bgAppendPartsInfo{inputCh, timeoutCh, endCh} + b.infoMap[uploadID] = info + + go b.appendParts(disk, bucket, object, uploadID, info) + } + b.Unlock() + go func() { + errCh := make(chan error) + // send input in a goroutine as send on the inputCh can block if appendParts go-routine + // is busy appending a part. + select { + case <-info.timeoutCh: + // This is to handle a rare race condition where we found info in b.infoMap + // but soon after that appendParts go-routine timed out. + case info.inputCh <- bgAppendPartsInput{meta, errCh}: + // Receive the error so that the appendParts go-routine does not block on send. + // But the error received is ignored as fs.PutObjectPart() would have already + // returned success to the client. + <-errCh + } + }() +} + +// Called on complete-multipart-upload. Returns nil if the required parts have been appended. +func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error { + b.Lock() + info, ok := b.infoMap[uploadID] + b.Unlock() + if !ok { + return errPartsMissing + } + errCh := make(chan error) + select { + case <-info.timeoutCh: + // This is to handle a rare race condition where we found info in b.infoMap + // but soon after that appendParts go-routine timedouted out. + return errAppendPartsTimeout + case info.inputCh <- bgAppendPartsInput{meta, errCh}: + } + err := <-errCh + + b.remove(uploadID) + + return err +} + +// Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling. +func (b *backgroundAppend) remove(uploadID string) { + b.Lock() + defer b.Unlock() + info, ok := b.infoMap[uploadID] + if !ok { + return + } + delete(b.infoMap, uploadID) + close(info.endCh) +} + +// This is run as a go-routine that appends the parts in the background. +func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID string, info bgAppendPartsInfo) { + // Holds the list of parts that is already appended to the "append" file. + appendMeta := fsMetaV1{} + for { + select { + case input := <-info.inputCh: + // We receive on this channel when new part gets uploaded or when complete-multipart sends + // a value on this channel to confirm if all the required parts are appended. + meta := input.meta + for { + // Append should be done such a way that if part-3 and part-2 is uploaded before part-1, we + // wait till part-1 is uploaded after which we append part-2 and part-3 as well in this for-loop. + part, appendNeeded := partToAppend(meta, appendMeta) + if !appendNeeded { + if reflect.DeepEqual(meta.Parts, appendMeta.Parts) { + // Sending nil is useful so that the complete-multipart-upload knows that + // all the required parts have been appended. + input.errCh <- nil + } else { + // Sending error is useful so that complete-multipart-upload can fall-back to + // its own append process. + input.errCh <- errPartsMissing + } + break + } + if err := appendPart(disk, bucket, object, uploadID, part); err != nil { + input.errCh <- err + break + } + appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) + } + case <-info.endCh: + // Either complete-multipart-upload or abort-multipart-upload closed endCh to end the appendParts go-routine. + appendFilePath := getFSAppendDataPath(uploadID) + disk.DeleteFile(bucket, appendFilePath) + return + case <-time.After(appendPartsTimeout): + // Timeout the goroutine to garbage collect its resources. This would happen if the client initiates + // a multipart upload and does not complete/abort it. + b.Lock() + delete(b.infoMap, uploadID) + b.Unlock() + // Delete the temporary append file as well. + appendFilePath := getFSAppendDataPath(uploadID) + disk.DeleteFile(bucket, appendFilePath) + + close(info.timeoutCh) + } + } +} + +// Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location +// upon complete-multipart-upload. +func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { + partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, part.Name) + appendFilePath := getFSAppendDataPath(uploadID) + + offset := int64(0) + totalLeft := part.Size + buf := make([]byte, readSizeV1) + for totalLeft > 0 { + curLeft := int64(readSizeV1) + if totalLeft < readSizeV1 { + curLeft = totalLeft + } + var n int64 + n, err := disk.ReadFile(minioMetaBucket, partPath, offset, buf[:curLeft]) + if err != nil { + // Check for EOF/ErrUnexpectedEOF not needed as it should never happen as we know + // the exact size of the file and hence know the size of buf[] + // EOF/ErrUnexpectedEOF indicates that the length of file was shorter than part.Size and + // hence considered as an error condition. + disk.DeleteFile(bucket, appendFilePath) + return err + } + if err = disk.AppendFile(minioMetaBucket, appendFilePath, buf[:n]); err != nil { + disk.DeleteFile(bucket, appendFilePath) + return err + } + offset += n + totalLeft -= n + } + return nil +} diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 7751271e6..176bb867d 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -24,7 +24,6 @@ import ( "hash" "io" "path" - "strconv" "strings" "time" @@ -286,108 +285,9 @@ func partToAppend(fsMeta fsMetaV1, fsAppendMeta fsMetaV1) (part objectPartInfo, return fsMeta.Parts[nextPartIndex], true } -// Returns metadata path for the file holding info about the parts that -// have been appended to the "append-file" -func getFSAppendMetaPath(uploadID string) string { - return uploadID + ".json" -} - // Returns path for the append-file. func getFSAppendDataPath(uploadID string) string { - return uploadID + ".data" -} - -// Append parts to fsAppendDataFile. -func appendParts(disk StorageAPI, bucket, object, uploadID string) { - cleanupAppendPaths := func() { - // In case of any error, cleanup the append data and json files - // from the tmp so that we do not have any inconsistent append - // data/json files. - disk.DeleteFile(minioMetaTmpBucket, getFSAppendDataPath(uploadID)) - disk.DeleteFile(minioMetaTmpBucket, getFSAppendMetaPath(uploadID)) - } - uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) - // fs-append.json path - fsAppendMetaPath := getFSAppendMetaPath(uploadID) - // fs.json path - fsMetaPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, fsMetaJSONFile) - - // Lock the uploadID so that no one modifies fs.json - uploadIDLock := nsMutex.NewNSLock(minioMetaBucket, uploadIDPath) - uploadIDLock.RLock() - fsMeta, err := readFSMetadata(disk, minioMetaBucket, fsMetaPath) - uploadIDLock.RUnlock() - if err != nil { - return - } - - // Lock fs-append.json so that there is no parallel append to the file. - appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath) - appendPathLock.Lock() - defer appendPathLock.Unlock() - - fsAppendMeta, err := readFSMetadata(disk, minioMetaTmpBucket, fsAppendMetaPath) - if err != nil { - if errorCause(err) != errFileNotFound { - cleanupAppendPaths() - return - } - fsAppendMeta = fsMeta - fsAppendMeta.Parts = nil - } - - // Check if a part needs to be appended to - part, appendNeeded := partToAppend(fsMeta, fsAppendMeta) - if !appendNeeded { - return - } - // Hold write lock on the part so that there is no parallel upload on the part. - partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, strconv.Itoa(part.Number)) - partPathLock := nsMutex.NewNSLock(minioMetaBucket, partPath) - partPathLock.Lock() - defer partPathLock.Unlock() - - // Proceed to append "part" - fsAppendDataPath := getFSAppendDataPath(uploadID) - // Path to the part that needs to be appended. - partPath = path.Join(mpartMetaPrefix, bucket, object, uploadID, part.Name) - offset := int64(0) - totalLeft := part.Size - buf := make([]byte, readSizeV1) - for totalLeft > 0 { - curLeft := int64(readSizeV1) - if totalLeft < readSizeV1 { - curLeft = totalLeft - } - var n int64 - n, err = disk.ReadFile(minioMetaBucket, partPath, offset, buf[:curLeft]) - if n > 0 { - if err = disk.AppendFile(minioMetaTmpBucket, fsAppendDataPath, buf[:n]); err != nil { - cleanupAppendPaths() - return - } - } - if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { - break - } - cleanupAppendPaths() - return - } - offset += n - totalLeft -= n - } - fsAppendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size) - // Overwrite previous fs-append.json - if err = writeFSMetadata(disk, minioMetaTmpBucket, fsAppendMetaPath, fsAppendMeta); err != nil { - cleanupAppendPaths() - return - } - // If there are more parts that need to be appended to fsAppendDataFile - _, appendNeeded = partToAppend(fsMeta, fsAppendMeta) - if appendNeeded { - go appendParts(disk, bucket, object, uploadID) - } + return path.Join(minioMetaTmpBucket, uploadID) } // PutObjectPart - reads incoming data until EOF for the part file on @@ -514,7 +414,10 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s if err = writeFSMetadata(fs.storage, minioMetaBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, uploadIDPath) } - go appendParts(fs.storage, bucket, object, uploadID) + + // Append the part in background. + fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta) + return newMD5Hex, nil } @@ -650,21 +553,12 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", traceError(InvalidUploadID{UploadID: uploadID}) } - // fs-append.json path - fsAppendMetaPath := getFSAppendMetaPath(uploadID) - // Lock fs-append.json so that no parallel appendParts() is being done. - appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath) - appendPathLock.Lock() - defer appendPathLock.Unlock() - // Calculate s3 compatible md5sum for complete multipart. s3MD5, err := getCompleteMultipartMD5(parts) if err != nil { return "", err } - fsAppendDataPath := getFSAppendDataPath(uploadID) - // Read saved fs metadata for ongoing multipart. fsMetaPath := pathJoin(uploadIDPath, fsMetaJSONFile) fsMeta, err := readFSMetadata(fs.storage, minioMetaBucket, fsMetaPath) @@ -672,16 +566,21 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload return "", toObjectErr(err, minioMetaBucket, fsMetaPath) } - fsAppendMeta, err := readFSMetadata(fs.storage, minioMetaTmpBucket, fsAppendMetaPath) - if err == nil && isPartsSame(fsAppendMeta.Parts, parts) { - if err = fs.storage.RenameFile(minioMetaTmpBucket, fsAppendDataPath, bucket, object); err != nil { - return "", toObjectErr(traceError(err), minioMetaTmpBucket, fsAppendDataPath) + appendFallback := true // In case background appendRoutine() did not append the required parts. + if isPartsSame(fsMeta.Parts, parts) { + err = fs.bgAppend.complete(fs.storage, bucket, object, uploadID, fsMeta) + if err == nil { + appendFallback = false + fsAppendDataPath := getFSAppendDataPath(uploadID) + if err = fs.storage.RenameFile(minioMetaBucket, fsAppendDataPath, bucket, object); err != nil { + return "", toObjectErr(traceError(err), minioMetaBucket, fsAppendDataPath) + } } - } else { - // Remove append data temporary file since it is no longer needed at this point - fs.storage.DeleteFile(minioMetaTmpBucket, fsAppendDataPath) + } - tempObj := uploadID + "-" + "part.1" + if appendFallback { + // appendRoutine could not do append all the required parts, hence we do it here. + tempObj := path.Join(minioMetaTmpBucket, uploadID+"-"+"part.1") // Allocate staging buffer. var buf = make([]byte, readSizeV1) @@ -757,9 +656,6 @@ func (fs fsObjects) CompleteMultipartUpload(bucket string, object string, upload } } - // Remove the append-file metadata file in tmp location as we no longer need it. - fs.storage.DeleteFile(minioMetaTmpBucket, fsAppendMetaPath) - // No need to save part info, since we have concatenated all parts. fsMeta.Parts = nil @@ -806,7 +702,7 @@ func (fs fsObjects) abortMultipartUpload(bucket, object, uploadID string) error if err := cleanupUploadedParts(bucket, object, uploadID, fs.storage); err != nil { return err } - + fs.bgAppend.remove(uploadID) // remove entry from uploads.json with quorum if err := fs.updateUploadJSON(bucket, object, uploadIDChange{uploadID: uploadID, isRemove: true}); err != nil { return toObjectErr(err, bucket, object) @@ -851,12 +747,6 @@ func (fs fsObjects) AbortMultipartUpload(bucket, object, uploadID string) error return traceError(InvalidUploadID{UploadID: uploadID}) } - fsAppendMetaPath := getFSAppendMetaPath(uploadID) - // Lock fs-append.json so that no parallel appendParts() is being done. - appendPathLock := nsMutex.NewNSLock(minioMetaTmpBucket, fsAppendMetaPath) - appendPathLock.Lock() - defer appendPathLock.Unlock() - err := fs.abortMultipartUpload(bucket, object, uploadID) return err } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index bf69e05fd..acd2f86f8 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -37,6 +37,9 @@ type fsObjects struct { // List pool management. listPool *treeWalkPool + + // To manage the appendRoutine go0routines + bgAppend *backgroundAppend } // list of all errors that can be ignored in tree walk operation in FS @@ -61,6 +64,9 @@ func newFSObjects(storage StorageAPI) (ObjectLayer, error) { fs := fsObjects{ storage: storage, listPool: newTreeWalkPool(globalLookupTimeout), + bgAppend: &backgroundAppend{ + infoMap: make(map[string]bgAppendPartsInfo), + }, } // Return successfully initialized object layer. diff --git a/cmd/posix.go b/cmd/posix.go index e734f2b5a..b4850111d 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -494,8 +494,11 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) { // ReadFile reads exactly len(buf) bytes into buf. It returns the // number of bytes copied. The error is EOF only if no bytes were // read. On return, n == len(buf) if and only if err == nil. n == 0 -// for io.EOF. Additionally ReadFile also starts reading from an -// offset. +// for io.EOF. +// If an EOF happens after reading some but not all the bytes, +// ReadFull returns ErrUnexpectedEOF. +// Additionally ReadFile also starts reading from an offset. +// ReadFile symantics are same as io.ReadFull func (s *posix) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) { defer func() { if err == syscall.EIO {