Implement metadata cache, metadata cache is used by top level donut right now. Rename trove as data cache

We should use it internally everywhere.
This commit is contained in:
Harshavardhana
2015-07-03 21:02:31 -07:00
parent 7d2609856e
commit 0a827305ad
8 changed files with 342 additions and 199 deletions

View File

@@ -44,10 +44,10 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
if !IsValidObjectName(key) {
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
return "", iodine.New(ObjectExists{Object: key}, nil)
@@ -56,12 +56,12 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
donut.storedBuckets[bucket].multiPartSession[key] = multiPartSession{
storedBucket.multiPartSession[key] = MultiPartSession{
uploadID: uploadID,
initiated: time.Now(),
totalParts: 0,
}
donut.storedBuckets.Set(bucket, storedBucket)
return uploadID, nil
}
@@ -69,11 +69,16 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, er
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
storedBucket := donut.storedBuckets[bucket]
if !IsValidBucket(bucket) {
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !donut.storedBuckets.Exists(bucket) {
return iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
if storedBucket.multiPartSession[key].uploadID != uploadID {
return iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
@@ -84,36 +89,38 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
// CreateObjectPart -
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.Lock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
donut.lock.Unlock()
// free
debug.FreeOSMemory()
return etag, nil
}
// createObject - PUT object to cache buffer
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
if !IsValidBucket(bucket) {
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
var etag string
var err error
{
donut.lock.Lock()
etag, err = donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
donut.lock.Unlock()
}
// possible free
debug.FreeOSMemory()
return etag, nil
}
// createObject - PUT object to cache buffer
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id
if storedBucket.multiPartSession[key].uploadID != uploadID {
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
// get object key
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
@@ -179,20 +186,18 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets[bucket] = storedBucket
donut.storedBuckets.Set(bucket, storedBucket)
return md5Sum, nil
}
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
delete(donut.storedBuckets[bucket].multiPartSession, key)
}
func (donut API) cleanupMultiparts(bucket, key, uploadID string) {
for i := 1; i <= donut.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
for i := 1; i <= storedBucket.multiPartSession[key].totalParts; i++ {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
donut.multiPartObjects.Delete(objectKey)
}
delete(storedBucket.multiPartSession, key)
donut.storedBuckets.Set(bucket, storedBucket)
}
// CompleteMultipartUpload -
@@ -205,10 +210,10 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
if _, ok := donut.storedBuckets[bucket]; ok == false {
if !donut.storedBuckets.Exists(bucket) {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
if storedBucket.multiPartSession[key].uploadID != uploadID {
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
@@ -250,8 +255,8 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
donut.lock.Lock()
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
donut.lock.Unlock()
return objectMetadata, nil
@@ -269,10 +274,10 @@ func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartRe
// TODO handle delimiter
donut.lock.Lock()
defer donut.lock.Unlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
if !donut.storedBuckets.Exists(bucket) {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
var uploads []*UploadMetadata
for key, session := range storedBucket.multiPartSession {
@@ -331,10 +336,10 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
// Verify upload id
donut.lock.Lock()
defer donut.lock.Unlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
if !donut.storedBuckets.Exists(bucket) {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
}
@@ -374,8 +379,11 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
func (donut API) expiredPart(a ...interface{}) {
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range donut.storedBuckets {
delete(storedBucket.partMetadata, key)
buckets := donut.storedBuckets.GetAll()
for bucketName, bucket := range buckets {
b := bucket.(storedBucket)
delete(b.partMetadata, key)
donut.storedBuckets.Set(bucketName, b)
}
debug.FreeOSMemory()
}