Fix an ugly multipart bug

This commit is contained in:
Harshavardhana 2015-07-13 21:34:22 -07:00
parent 00acc47158
commit 45ddec925c
2 changed files with 20 additions and 18 deletions

View File

@ -67,7 +67,7 @@ type API struct {
type storedBucket struct { type storedBucket struct {
bucketMetadata BucketMetadata bucketMetadata BucketMetadata
objectMetadata map[string]ObjectMetadata objectMetadata map[string]ObjectMetadata
partMetadata map[int]PartMetadata partMetadata map[string]map[int]PartMetadata
multiPartSession map[string]MultiPartSession multiPartSession map[string]MultiPartSession
} }
@ -116,7 +116,7 @@ func New() (Interface, error) {
newBucket.bucketMetadata = v newBucket.bucketMetadata = v
newBucket.objectMetadata = make(map[string]ObjectMetadata) newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession) newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata) newBucket.partMetadata = make(map[string]map[int]PartMetadata)
a.storedBuckets.Set(k, newBucket) a.storedBuckets.Set(k, newBucket)
} }
} }
@ -487,7 +487,7 @@ func (donut API) MakeBucket(bucketName, acl string, signature *Signature) error
var newBucket = storedBucket{} var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]ObjectMetadata) newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]MultiPartSession) newBucket.multiPartSession = make(map[string]MultiPartSession)
newBucket.partMetadata = make(map[int]PartMetadata) newBucket.partMetadata = make(map[string]map[int]PartMetadata)
newBucket.bucketMetadata = BucketMetadata{} newBucket.bucketMetadata = BucketMetadata{}
newBucket.bucketMetadata.Name = bucketName newBucket.bucketMetadata.Name = bucketName
newBucket.bucketMetadata.Created = time.Now().UTC() newBucket.bucketMetadata.Created = time.Now().UTC()

View File

@ -23,7 +23,6 @@ import (
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"errors"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -78,6 +77,7 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *
initiated: time.Now(), initiated: time.Now(),
totalParts: 0, totalParts: 0,
} }
storedBucket.partMetadata[key] = make(map[int]PartMetadata)
multiPartCache := data.NewCache(0) multiPartCache := data.NewCache(0)
multiPartCache.OnEvicted = donut.evictedPart multiPartCache.OnEvicted = donut.evictedPart
donut.multiPartObjects[uploadID] = multiPartCache donut.multiPartObjects[uploadID] = multiPartCache
@ -117,9 +117,8 @@ func (donut API) AbortMultipartUpload(bucket, key, uploadID string, signature *S
// CreateObjectPart - create a part in a multipart session // CreateObjectPart - create a part in a multipart session
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, error) { func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, error) {
donut.lock.Lock() donut.lock.Lock()
defer donut.lock.Unlock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data, signature) etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data, signature)
donut.lock.Unlock()
// possible free // possible free
debug.FreeOSMemory() debug.FreeOSMemory()
@ -137,15 +136,16 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
if !donut.storedBuckets.Exists(bucket) { if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
} }
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket) strBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id // Verify upload id
if storedBucket.multiPartSession[key].uploadID != uploadID { if strBucket.multiPartSession[key].uploadID != uploadID {
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil) return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
} }
// get object key // get object key
if _, ok := storedBucket.partMetadata[partID]; ok { parts := strBucket.partMetadata[key]
return storedBucket.partMetadata[partID].ETag, nil if _, ok := parts[partID]; ok {
return parts[partID].ETag, nil
} }
if contentType == "" { if contentType == "" {
@ -214,11 +214,12 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
Size: totalLength, Size: totalLength,
} }
storedBucket.partMetadata[partID] = newPart parts[partID] = newPart
multiPartSession := storedBucket.multiPartSession[key] strBucket.partMetadata[key] = parts
multiPartSession := strBucket.multiPartSession[key]
multiPartSession.totalParts++ multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession strBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets.Set(bucket, storedBucket) donut.storedBuckets.Set(bucket, strBucket)
return md5Sum, nil return md5Sum, nil
} }
@ -229,6 +230,7 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
donut.multiPartObjects[uploadID].Delete(i) donut.multiPartObjects[uploadID].Delete(i)
} }
delete(storedBucket.multiPartSession, key) delete(storedBucket.multiPartSession, key)
delete(storedBucket.partMetadata, key)
donut.storedBuckets.Set(bucket, storedBucket) donut.storedBuckets.Set(bucket, storedBucket)
} }
@ -279,6 +281,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
donut.lock.Unlock() donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidPartOrder{}, nil) return ObjectMetadata{}, iodine.New(InvalidPartOrder{}, nil)
} }
var size int64 var size int64
var fullObject bytes.Buffer var fullObject bytes.Buffer
for i := 0; i < len(parts.Part); i++ { for i := 0; i < len(parts.Part); i++ {
@ -434,6 +437,7 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
if storedBucket.multiPartSession[key].uploadID != resources.UploadID { if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil) return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
} }
storedParts := storedBucket.partMetadata[key]
objectResourcesMetadata := resources objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key objectResourcesMetadata.Key = key
@ -453,9 +457,9 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
objectResourcesMetadata.NextPartNumberMarker = i objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil return objectResourcesMetadata, nil
} }
part, ok := storedBucket.partMetadata[i] part, ok := storedParts[i]
if !ok { if !ok {
return ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) return ObjectResourcesMetadata{}, iodine.New(InvalidPart{}, nil)
} }
parts = append(parts, &part) parts = append(parts, &part)
} }
@ -466,12 +470,10 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
// evictedPart - call back function called by caching module during individual cache evictions // evictedPart - call back function called by caching module during individual cache evictions
func (donut API) evictedPart(a ...interface{}) { func (donut API) evictedPart(a ...interface{}) {
key := a[0].(int)
// loop through all buckets // loop through all buckets
buckets := donut.storedBuckets.GetAll() buckets := donut.storedBuckets.GetAll()
for bucketName, bucket := range buckets { for bucketName, bucket := range buckets {
b := bucket.(storedBucket) b := bucket.(storedBucket)
delete(b.partMetadata, key)
donut.storedBuckets.Set(bucketName, b) donut.storedBuckets.Set(bucketName, b)
} }
debug.FreeOSMemory() debug.FreeOSMemory()