Merge pull request #748 from harshavardhana/pr_out_fix_an_ugly_multipart_bug

Fix an ugly multipart bug
This commit is contained in:
Harshavardhana 2015-07-14 04:40:24 +00:00
commit e4dd49bdd9
2 changed files with 20 additions and 18 deletions

View File

@ -67,7 +67,7 @@ type API struct {
type storedBucket struct {
bucketMetadata BucketMetadata
objectMetadata map[string]ObjectMetadata
partMetadata map[int]PartMetadata
partMetadata map[string]map[int]PartMetadata
multiPartSession map[string]MultiPartSession
}
@ -116,7 +116,7 @@ func New() (Interface, error) {
newBucket.bucketMetadata = v
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.partMetadata = make(map[string]map[int]PartMetadata)
a.storedBuckets.Set(k, newBucket)
}
}
@ -487,7 +487,7 @@ func (donut API) MakeBucket(bucketName, acl string, signature *Signature) error
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata)
newBucket.partMetadata = make(map[string]map[int]PartMetadata)
newBucket.bucketMetadata = BucketMetadata{}
newBucket.bucketMetadata.Name = bucketName
newBucket.bucketMetadata.Created = time.Now().UTC()

View File

@ -23,7 +23,6 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/xml"
"errors"
"io"
"io/ioutil"
"math/rand"
@ -78,6 +77,7 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *
initiated: time.Now(),
totalParts: 0,
}
storedBucket.partMetadata[key] = make(map[int]PartMetadata)
multiPartCache := data.NewCache(0)
multiPartCache.OnEvicted = donut.evictedPart
donut.multiPartObjects[uploadID] = multiPartCache
@ -117,9 +117,8 @@ func (donut API) AbortMultipartUpload(bucket, key, uploadID string, signature *S
// CreateObjectPart - create a part in a multipart session
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data, signature)
donut.lock.Unlock()
// possible free
debug.FreeOSMemory()
@ -137,15 +136,16 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
strBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id
if storedBucket.multiPartSession[key].uploadID != uploadID {
if strBucket.multiPartSession[key].uploadID != uploadID {
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
// get object key
if _, ok := storedBucket.partMetadata[partID]; ok {
return storedBucket.partMetadata[partID].ETag, nil
parts := strBucket.partMetadata[key]
if _, ok := parts[partID]; ok {
return parts[partID].ETag, nil
}
if contentType == "" {
@ -214,11 +214,12 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
Size: totalLength,
}
storedBucket.partMetadata[partID] = newPart
multiPartSession := storedBucket.multiPartSession[key]
parts[partID] = newPart
strBucket.partMetadata[key] = parts
multiPartSession := strBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets.Set(bucket, storedBucket)
strBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets.Set(bucket, strBucket)
return md5Sum, nil
}
@ -229,6 +230,7 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
donut.multiPartObjects[uploadID].Delete(i)
}
delete(storedBucket.multiPartSession, key)
delete(storedBucket.partMetadata, key)
donut.storedBuckets.Set(bucket, storedBucket)
}
@ -279,6 +281,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidPartOrder{}, nil)
}
var size int64
var fullObject bytes.Buffer
for i := 0; i < len(parts.Part); i++ {
@ -434,6 +437,7 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
}
storedParts := storedBucket.partMetadata[key]
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key
@ -453,9 +457,9 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
part, ok := storedBucket.partMetadata[i]
part, ok := storedParts[i]
if !ok {
return ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
return ObjectResourcesMetadata{}, iodine.New(InvalidPart{}, nil)
}
parts = append(parts, &part)
}
@ -466,12 +470,10 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
// evictedPart - call back function called by caching module during individual cache evictions
func (donut API) evictedPart(a ...interface{}) {
key := a[0].(int)
// loop through all buckets
buckets := donut.storedBuckets.GetAll()
for bucketName, bucket := range buckets {
b := bucket.(storedBucket)
delete(b.partMetadata, key)
donut.storedBuckets.Set(bucketName, b)
}
debug.FreeOSMemory()