diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go index c7cbc626d..59de9295e 100644 --- a/cmd/fs-v1-background-append.go +++ b/cmd/fs-v1-background-append.go @@ -56,7 +56,7 @@ type bgAppendPartsInfo struct { } // Called after a part is uploaded so that it can be appended in the background. -func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) { +func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) chan error { b.Lock() info, ok := b.infoMap[uploadID] if !ok { @@ -73,21 +73,19 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri go b.appendParts(disk, bucket, object, uploadID, info) } b.Unlock() + errCh := make(chan error) go func() { - errCh := make(chan error) // send input in a goroutine as send on the inputCh can block if appendParts go-routine // is busy appending a part. select { case <-info.timeoutCh: // This is to handle a rare race condition where we found info in b.infoMap // but soon after that appendParts go-routine timed out. + errCh <- errAppendPartsTimeout case info.inputCh <- bgAppendPartsInput{meta, errCh}: - // Receive the error so that the appendParts go-routine does not block on send. - // But the error received is ignored as fs.PutObjectPart() would have already - // returned success to the client. - <-errCh } }() + return errCh } // Called on complete-multipart-upload. Returns nil if the required parts have been appended. @@ -153,6 +151,8 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID break } if err := appendPart(disk, bucket, object, uploadID, part); err != nil { + disk.DeleteFile(minioMetaTmpBucket, uploadID) + appendMeta.Parts = nil input.errCh <- err break } @@ -175,6 +175,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID disk.DeleteFile(minioMetaTmpBucket, uploadID) close(info.timeoutCh) + return } } } @@ -182,7 +183,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // upon complete-multipart-upload. func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { - partPath := pathJoin(mpartMetaPrefix, bucket, object, uploadID, part.Name) + partPath := pathJoin(bucket, object, uploadID, part.Name) offset := int64(0) totalLeft := part.Size @@ -192,18 +193,15 @@ func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPar if totalLeft < readSizeV1 { curLeft = totalLeft } - var n int64 n, err := disk.ReadFile(minioMetaMultipartBucket, partPath, offset, buf[:curLeft]) if err != nil { // Check for EOF/ErrUnexpectedEOF not needed as it should never happen as we know // the exact size of the file and hence know the size of buf[] // EOF/ErrUnexpectedEOF indicates that the length of file was shorter than part.Size and // hence considered as an error condition. - disk.DeleteFile(minioMetaTmpBucket, uploadID) return err } if err = disk.AppendFile(minioMetaTmpBucket, uploadID, buf[:n]); err != nil { - disk.DeleteFile(minioMetaTmpBucket, uploadID) return err } offset += n diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index d8543b189..359bebbbb 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -401,17 +401,30 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s fsMeta.AddObjectPart(partID, partSuffix, newMD5Hex, size) partPath := path.Join(bucket, object, uploadID, partSuffix) + // Lock the part so that another part upload with same part-number gets blocked + // while the part is getting appended in the background. + partLock := nsMutex.NewNSLock(minioMetaMultipartBucket, partPath) + partLock.Lock() err = fs.storage.RenameFile(minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath) if err != nil { + partLock.Unlock() return "", toObjectErr(traceError(err), minioMetaMultipartBucket, partPath) } uploadIDPath = path.Join(bucket, object, uploadID) if err = writeFSMetadata(fs.storage, minioMetaMultipartBucket, path.Join(uploadIDPath, fsMetaJSONFile), fsMeta); err != nil { + partLock.Unlock() return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } - // Append the part in background. - fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta) + go func() { + // Append the part in background. + errCh := fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta) + // Also receive the error so that the appendParts go-routine does not block on send. + // But the error received is ignored as fs.PutObjectPart() would have already + // returned success to the client. + <-errCh + partLock.Unlock() + }() return newMD5Hex, nil }