From 14844f48dde5c679a0346cafd72dfccf697fd1a4 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 3 Jul 2015 17:11:26 -0700 Subject: [PATCH] Make donut do everything as an atomic operation, this avoids all the deadlocks and races --- pkg/donut/bucket.go | 16 ++--- pkg/donut/donut-v1.go | 16 ----- pkg/donut/donut-v2.go | 135 ++++++++++++----------------------------- pkg/donut/multipart.go | 54 +++++------------ 4 files changed, 61 insertions(+), 160 deletions(-) diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index fca6a5adb..cd8e2c7c3 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -47,7 +47,7 @@ type bucket struct { time time.Time donutName string nodes map[string]node - lock *sync.RWMutex + lock *sync.Mutex } // newBucket - instantiate a new bucket @@ -69,7 +69,7 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu b.time = t b.donutName = donutName b.nodes = nodes - b.lock = new(sync.RWMutex) + b.lock = new(sync.Mutex) metadata := BucketMetadata{} metadata.Version = bucketMetadataVersion @@ -125,15 +125,15 @@ func (b bucket) getBucketMetadata() (*AllBuckets, error) { // GetObjectMetadata - get metadata for an object func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) { - b.lock.RLock() - defer b.lock.RUnlock() + b.lock.Lock() + defer b.lock.Unlock() return b.readObjectMetadata(objectName) } // ListObjects - list all objects func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) { - b.lock.RLock() - defer b.lock.RUnlock() + b.lock.Lock() + defer b.lock.Unlock() if maxkeys <= 0 { maxkeys = 1000 } @@ -199,8 +199,8 @@ func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (List // ReadObject - open an object to read func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, err error) { - b.lock.RLock() - defer b.lock.RUnlock() + b.lock.Lock() + defer b.lock.Unlock() reader, writer := io.Pipe() // get list of objects bucketMetadata, err := b.getBucketMetadata() diff --git a/pkg/donut/donut-v1.go b/pkg/donut/donut-v1.go index 54067c33a..ada4300f6 100644 --- a/pkg/donut/donut-v1.go +++ b/pkg/donut/donut-v1.go @@ -44,8 +44,6 @@ const ( // makeBucket - make a new bucket func (donut API) makeBucket(bucket string, acl BucketACL) error { - donut.lock.Lock() - defer donut.lock.Unlock() if bucket == "" || strings.TrimSpace(bucket) == "" { return iodine.New(InvalidArgument{}, nil) } @@ -54,8 +52,6 @@ func (donut API) makeBucket(bucket string, acl BucketACL) error { // getBucketMetadata - get bucket metadata func (donut API) getBucketMetadata(bucketName string) (BucketMetadata, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() if err := donut.listDonutBuckets(); err != nil { return BucketMetadata{}, iodine.New(err, nil) } @@ -71,8 +67,6 @@ func (donut API) getBucketMetadata(bucketName string) (BucketMetadata, error) { // setBucketMetadata - set bucket metadata func (donut API) setBucketMetadata(bucketName string, bucketMetadata map[string]string) error { - donut.lock.Lock() - defer donut.lock.Unlock() if err := donut.listDonutBuckets(); err != nil { return iodine.New(err, nil) } @@ -92,8 +86,6 @@ func (donut API) setBucketMetadata(bucketName string, bucketMetadata map[string] // listBuckets - return list of buckets func (donut API) listBuckets() (map[string]BucketMetadata, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() if err := donut.listDonutBuckets(); err != nil { return nil, iodine.New(err, nil) } @@ -109,8 +101,6 @@ func (donut API) listBuckets() (map[string]BucketMetadata, error) { // listObjects - return list of objects func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "prefix": prefix, @@ -133,8 +123,6 @@ func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys i // putObject - put object func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) { - donut.lock.Lock() - defer donut.lock.Unlock() errParams := map[string]string{ "bucket": bucket, "object": object, @@ -171,8 +159,6 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read // getObject - get object func (donut API) getObject(bucket, object string) (reader io.ReadCloser, size int64, err error) { - donut.lock.RLock() - defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "object": object, @@ -194,8 +180,6 @@ func (donut API) getObject(bucket, object string) (reader io.ReadCloser, size in // getObjectMetadata - get object metadata func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() errParams := map[string]string{ "bucket": bucket, "object": object, diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index 446596c89..e3d10a2de 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -54,7 +54,7 @@ type Config struct { // API - local variables type API struct { config *Config - lock *sync.RWMutex + lock *sync.Mutex objects *trove.Cache multiPartObjects *trove.Cache storedBuckets map[string]storedBucket @@ -90,7 +90,7 @@ func New(c *Config) (Interface, error) { a.multiPartObjects = trove.NewCache(0, time.Duration(0)) a.objects.OnExpired = a.expiredObject a.multiPartObjects.OnExpired = a.expiredPart - a.lock = new(sync.RWMutex) + a.lock = new(sync.Mutex) // set up cache expiration a.objects.ExpireObjects(time.Second * 5) @@ -121,17 +121,15 @@ func New(c *Config) (Interface, error) { // GetObject - GET object from cache buffer func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(object) { - donut.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil) } objectKey := bucket + "/" + object @@ -140,60 +138,49 @@ func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, er if len(donut.config.NodeDiskMap) > 0 { reader, size, err := donut.getObject(bucket, object) if err != nil { - donut.lock.RUnlock() return 0, iodine.New(err, nil) } // new proxy writer to capture data read from disk pw := NewProxyWriter(w) written, err := io.CopyN(pw, reader, size) if err != nil { - donut.lock.RUnlock() return 0, iodine.New(err, nil) } - donut.lock.RUnlock() /// cache object read from disk - { - donut.lock.Lock() - ok := donut.objects.Set(objectKey, pw.writtenBytes) - donut.lock.Unlock() - pw.writtenBytes = nil - go debug.FreeOSMemory() - if !ok { - return 0, iodine.New(InternalError{}, nil) - } + ok := donut.objects.Set(objectKey, pw.writtenBytes) + pw.writtenBytes = nil + go debug.FreeOSMemory() + if !ok { + return 0, iodine.New(InternalError{}, nil) } return written, nil } - donut.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data), int64(donut.objects.Len(objectKey))) if err != nil { return 0, iodine.New(err, nil) } - donut.lock.RUnlock() return written, nil } // GetPartialObject - GET object from cache buffer range func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { + donut.lock.Lock() + defer donut.lock.Unlock() errParams := map[string]string{ "bucket": bucket, "object": object, "start": strconv.FormatInt(start, 10), "length": strconv.FormatInt(length, 10), } - donut.lock.RLock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams) } if !IsValidObjectName(object) { - donut.lock.RUnlock() return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams) } if start < 0 { - donut.lock.RUnlock() return 0, iodine.New(InvalidRange{ Start: start, Length: length, @@ -205,96 +192,73 @@ func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, len if len(donut.config.NodeDiskMap) > 0 { reader, _, err := donut.getObject(bucket, object) if err != nil { - donut.lock.RUnlock() return 0, iodine.New(err, nil) } if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil { - donut.lock.RUnlock() return 0, iodine.New(err, nil) } pw := NewProxyWriter(w) written, err := io.CopyN(w, reader, length) if err != nil { - donut.lock.RUnlock() return 0, iodine.New(err, nil) } - donut.lock.RUnlock() - { - donut.lock.Lock() - ok := donut.objects.Set(objectKey, pw.writtenBytes) - donut.lock.Unlock() - pw.writtenBytes = nil - go debug.FreeOSMemory() - if !ok { - return 0, iodine.New(InternalError{}, nil) - } + ok := donut.objects.Set(objectKey, pw.writtenBytes) + pw.writtenBytes = nil + go debug.FreeOSMemory() + if !ok { + return 0, iodine.New(InternalError{}, nil) } return written, nil } - donut.lock.RUnlock() return 0, iodine.New(ObjectNotFound{Object: object}, nil) } written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) if err != nil { return 0, iodine.New(err, nil) } - donut.lock.RUnlock() return written, nil } // GetBucketMetadata - func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { if len(donut.config.NodeDiskMap) > 0 { bucketMetadata, err := donut.getBucketMetadata(bucket) if err != nil { - donut.lock.RUnlock() return BucketMetadata{}, iodine.New(err, nil) } storedBucket := donut.storedBuckets[bucket] - donut.lock.RUnlock() - { - donut.lock.Lock() - storedBucket.bucketMetadata = bucketMetadata - donut.storedBuckets[bucket] = storedBucket - donut.lock.Unlock() - } + storedBucket.bucketMetadata = bucketMetadata + donut.storedBuckets[bucket] = storedBucket } return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } - donut.lock.RUnlock() return donut.storedBuckets[bucket].bucketMetadata, nil } // SetBucketMetadata - func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return iodine.New(BucketNotFound{Bucket: bucket}, nil) } - donut.lock.RUnlock() - donut.lock.Lock() - { - if len(donut.config.NodeDiskMap) > 0 { - if err := donut.setBucketMetadata(bucket, metadata); err != nil { - return iodine.New(err, nil) - } + if len(donut.config.NodeDiskMap) > 0 { + if err := donut.setBucketMetadata(bucket, metadata); err != nil { + return iodine.New(err, nil) } - storedBucket := donut.storedBuckets[bucket] - storedBucket.bucketMetadata.ACL = BucketACL(metadata["acl"]) - donut.storedBuckets[bucket] = storedBucket } - donut.lock.Unlock() + storedBucket := donut.storedBuckets[bucket] + storedBucket.bucketMetadata.ACL = BucketACL(metadata["acl"]) + donut.storedBuckets[bucket] = storedBucket return nil } @@ -328,7 +292,9 @@ func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, da }, nil) } contentType := metadata["contentType"] + donut.lock.Lock() objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data) + donut.lock.Unlock() // free debug.FreeOSMemory() return objectMetadata, iodine.New(err, nil) @@ -336,27 +302,21 @@ func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, da // createObject - PUT object to cache buffer func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) { - donut.lock.RLock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } storedBucket := donut.storedBuckets[bucket] // get object key objectKey := bucket + "/" + key if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectExists{Object: key}, nil) } - donut.lock.RUnlock() if contentType == "" { contentType = "application/octet-stream" @@ -376,10 +336,8 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } - donut.lock.Lock() storedBucket.objectMetadata[objectKey] = objMetadata donut.storedBuckets[bucket] = storedBucket - donut.lock.Unlock() return objMetadata, nil } // calculate md5 @@ -397,9 +355,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s break } hash.Write(byteBuffer[0:length]) - //donut.lock.Lock() ok := donut.objects.Append(objectKey, byteBuffer[0:length]) - //donut.lock.Unlock() if !ok { return ObjectMetadata{}, iodine.New(InternalError{}, nil) } @@ -431,33 +387,27 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s Size: int64(totalLength), } - //donut.lock.Lock() storedBucket.objectMetadata[objectKey] = newObject donut.storedBuckets[bucket] = storedBucket - //donut.lock.Unlock() return newObject, nil } // MakeBucket - create bucket in cache func (donut API) MakeBucket(bucketName, acl string) error { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() if len(donut.storedBuckets) == totalBuckets { - donut.lock.RUnlock() return iodine.New(TooManyBuckets{Bucket: bucketName}, nil) } if !IsValidBucket(bucketName) { - donut.lock.RUnlock() return iodine.New(BucketNameInvalid{Bucket: bucketName}, nil) } if !IsValidBucketACL(acl) { - donut.lock.RUnlock() return iodine.New(InvalidACL{ACL: acl}, nil) } if _, ok := donut.storedBuckets[bucketName]; ok == true { - donut.lock.RUnlock() return iodine.New(BucketExists{Bucket: bucketName}, nil) } - donut.lock.RUnlock() if strings.TrimSpace(acl) == "" { // default is private @@ -476,16 +426,14 @@ func (donut API) MakeBucket(bucketName, acl string) error { newBucket.bucketMetadata.Name = bucketName newBucket.bucketMetadata.Created = time.Now().UTC() newBucket.bucketMetadata.ACL = BucketACL(acl) - //donut.lock.Lock() donut.storedBuckets[bucketName] = newBucket - //donut.lock.Unlock() return nil } // ListObjects - list objects from cache func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() + donut.lock.Lock() + defer donut.lock.Unlock() if !IsValidBucket(bucket) { return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } @@ -577,8 +525,8 @@ func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } // ListBuckets - List buckets from cache func (donut API) ListBuckets() ([]BucketMetadata, error) { - donut.lock.RLock() - defer donut.lock.RUnlock() + donut.lock.Lock() + defer donut.lock.Unlock() var results []BucketMetadata for _, bucket := range donut.storedBuckets { results = append(results, bucket.bucketMetadata) @@ -589,39 +537,32 @@ func (donut API) ListBuckets() ([]BucketMetadata, error) { // GetObjectMetadata - get object metadata from cache func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() // check if bucket exists if !IsValidBucket(bucket) { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } storedBucket := donut.storedBuckets[bucket] objectKey := bucket + "/" + key if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true { - donut.lock.RUnlock() return objMetadata, nil } if len(donut.config.NodeDiskMap) > 0 { objMetadata, err := donut.getObjectMetadata(bucket, key) - donut.lock.RUnlock() if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } // update - donut.lock.Lock() storedBucket.objectMetadata[objectKey] = objMetadata - donut.lock.Unlock() return objMetadata, nil } - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil) } diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index 9946bcfce..ae2808177 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -36,28 +36,22 @@ import ( // NewMultipartUpload - func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - donut.lock.RUnlock() return "", iodine.New(ObjectNameInvalid{Object: key}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) } storedBucket := donut.storedBuckets[bucket] objectKey := bucket + "/" + key if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { - donut.lock.RUnlock() return "", iodine.New(ObjectExists{Object: key}, nil) } - donut.lock.RUnlock() - - //donut.lock.Lock() id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) uploadIDSum := sha512.Sum512(id) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47] @@ -67,21 +61,18 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er initiated: time.Now(), totalParts: 0, } - //donut.lock.Unlock() return uploadID, nil } // AbortMultipartUpload - func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error { - donut.lock.RLock() + donut.lock.Lock() + defer donut.lock.Unlock() storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - donut.lock.RUnlock() return iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - donut.lock.RUnlock() - donut.cleanupMultiparts(bucket, key, uploadID) donut.cleanupMultipartSession(bucket, key, uploadID) return nil @@ -94,18 +85,18 @@ func getMultipartKey(key string, uploadID string, partNumber int) string { // CreateObjectPart - func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { // Verify upload id - donut.lock.RLock() storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - donut.lock.RUnlock() return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - donut.lock.RUnlock() + donut.lock.Lock() etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data) if err != nil { return "", iodine.New(err, nil) } + donut.lock.Unlock() + // free debug.FreeOSMemory() return etag, nil @@ -113,27 +104,21 @@ func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, cont // createObject - PUT object to cache buffer func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - donut.lock.RLock() if !IsValidBucket(bucket) { - donut.lock.RUnlock() return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } if !IsValidObjectName(key) { - donut.lock.RUnlock() return "", iodine.New(ObjectNameInvalid{Object: key}, nil) } if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return "", iodine.New(BucketNotFound{Bucket: bucket}, nil) } storedBucket := donut.storedBuckets[bucket] // get object key partKey := bucket + "/" + getMultipartKey(key, uploadID, partID) if _, ok := storedBucket.partMetadata[partKey]; ok == true { - donut.lock.RUnlock() return storedBucket.partMetadata[partKey].ETag, nil } - donut.lock.RUnlock() if contentType == "" { contentType = "application/octet-stream" @@ -172,9 +157,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont md5SumBytes := hash.Sum(nil) totalLength := int64(len(readBytes)) - //donut.lock.Lock() donut.multiPartObjects.Set(partKey, readBytes) - //donut.lock.Unlock() // setting up for de-allocation readBytes = nil @@ -192,20 +175,16 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont Size: totalLength, } - //donut.lock.Lock() storedBucket.partMetadata[partKey] = newPart multiPartSession := storedBucket.multiPartSession[key] multiPartSession.totalParts++ storedBucket.multiPartSession[key] = multiPartSession donut.storedBuckets[bucket] = storedBucket - //donut.lock.Unlock() return md5Sum, nil } func (donut API) cleanupMultipartSession(bucket, key, uploadID string) { - // donut.lock.Lock() - // defer donut.lock.Unlock() delete(donut.storedBuckets[bucket].multiPartSession, key) } @@ -218,6 +197,7 @@ func (donut API) cleanupMultiparts(bucket, key, uploadID string) { // CompleteMultipartUpload - func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) { + donut.lock.Lock() if !IsValidBucket(bucket) { return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) } @@ -225,19 +205,13 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil) } // Verify upload id - donut.lock.RLock() if _, ok := donut.storedBuckets[bucket]; ok == false { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } storedBucket := donut.storedBuckets[bucket] if storedBucket.multiPartSession[key].uploadID != uploadID { - donut.lock.RUnlock() return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil) } - donut.lock.RUnlock() - - //donut.lock.Lock() var size int64 var fullObject bytes.Buffer for i := 1; i <= len(parts); i++ { @@ -264,11 +238,11 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map object = nil go debug.FreeOSMemory() } - //donut.lock.Unlock() md5sumSlice := md5.Sum(fullObject.Bytes()) // this is needed for final verification inside CreateObject, do not convert this to hex md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + donut.lock.Unlock() objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil) if err != nil { // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() @@ -276,8 +250,10 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map return ObjectMetadata{}, iodine.New(err, nil) } fullObject.Reset() + donut.lock.Lock() donut.cleanupMultiparts(bucket, key, uploadID) donut.cleanupMultipartSession(bucket, key, uploadID) + donut.lock.Unlock() return objectMetadata, nil } @@ -291,8 +267,8 @@ func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } // ListMultipartUploads - func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) { // TODO handle delimiter - donut.lock.RLock() - defer donut.lock.RUnlock() + donut.lock.Lock() + defer donut.lock.Unlock() if _, ok := donut.storedBuckets[bucket]; ok == false { return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) } @@ -353,8 +329,8 @@ func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumb // ListObjectParts - func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) { // Verify upload id - donut.lock.RLock() - defer donut.lock.RUnlock() + donut.lock.Lock() + defer donut.lock.Unlock() if _, ok := donut.storedBuckets[bucket]; ok == false { return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil) }