multipart: Increase locked critical for CompleteMultipart.

This commit is contained in:
Harshavardhana
2016-02-06 01:36:43 -08:00
parent 643ef30533
commit 4e6e78598f
3 changed files with 73 additions and 34 deletions

View File

@@ -236,6 +236,7 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
// Critical region requiring write lock.
fs.rwLock.Lock()
// Initialize multipart session.
mpartSession := &MultipartSession{}
@@ -364,6 +365,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.Size = fi.Size()
partMetadata.LastModified = fi.ModTime()
// Critical region requiring read lock.
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock()
@@ -382,6 +384,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
// Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts))
// Critical region requiring write lock.
fs.rwLock.Lock()
fs.multiparts.ActiveSession[uploadID] = deserializedMultipartSession
if err := saveMultipartsSession(*fs.multiparts); err != nil {
@@ -390,6 +393,7 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
}
fs.rwLock.Unlock()
// Return etag.
return partMetadata.ETag, nil
}
@@ -459,6 +463,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
// Save parts for verification.
parts := completeMultipartUpload.Part
// Critical region requiring read lock.
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock()
@@ -468,6 +473,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
return ObjectMetadata{}, probe.NewError(InvalidPart{})
}
// Parts successfully validated, save all the parts.
partPathPrefix := objectPath + uploadID
if err := saveParts(partPathPrefix, objectWriter, parts); err != nil {
file.CloseAndPurge()
@@ -476,6 +482,7 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
// Successfully saved, remove all parts.
removeParts(partPathPrefix, savedParts)
// Critical region requiring write lock.
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil {
@@ -483,10 +490,10 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
file.Close()
fs.rwLock.Unlock()
file.Close()
// Send stat again to get object metadata.
st, e := os.Stat(objectPath)
if e != nil {
return ObjectMetadata{}, probe.NewError(e)
@@ -550,6 +557,7 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
return ObjectResourcesMetadata{}, probe.NewError(e)
}
// Critical region requiring read lock.
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[uploadID]
fs.rwLock.RUnlock()
@@ -600,14 +608,18 @@ func (fs Filesystem) AbortMultipartUpload(bucket, object, uploadID string) *prob
objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + uploadID
// Critical region requiring read lock.
fs.rwLock.RLock()
savedParts := fs.multiparts.ActiveSession[uploadID].Parts
fs.rwLock.RUnlock()
// Remove all parts.
if err := removeParts(partPathPrefix, savedParts); err != nil {
return err.Trace(partPathPrefix)
}
// Critical region requiring write lock.
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, uploadID)
if err := saveMultipartsSession(*fs.multiparts); err != nil {