Use cache Append() for saving objects in memory, GetObject() caches un-cached entries while reading

This commit is contained in:
Harshavardhana 2015-07-01 21:39:37 -07:00
parent bce93c1b3a
commit ebe61d99d9
2 changed files with 77 additions and 26 deletions

View File

@ -106,14 +106,16 @@ func NewCache(maxSize uint64, expiration time.Duration, donutName string, nodeDi
// GetObject - GET object from cache buffer // GetObject - GET object from cache buffer
func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) { func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64, error) {
cache.lock.RLock() cache.lock.RLock()
defer cache.lock.RUnlock()
if !IsValidBucket(bucket) { if !IsValidBucket(bucket) {
cache.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil) return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
} }
if !IsValidObjectName(object) { if !IsValidObjectName(object) {
cache.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, nil) return 0, iodine.New(ObjectNameInvalid{Object: object}, nil)
} }
if _, ok := cache.storedBuckets[bucket]; ok == false { if _, ok := cache.storedBuckets[bucket]; ok == false {
cache.lock.RUnlock()
return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil) return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil)
} }
objectKey := bucket + "/" + object objectKey := bucket + "/" + object
@ -122,20 +124,38 @@ func (cache Cache) GetObject(w io.Writer, bucket string, object string) (int64,
if cache.donut != nil { if cache.donut != nil {
reader, size, err := cache.donut.GetObject(bucket, object) reader, size, err := cache.donut.GetObject(bucket, object)
if err != nil { if err != nil {
cache.lock.RUnlock()
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
written, err := io.CopyN(w, reader, size) // new proxy writer to capture data read from disk
pw := newProxyWriter(w)
written, err := io.CopyN(pw, reader, size)
if err != nil { if err != nil {
cache.lock.RUnlock()
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
cache.lock.RUnlock()
/// cache object read from disk
{
cache.lock.Lock()
ok := cache.objects.Set(objectKey, pw.writtenBytes)
cache.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
return 0, iodine.New(InternalError{}, nil)
}
}
return written, nil return written, nil
} }
cache.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil) return 0, iodine.New(ObjectNotFound{Object: object}, nil)
} }
written, err := io.Copy(w, bytes.NewBuffer(data)) written, err := io.CopyN(w, bytes.NewBuffer(data), int64(cache.objects.Len(objectKey)))
if err != nil { if err != nil {
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
cache.lock.RUnlock()
return written, nil return written, nil
} }
@ -148,14 +168,16 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l
"length": strconv.FormatInt(length, 10), "length": strconv.FormatInt(length, 10),
} }
cache.lock.RLock() cache.lock.RLock()
defer cache.lock.RUnlock()
if !IsValidBucket(bucket) { if !IsValidBucket(bucket) {
cache.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams) return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams)
} }
if !IsValidObjectName(object) { if !IsValidObjectName(object) {
cache.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams) return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams)
} }
if start < 0 { if start < 0 {
cache.lock.RUnlock()
return 0, iodine.New(InvalidRange{ return 0, iodine.New(InvalidRange{
Start: start, Start: start,
Length: length, Length: length,
@ -167,23 +189,40 @@ func (cache Cache) GetPartialObject(w io.Writer, bucket, object string, start, l
if cache.donut != nil { if cache.donut != nil {
reader, _, err := cache.donut.GetObject(bucket, object) reader, _, err := cache.donut.GetObject(bucket, object)
if err != nil { if err != nil {
cache.lock.RUnlock()
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil { if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil {
cache.lock.RUnlock()
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
pw := newProxyWriter(w)
written, err := io.CopyN(w, reader, length) written, err := io.CopyN(w, reader, length)
if err != nil { if err != nil {
cache.lock.RUnlock()
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
cache.lock.RUnlock()
{
cache.lock.Lock()
ok := cache.objects.Set(objectKey, pw.writtenBytes)
cache.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
return 0, iodine.New(InternalError{}, nil)
}
}
return written, nil return written, nil
} }
cache.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil) return 0, iodine.New(ObjectNotFound{Object: object}, nil)
} }
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
if err != nil { if err != nil {
return 0, iodine.New(err, nil) return 0, iodine.New(err, nil)
} }
cache.lock.RUnlock()
return written, nil return written, nil
} }
@ -317,13 +356,24 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
} }
if cache.donut != nil {
objMetadata, err := cache.donut.PutObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType})
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
cache.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
cache.storedBuckets[bucket] = storedBucket
cache.lock.Unlock()
return objMetadata, nil
}
// calculate md5 // calculate md5
hash := md5.New() hash := md5.New()
var readBytes []byte
var err error var err error
var length int var totalLength int
for err == nil { for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024) byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer) length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer // While hash.Write() wouldn't mind a Nil byteBuffer
@ -332,24 +382,20 @@ func (cache Cache) createObject(bucket, key, contentType, expectedMD5Sum string,
break break
} }
hash.Write(byteBuffer[0:length]) hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...) cache.lock.Lock()
ok := cache.objects.Append(objectKey, byteBuffer[0:length])
cache.lock.Unlock()
if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
totalLength += length
go debug.FreeOSMemory()
} }
if err != io.EOF { if err != io.EOF {
return ObjectMetadata{}, iodine.New(err, nil) return ObjectMetadata{}, iodine.New(err, nil)
} }
md5SumBytes := hash.Sum(nil) md5SumBytes := hash.Sum(nil)
totalLength := len(readBytes)
cache.lock.Lock()
ok := cache.objects.Set(objectKey, readBytes)
// setting up for de-allocation
readBytes = nil
go debug.FreeOSMemory()
cache.lock.Unlock()
if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
md5Sum := hex.EncodeToString(md5SumBytes) md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such // Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" { if strings.TrimSpace(expectedMD5Sum) != "" {
@ -576,14 +622,8 @@ func (cache Cache) expiredObject(a ...interface{}) {
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired) cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
key := a[0].(string) key := a[0].(string)
// loop through all buckets // loop through all buckets
for bucket, storedBucket := range cache.storedBuckets { for _, storedBucket := range cache.storedBuckets {
delete(storedBucket.objectMetadata, key) delete(storedBucket.objectMetadata, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
if time.Since(cache.storedBuckets[bucket].bucketMetadata.Created) > cache.expiration {
delete(cache.storedBuckets, bucket)
}
}
} }
debug.FreeOSMemory() debug.FreeOSMemory()
} }

View File

@ -115,6 +115,17 @@ func (r *Cache) Get(key string) ([]byte, bool) {
return value, true return value, true
} }
// Len returns length of the value of a given key, returns zero if key doesn't exist
func (r *Cache) Len(key string) int {
r.Lock()
defer r.Unlock()
_, ok := r.items[key]
if !ok {
return 0
}
return len(r.items[key])
}
// Append will append new data to an existing key, // Append will append new data to an existing key,
// if key doesn't exist it behaves like Set() // if key doesn't exist it behaves like Set()
func (r *Cache) Append(key string, value []byte) bool { func (r *Cache) Append(key string, value []byte) bool {