Merge pull request #567 from harshavardhana/pr_out_convert_groupcache_lru_to_be_memory_size_based_rather_than_entry_based_calling_it_lru_memory

Convert groupcache lru to be memory size based rather than entry based calling it lru-memory
This commit is contained in:
Harshavardhana 2015-05-04 22:04:26 -07:00
commit 3fc9b4554f
14 changed files with 310 additions and 458 deletions

View File

@ -18,6 +18,7 @@ package api
import (
"net/http"
"strconv"
"github.com/gorilla/mux"
"github.com/minio-io/minio/pkg/iodine"
@ -155,10 +156,10 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
return
}
/// if Content-Length missing, incomplete request throw IncompleteBody
/// if Content-Length missing, throw away
size := req.Header.Get("Content-Length")
if size == "" {
writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path)
writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path)
return
}
/// maximum Upload size for objects in a single operation
@ -171,7 +172,9 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path)
return
}
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, req.Body)
// ignoring error here, TODO find a way to reply back if we can
sizeInt, _ := strconv.ParseInt(size, 10, 64)
calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt, req.Body)
switch err := iodine.ToError(err).(type) {
case nil:
{

View File

@ -168,7 +168,7 @@ func (s *MySuite) TestEmptyObject(c *C) {
Size: 0,
}
typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once()
typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(metadata.Md5, nil).Once()
typedDriver.On("CreateObject", "bucket", "object", "", "", 0, mock.Anything).Return(metadata.Md5, nil).Once()
typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Twice()
typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(metadata, nil).Once()
typedDriver.On("GetObject", mock.Anything, "bucket", "object").Return(int64(0), nil).Once()
@ -179,7 +179,7 @@ func (s *MySuite) TestEmptyObject(c *C) {
buffer := bytes.NewBufferString("")
driver.CreateBucket("bucket", "private")
driver.CreateObject("bucket", "object", "", "", buffer)
driver.CreateObject("bucket", "object", "", "", 0, buffer)
request, err := http.NewRequest("GET", testServer.URL+"/bucket/object", nil)
c.Assert(err, IsNil)
@ -250,7 +250,7 @@ func (s *MySuite) TestObject(c *C) {
Size: 11,
}
typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once()
typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(metadata.Md5, nil).Once()
typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything, mock.Anything).Return(metadata.Md5, nil).Once()
typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Twice()
typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(metadata, nil).Twice()
typedDriver.SetGetObjectWriter("bucket", "object", []byte("hello world"))
@ -262,7 +262,7 @@ func (s *MySuite) TestObject(c *C) {
buffer := bytes.NewBufferString("hello world")
driver.CreateBucket("bucket", "private")
driver.CreateObject("bucket", "object", "", "", buffer)
driver.CreateObject("bucket", "object", "", "", int64(buffer.Len()), buffer)
request, err := http.NewRequest("GET", testServer.URL+"/bucket/object", nil)
c.Assert(err, IsNil)
@ -325,12 +325,12 @@ func (s *MySuite) TestMultipleObjects(c *C) {
typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once()
driver.CreateBucket("bucket", "private")
typedDriver.On("CreateObject", "bucket", "object1", "", "", mock.Anything).Return(metadata1.Md5, nil).Once()
driver.CreateObject("bucket", "object1", "", "", buffer1)
typedDriver.On("CreateObject", "bucket", "object2", "", "", mock.Anything).Return(metadata2.Md5, nil).Once()
driver.CreateObject("bucket", "object2", "", "", buffer2)
typedDriver.On("CreateObject", "bucket", "object3", "", "", mock.Anything).Return(metadata3.Md5, nil).Once()
driver.CreateObject("bucket", "object3", "", "", buffer3)
typedDriver.On("CreateObject", "bucket", "object1", "", "", mock.Anything, mock.Anything).Return(metadata1.Md5, nil).Once()
driver.CreateObject("bucket", "object1", "", "", int64(buffer1.Len()), buffer1)
typedDriver.On("CreateObject", "bucket", "object2", "", "", mock.Anything, mock.Anything).Return(metadata2.Md5, nil).Once()
driver.CreateObject("bucket", "object2", "", "", int64(buffer2.Len()), buffer2)
typedDriver.On("CreateObject", "bucket", "object3", "", "", mock.Anything, mock.Anything).Return(metadata3.Md5, nil).Once()
driver.CreateObject("bucket", "object3", "", "", int64(buffer3.Len()), buffer3)
// test non-existant object
typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Once()
@ -506,8 +506,8 @@ func (s *MySuite) TestHeader(c *C) {
buffer := bytes.NewBufferString("hello world")
typedDriver.On("GetBucketMetadata", "foo").Return(bucketMetadata, nil).Once()
typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(objectMetadata.Md5, nil).Once()
driver.CreateObject("bucket", "object", "", "", buffer)
typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything, mock.Anything).Return(objectMetadata.Md5, nil).Once()
driver.CreateObject("bucket", "object", "", "", int64(buffer.Len()), buffer)
typedDriver.On("GetBucketMetadata", "bucket").Return(bucketMetadata, nil).Once()
typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(objectMetadata, nil).Once()
@ -618,7 +618,7 @@ func (s *MySuite) TestPutObject(c *C) {
Size: 11,
}
typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything).Return(twoMetadata.Md5, nil).Once()
typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything, mock.Anything).Return(twoMetadata.Md5, nil).Once()
request, err = http.NewRequest("PUT", testServer.URL+"/bucket/two", bytes.NewBufferString("hello world"))
c.Assert(err, IsNil)
setAuthHeader(request)
@ -905,7 +905,7 @@ func (s *MySuite) TestContentTypePersists(c *C) {
}
typedDriver.On("GetBucketMetadata", "bucket").Return(metadata, nil).Once()
typedDriver.On("CreateObject", "bucket", "one", "", "", mock.Anything).Return(oneMetadata.Md5, nil).Once()
typedDriver.On("CreateObject", "bucket", "one", "", "", mock.Anything, mock.Anything).Return(oneMetadata.Md5, nil).Once()
request, err := http.NewRequest("PUT", testServer.URL+"/bucket/one", bytes.NewBufferString("hello world"))
delete(request.Header, "Content-Type")
c.Assert(err, IsNil)
@ -952,7 +952,7 @@ func (s *MySuite) TestContentTypePersists(c *C) {
}
typedDriver.On("GetBucketMetadata", "bucket").Return(metadata, nil).Once()
typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything).Return(twoMetadata.Md5, nil).Once()
typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything, mock.Anything).Return(twoMetadata.Md5, nil).Once()
request, err = http.NewRequest("PUT", testServer.URL+"/bucket/two", bytes.NewBufferString("hello world"))
delete(request.Header, "Content-Type")
request.Header.Add("Content-Type", "application/json")
@ -1010,11 +1010,11 @@ func (s *MySuite) TestPartialContent(c *C) {
}
typedDriver.On("CreateBucket", "foo", "private").Return(nil).Once()
typedDriver.On("CreateObject", "foo", "bar", "", "", mock.Anything).Return(metadata.Md5, nil).Once()
typedDriver.On("CreateObject", "foo", "bar", "", "", mock.Anything, mock.Anything).Return(metadata.Md5, nil).Once()
err := driver.CreateBucket("foo", "private")
c.Assert(err, IsNil)
driver.CreateObject("foo", "bar", "", "", bytes.NewBufferString("hello world"))
driver.CreateObject("foo", "bar", "", "", int64(len("hello world")), bytes.NewBufferString("hello world"))
// prepare for GET on range request
typedDriver.SetGetObjectWriter("foo", "bar", []byte("hello world"))

View File

@ -151,11 +151,17 @@ func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Su
donutObjectMetadata := make(map[string]string)
objectMetadata["version"] = "1.0"
donutObjectMetadata["version"] = "1.0"
size := metadata["contentLength"]
sizeInt, err := strconv.ParseInt(size, 10, 64)
if err != nil {
return "", iodine.New(err, nil)
}
// if total writers are only '1' do not compute erasure
switch len(writers) == 1 {
case true:
mw := io.MultiWriter(writers[0], summer)
totalLength, err := io.Copy(mw, objectData)
totalLength, err := io.CopyN(mw, objectData, sizeInt)
if err != nil {
return "", iodine.New(err, nil)
}

View File

@ -184,6 +184,7 @@ func (s *MySuite) TestNewObjectMetadata(c *C) {
hasher.Write([]byte(data))
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
metadata["contentLength"] = strconv.Itoa(len(data))
err = donut.MakeBucket("foo", "private")
c.Assert(err, IsNil)
@ -210,9 +211,6 @@ func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
_, err = donut.PutObject("foo", "", "", nil, nil)
c.Assert(err, Not(IsNil))
_, err = donut.PutObject("foo", " ", "", nil, nil)
c.Assert(err, Not(IsNil))
}
// test create object
@ -234,6 +232,7 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
hasher.Write([]byte(data))
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
metadata["contentLength"] = strconv.Itoa(len(data))
calculatedMd5Sum, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
c.Assert(err, IsNil)
@ -268,11 +267,16 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
c.Assert(donut.MakeBucket("foo", "private"), IsNil)
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
_, err = donut.PutObject("foo", "obj1", "", one, nil)
metadata := make(map[string]string)
metadata["contentLength"] = strconv.Itoa(len("one"))
_, err = donut.PutObject("foo", "obj1", "", one, metadata)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
_, err = donut.PutObject("foo", "obj2", "", two, nil)
metadata["contentLength"] = strconv.Itoa(len("two"))
_, err = donut.PutObject("foo", "obj2", "", two, metadata)
c.Assert(err, IsNil)
obj1, size, err := donut.GetObject("foo", "obj1")
@ -315,7 +319,8 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"})
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
_, err = donut.PutObject("foo", "obj3", "", three, nil)
metadata["contentLength"] = strconv.Itoa(len("three"))
_, err = donut.PutObject("foo", "obj3", "", three, metadata)
c.Assert(err, IsNil)
obj3, size, err := donut.GetObject("foo", "obj3")

View File

@ -73,7 +73,8 @@ func testMultipleObjectCreation(c *check.C, create func() Driver) {
key := "obj" + strconv.Itoa(i)
objects[key] = []byte(randomString)
calculatedmd5sum, err := drivers.CreateObject("bucket", key, "", expectedmd5Sum, bytes.NewBufferString(randomString))
calculatedmd5sum, err := drivers.CreateObject("bucket", key, "", expectedmd5Sum, int64(len(randomString)),
bytes.NewBufferString(randomString))
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
}
@ -107,7 +108,7 @@ func testPaging(c *check.C, create func() Driver) {
// check before paging occurs
for i := 0; i < 5; i++ {
key := "obj" + strconv.Itoa(i)
drivers.CreateObject("bucket", key, "", "", bytes.NewBufferString(key))
drivers.CreateObject("bucket", key, "", "", int64(len(key)), bytes.NewBufferString(key))
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = drivers.ListObjects("bucket", resources)
@ -118,7 +119,7 @@ func testPaging(c *check.C, create func() Driver) {
// check after paging occurs pages work
for i := 6; i <= 10; i++ {
key := "obj" + strconv.Itoa(i)
drivers.CreateObject("bucket", key, "", "", bytes.NewBufferString(key))
drivers.CreateObject("bucket", key, "", "", int64(len(key)), bytes.NewBufferString(key))
resources.Maxkeys = 5
resources.Prefix = ""
objects, resources, err = drivers.ListObjects("bucket", resources)
@ -128,8 +129,8 @@ func testPaging(c *check.C, create func() Driver) {
}
// check paging with prefix at end returns less objects
{
drivers.CreateObject("bucket", "newPrefix", "", "", bytes.NewBufferString("prefix1"))
drivers.CreateObject("bucket", "newPrefix2", "", "", bytes.NewBufferString("prefix2"))
drivers.CreateObject("bucket", "newPrefix", "", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"))
drivers.CreateObject("bucket", "newPrefix2", "", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"))
resources.Prefix = "new"
resources.Maxkeys = 5
objects, resources, err = drivers.ListObjects("bucket", resources)
@ -150,8 +151,8 @@ func testPaging(c *check.C, create func() Driver) {
// check delimited results with delimiter and prefix
{
drivers.CreateObject("bucket", "this/is/delimited", "", "", bytes.NewBufferString("prefix1"))
drivers.CreateObject("bucket", "this/is/also/a/delimited/file", "", "", bytes.NewBufferString("prefix2"))
drivers.CreateObject("bucket", "this/is/delimited", "", "", int64(len("prefix1")), bytes.NewBufferString("prefix1"))
drivers.CreateObject("bucket", "this/is/also/a/delimited/file", "", "", int64(len("prefix2")), bytes.NewBufferString("prefix2"))
var prefixes []string
resources.CommonPrefixes = prefixes // allocate new everytime
resources.Delimiter = "/"
@ -210,14 +211,14 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) {
hasher1.Write([]byte("one"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher1.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil))
md5Sum11, err := drivers.CreateObject("bucket", "object", "", md5Sum1, bytes.NewBufferString("one"))
md5Sum11, err := drivers.CreateObject("bucket", "object", "", md5Sum1, int64(len("one")), bytes.NewBufferString("one"))
c.Assert(err, check.IsNil)
c.Assert(md5Sum1hex, check.Equals, md5Sum11)
hasher2 := md5.New()
hasher2.Write([]byte("three"))
md5Sum2 := base64.StdEncoding.EncodeToString(hasher2.Sum(nil))
_, err = drivers.CreateObject("bucket", "object", "", md5Sum2, bytes.NewBufferString("three"))
_, err = drivers.CreateObject("bucket", "object", "", md5Sum2, int64(len("three")), bytes.NewBufferString("three"))
c.Assert(err, check.Not(check.IsNil))
var bytesBuffer bytes.Buffer
@ -229,7 +230,7 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) {
func testNonExistantBucketOperations(c *check.C, create func() Driver) {
drivers := create()
_, err := drivers.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one"))
_, err := drivers.CreateObject("bucket", "object", "", "", int64(len("one")), bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil))
}
@ -260,7 +261,8 @@ func testPutObjectInSubdir(c *check.C, create func() Driver) {
hasher.Write([]byte("hello world"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher.Sum(nil))
md5Sum11, err := drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum1, bytes.NewBufferString("hello world"))
md5Sum11, err := drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum1, int64(len("hello world")),
bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil)
c.Assert(md5Sum11, check.Equals, md5Sum1hex)
@ -356,7 +358,8 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Driver) {
err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil)
_, err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world"))
_, err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", int64(len("hello world")),
bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
@ -400,19 +403,19 @@ func testDefaultContentType(c *check.C, create func() Driver) {
c.Assert(err, check.IsNil)
// test empty
_, err = drivers.CreateObject("bucket", "one", "", "", bytes.NewBufferString("one"))
_, err = drivers.CreateObject("bucket", "one", "", "", int64(len("one")), bytes.NewBufferString("one"))
metadata, err := drivers.GetObjectMetadata("bucket", "one", "")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/octet-stream")
// test custom
drivers.CreateObject("bucket", "two", "application/text", "", bytes.NewBufferString("two"))
drivers.CreateObject("bucket", "two", "application/text", "", int64(len("two")), bytes.NewBufferString("two"))
metadata, err = drivers.GetObjectMetadata("bucket", "two", "")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/text")
// test trim space
drivers.CreateObject("bucket", "three", "\tapplication/json ", "", bytes.NewBufferString("three"))
drivers.CreateObject("bucket", "three", "\tapplication/json ", "", int64(len("three")), bytes.NewBufferString("three"))
metadata, err = drivers.GetObjectMetadata("bucket", "three", "")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/json")
@ -425,12 +428,12 @@ func testContentMd5Set(c *check.C, create func() Driver) {
// test md5 invalid
badmd5Sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA"
calculatedmd5sum, err := drivers.CreateObject("bucket", "one", "", badmd5Sum, bytes.NewBufferString("one"))
calculatedmd5sum, err := drivers.CreateObject("bucket", "one", "", badmd5Sum, int64(len("one")), bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil))
c.Assert(calculatedmd5sum, check.Not(check.Equals), badmd5Sum)
goodmd5sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA="
calculatedmd5sum, err = drivers.CreateObject("bucket", "two", "", goodmd5sum, bytes.NewBufferString("one"))
calculatedmd5sum, err = drivers.CreateObject("bucket", "two", "", goodmd5sum, int64(len("one")), bytes.NewBufferString("one"))
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, goodmd5sum)
}

View File

@ -248,13 +248,13 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string
}, errParams)
}
reader, size, err := d.donut.GetObject(bucketName, objectName)
defer reader.Close()
if err != nil {
return 0, iodine.New(drivers.ObjectNotFound{
Bucket: bucketName,
Object: objectName,
}, nil)
}
defer reader.Close()
if start > size || (start+length-1) > size {
return 0, iodine.New(drivers.InvalidRange{
Start: start,
@ -366,7 +366,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
}
// CreateObject creates a new object
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) (string, error) {
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) {
errParams := map[string]string{
"bucketName": bucketName,
"objectName": objectName,
@ -383,6 +383,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
}
metadata := make(map[string]string)
metadata["contentType"] = strings.TrimSpace(contentType)
metadata["contentLength"] = strconv.FormatInt(size, 10)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))

View File

@ -37,7 +37,7 @@ type Driver interface {
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error)
CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error)
}
// BucketACL - bucket level access control

View File

@ -92,9 +92,8 @@ type ObjectExists GenericObjectError
// EntityTooLarge - object size exceeds maximum limit
type EntityTooLarge struct {
GenericObjectError
Size string
TotalSize string
MaxSize string
Size string
MaxSize string
}
// ObjectNameInvalid - object name provided is invalid
@ -170,7 +169,7 @@ func (e ObjectNameInvalid) Error() string {
// Return string an error formatted as the given text
func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.TotalSize
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize
}
// Return string an error formatted as the given text

View File

@ -1,165 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Modifications from Minio under the following license:
Minimalist Object Storage, (C) 2015 Minio, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package lru implements an LRU cache.
package lru
import (
"container/list"
"sync"
)
// Cache is an LRU cache. It is not safe for concurrent access.
type Cache struct {
// MaxEntries is the maximum number of cache entries before
// an item is evicted. Zero means no limit.
MaxEntries int
// OnEvicted optionally specificies a callback function to be
// executed when an entry is purged from the cache.
OnEvicted func(key Key, value interface{})
ll *list.List
cache map[interface{}]*list.Element
lock *sync.RWMutex
}
// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators
type Key interface{}
type entry struct {
key Key
value interface{}
}
// New creates a new Cache.
// If maxEntries is zero, the cache has no limit and it's assumed
// that eviction is done by the caller.
func New(maxEntries int) *Cache {
return &Cache{
MaxEntries: maxEntries,
ll: list.New(),
cache: make(map[interface{}]*list.Element),
lock: &sync.RWMutex{},
}
}
// Add adds a value to the cache.
func (c *Cache) Add(key Key, value interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
c.cache = make(map[interface{}]*list.Element)
c.ll = list.New()
}
if ee, ok := c.cache[key]; ok {
c.ll.MoveToFront(ee)
ee.Value.(*entry).value = value
return
}
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
c.RemoveOldest()
}
}
// Get looks up a key's value from the cache.
func (c *Cache) Get(key Key) (value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.ll.MoveToFront(ele)
return ele.Value.(*entry).value, true
}
return
}
// Remove removes the provided key from the cache.
func (c *Cache) Remove(key Key) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
if ele, hit := c.cache[key]; hit {
c.removeElement(ele)
}
}
// RemoveOldest removes the oldest item from the cache.
func (c *Cache) RemoveOldest() {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return
}
ele := c.ll.Back()
if ele != nil {
c.removeElement(ele)
}
}
// GetOldest returns the oldest key, value, ok without modifying the lru
func (c *Cache) GetOldest() (key Key, value interface{}, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return nil, nil, false
}
ele := c.ll.Back()
if ele != nil {
return ele.Value.(*entry).key, ele.Value.(*entry).value, true
}
return nil, nil, false
}
func (c *Cache) removeElement(e *list.Element) {
c.ll.Remove(e)
kv := e.Value.(*entry)
delete(c.cache, kv.key)
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
// Len returns the number of items in the cache.
func (c *Cache) Len() int {
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
return 0
}
return c.ll.Len()
}

View File

@ -1,110 +0,0 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Modifications from Minio under the following license:
Minimalist Object Storage, (C) 2015 Minio, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lru
import (
"testing"
)
type simpleStruct struct {
int
string
}
type complexStruct struct {
int
simpleStruct
}
var getTests = []struct {
name string
keyToAdd interface{}
keyToGet interface{}
expectedOk bool
}{
{"string_hit", "myKey", "myKey", true},
{"string_miss", "myKey", "nonsense", false},
{"simple_struct_hit", simpleStruct{1, "two"}, simpleStruct{1, "two"}, true},
{"simeple_struct_miss", simpleStruct{1, "two"}, simpleStruct{0, "noway"}, false},
{"complex_struct_hit", complexStruct{1, simpleStruct{2, "three"}},
complexStruct{1, simpleStruct{2, "three"}}, true},
}
func TestGet(t *testing.T) {
for _, tt := range getTests {
lru := New(0)
lru.Add(tt.keyToAdd, 1234)
val, ok := lru.Get(tt.keyToGet)
if ok != tt.expectedOk {
t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok)
} else if ok && val != 1234 {
t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val)
}
}
}
func TestRemove(t *testing.T) {
lru := New(0)
lru.Add("myKey", 1234)
if val, ok := lru.Get("myKey"); !ok {
t.Fatal("TestRemove returned no match")
} else if val != 1234 {
t.Fatalf("TestRemove failed. Expected %d, got %v", 1234, val)
}
lru.Remove("myKey")
if _, ok := lru.Get("myKey"); ok {
t.Fatal("TestRemove returned a removed entry")
}
}
func TestOldest(t *testing.T) {
lru := New(0)
lru.Add("a", 1)
lru.Add("b", 2)
lru.Add("c", 3)
key, val, ok := lru.GetOldest()
if ok != true {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
if key != "a" && val != 1 {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
lru.RemoveOldest()
key, val, ok = lru.GetOldest()
if ok != true {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
if key != "b" && val != 2 {
t.Fatalf("%s expected get to return 1 but got %v", "a", val)
}
}

View File

@ -24,30 +24,23 @@ import (
"encoding/hex"
"errors"
"io"
"io/ioutil"
"log"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/minio-io/minio/pkg/iodine"
"github.com/minio-io/minio/pkg/storage/drivers"
"github.com/minio-io/minio/pkg/storage/drivers/memory/lru"
"github.com/minio-io/minio/pkg/utils/log"
"github.com/minio-io/minio/pkg/utils/split"
)
// memoryDriver - local variables
type memoryDriver struct {
storedBuckets map[string]storedBucket
lock *sync.RWMutex
objects *lru.Cache
lastAccessedObjects map[string]time.Time
totalSize uint64
maxSize uint64
expiration time.Duration
shutdown bool
storedBuckets map[string]storedBucket
lock *sync.RWMutex
objects *Intelligent
}
type storedBucket struct {
@ -67,27 +60,13 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
var memory *memoryDriver
memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket)
memory.lastAccessedObjects = make(map[string]time.Time)
memory.objects = lru.New(0)
memory.objects = NewIntelligent(maxSize, expiration)
memory.lock = new(sync.RWMutex)
memory.expiration = expiration
memory.shutdown = false
switch {
case maxSize <= 0:
memory.maxSize = 9223372036854775807
case maxSize > 0:
memory.maxSize = maxSize
default:
log.Println("Error")
}
memory.objects.OnEvicted = memory.evictObject
// set up memory expiration
if expiration > 0 {
go memory.expireLRUObjects()
}
memory.objects.ExpireObjects(time.Millisecond * 10)
go start(ctrlChannel, errorChannel)
return ctrlChannel, errorChannel, memory
@ -112,41 +91,49 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string)
memory.lock.RUnlock()
return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
// form objectKey
objectKey := bucket + "/" + object
if _, ok := storedBucket.objectMetadata[objectKey]; ok {
if data, ok := memory.objects.Get(objectKey); ok {
dataSlice := data.([]byte)
objectBuffer := bytes.NewBuffer(dataSlice)
memory.lock.RUnlock()
go memory.updateAccessTime(objectKey)
written, err := io.Copy(w, objectBuffer)
return written, iodine.New(err, nil)
}
data, ok := memory.objects.Get(objectKey)
if !ok {
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
}
written, err := io.Copy(w, bytes.NewBuffer(data.([]byte)))
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil)
return written, iodine.New(err, nil)
}
// GetPartialObject - GET object from memory buffer range
func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
"start": strconv.FormatInt(start, 10),
"length": strconv.FormatInt(length, 10),
}
memory.lock.RLock()
defer memory.lock.RUnlock()
var sourceBuffer bytes.Buffer
if !drivers.IsValidBucket(bucket) {
return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
memory.lock.RUnlock()
return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, errParams)
}
if !drivers.IsValidObjectName(object) {
return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil)
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, errParams)
}
if _, err := memory.GetObject(&sourceBuffer, bucket, object); err != nil {
return 0, iodine.New(err, nil)
if start < 0 {
return 0, iodine.New(drivers.InvalidRange{
Start: start,
Length: length,
}, errParams)
}
if _, err := io.CopyN(ioutil.Discard, &sourceBuffer, start); err != nil {
return 0, iodine.New(err, nil)
objectKey := bucket + "/" + object
data, ok := memory.objects.Get(objectKey)
if !ok {
memory.lock.RUnlock()
return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams)
}
return io.CopyN(w, &sourceBuffer, length)
written, err := io.CopyN(w, bytes.NewBuffer(data.([]byte)[start:]), length)
memory.lock.RUnlock()
return written, iodine.New(err, nil)
}
// GetBucketMetadata -
@ -204,14 +191,39 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
return iodine.New(errors.New("invalid argument"), nil)
}
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) {
humanError, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, data)
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
humanReadableErr, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return humanError, err
return humanReadableErr, iodine.New(err, nil)
}
// getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way
func getMD5AndData(reader io.Reader) ([]byte, []byte, error) {
hash := md5.New()
var data []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = reader.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
data = append(data, byteBuffer[0:length]...)
}
if err != io.EOF {
return nil, nil, err
}
return hash.Sum(nil), data, nil
}
// CreateObject - PUT object to memory buffer
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) {
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
memory.lock.RLock()
if !drivers.IsValidBucket(bucket) {
memory.lock.RUnlock()
@ -246,38 +258,25 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
var bytesBuffer bytes.Buffer
chunks := split.Stream(data, 10*1024*1024)
totalLength := 0
summer := md5.New()
for chunk := range chunks {
if chunk.Err == nil {
totalLength = totalLength + len(chunk.Data)
summer.Write(chunk.Data)
_, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data))
if err != nil {
err := iodine.New(err, nil)
log.Println(err)
return "", err
}
if uint64(totalLength)+memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
}
}
md5SumBytes, readBytes, err := getMD5AndData(data)
if err != nil {
return "", iodine.New(err, nil)
}
md5SumBytes := summer.Sum(nil)
totalLength := len(readBytes)
memory.lock.Lock()
memory.objects.Set(objectKey, readBytes)
memory.lock.Unlock()
// de-allocating
readBytes = nil
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.RemoveOldest()
return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
}
}
newObject := drivers.ObjectMetadata{
Bucket: bucket,
Key: key,
@ -287,20 +286,17 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
Md5: md5Sum,
Size: int64(totalLength),
}
memory.lock.Lock()
memoryObject := make(map[string]drivers.ObjectMetadata)
if len(memory.storedBuckets[bucket].objectMetadata) == 0 {
switch {
case len(memory.storedBuckets[bucket].objectMetadata) == 0:
storedBucket.objectMetadata = memoryObject
storedBucket.objectMetadata[objectKey] = newObject
} else {
default:
storedBucket.objectMetadata[objectKey] = newObject
}
memory.storedBuckets[bucket] = storedBucket
memory.objects.Add(objectKey, bytesBuffer.Bytes())
memory.totalSize = memory.totalSize + uint64(newObject.Size)
if memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
}
memory.lock.Unlock()
return newObject.Md5, nil
}
@ -481,55 +477,18 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive
return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
}
func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) {
memory.doEvictObject(key, value)
debug.FreeOSMemory()
}
func (memory *memoryDriver) doEvictObject(key lru.Key, value interface{}) {
k := key.(string)
func (memory *memoryDriver) evictObject(a ...interface{}) {
cacheStats := memory.objects.Stats()
log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions)
key := a[0].(string)
// loop through all buckets
for bucket, storedBucket := range memory.storedBuckets {
memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size)
log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size)
log.Println("TotalSize:", memory.totalSize)
delete(storedBucket.objectMetadata, k)
delete(storedBucket.objectMetadata, key)
// remove bucket if no objects found anymore
if len(storedBucket.objectMetadata) == 0 {
delete(memory.storedBuckets, bucket)
}
delete(memory.lastAccessedObjects, k)
}
}
func (memory *memoryDriver) expireLRUObjects() {
for {
if memory.shutdown {
return
}
var sleepDuration time.Duration
memory.lock.Lock()
if memory.objects.Len() > 0 {
if k, _, ok := memory.objects.GetOldest(); ok {
key := k.(string)
if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration {
memory.objects.RemoveOldest()
} else {
sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key])
}
}
} else {
sleepDuration = memory.expiration
}
memory.lock.Unlock()
time.Sleep(sleepDuration)
}
}
func (memory *memoryDriver) updateAccessTime(key string) {
memory.lock.Lock()
defer memory.lock.Unlock()
if _, ok := memory.lastAccessedObjects[key]; ok {
memory.lastAccessedObjects[key] = time.Now().UTC()
}
debug.FreeOSMemory()
}

View File

@ -0,0 +1,151 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package memory
import (
"sync"
"time"
)
var zeroExpiration = time.Duration(0)
// Intelligent holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize
type Intelligent struct {
// Mutex is used for handling the concurrent
// read/write requests for cache
sync.Mutex
// items hold the cached objects
items map[string]interface{}
// createdAt holds the time that related item's created At
createdAt map[string]time.Time
// expiration is a duration for a cache key to expire
expiration time.Duration
// gcInterval is a duration for garbage collection
gcInterval time.Duration
// maxSize is a total size for overall cache
maxSize uint64
// currentSize is a current size in memory
currentSize uint64
// OnEvicted - callback function for eviction
OnEvicted func(a ...interface{})
// totalEvicted counter to keep track of total evictions
totalEvicted uint64
}
// Stats current cache statistics
type Stats struct {
Bytes uint64
Items uint64
Evictions uint64
}
// NewIntelligent creates an inmemory cache
//
// maxSize is used for expiring objects before we run out of memory
// expiration is used for expiration of a key from cache
func NewIntelligent(maxSize uint64, expiration time.Duration) *Intelligent {
return &Intelligent{
items: map[string]interface{}{},
createdAt: map[string]time.Time{},
expiration: expiration,
maxSize: maxSize,
}
}
// Stats get current cache statistics
func (r *Intelligent) Stats() Stats {
return Stats{
Bytes: r.currentSize,
Items: uint64(len(r.items)),
Evictions: r.totalEvicted,
}
}
// ExpireObjects expire objects in go routine
func (r *Intelligent) ExpireObjects(gcInterval time.Duration) {
r.gcInterval = gcInterval
go func() {
for range time.Tick(gcInterval) {
for key := range r.items {
r.Lock()
if !r.isValid(key) {
r.Delete(key)
}
r.Unlock()
}
}
}()
}
// Get returns a value of a given key if it exists
func (r *Intelligent) Get(key string) (interface{}, bool) {
r.Lock()
defer r.Unlock()
value, ok := r.items[key]
return value, ok
}
// Set will persist a value to the cache
func (r *Intelligent) Set(key string, value interface{}) {
r.Lock()
// remove random key if only we reach the maxSize threshold,
// if not assume infinite memory
if r.maxSize > 0 {
for key := range r.items {
for (r.currentSize + uint64(len(value.([]byte)))) > r.maxSize {
r.Delete(key)
}
break
}
}
r.items[key] = value
r.currentSize += uint64(len(value.([]byte)))
r.createdAt[key] = time.Now()
r.Unlock()
return
}
// Delete deletes a given key if exists
func (r *Intelligent) Delete(key string) {
r.currentSize -= uint64(len(r.items[key].([]byte)))
delete(r.items, key)
delete(r.createdAt, key)
r.totalEvicted++
if r.OnEvicted != nil {
r.OnEvicted(key)
}
}
func (r *Intelligent) isValid(key string) bool {
createdAt, ok := r.createdAt[key]
if !ok {
return false
}
if r.expiration == zeroExpiration {
return true
}
return createdAt.Add(r.expiration).After(time.Now())
}

View File

@ -18,10 +18,10 @@ package memory
import (
"testing"
"time"
. "github.com/minio-io/check"
"github.com/minio-io/minio/pkg/storage/drivers"
"time"
)
func Test(t *testing.T) { TestingT(t) }
@ -32,7 +32,7 @@ var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) {
create := func() drivers.Driver {
_, _, store := Start(1000, 3*time.Hour)
_, _, store := Start(1000000, 3*time.Hour)
return store
}
drivers.APITestSuite(c, create)

View File

@ -116,8 +116,8 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet
}
// CreateObject is a mock
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, data)
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, size, data)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)