multipart: Multipart resume simplify further.

This commit is contained in:
Harshavardhana 2016-02-05 16:47:31 -08:00
parent 7f7697ca38
commit 69bd001c8b
3 changed files with 34 additions and 55 deletions

View File

@ -91,7 +91,7 @@ type ObjectResourcesMetadata struct {
MaxParts int
IsTruncated bool
Part []*PartMetadata
Part []PartMetadata
EncodingType string
}

View File

@ -20,7 +20,6 @@ import (
"crypto/md5"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@ -120,7 +119,7 @@ func (fs Filesystem) ListMultipartUploads(bucket string, resources BucketMultipa
// verify if parts sent over the network do really match with what we
// have for the session.
func doPartsMatch(parts []CompletePart, savedParts []*PartMetadata) bool {
func doPartsMatch(parts []CompletePart, savedParts []PartMetadata) bool {
if parts == nil || savedParts == nil {
return false
}
@ -155,14 +154,11 @@ func MultiCloser(closers ...io.Closer) io.Closer {
}
// removeParts - remove all parts.
func removeParts(partPathPrefix string, parts []*PartMetadata) *probe.Error {
func removeParts(partPathPrefix string, parts []PartMetadata) *probe.Error {
for _, part := range parts {
if e := os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber)); e != nil {
return probe.NewError(e)
}
}
if e := os.Remove(partPathPrefix + "$multiparts"); e != nil {
return probe.NewError(e)
// We are on purpose ignoring the return values here, since
// another thread would have purged these entries.
os.Remove(partPathPrefix + fmt.Sprintf("$%d-$multiparts", part.PartNumber))
}
return nil
}
@ -245,31 +241,21 @@ func (fs Filesystem) NewMultipartUpload(bucket, object string) (string, *probe.E
mpartSession.TotalParts = 0
mpartSession.UploadID = uploadID
mpartSession.Initiated = time.Now().UTC()
var parts []*PartMetadata
var parts []PartMetadata
mpartSession.Parts = parts
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = mpartSession
fs.rwLock.Unlock()
mpartSessionBytes, e := json.Marshal(mpartSession)
if e != nil {
return "", probe.NewError(e)
}
partPathPrefix := objectPath + uploadID
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", mpartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
}
if err := saveMultipartsSession(*fs.multiparts); err != nil {
return "", err.Trace(partPathPrefix)
return "", err.Trace(objectPath)
}
return uploadID, nil
}
// partNumber is a sortable interface for Part slice.
type partNumber []*PartMetadata
type partNumber []PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@ -377,32 +363,32 @@ func (fs Filesystem) CreateObjectPart(bucket, object, uploadID, expectedMD5Sum s
partMetadata.Size = fi.Size()
partMetadata.LastModified = fi.ModTime()
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts")
if e != nil {
return "", probe.NewError(e)
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
fs.rwLock.RUnlock()
if !ok {
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
}
var deserializedMultipartSession MultipartSession
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil {
return "", probe.NewError(e)
// Append any pre-existing partNumber with new metadata, otherwise
// append to the list.
if len(deserializedMultipartSession.Parts) < partID {
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, partMetadata)
} else {
deserializedMultipartSession.Parts[partID-1] = partMetadata
}
deserializedMultipartSession.Parts = append(deserializedMultipartSession.Parts, &partMetadata)
deserializedMultipartSession.TotalParts++
deserializedMultipartSession.TotalParts = len(deserializedMultipartSession.Parts)
fs.rwLock.Lock()
fs.multiparts.ActiveSession[object] = &deserializedMultipartSession
fs.multiparts.ActiveSession[object] = deserializedMultipartSession
fs.rwLock.Unlock()
// Sort by part number before saving.
sort.Sort(partNumber(deserializedMultipartSession.Parts))
if err := saveMultipartsSession(*fs.multiparts); err != nil {
return "", err.Trace(partPathPrefix)
}
multipartSessionBytes, e = json.Marshal(deserializedMultipartSession)
if e != nil {
return "", probe.NewError(e)
}
if e = ioutil.WriteFile(partPathPrefix+"$multiparts", multipartSessionBytes, 0600); e != nil {
return "", probe.NewError(e)
}
return partMetadata.ETag, nil
}
@ -486,10 +472,8 @@ func (fs Filesystem) CompleteMultipartUpload(bucket, object, uploadID string, da
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
if err := removeParts(partPathPrefix, savedParts); err != nil {
file.CloseAndPurge()
return ObjectMetadata{}, err.Trace(partPathPrefix)
}
// Successfully saved, remove all parts.
removeParts(partPathPrefix, savedParts)
fs.rwLock.Lock()
delete(fs.multiparts.ActiveSession, object)
@ -561,18 +545,13 @@ func (fs Filesystem) ListObjectParts(bucket, object string, resources ObjectReso
return ObjectResourcesMetadata{}, probe.NewError(e)
}
objectPath := filepath.Join(bucketPath, object)
partPathPrefix := objectPath + resources.UploadID
multipartSessionBytes, e := ioutil.ReadFile(partPathPrefix + "$multiparts")
if e != nil {
return ObjectResourcesMetadata{}, probe.NewError(e)
fs.rwLock.RLock()
deserializedMultipartSession, ok := fs.multiparts.ActiveSession[object]
fs.rwLock.RUnlock()
if !ok {
return ObjectResourcesMetadata{}, probe.NewError(InvalidUploadID{UploadID: resources.UploadID})
}
var deserializedMultipartSession MultipartSession
if e = json.Unmarshal(multipartSessionBytes, &deserializedMultipartSession); e != nil {
return ObjectResourcesMetadata{}, probe.NewError(e)
}
var parts []*PartMetadata
var parts []PartMetadata
for i := startPartNumber; i <= deserializedMultipartSession.TotalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))

View File

@ -48,7 +48,7 @@ type MultipartSession struct {
TotalParts int
UploadID string
Initiated time.Time
Parts []*PartMetadata
Parts []PartMetadata
}
// Multiparts collection of many parts