FS/Multipart: Fix race between PutObjectPart and Complete/Abort multi… (#3419)

FS/Multipart: Fix race between PutObjectPart and Complete/Abort multipart. close(timeoutCh) on complete/abort so that a racing PutObjectPart does not leave a dangling go-routine.

Fixes #3351
This commit is contained in:
Krishna Srinivas 2016-12-10 05:40:18 +05:30 committed by Harshavardhana
parent b363709c11
commit ac554bf663
2 changed files with 6 additions and 5 deletions

View File

@ -91,9 +91,9 @@ func (b *backgroundAppend) append(disk StorageAPI, bucket, object, uploadID stri
// Called on complete-multipart-upload. Returns nil if the required parts have been appended. // Called on complete-multipart-upload. Returns nil if the required parts have been appended.
func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error { func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID string, meta fsMetaV1) error {
b.Lock() b.Lock()
defer b.Unlock()
info, ok := b.infoMap[uploadID] info, ok := b.infoMap[uploadID]
delete(b.infoMap, uploadID) delete(b.infoMap, uploadID)
b.Unlock()
if !ok { if !ok {
return errPartsMissing return errPartsMissing
} }
@ -115,13 +115,12 @@ func (b *backgroundAppend) complete(disk StorageAPI, bucket, object, uploadID st
// Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling. // Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling.
func (b *backgroundAppend) abort(uploadID string) { func (b *backgroundAppend) abort(uploadID string) {
b.Lock() b.Lock()
defer b.Unlock()
info, ok := b.infoMap[uploadID] info, ok := b.infoMap[uploadID]
if !ok { if !ok {
b.Unlock()
return return
} }
delete(b.infoMap, uploadID) delete(b.infoMap, uploadID)
b.Unlock()
info.abortCh <- struct{}{} info.abortCh <- struct{}{}
} }
@ -164,9 +163,11 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
case <-info.abortCh: case <-info.abortCh:
// abort-multipart-upload closed abortCh to end the appendParts go-routine. // abort-multipart-upload closed abortCh to end the appendParts go-routine.
disk.DeleteFile(minioMetaTmpBucket, uploadID) disk.DeleteFile(minioMetaTmpBucket, uploadID)
close(info.timeoutCh) // So that any racing PutObjectPart does not leave a dangling go-routine.
return return
case <-info.completeCh: case <-info.completeCh:
// complete-multipart-upload closed completeCh to end the appendParts go-routine. // complete-multipart-upload closed completeCh to end the appendParts go-routine.
close(info.timeoutCh) // So that any racing PutObjectPart does not leave a dangling go-routine.
return return
case <-time.After(appendPartsTimeout): case <-time.After(appendPartsTimeout):
// Timeout the goroutine to garbage collect its resources. This would happen if the client initiates // Timeout the goroutine to garbage collect its resources. This would happen if the client initiates

View File

@ -361,9 +361,9 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
} }
// Append the part in background.
errCh := fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta)
go func() { go func() {
// Append the part in background.
errCh := fs.bgAppend.append(fs.storage, bucket, object, uploadID, fsMeta)
// Also receive the error so that the appendParts go-routine does not block on send. // Also receive the error so that the appendParts go-routine does not block on send.
// But the error received is ignored as fs.PutObjectPart() would have already // But the error received is ignored as fs.PutObjectPart() would have already
// returned success to the client. // returned success to the client.