1
0
mirror of https://github.com/minio/minio.git synced 2025-01-18 10:13:16 -05:00

Memory driver now limits by size

This commit is contained in:
Frederick F. Kautz IV 2015-04-22 17:50:26 -07:00
parent 276c02af62
commit c56f7380d6
2 changed files with 35 additions and 14 deletions
pkg
server
storage/drivers/memory

@ -134,7 +134,7 @@ func getDriverChannels(driverType DriverType) (ctrlChans []chan<- string, status
switch { switch {
case driverType == Memory: case driverType == Memory:
{ {
ctrlChan, statusChan, driver = memory.Start(1000) ctrlChan, statusChan, driver = memory.Start(1024 * 1024 * 1024)
ctrlChans = append(ctrlChans, ctrlChan) ctrlChans = append(ctrlChans, ctrlChan)
statusChans = append(statusChans, statusChan) statusChans = append(statusChans, statusChan)
} }

@ -33,6 +33,7 @@ import (
"io/ioutil" "io/ioutil"
"github.com/golang/groupcache/lru" "github.com/golang/groupcache/lru"
"github.com/minio-io/minio/pkg/utils/log"
) )
// memoryDriver - local variables // memoryDriver - local variables
@ -41,6 +42,8 @@ type memoryDriver struct {
objectMetadata map[string]storedObject objectMetadata map[string]storedObject
objects *lru.Cache objects *lru.Cache
lock *sync.RWMutex lock *sync.RWMutex
totalSize int64
maxSize int64
} }
type storedBucket struct { type storedBucket struct {
@ -54,16 +57,26 @@ type storedObject struct {
} }
// Start memory object server // Start memory object server
func Start(maxObjects int) (chan<- string, <-chan error, drivers.Driver) { func Start(maxSize int64) (chan<- string, <-chan error, drivers.Driver) {
ctrlChannel := make(chan string) ctrlChannel := make(chan string)
errorChannel := make(chan error) errorChannel := make(chan error)
memory := new(memoryDriver) var memory *memoryDriver
memory = new(memoryDriver)
memory.bucketMetadata = make(map[string]storedBucket) memory.bucketMetadata = make(map[string]storedBucket)
memory.objectMetadata = make(map[string]storedObject) memory.objectMetadata = make(map[string]storedObject)
memory.objects = lru.New(maxObjects) memory.objects = lru.New(0)
memory.lock = new(sync.RWMutex) memory.lock = new(sync.RWMutex)
switch {
case maxSize == 0:
memory.maxSize = 9223372036854775807
case maxSize > 0:
memory.maxSize = maxSize
default:
log.Println("Error")
}
memory.objects.OnEvicted = memory.evictObject memory.objects.OnEvicted = memory.evictObject
go start(ctrlChannel, errorChannel) go start(ctrlChannel, errorChannel)
@ -75,7 +88,7 @@ func start(ctrlChannel <-chan string, errorChannel chan<- error) {
} }
// GetObject - GET object from memory buffer // GetObject - GET object from memory buffer
func (memory memoryDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) { func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) (int64, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
if _, ok := memory.bucketMetadata[bucket]; ok == false { if _, ok := memory.bucketMetadata[bucket]; ok == false {
@ -95,7 +108,7 @@ func (memory memoryDriver) GetObject(w io.Writer, bucket string, object string)
} }
// GetPartialObject - GET object from memory buffer range // GetPartialObject - GET object from memory buffer range
func (memory memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
var sourceBuffer bytes.Buffer var sourceBuffer bytes.Buffer
@ -109,7 +122,7 @@ func (memory memoryDriver) GetPartialObject(w io.Writer, bucket, object string,
} }
// GetBucketMetadata - // GetBucketMetadata -
func (memory memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) { func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetadata, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
if _, ok := memory.bucketMetadata[bucket]; ok == false { if _, ok := memory.bucketMetadata[bucket]; ok == false {
@ -119,7 +132,7 @@ func (memory memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMetad
} }
// CreateObject - PUT object to memory buffer // CreateObject - PUT object to memory buffer
func (memory memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error { func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error {
memory.lock.RLock() memory.lock.RLock()
if _, ok := memory.bucketMetadata[bucket]; ok == false { if _, ok := memory.bucketMetadata[bucket]; ok == false {
@ -166,12 +179,16 @@ func (memory memoryDriver) CreateObject(bucket, key, contentType, md5sum string,
} }
memory.objectMetadata[objectKey] = newObject memory.objectMetadata[objectKey] = newObject
memory.objects.Add(objectKey, dataSlice) memory.objects.Add(objectKey, dataSlice)
memory.totalSize = memory.totalSize + newObject.metadata.Size
for memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
}
memory.lock.Unlock() memory.lock.Unlock()
return nil return nil
} }
// CreateBucket - create bucket in memory // CreateBucket - create bucket in memory
func (memory memoryDriver) CreateBucket(bucketName string) error { func (memory *memoryDriver) CreateBucket(bucketName string) error {
memory.lock.RLock() memory.lock.RLock()
if !drivers.IsValidBucket(bucketName) { if !drivers.IsValidBucket(bucketName) {
memory.lock.RUnlock() memory.lock.RUnlock()
@ -213,7 +230,7 @@ func appendUniq(slice []string, i string) []string {
return append(slice, i) return append(slice, i)
} }
func (memory memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedName string, resources drivers.BucketResourcesMetadata) (drivers.BucketResourcesMetadata, []string) { func (memory *memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedName string, resources drivers.BucketResourcesMetadata) (drivers.BucketResourcesMetadata, []string) {
switch true { switch true {
case key == resources.Prefix: case key == resources.Prefix:
keys = appendUniq(keys, key) keys = appendUniq(keys, key)
@ -227,7 +244,7 @@ func (memory memoryDriver) filterDelimiterPrefix(keys []string, key, delimitedNa
} }
// ListObjects - list objects from memory // ListObjects - list objects from memory
func (memory memoryDriver) ListObjects(bucket string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) { func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketResourcesMetadata) ([]drivers.ObjectMetadata, drivers.BucketResourcesMetadata, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
if _, ok := memory.bucketMetadata[bucket]; ok == false { if _, ok := memory.bucketMetadata[bucket]; ok == false {
@ -290,7 +307,7 @@ func (b ByBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name } func (b ByBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
// ListBuckets - List buckets from memory // ListBuckets - List buckets from memory
func (memory memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { func (memory *memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
var results []drivers.BucketMetadata var results []drivers.BucketMetadata
@ -302,7 +319,7 @@ func (memory memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) {
} }
// GetObjectMetadata - get object metadata from memory // GetObjectMetadata - get object metadata from memory
func (memory memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drivers.ObjectMetadata, error) { func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drivers.ObjectMetadata, error) {
memory.lock.RLock() memory.lock.RLock()
defer memory.lock.RUnlock() defer memory.lock.RUnlock()
// check if bucket exists // check if bucket exists
@ -318,8 +335,12 @@ func (memory memoryDriver) GetObjectMetadata(bucket, key, prefix string) (driver
return drivers.ObjectMetadata{}, drivers.ObjectNotFound{Bucket: bucket, Object: key} return drivers.ObjectMetadata{}, drivers.ObjectNotFound{Bucket: bucket, Object: key}
} }
func (memory memoryDriver) evictObject(key lru.Key, value interface{}) { func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) {
k := key.(string) k := key.(string)
memory.totalSize = memory.totalSize - memory.objectMetadata[k].metadata.Size
log.Println("evicting:", k)
delete(memory.objectMetadata, k) delete(memory.objectMetadata, k)
} }