Use new LRU inside memory driver

This commit is contained in:
Harshavardhana 2015-05-03 23:16:45 -07:00
parent f7caef2d26
commit d0df548eb5
5 changed files with 89 additions and 99 deletions

View File

@ -366,7 +366,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
}
// CreateObject creates a new object
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) (string, error) {
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) {
errParams := map[string]string{
"bucketName": bucketName,
"objectName": objectName,
@ -383,6 +383,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
}
metadata := make(map[string]string)
metadata["contentType"] = strings.TrimSpace(contentType)
metadata["contentLength"] = strconv.FormatInt(size, 10)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))

View File

@ -31,7 +31,15 @@ limitations under the License.
package memory
import "container/list"
import (
"bytes"
"container/list"
"io"
"strconv"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
)
// CacheStats are returned by stats accessors on Group.
type CacheStats struct {
@ -53,7 +61,6 @@ type Cache struct {
totalSize uint64
totalEvicted int64
cache map[interface{}]*list.Element
value []byte
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
@ -61,7 +68,7 @@ type Key interface{}
type entry struct {
key Key
value []byte
value *bytes.Buffer
}
// NewCache creates a new Cache.
@ -83,30 +90,37 @@ func (c *Cache) Stats() CacheStats {
}
}
func (c *Cache) Write(p []byte) (n int, err error) {
c.totalSize = c.totalSize + uint64(len(p))
// If MaxSize is zero expecting infinite memory
if c.MaxSize != 0 && c.totalSize > c.MaxSize {
c.totalSize -= uint64(len(p))
c.RemoveOldest()
}
c.value = append(c.value, p...)
return len(p), nil
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key) {
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
ele := c.ll.PushFront(&entry{key, c.value})
c.value = nil
c.cache[key] = ele
func (c *Cache) Add(key Key, size int64) io.WriteCloser {
r, w := io.Pipe()
go func() {
if uint64(size) > c.MaxSize {
err := iodine.New(drivers.EntityTooLarge{
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(c.MaxSize, 10),
}, nil)
r.CloseWithError(err)
return
}
// If MaxSize is zero expecting infinite memory
if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize {
c.RemoveOldest()
}
value := new(bytes.Buffer)
n, err := io.CopyN(value, r, size)
if err != nil {
r.CloseWithError(iodine.New(err, nil))
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.totalSize += uint64(n)
}()
return w
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value []byte, ok bool) {
func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) {
if c.cache == nil {
return
}
@ -155,6 +169,7 @@ func (c *Cache) removeElement(e *list.Element) {
kv := e.Value.(*entry)
delete(c.cache, kv.key)
c.totalEvicted++
c.totalSize -= uint64(kv.value.Len())
if c.OnEvicted != nil {
c.OnEvicted(kv.key)
}

View File

@ -25,6 +25,7 @@ import (
"errors"
"io"
"io/ioutil"
"log"
"runtime/debug"
"sort"
"strings"
@ -33,18 +34,14 @@ import (
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
"github.com/minio-io/minio/pkg/storage/drivers/memory/lru"
"github.com/minio-io/minio/pkg/utils/log"
"github.com/minio-io/minio/pkg/utils/split"
)
// memoryDriver - local variables
type memoryDriver struct {
storedBuckets map[string]storedBucket
lock *sync.RWMutex
objects *lru.Cache
objects *Cache
lastAccessedObjects map[string]time.Time
totalSize uint64
maxSize uint64
expiration time.Duration
shutdown bool
@ -68,27 +65,18 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket)
memory.lastAccessedObjects = make(map[string]time.Time)
memory.objects = lru.New(0)
memory.objects = NewCache(maxSize)
memory.maxSize = maxSize
memory.lock = new(sync.RWMutex)
memory.expiration = expiration
memory.shutdown = false
switch {
case maxSize <= 0:
memory.maxSize = 9223372036854775807
case maxSize > 0:
memory.maxSize = maxSize
default:
log.Println("Error")
}
memory.objects.OnEvicted = memory.evictObject
// set up memory expiration
if expiration > 0 {
go memory.expireLRUObjects()
}
go start(ctrlChannel, errorChannel)
return ctrlChannel, errorChannel, memory
}
@ -117,11 +105,9 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string)
objectKey := bucket + "/" + object
if _, ok := storedBucket.objectMetadata[objectKey]; ok {
if data, ok := memory.objects.Get(objectKey); ok {
dataSlice := data.([]byte)
objectBuffer := bytes.NewBuffer(dataSlice)
memory.lock.RUnlock()
go memory.updateAccessTime(objectKey)
written, err := io.Copy(w, objectBuffer)
written, err := io.Copy(w, data)
return written, iodine.New(err, nil)
}
}
@ -204,14 +190,12 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
return iodine.New(errors.New("invalid argument"), nil)
}
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) {
humanError, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, data)
debug.FreeOSMemory()
return humanError, err
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
return memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
}
// CreateObject - PUT object to memory buffer
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) {
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
memory.lock.RLock()
if !drivers.IsValidBucket(bucket) {
memory.lock.RUnlock()
@ -247,37 +231,30 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
var bytesBuffer bytes.Buffer
chunks := split.Stream(data, 10*1024*1024)
totalLength := 0
summer := md5.New()
for chunk := range chunks {
if chunk.Err == nil {
totalLength = totalLength + len(chunk.Data)
summer.Write(chunk.Data)
_, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data))
if err != nil {
err := iodine.New(err, nil)
log.Println(err)
return "", err
}
if uint64(totalLength)+memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
}
}
memory.lock.Lock()
md5Writer := md5.New()
lruWriter := memory.objects.Add(objectKey, size)
mw := io.MultiWriter(md5Writer, lruWriter)
totalLength, err := io.CopyN(mw, data, size)
if err != nil {
memory.lock.Unlock()
return "", iodine.New(err, nil)
}
md5SumBytes := summer.Sum(nil)
if err := lruWriter.Close(); err != nil {
memory.lock.Unlock()
return "", iodine.New(err, nil)
}
memory.lock.Unlock()
md5SumBytes := md5Writer.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.RemoveOldest()
return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
}
}
newObject := drivers.ObjectMetadata{
Bucket: bucket,
Key: key,
@ -287,21 +264,21 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
Md5: md5Sum,
Size: int64(totalLength),
}
memory.lock.Lock()
memoryObject := make(map[string]drivers.ObjectMetadata)
if len(memory.storedBuckets[bucket].objectMetadata) == 0 {
switch {
case len(memory.storedBuckets[bucket].objectMetadata) == 0:
storedBucket.objectMetadata = memoryObject
storedBucket.objectMetadata[objectKey] = newObject
} else {
default:
storedBucket.objectMetadata[objectKey] = newObject
}
memory.storedBuckets[bucket] = storedBucket
memory.objects.Add(objectKey, bytesBuffer.Bytes())
memory.totalSize = memory.totalSize + uint64(newObject.Size)
if memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
}
memory.lock.Unlock()
// free
debug.FreeOSMemory()
return newObject.Md5, nil
}
@ -481,25 +458,21 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
}
func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) {
memory.doEvictObject(key, value)
debug.FreeOSMemory()
}
func (memory *memoryDriver) doEvictObject(key lru.Key, value interface{}) {
k := key.(string)
func (memory *memoryDriver) evictObject(a ...interface{}) {
cacheStats := memory.objects.Stats()
log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d",
cacheStats.Bytes, memory.objects.Len(), cacheStats.Evictions)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range memory.storedBuckets {
memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size)
log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size)
log.Println("TotalSize:", memory.totalSize)
delete(storedBucket.objectMetadata, k)
delete(storedBucket.objectMetadata, key)
delete(memory.lastAccessedObjects, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
delete(memory.storedBuckets, bucket)
}
delete(memory.lastAccessedObjects, k)
}
debug.FreeOSMemory()
}
func (memory *memoryDriver) expireLRUObjects() {
@ -509,16 +482,18 @@ func (memory *memoryDriver) expireLRUObjects() {
}
var sleepDuration time.Duration
memory.lock.Lock()
if memory.objects.Len() > 0 {
if k, _, ok := memory.objects.GetOldest(); ok {
switch {
case memory.objects.Len() > 0:
if k, ok := memory.objects.GetOldest(); ok {
key := k.(string)
if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration {
switch {
case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration:
memory.objects.RemoveOldest()
} else {
default:
sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key])
}
}
} else {
default:
sleepDuration = memory.expiration
}
memory.lock.Unlock()

View File

@ -21,7 +21,6 @@ import (
. "github.com/minio-io/check"
"github.com/minio-io/minio/pkg/storage/drivers"
"time"
)
func Test(t *testing.T) { TestingT(t) }
@ -32,7 +31,7 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) {
create := func() drivers.Driver {
_, _, store := Start(1000, 3*time.Hour)
_, _, store := Start(10000000, 0)
return store
}
drivers.APITestSuite(c, create)

View File

@ -116,8 +116,8 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet
}
// CreateObject is a mock
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, data)
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, size, data)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)