Make donut do everything as an atomic operation, this avoids all the deadlocks and races

This commit is contained in:
Harshavardhana
2015-07-03 17:11:26 -07:00
parent 86bcfed2da
commit 14844f48dd
4 changed files with 61 additions and 160 deletions

View File

@@ -36,28 +36,22 @@ import (
// NewMultipartUpload -
func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) {
donut.lock.RLock()
donut.lock.Lock()
defer donut.lock.Unlock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
donut.lock.RUnlock()
return "", iodine.New(ObjectExists{Object: key}, nil)
}
donut.lock.RUnlock()
//donut.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
@@ -67,21 +61,18 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
initiated: time.Now(),
totalParts: 0,
}
//donut.lock.Unlock()
return uploadID, nil
}
// AbortMultipartUpload -
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
donut.lock.RLock()
donut.lock.Lock()
defer donut.lock.Unlock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return nil
@@ -94,18 +85,18 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
// CreateObjectPart -
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
donut.lock.RLock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
donut.lock.Lock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
donut.lock.Unlock()
// free
debug.FreeOSMemory()
return etag, nil
@@ -113,27 +104,21 @@ func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, cont
// createObject - PUT object to cache buffer
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
// get object key
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
donut.lock.RUnlock()
return storedBucket.partMetadata[partKey].ETag, nil
}
donut.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
@@ -172,9 +157,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
//donut.lock.Lock()
donut.multiPartObjects.Set(partKey, readBytes)
//donut.lock.Unlock()
// setting up for de-allocation
readBytes = nil
@@ -192,20 +175,16 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
Size: totalLength,
}
//donut.lock.Lock()
storedBucket.partMetadata[partKey] = newPart
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets[bucket] = storedBucket
//donut.lock.Unlock()
return md5Sum, nil
}
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
// donut.lock.Lock()
// defer donut.lock.Unlock()
delete(donut.storedBuckets[bucket].multiPartSession, key)
}
@@ -218,6 +197,7 @@ func (donut API) cleanupMultiparts(bucket, key, uploadID string) {
// CompleteMultipartUpload -
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
donut.lock.Lock()
if !IsValidBucket(bucket) {
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
@@ -225,19 +205,13 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
donut.lock.RLock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
//donut.lock.Lock()
var size int64
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
@@ -264,11 +238,11 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
object = nil
go debug.FreeOSMemory()
}
//donut.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
donut.lock.Unlock()
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
@@ -276,8 +250,10 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
donut.lock.Lock()
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
donut.lock.Unlock()
return objectMetadata, nil
}
@@ -291,8 +267,8 @@ func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// ListMultipartUploads -
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter
donut.lock.RLock()
defer donut.lock.RUnlock()
donut.lock.Lock()
defer donut.lock.Unlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
@@ -353,8 +329,8 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb
// ListObjectParts -
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
// Verify upload id
donut.lock.RLock()
defer donut.lock.RUnlock()
donut.lock.Lock()
defer donut.lock.Unlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}