From c342ce1588d66169f4a24c171fa122fb54c7899f Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 3 May 2015 23:13:21 -0700 Subject: [PATCH 1/8] New LRU based on GroupCache which keeps track of memory rather than entries --- pkg/storage/drivers/memory/lru.go | 169 ++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 pkg/storage/drivers/memory/lru.go diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go new file mode 100644 index 000000000..2f1e4f72c --- /dev/null +++ b/pkg/storage/drivers/memory/lru.go @@ -0,0 +1,169 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--- +Modifications from Minio under the following license: + +Minimalist Object Storage, (C) 2015 Minio, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import "container/list" + +// CacheStats are returned by stats accessors on Group. +type CacheStats struct { + Bytes uint64 + Evictions int64 +} + +// Cache is an LRU cache. It is not safe for concurrent access. +type Cache struct { + // MaxSize is the maximum number of cache size for entries + // before an item is evicted. Zero means no limit + MaxSize uint64 + + // OnEvicted optionally specificies a callback function to be + // executed when an entry is purged from the cache. + OnEvicted func(a ...interface{}) + + ll *list.List + totalSize uint64 + totalEvicted int64 + cache map[interface{}]*list.Element + value []byte +} + +// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators +type Key interface{} + +type entry struct { + key Key + value []byte +} + +// NewCache creates a new Cache. +// If maxEntries is zero, the cache has no limit and it's assumed +// that eviction is done by the caller. +func NewCache(maxSize uint64) *Cache { + return &Cache{ + MaxSize: maxSize, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } +} + +// Stats return cache stats +func (c *Cache) Stats() CacheStats { + return CacheStats{ + Bytes: c.totalSize, + Evictions: c.totalEvicted, + } +} + +func (c *Cache) Write(p []byte) (n int, err error) { + c.totalSize = c.totalSize + uint64(len(p)) + // If MaxSize is zero expecting infinite memory + if c.MaxSize != 0 && c.totalSize > c.MaxSize { + c.totalSize -= uint64(len(p)) + c.RemoveOldest() + } + c.value = append(c.value, p...) + return len(p), nil +} + +// Add adds a value to the cache. +func (c *Cache) Add(key Key) { + if c.cache == nil { + c.cache = make(map[interface{}]*list.Element) + c.ll = list.New() + } + ele := c.ll.PushFront(&entry{key, c.value}) + c.value = nil + c.cache[key] = ele +} + +// Get looks up a key's value from the cache. +func (c *Cache) Get(key Key) (value []byte, ok bool) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.ll.MoveToFront(ele) + return ele.Value.(*entry).value, true + } + return +} + +// Remove removes the provided key from the cache. +func (c *Cache) Remove(key Key) { + if c.cache == nil { + return + } + if ele, hit := c.cache[key]; hit { + c.removeElement(ele) + } +} + +// RemoveOldest removes the oldest item from the cache. +func (c *Cache) RemoveOldest() { + if c.cache == nil { + return + } + ele := c.ll.Back() + if ele != nil { + c.removeElement(ele) + } +} + +// GetOldest returns the oldest key +func (c *Cache) GetOldest() (key Key, ok bool) { + if c.cache == nil { + return nil, false + } + ele := c.ll.Back() + if ele != nil { + return ele.Value.(*entry).key, true + } + return nil, false +} + +func (c *Cache) removeElement(e *list.Element) { + c.ll.Remove(e) + kv := e.Value.(*entry) + delete(c.cache, kv.key) + c.totalEvicted++ + if c.OnEvicted != nil { + c.OnEvicted(kv.key) + } +} + +// Len returns the number of items in the cache. +func (c *Cache) Len() int { + if c.cache == nil { + return 0 + } + return c.ll.Len() +} From f7caef2d262e4bb718153e5388dd58fcc342dc8b Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 3 May 2015 23:16:10 -0700 Subject: [PATCH 2/8] Change CreateObject() to take size argument from content-length --- pkg/api/api_object_handlers.go | 9 ++++--- pkg/api/api_test.go | 34 ++++++++++++------------- pkg/storage/donut/donut_bucket.go | 8 +++++- pkg/storage/donut/donut_test.go | 17 ++++++++----- pkg/storage/drivers/api_testsuite.go | 37 +++++++++++++++------------- pkg/storage/drivers/driver.go | 2 +- pkg/storage/drivers/errors.go | 7 +++--- 7 files changed, 65 insertions(+), 49 deletions(-) diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 790c7de98..cf7bc1db3 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -18,6 +18,7 @@ package api import ( "net/http" + "strconv" "github.com/gorilla/mux" "github.com/minio-io/minio/pkg/iodine" @@ -155,10 +156,10 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path) return } - /// if Content-Length missing, incomplete request throw IncompleteBody + /// if Content-Length missing, throw away size := req.Header.Get("Content-Length") if size == "" { - writeErrorResponse(w, req, IncompleteBody, acceptsContentType, req.URL.Path) + writeErrorResponse(w, req, MissingContentLength, acceptsContentType, req.URL.Path) return } /// maximum Upload size for objects in a single operation @@ -171,7 +172,9 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques writeErrorResponse(w, req, EntityTooSmall, acceptsContentType, req.URL.Path) return } - calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, req.Body) + // ignoring error here, TODO find a way to reply back if we can + sizeInt, _ := strconv.ParseInt(size, 10, 64) + calculatedMD5, err := server.driver.CreateObject(bucket, object, "", md5, sizeInt, req.Body) switch err := iodine.ToError(err).(type) { case nil: { diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index cafc19a4d..c5cb5ef01 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -168,7 +168,7 @@ func (s *MySuite) TestEmptyObject(c *C) { Size: 0, } typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once() - typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(metadata.Md5, nil).Once() + typedDriver.On("CreateObject", "bucket", "object", "", "", 0, mock.Anything).Return(metadata.Md5, nil).Once() typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Twice() typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(metadata, nil).Once() typedDriver.On("GetObject", mock.Anything, "bucket", "object").Return(int64(0), nil).Once() @@ -179,7 +179,7 @@ func (s *MySuite) TestEmptyObject(c *C) { buffer := bytes.NewBufferString("") driver.CreateBucket("bucket", "private") - driver.CreateObject("bucket", "object", "", "", buffer) + driver.CreateObject("bucket", "object", "", "", 0, buffer) request, err := http.NewRequest("GET", testServer.URL+"/bucket/object", nil) c.Assert(err, IsNil) @@ -250,7 +250,7 @@ func (s *MySuite) TestObject(c *C) { Size: 11, } typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once() - typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(metadata.Md5, nil).Once() + typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything, mock.Anything).Return(metadata.Md5, nil).Once() typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Twice() typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(metadata, nil).Twice() typedDriver.SetGetObjectWriter("bucket", "object", []byte("hello world")) @@ -262,7 +262,7 @@ func (s *MySuite) TestObject(c *C) { buffer := bytes.NewBufferString("hello world") driver.CreateBucket("bucket", "private") - driver.CreateObject("bucket", "object", "", "", buffer) + driver.CreateObject("bucket", "object", "", "", int64(buffer.Len()), buffer) request, err := http.NewRequest("GET", testServer.URL+"/bucket/object", nil) c.Assert(err, IsNil) @@ -325,12 +325,12 @@ func (s *MySuite) TestMultipleObjects(c *C) { typedDriver.On("CreateBucket", "bucket", "private").Return(nil).Once() driver.CreateBucket("bucket", "private") - typedDriver.On("CreateObject", "bucket", "object1", "", "", mock.Anything).Return(metadata1.Md5, nil).Once() - driver.CreateObject("bucket", "object1", "", "", buffer1) - typedDriver.On("CreateObject", "bucket", "object2", "", "", mock.Anything).Return(metadata2.Md5, nil).Once() - driver.CreateObject("bucket", "object2", "", "", buffer2) - typedDriver.On("CreateObject", "bucket", "object3", "", "", mock.Anything).Return(metadata3.Md5, nil).Once() - driver.CreateObject("bucket", "object3", "", "", buffer3) + typedDriver.On("CreateObject", "bucket", "object1", "", "", mock.Anything, mock.Anything).Return(metadata1.Md5, nil).Once() + driver.CreateObject("bucket", "object1", "", "", int64(buffer1.Len()), buffer1) + typedDriver.On("CreateObject", "bucket", "object2", "", "", mock.Anything, mock.Anything).Return(metadata2.Md5, nil).Once() + driver.CreateObject("bucket", "object2", "", "", int64(buffer2.Len()), buffer2) + typedDriver.On("CreateObject", "bucket", "object3", "", "", mock.Anything, mock.Anything).Return(metadata3.Md5, nil).Once() + driver.CreateObject("bucket", "object3", "", "", int64(buffer3.Len()), buffer3) // test non-existant object typedDriver.On("GetBucketMetadata", "bucket").Return(drivers.BucketMetadata{}, nil).Once() @@ -506,8 +506,8 @@ func (s *MySuite) TestHeader(c *C) { buffer := bytes.NewBufferString("hello world") typedDriver.On("GetBucketMetadata", "foo").Return(bucketMetadata, nil).Once() - typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything).Return(objectMetadata.Md5, nil).Once() - driver.CreateObject("bucket", "object", "", "", buffer) + typedDriver.On("CreateObject", "bucket", "object", "", "", mock.Anything, mock.Anything).Return(objectMetadata.Md5, nil).Once() + driver.CreateObject("bucket", "object", "", "", int64(buffer.Len()), buffer) typedDriver.On("GetBucketMetadata", "bucket").Return(bucketMetadata, nil).Once() typedDriver.On("GetObjectMetadata", "bucket", "object", "").Return(objectMetadata, nil).Once() @@ -618,7 +618,7 @@ func (s *MySuite) TestPutObject(c *C) { Size: 11, } - typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything).Return(twoMetadata.Md5, nil).Once() + typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything, mock.Anything).Return(twoMetadata.Md5, nil).Once() request, err = http.NewRequest("PUT", testServer.URL+"/bucket/two", bytes.NewBufferString("hello world")) c.Assert(err, IsNil) setAuthHeader(request) @@ -905,7 +905,7 @@ func (s *MySuite) TestContentTypePersists(c *C) { } typedDriver.On("GetBucketMetadata", "bucket").Return(metadata, nil).Once() - typedDriver.On("CreateObject", "bucket", "one", "", "", mock.Anything).Return(oneMetadata.Md5, nil).Once() + typedDriver.On("CreateObject", "bucket", "one", "", "", mock.Anything, mock.Anything).Return(oneMetadata.Md5, nil).Once() request, err := http.NewRequest("PUT", testServer.URL+"/bucket/one", bytes.NewBufferString("hello world")) delete(request.Header, "Content-Type") c.Assert(err, IsNil) @@ -952,7 +952,7 @@ func (s *MySuite) TestContentTypePersists(c *C) { } typedDriver.On("GetBucketMetadata", "bucket").Return(metadata, nil).Once() - typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything).Return(twoMetadata.Md5, nil).Once() + typedDriver.On("CreateObject", "bucket", "two", "", "", mock.Anything, mock.Anything).Return(twoMetadata.Md5, nil).Once() request, err = http.NewRequest("PUT", testServer.URL+"/bucket/two", bytes.NewBufferString("hello world")) delete(request.Header, "Content-Type") request.Header.Add("Content-Type", "application/json") @@ -1010,11 +1010,11 @@ func (s *MySuite) TestPartialContent(c *C) { } typedDriver.On("CreateBucket", "foo", "private").Return(nil).Once() - typedDriver.On("CreateObject", "foo", "bar", "", "", mock.Anything).Return(metadata.Md5, nil).Once() + typedDriver.On("CreateObject", "foo", "bar", "", "", mock.Anything, mock.Anything).Return(metadata.Md5, nil).Once() err := driver.CreateBucket("foo", "private") c.Assert(err, IsNil) - driver.CreateObject("foo", "bar", "", "", bytes.NewBufferString("hello world")) + driver.CreateObject("foo", "bar", "", "", int64(len("hello world")), bytes.NewBufferString("hello world")) // prepare for GET on range request typedDriver.SetGetObjectWriter("foo", "bar", []byte("hello world")) diff --git a/pkg/storage/donut/donut_bucket.go b/pkg/storage/donut/donut_bucket.go index 6732490cf..475ffa13c 100644 --- a/pkg/storage/donut/donut_bucket.go +++ b/pkg/storage/donut/donut_bucket.go @@ -151,11 +151,17 @@ func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Su donutObjectMetadata := make(map[string]string) objectMetadata["version"] = "1.0" donutObjectMetadata["version"] = "1.0" + size := metadata["contentLength"] + sizeInt, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return "", iodine.New(err, nil) + } + // if total writers are only '1' do not compute erasure switch len(writers) == 1 { case true: mw := io.MultiWriter(writers[0], summer) - totalLength, err := io.Copy(mw, objectData) + totalLength, err := io.CopyN(mw, objectData, sizeInt) if err != nil { return "", iodine.New(err, nil) } diff --git a/pkg/storage/donut/donut_test.go b/pkg/storage/donut/donut_test.go index 720942fe0..58eaad620 100644 --- a/pkg/storage/donut/donut_test.go +++ b/pkg/storage/donut/donut_test.go @@ -184,6 +184,7 @@ func (s *MySuite) TestNewObjectMetadata(c *C) { hasher.Write([]byte(data)) expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil)) reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) + metadata["contentLength"] = strconv.Itoa(len(data)) err = donut.MakeBucket("foo", "private") c.Assert(err, IsNil) @@ -210,9 +211,6 @@ func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) { _, err = donut.PutObject("foo", "", "", nil, nil) c.Assert(err, Not(IsNil)) - - _, err = donut.PutObject("foo", " ", "", nil, nil) - c.Assert(err, Not(IsNil)) } // test create object @@ -234,6 +232,7 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) { hasher.Write([]byte(data)) expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil)) reader := ioutil.NopCloser(bytes.NewReader([]byte(data))) + metadata["contentLength"] = strconv.Itoa(len(data)) calculatedMd5Sum, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata) c.Assert(err, IsNil) @@ -268,11 +267,16 @@ func (s *MySuite) TestMultipleNewObjects(c *C) { c.Assert(donut.MakeBucket("foo", "private"), IsNil) one := ioutil.NopCloser(bytes.NewReader([]byte("one"))) - _, err = donut.PutObject("foo", "obj1", "", one, nil) + metadata := make(map[string]string) + metadata["contentLength"] = strconv.Itoa(len("one")) + + _, err = donut.PutObject("foo", "obj1", "", one, metadata) c.Assert(err, IsNil) two := ioutil.NopCloser(bytes.NewReader([]byte("two"))) - _, err = donut.PutObject("foo", "obj2", "", two, nil) + + metadata["contentLength"] = strconv.Itoa(len("two")) + _, err = donut.PutObject("foo", "obj2", "", two, metadata) c.Assert(err, IsNil) obj1, size, err := donut.GetObject("foo", "obj1") @@ -315,7 +319,8 @@ func (s *MySuite) TestMultipleNewObjects(c *C) { c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"}) three := ioutil.NopCloser(bytes.NewReader([]byte("three"))) - _, err = donut.PutObject("foo", "obj3", "", three, nil) + metadata["contentLength"] = strconv.Itoa(len("three")) + _, err = donut.PutObject("foo", "obj3", "", three, metadata) c.Assert(err, IsNil) obj3, size, err := donut.GetObject("foo", "obj3") diff --git a/pkg/storage/drivers/api_testsuite.go b/pkg/storage/drivers/api_testsuite.go index 21d76466d..ab4cf742a 100644 --- a/pkg/storage/drivers/api_testsuite.go +++ b/pkg/storage/drivers/api_testsuite.go @@ -73,7 +73,8 @@ func testMultipleObjectCreation(c *check.C, create func() Driver) { key := "obj" + strconv.Itoa(i) objects[key] = []byte(randomString) - calculatedmd5sum, err := drivers.CreateObject("bucket", key, "", expectedmd5Sum, bytes.NewBufferString(randomString)) + calculatedmd5sum, err := drivers.CreateObject("bucket", key, "", expectedmd5Sum, int64(len(randomString)), + bytes.NewBufferString(randomString)) c.Assert(err, check.IsNil) c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex) } @@ -107,7 +108,7 @@ func testPaging(c *check.C, create func() Driver) { // check before paging occurs for i := 0; i < 5; i++ { key := "obj" + strconv.Itoa(i) - drivers.CreateObject("bucket", key, "", "", bytes.NewBufferString(key)) + drivers.CreateObject("bucket", key, "", "", int64(len(key)), bytes.NewBufferString(key)) resources.Maxkeys = 5 resources.Prefix = "" objects, resources, err = drivers.ListObjects("bucket", resources) @@ -118,7 +119,7 @@ func testPaging(c *check.C, create func() Driver) { // check after paging occurs pages work for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) - drivers.CreateObject("bucket", key, "", "", bytes.NewBufferString(key)) + drivers.CreateObject("bucket", key, "", "", int64(len(key)), bytes.NewBufferString(key)) resources.Maxkeys = 5 resources.Prefix = "" objects, resources, err = drivers.ListObjects("bucket", resources) @@ -128,8 +129,8 @@ func testPaging(c *check.C, create func() Driver) { } // check paging with prefix at end returns less objects { - drivers.CreateObject("bucket", "newPrefix", "", "", bytes.NewBufferString("prefix1")) - drivers.CreateObject("bucket", "newPrefix2", "", "", bytes.NewBufferString("prefix2")) + drivers.CreateObject("bucket", "newPrefix", "", "", int64(len("prefix1")), bytes.NewBufferString("prefix1")) + drivers.CreateObject("bucket", "newPrefix2", "", "", int64(len("prefix2")), bytes.NewBufferString("prefix2")) resources.Prefix = "new" resources.Maxkeys = 5 objects, resources, err = drivers.ListObjects("bucket", resources) @@ -150,8 +151,8 @@ func testPaging(c *check.C, create func() Driver) { // check delimited results with delimiter and prefix { - drivers.CreateObject("bucket", "this/is/delimited", "", "", bytes.NewBufferString("prefix1")) - drivers.CreateObject("bucket", "this/is/also/a/delimited/file", "", "", bytes.NewBufferString("prefix2")) + drivers.CreateObject("bucket", "this/is/delimited", "", "", int64(len("prefix1")), bytes.NewBufferString("prefix1")) + drivers.CreateObject("bucket", "this/is/also/a/delimited/file", "", "", int64(len("prefix2")), bytes.NewBufferString("prefix2")) var prefixes []string resources.CommonPrefixes = prefixes // allocate new everytime resources.Delimiter = "/" @@ -210,14 +211,14 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) { hasher1.Write([]byte("one")) md5Sum1 := base64.StdEncoding.EncodeToString(hasher1.Sum(nil)) md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil)) - md5Sum11, err := drivers.CreateObject("bucket", "object", "", md5Sum1, bytes.NewBufferString("one")) + md5Sum11, err := drivers.CreateObject("bucket", "object", "", md5Sum1, int64(len("one")), bytes.NewBufferString("one")) c.Assert(err, check.IsNil) c.Assert(md5Sum1hex, check.Equals, md5Sum11) hasher2 := md5.New() hasher2.Write([]byte("three")) md5Sum2 := base64.StdEncoding.EncodeToString(hasher2.Sum(nil)) - _, err = drivers.CreateObject("bucket", "object", "", md5Sum2, bytes.NewBufferString("three")) + _, err = drivers.CreateObject("bucket", "object", "", md5Sum2, int64(len("three")), bytes.NewBufferString("three")) c.Assert(err, check.Not(check.IsNil)) var bytesBuffer bytes.Buffer @@ -229,7 +230,7 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) { func testNonExistantBucketOperations(c *check.C, create func() Driver) { drivers := create() - _, err := drivers.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one")) + _, err := drivers.CreateObject("bucket", "object", "", "", int64(len("one")), bytes.NewBufferString("one")) c.Assert(err, check.Not(check.IsNil)) } @@ -260,7 +261,8 @@ func testPutObjectInSubdir(c *check.C, create func() Driver) { hasher.Write([]byte("hello world")) md5Sum1 := base64.StdEncoding.EncodeToString(hasher.Sum(nil)) md5Sum1hex := hex.EncodeToString(hasher.Sum(nil)) - md5Sum11, err := drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum1, bytes.NewBufferString("hello world")) + md5Sum11, err := drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum1, int64(len("hello world")), + bytes.NewBufferString("hello world")) c.Assert(err, check.IsNil) c.Assert(md5Sum11, check.Equals, md5Sum1hex) @@ -356,7 +358,8 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Driver) { err := drivers.CreateBucket("bucket", "") c.Assert(err, check.IsNil) - _, err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world")) + _, err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", int64(len("hello world")), + bytes.NewBufferString("hello world")) c.Assert(err, check.IsNil) var byteBuffer bytes.Buffer @@ -400,19 +403,19 @@ func testDefaultContentType(c *check.C, create func() Driver) { c.Assert(err, check.IsNil) // test empty - _, err = drivers.CreateObject("bucket", "one", "", "", bytes.NewBufferString("one")) + _, err = drivers.CreateObject("bucket", "one", "", "", int64(len("one")), bytes.NewBufferString("one")) metadata, err := drivers.GetObjectMetadata("bucket", "one", "") c.Assert(err, check.IsNil) c.Assert(metadata.ContentType, check.Equals, "application/octet-stream") // test custom - drivers.CreateObject("bucket", "two", "application/text", "", bytes.NewBufferString("two")) + drivers.CreateObject("bucket", "two", "application/text", "", int64(len("two")), bytes.NewBufferString("two")) metadata, err = drivers.GetObjectMetadata("bucket", "two", "") c.Assert(err, check.IsNil) c.Assert(metadata.ContentType, check.Equals, "application/text") // test trim space - drivers.CreateObject("bucket", "three", "\tapplication/json ", "", bytes.NewBufferString("three")) + drivers.CreateObject("bucket", "three", "\tapplication/json ", "", int64(len("three")), bytes.NewBufferString("three")) metadata, err = drivers.GetObjectMetadata("bucket", "three", "") c.Assert(err, check.IsNil) c.Assert(metadata.ContentType, check.Equals, "application/json") @@ -425,12 +428,12 @@ func testContentMd5Set(c *check.C, create func() Driver) { // test md5 invalid badmd5Sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA" - calculatedmd5sum, err := drivers.CreateObject("bucket", "one", "", badmd5Sum, bytes.NewBufferString("one")) + calculatedmd5sum, err := drivers.CreateObject("bucket", "one", "", badmd5Sum, int64(len("one")), bytes.NewBufferString("one")) c.Assert(err, check.Not(check.IsNil)) c.Assert(calculatedmd5sum, check.Not(check.Equals), badmd5Sum) goodmd5sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA=" - calculatedmd5sum, err = drivers.CreateObject("bucket", "two", "", goodmd5sum, bytes.NewBufferString("one")) + calculatedmd5sum, err = drivers.CreateObject("bucket", "two", "", goodmd5sum, int64(len("one")), bytes.NewBufferString("one")) c.Assert(err, check.IsNil) c.Assert(calculatedmd5sum, check.Equals, goodmd5sum) } diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index 8f50acba7..f4402674f 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -37,7 +37,7 @@ type Driver interface { GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) - CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) + CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) } // BucketACL - bucket level access control diff --git a/pkg/storage/drivers/errors.go b/pkg/storage/drivers/errors.go index 4a25ad610..b3dfa9925 100644 --- a/pkg/storage/drivers/errors.go +++ b/pkg/storage/drivers/errors.go @@ -92,9 +92,8 @@ type ObjectExists GenericObjectError // EntityTooLarge - object size exceeds maximum limit type EntityTooLarge struct { GenericObjectError - Size string - TotalSize string - MaxSize string + Size string + MaxSize string } // ObjectNameInvalid - object name provided is invalid @@ -170,7 +169,7 @@ func (e ObjectNameInvalid) Error() string { // Return string an error formatted as the given text func (e EntityTooLarge) Error() string { - return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.TotalSize + return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize } // Return string an error formatted as the given text From d0df548eb5d19d9df3404a9c8f91482cd5249b71 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sun, 3 May 2015 23:16:45 -0700 Subject: [PATCH 3/8] Use new LRU inside memory driver --- pkg/storage/drivers/donut/donut.go | 3 +- pkg/storage/drivers/memory/lru.go | 61 ++++++----- pkg/storage/drivers/memory/memory.go | 117 +++++++++------------- pkg/storage/drivers/memory/memory_test.go | 3 +- pkg/storage/drivers/mocks/Driver.go | 4 +- 5 files changed, 89 insertions(+), 99 deletions(-) diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 2f10dae30..8f69d99b6 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -366,7 +366,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso } // CreateObject creates a new object -func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) (string, error) { +func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { errParams := map[string]string{ "bucketName": bucketName, "objectName": objectName, @@ -383,6 +383,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } metadata := make(map[string]string) metadata["contentType"] = strings.TrimSpace(contentType) + metadata["contentLength"] = strconv.FormatInt(size, 10) if strings.TrimSpace(expectedMD5Sum) != "" { expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 2f1e4f72c..99e277bce 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -31,7 +31,15 @@ limitations under the License. package memory -import "container/list" +import ( + "bytes" + "container/list" + "io" + "strconv" + + "github.com/minio-io/minio/pkg/iodine" + "github.com/minio-io/minio/pkg/storage/drivers" +) // CacheStats are returned by stats accessors on Group. type CacheStats struct { @@ -53,7 +61,6 @@ type Cache struct { totalSize uint64 totalEvicted int64 cache map[interface{}]*list.Element - value []byte } // A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators @@ -61,7 +68,7 @@ type Key interface{} type entry struct { key Key - value []byte + value *bytes.Buffer } // NewCache creates a new Cache. @@ -83,30 +90,37 @@ func (c *Cache) Stats() CacheStats { } } -func (c *Cache) Write(p []byte) (n int, err error) { - c.totalSize = c.totalSize + uint64(len(p)) - // If MaxSize is zero expecting infinite memory - if c.MaxSize != 0 && c.totalSize > c.MaxSize { - c.totalSize -= uint64(len(p)) - c.RemoveOldest() - } - c.value = append(c.value, p...) - return len(p), nil -} - // Add adds a value to the cache. -func (c *Cache) Add(key Key) { - if c.cache == nil { - c.cache = make(map[interface{}]*list.Element) - c.ll = list.New() - } - ele := c.ll.PushFront(&entry{key, c.value}) - c.value = nil - c.cache[key] = ele +func (c *Cache) Add(key Key, size int64) io.WriteCloser { + r, w := io.Pipe() + go func() { + if uint64(size) > c.MaxSize { + err := iodine.New(drivers.EntityTooLarge{ + Size: strconv.FormatInt(size, 10), + MaxSize: strconv.FormatUint(c.MaxSize, 10), + }, nil) + r.CloseWithError(err) + return + } + // If MaxSize is zero expecting infinite memory + if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize { + c.RemoveOldest() + } + value := new(bytes.Buffer) + n, err := io.CopyN(value, r, size) + if err != nil { + r.CloseWithError(iodine.New(err, nil)) + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + c.totalSize += uint64(n) + }() + return w } // Get looks up a key's value from the cache. -func (c *Cache) Get(key Key) (value []byte, ok bool) { +func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) { if c.cache == nil { return } @@ -155,6 +169,7 @@ func (c *Cache) removeElement(e *list.Element) { kv := e.Value.(*entry) delete(c.cache, kv.key) c.totalEvicted++ + c.totalSize -= uint64(kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key) } diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 7445319c8..873816b78 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -25,6 +25,7 @@ import ( "errors" "io" "io/ioutil" + "log" "runtime/debug" "sort" "strings" @@ -33,18 +34,14 @@ import ( "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" - "github.com/minio-io/minio/pkg/storage/drivers/memory/lru" - "github.com/minio-io/minio/pkg/utils/log" - "github.com/minio-io/minio/pkg/utils/split" ) // memoryDriver - local variables type memoryDriver struct { storedBuckets map[string]storedBucket lock *sync.RWMutex - objects *lru.Cache + objects *Cache lastAccessedObjects map[string]time.Time - totalSize uint64 maxSize uint64 expiration time.Duration shutdown bool @@ -68,27 +65,18 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) memory.lastAccessedObjects = make(map[string]time.Time) - memory.objects = lru.New(0) + memory.objects = NewCache(maxSize) + memory.maxSize = maxSize memory.lock = new(sync.RWMutex) memory.expiration = expiration memory.shutdown = false - switch { - case maxSize <= 0: - memory.maxSize = 9223372036854775807 - case maxSize > 0: - memory.maxSize = maxSize - default: - log.Println("Error") - } - memory.objects.OnEvicted = memory.evictObject // set up memory expiration if expiration > 0 { go memory.expireLRUObjects() } - go start(ctrlChannel, errorChannel) return ctrlChannel, errorChannel, memory } @@ -117,11 +105,9 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) objectKey := bucket + "/" + object if _, ok := storedBucket.objectMetadata[objectKey]; ok { if data, ok := memory.objects.Get(objectKey); ok { - dataSlice := data.([]byte) - objectBuffer := bytes.NewBuffer(dataSlice) memory.lock.RUnlock() go memory.updateAccessTime(objectKey) - written, err := io.Copy(w, objectBuffer) + written, err := io.Copy(w, data) return written, iodine.New(err, nil) } } @@ -204,14 +190,12 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { return iodine.New(errors.New("invalid argument"), nil) } -func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) { - humanError, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, data) - debug.FreeOSMemory() - return humanError, err +func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + return memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) } // CreateObject - PUT object to memory buffer -func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) { +func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { memory.lock.RLock() if !drivers.IsValidBucket(bucket) { memory.lock.RUnlock() @@ -247,37 +231,30 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - var bytesBuffer bytes.Buffer - - chunks := split.Stream(data, 10*1024*1024) - totalLength := 0 - summer := md5.New() - for chunk := range chunks { - if chunk.Err == nil { - totalLength = totalLength + len(chunk.Data) - summer.Write(chunk.Data) - _, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data)) - if err != nil { - err := iodine.New(err, nil) - log.Println(err) - return "", err - } - if uint64(totalLength)+memory.totalSize > memory.maxSize { - memory.objects.RemoveOldest() - } - } + memory.lock.Lock() + md5Writer := md5.New() + lruWriter := memory.objects.Add(objectKey, size) + mw := io.MultiWriter(md5Writer, lruWriter) + totalLength, err := io.CopyN(mw, data, size) + if err != nil { + memory.lock.Unlock() + return "", iodine.New(err, nil) } - md5SumBytes := summer.Sum(nil) + if err := lruWriter.Close(); err != nil { + memory.lock.Unlock() + return "", iodine.New(err, nil) + } + memory.lock.Unlock() + + md5SumBytes := md5Writer.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { - memory.lock.Lock() - defer memory.lock.Unlock() - memory.objects.RemoveOldest() return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) } } + newObject := drivers.ObjectMetadata{ Bucket: bucket, Key: key, @@ -287,21 +264,21 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su Md5: md5Sum, Size: int64(totalLength), } + memory.lock.Lock() memoryObject := make(map[string]drivers.ObjectMetadata) - if len(memory.storedBuckets[bucket].objectMetadata) == 0 { + switch { + case len(memory.storedBuckets[bucket].objectMetadata) == 0: storedBucket.objectMetadata = memoryObject storedBucket.objectMetadata[objectKey] = newObject - } else { + default: storedBucket.objectMetadata[objectKey] = newObject } memory.storedBuckets[bucket] = storedBucket - memory.objects.Add(objectKey, bytesBuffer.Bytes()) - memory.totalSize = memory.totalSize + uint64(newObject.Size) - if memory.totalSize > memory.maxSize { - memory.objects.RemoveOldest() - } memory.lock.Unlock() + + // free + debug.FreeOSMemory() return newObject.Md5, nil } @@ -481,25 +458,21 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) } -func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { - memory.doEvictObject(key, value) - debug.FreeOSMemory() -} - -func (memory *memoryDriver) doEvictObject(key lru.Key, value interface{}) { - k := key.(string) +func (memory *memoryDriver) evictObject(a ...interface{}) { + cacheStats := memory.objects.Stats() + log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d", + cacheStats.Bytes, memory.objects.Len(), cacheStats.Evictions) + key := a[0].(string) // loop through all buckets for bucket, storedBucket := range memory.storedBuckets { - memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size) - log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size) - log.Println("TotalSize:", memory.totalSize) - delete(storedBucket.objectMetadata, k) + delete(storedBucket.objectMetadata, key) + delete(memory.lastAccessedObjects, key) // remove bucket if no objects found anymore if len(storedBucket.objectMetadata) == 0 { delete(memory.storedBuckets, bucket) } - delete(memory.lastAccessedObjects, k) } + debug.FreeOSMemory() } func (memory *memoryDriver) expireLRUObjects() { @@ -509,16 +482,18 @@ func (memory *memoryDriver) expireLRUObjects() { } var sleepDuration time.Duration memory.lock.Lock() - if memory.objects.Len() > 0 { - if k, _, ok := memory.objects.GetOldest(); ok { + switch { + case memory.objects.Len() > 0: + if k, ok := memory.objects.GetOldest(); ok { key := k.(string) - if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration { + switch { + case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration: memory.objects.RemoveOldest() - } else { + default: sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) } } - } else { + default: sleepDuration = memory.expiration } memory.lock.Unlock() diff --git a/pkg/storage/drivers/memory/memory_test.go b/pkg/storage/drivers/memory/memory_test.go index ed53a0577..3ed145fd8 100644 --- a/pkg/storage/drivers/memory/memory_test.go +++ b/pkg/storage/drivers/memory/memory_test.go @@ -21,7 +21,6 @@ import ( . "github.com/minio-io/check" "github.com/minio-io/minio/pkg/storage/drivers" - "time" ) func Test(t *testing.T) { TestingT(t) } @@ -32,7 +31,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - _, _, store := Start(1000, 3*time.Hour) + _, _, store := Start(10000000, 0) return store } drivers.APITestSuite(c, create) diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index 81d5af815..455583215 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -116,8 +116,8 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet } // CreateObject is a mock -func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) { - ret := m.Called(bucket, key, contentType, md5sum, data) +func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) { + ret := m.Called(bucket, key, contentType, md5sum, size, data) r0 := ret.Get(0).(string) r1 := ret.Error(1) From 670f997b070a11de4bee5a9e92104ca5a3bd83a8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 May 2015 01:17:48 -0700 Subject: [PATCH 4/8] Bring blockingWriter from client and use it here --- pkg/storage/drivers/memory/blockingwriter.go | 65 ++++++++++++++++++++ pkg/storage/drivers/memory/lru.go | 10 ++- pkg/storage/drivers/memory/memory_test.go | 3 +- 3 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 pkg/storage/drivers/memory/blockingwriter.go diff --git a/pkg/storage/drivers/memory/blockingwriter.go b/pkg/storage/drivers/memory/blockingwriter.go new file mode 100644 index 000000000..8c63a2d7c --- /dev/null +++ b/pkg/storage/drivers/memory/blockingwriter.go @@ -0,0 +1,65 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "io" + "sync" +) + +// BlockingWriteCloser is a WriteCloser that blocks until released +type BlockingWriteCloser struct { + w io.WriteCloser + release *sync.WaitGroup + err error +} + +// Write to the underlying writer +func (b *BlockingWriteCloser) Write(p []byte) (int, error) { + n, err := b.w.Write(p) + if err != nil { + b.err = err + } + return n, b.err +} + +// Close blocks until another goroutine calls Release(error). Returns error code if either +// writer fails or Release is called with an error. +func (b *BlockingWriteCloser) Close() error { + err := b.w.Close() + if err != nil { + b.err = err + } + b.release.Wait() + return b.err +} + +// Release the Close, causing it to unblock. Only call this once. Calling it multiple times results in a panic. +func (b *BlockingWriteCloser) Release(err error) { + b.release.Done() + if err != nil { + b.err = err + } + return +} + +// NewBlockingWriteCloser Creates a new write closer that must be released by the read consumer. +func NewBlockingWriteCloser(w io.WriteCloser) *BlockingWriteCloser { + wg := &sync.WaitGroup{} + wg.Add(1) + return &BlockingWriteCloser{w: w, release: wg} +} diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 99e277bce..07dc3c426 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -93,6 +93,7 @@ func (c *Cache) Stats() CacheStats { // Add adds a value to the cache. func (c *Cache) Add(key Key, size int64) io.WriteCloser { r, w := io.Pipe() + blockingWriter := NewBlockingWriteCloser(w) go func() { if uint64(size) > c.MaxSize { err := iodine.New(drivers.EntityTooLarge{ @@ -100,6 +101,7 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { MaxSize: strconv.FormatUint(c.MaxSize, 10), }, nil) r.CloseWithError(err) + blockingWriter.Release(err) return } // If MaxSize is zero expecting infinite memory @@ -109,14 +111,18 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { value := new(bytes.Buffer) n, err := io.CopyN(value, r, size) if err != nil { - r.CloseWithError(iodine.New(err, nil)) + err := iodine.New(err, nil) + r.CloseWithError(err) + blockingWriter.Release(err) return } ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.totalSize += uint64(n) + r.Close() + blockingWriter.Release(nil) }() - return w + return blockingWriter } // Get looks up a key's value from the cache. diff --git a/pkg/storage/drivers/memory/memory_test.go b/pkg/storage/drivers/memory/memory_test.go index 3ed145fd8..3a2b9c96d 100644 --- a/pkg/storage/drivers/memory/memory_test.go +++ b/pkg/storage/drivers/memory/memory_test.go @@ -18,6 +18,7 @@ package memory import ( "testing" + "time" . "github.com/minio-io/check" "github.com/minio-io/minio/pkg/storage/drivers" @@ -31,7 +32,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - _, _, store := Start(10000000, 0) + _, _, store := Start(1000000, 3*time.Hour) return store } drivers.APITestSuite(c, create) From c8f31d97a88227d460e5e1f526bf93d22593ede5 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 May 2015 02:31:18 -0700 Subject: [PATCH 5/8] Modify LRU further to add object expiration --- pkg/storage/drivers/memory/lru.go | 52 +++++++++++++++++----------- pkg/storage/drivers/memory/memory.go | 47 +++++-------------------- 2 files changed, 41 insertions(+), 58 deletions(-) diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 07dc3c426..b69ed5dbd 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -36,6 +36,7 @@ import ( "container/list" "io" "strconv" + "time" "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" @@ -53,6 +54,10 @@ type Cache struct { // before an item is evicted. Zero means no limit MaxSize uint64 + // Expiration is the maximum duration of individual objects to exist + // in cache before its evicted. + Expiration time.Duration + // OnEvicted optionally specificies a callback function to be // executed when an entry is purged from the cache. OnEvicted func(a ...interface{}) @@ -63,22 +68,21 @@ type Cache struct { cache map[interface{}]*list.Element } -// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators -type Key interface{} - type entry struct { - key Key + key string + time time.Time value *bytes.Buffer } // NewCache creates a new Cache. // If maxEntries is zero, the cache has no limit and it's assumed // that eviction is done by the caller. -func NewCache(maxSize uint64) *Cache { +func NewCache(maxSize uint64, expiration time.Duration) *Cache { return &Cache{ - MaxSize: maxSize, - ll: list.New(), - cache: make(map[interface{}]*list.Element), + MaxSize: maxSize, + Expiration: expiration, + ll: list.New(), + cache: make(map[interface{}]*list.Element), } } @@ -91,7 +95,7 @@ func (c *Cache) Stats() CacheStats { } // Add adds a value to the cache. -func (c *Cache) Add(key Key, size int64) io.WriteCloser { +func (c *Cache) Add(key string, size int64) io.WriteCloser { r, w := io.Pipe() blockingWriter := NewBlockingWriteCloser(w) go func() { @@ -105,8 +109,10 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { return } // If MaxSize is zero expecting infinite memory - if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize { - c.RemoveOldest() + if c.MaxSize != 0 { + for (c.totalSize + uint64(size)) > c.MaxSize { + c.RemoveOldest() + } } value := new(bytes.Buffer) n, err := io.CopyN(value, r, size) @@ -116,7 +122,7 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { blockingWriter.Release(err) return } - ele := c.ll.PushFront(&entry{key, value}) + ele := c.ll.PushFront(&entry{key, time.Now(), value}) c.cache[key] = ele c.totalSize += uint64(n) r.Close() @@ -126,19 +132,20 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { } // Get looks up a key's value from the cache. -func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) { +func (c *Cache) Get(key string) (value []byte, ok bool) { if c.cache == nil { return } if ele, hit := c.cache[key]; hit { c.ll.MoveToFront(ele) - return ele.Value.(*entry).value, true + ele.Value.(*entry).time = time.Now() + return ele.Value.(*entry).value.Bytes(), true } return } // Remove removes the provided key from the cache. -func (c *Cache) Remove(key Key) { +func (c *Cache) Remove(key string) { if c.cache == nil { return } @@ -158,16 +165,21 @@ func (c *Cache) RemoveOldest() { } } -// GetOldest returns the oldest key -func (c *Cache) GetOldest() (key Key, ok bool) { +// ExpireOldestAndWait expire old key which is expired and return wait times if any +func (c *Cache) ExpireOldestAndWait() time.Duration { if c.cache == nil { - return nil, false + return 0 } ele := c.ll.Back() if ele != nil { - return ele.Value.(*entry).key, true + switch { + case time.Now().Sub(ele.Value.(*entry).time) > c.Expiration: + c.removeElement(ele) + default: + return (c.Expiration - time.Now().Sub(ele.Value.(*entry).time)) + } } - return nil, false + return 0 } func (c *Cache) removeElement(e *list.Element) { diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 873816b78..6c6e0b6f6 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -42,9 +42,6 @@ type memoryDriver struct { lock *sync.RWMutex objects *Cache lastAccessedObjects map[string]time.Time - maxSize uint64 - expiration time.Duration - shutdown bool } type storedBucket struct { @@ -65,11 +62,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) memory.lastAccessedObjects = make(map[string]time.Time) - memory.objects = NewCache(maxSize) - memory.maxSize = maxSize + memory.objects = NewCache(maxSize, expiration) memory.lock = new(sync.RWMutex) - memory.expiration = expiration - memory.shutdown = false memory.objects.OnEvicted = memory.evictObject @@ -100,19 +94,15 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) memory.lock.RUnlock() return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } - storedBucket := memory.storedBuckets[bucket] - // form objectKey objectKey := bucket + "/" + object - if _, ok := storedBucket.objectMetadata[objectKey]; ok { - if data, ok := memory.objects.Get(objectKey); ok { - memory.lock.RUnlock() - go memory.updateAccessTime(objectKey) - written, err := io.Copy(w, data) - return written, iodine.New(err, nil) - } + data, ok := memory.objects.Get(objectKey) + if !ok { + memory.lock.RUnlock() + return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } memory.lock.RUnlock() - return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) + written, err := io.Copy(w, bytes.NewBuffer(data)) + return written, iodine.New(err, nil) } // GetPartialObject - GET object from memory buffer range @@ -477,34 +467,15 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { func (memory *memoryDriver) expireLRUObjects() { for { - if memory.shutdown { - return - } var sleepDuration time.Duration memory.lock.Lock() switch { case memory.objects.Len() > 0: - if k, ok := memory.objects.GetOldest(); ok { - key := k.(string) - switch { - case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration: - memory.objects.RemoveOldest() - default: - sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) - } - } + sleepDuration = memory.objects.ExpireOldestAndWait() default: - sleepDuration = memory.expiration + sleepDuration = memory.objects.Expiration } memory.lock.Unlock() time.Sleep(sleepDuration) } } - -func (memory *memoryDriver) updateAccessTime(key string) { - memory.lock.Lock() - defer memory.lock.Unlock() - if _, ok := memory.lastAccessedObjects[key]; ok { - memory.lastAccessedObjects[key] = time.Now().UTC() - } -} From d63064b8afb3ca6b842e01d91baa2476feab22c9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 May 2015 15:26:56 -0700 Subject: [PATCH 6/8] Optimize memory usage in GetPartialObject() for memory driver --- pkg/storage/drivers/donut/donut.go | 2 +- pkg/storage/drivers/memory/memory.go | 36 +++++++++++++++++++--------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 8f69d99b6..bb9ed1922 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -248,13 +248,13 @@ func (d donutDriver) GetPartialObject(w io.Writer, bucketName, objectName string }, errParams) } reader, size, err := d.donut.GetObject(bucketName, objectName) - defer reader.Close() if err != nil { return 0, iodine.New(drivers.ObjectNotFound{ Bucket: bucketName, Object: objectName, }, nil) } + defer reader.Close() if start > size || (start+length-1) > size { return 0, iodine.New(drivers.InvalidRange{ Start: start, diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 6c6e0b6f6..7100be074 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -24,10 +24,10 @@ import ( "encoding/hex" "errors" "io" - "io/ioutil" "log" "runtime/debug" "sort" + "strconv" "strings" "sync" "time" @@ -100,29 +100,43 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) memory.lock.RUnlock() return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } - memory.lock.RUnlock() written, err := io.Copy(w, bytes.NewBuffer(data)) + memory.lock.RUnlock() return written, iodine.New(err, nil) } // GetPartialObject - GET object from memory buffer range func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) { + errParams := map[string]string{ + "bucket": bucket, + "object": object, + "start": strconv.FormatInt(start, 10), + "length": strconv.FormatInt(length, 10), + } memory.lock.RLock() - defer memory.lock.RUnlock() - var sourceBuffer bytes.Buffer if !drivers.IsValidBucket(bucket) { - return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) + memory.lock.RUnlock() + return 0, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, errParams) } if !drivers.IsValidObjectName(object) { - return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) + memory.lock.RUnlock() + return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, errParams) } - if _, err := memory.GetObject(&sourceBuffer, bucket, object); err != nil { - return 0, iodine.New(err, nil) + if start < 0 { + return 0, iodine.New(drivers.InvalidRange{ + Start: start, + Length: length, + }, errParams) } - if _, err := io.CopyN(ioutil.Discard, &sourceBuffer, start); err != nil { - return 0, iodine.New(err, nil) + objectKey := bucket + "/" + object + data, ok := memory.objects.Get(objectKey) + if !ok { + memory.lock.RUnlock() + return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams) } - return io.CopyN(w, &sourceBuffer, length) + written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) + memory.lock.RUnlock() + return written, iodine.New(err, nil) } // GetBucketMetadata - From 63edb1e9a02185b8ae6f92d4519f5a8fd95c63a9 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 May 2015 15:58:58 -0700 Subject: [PATCH 7/8] Nullify list memory to nil as we remove the element --- pkg/storage/drivers/memory/lru.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index b69ed5dbd..90318cb76 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -72,6 +72,7 @@ type entry struct { key string time time.Time value *bytes.Buffer + size int64 } // NewCache creates a new Cache. @@ -114,17 +115,17 @@ func (c *Cache) Add(key string, size int64) io.WriteCloser { c.RemoveOldest() } } - value := new(bytes.Buffer) - n, err := io.CopyN(value, r, size) + buffer := new(bytes.Buffer) + written, err := io.CopyN(buffer, r, size) if err != nil { err := iodine.New(err, nil) r.CloseWithError(err) blockingWriter.Release(err) return } - ele := c.ll.PushFront(&entry{key, time.Now(), value}) + ele := c.ll.PushFront(&entry{key, time.Now(), buffer, written}) c.cache[key] = ele - c.totalSize += uint64(n) + c.totalSize += uint64(written) r.Close() blockingWriter.Release(nil) }() @@ -187,7 +188,8 @@ func (c *Cache) removeElement(e *list.Element) { kv := e.Value.(*entry) delete(c.cache, kv.key) c.totalEvicted++ - c.totalSize -= uint64(kv.value.Len()) + c.totalSize -= uint64(kv.size) + kv.value.Reset() if c.OnEvicted != nil { c.OnEvicted(kv.key) } From ed1259d6f0ed30847211b940e5e6859e52d6e5f1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 May 2015 19:04:56 -0700 Subject: [PATCH 8/8] Deprecate LRU use just map[string]interface{} and call it intelligent 'cache' --- pkg/storage/drivers/memory/blockingwriter.go | 65 ------ pkg/storage/drivers/memory/lru.go | 204 ------------------ pkg/storage/drivers/memory/lru/lru.go | 165 -------------- pkg/storage/drivers/memory/lru/lru_test.go | 110 ---------- pkg/storage/drivers/memory/memory.go | 87 ++++---- .../drivers/memory/memory_intelligent.go | 151 +++++++++++++ 6 files changed, 194 insertions(+), 588 deletions(-) delete mode 100644 pkg/storage/drivers/memory/blockingwriter.go delete mode 100644 pkg/storage/drivers/memory/lru.go delete mode 100644 pkg/storage/drivers/memory/lru/lru.go delete mode 100644 pkg/storage/drivers/memory/lru/lru_test.go create mode 100644 pkg/storage/drivers/memory/memory_intelligent.go diff --git a/pkg/storage/drivers/memory/blockingwriter.go b/pkg/storage/drivers/memory/blockingwriter.go deleted file mode 100644 index 8c63a2d7c..000000000 --- a/pkg/storage/drivers/memory/blockingwriter.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Minimalist Object Storage, (C) 2015 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package memory - -import ( - "io" - "sync" -) - -// BlockingWriteCloser is a WriteCloser that blocks until released -type BlockingWriteCloser struct { - w io.WriteCloser - release *sync.WaitGroup - err error -} - -// Write to the underlying writer -func (b *BlockingWriteCloser) Write(p []byte) (int, error) { - n, err := b.w.Write(p) - if err != nil { - b.err = err - } - return n, b.err -} - -// Close blocks until another goroutine calls Release(error). Returns error code if either -// writer fails or Release is called with an error. -func (b *BlockingWriteCloser) Close() error { - err := b.w.Close() - if err != nil { - b.err = err - } - b.release.Wait() - return b.err -} - -// Release the Close, causing it to unblock. Only call this once. Calling it multiple times results in a panic. -func (b *BlockingWriteCloser) Release(err error) { - b.release.Done() - if err != nil { - b.err = err - } - return -} - -// NewBlockingWriteCloser Creates a new write closer that must be released by the read consumer. -func NewBlockingWriteCloser(w io.WriteCloser) *BlockingWriteCloser { - wg := &sync.WaitGroup{} - wg.Add(1) - return &BlockingWriteCloser{w: w, release: wg} -} diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go deleted file mode 100644 index 90318cb76..000000000 --- a/pkg/storage/drivers/memory/lru.go +++ /dev/null @@ -1,204 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---- -Modifications from Minio under the following license: - -Minimalist Object Storage, (C) 2015 Minio, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package memory - -import ( - "bytes" - "container/list" - "io" - "strconv" - "time" - - "github.com/minio-io/minio/pkg/iodine" - "github.com/minio-io/minio/pkg/storage/drivers" -) - -// CacheStats are returned by stats accessors on Group. -type CacheStats struct { - Bytes uint64 - Evictions int64 -} - -// Cache is an LRU cache. It is not safe for concurrent access. -type Cache struct { - // MaxSize is the maximum number of cache size for entries - // before an item is evicted. Zero means no limit - MaxSize uint64 - - // Expiration is the maximum duration of individual objects to exist - // in cache before its evicted. - Expiration time.Duration - - // OnEvicted optionally specificies a callback function to be - // executed when an entry is purged from the cache. - OnEvicted func(a ...interface{}) - - ll *list.List - totalSize uint64 - totalEvicted int64 - cache map[interface{}]*list.Element -} - -type entry struct { - key string - time time.Time - value *bytes.Buffer - size int64 -} - -// NewCache creates a new Cache. -// If maxEntries is zero, the cache has no limit and it's assumed -// that eviction is done by the caller. -func NewCache(maxSize uint64, expiration time.Duration) *Cache { - return &Cache{ - MaxSize: maxSize, - Expiration: expiration, - ll: list.New(), - cache: make(map[interface{}]*list.Element), - } -} - -// Stats return cache stats -func (c *Cache) Stats() CacheStats { - return CacheStats{ - Bytes: c.totalSize, - Evictions: c.totalEvicted, - } -} - -// Add adds a value to the cache. -func (c *Cache) Add(key string, size int64) io.WriteCloser { - r, w := io.Pipe() - blockingWriter := NewBlockingWriteCloser(w) - go func() { - if uint64(size) > c.MaxSize { - err := iodine.New(drivers.EntityTooLarge{ - Size: strconv.FormatInt(size, 10), - MaxSize: strconv.FormatUint(c.MaxSize, 10), - }, nil) - r.CloseWithError(err) - blockingWriter.Release(err) - return - } - // If MaxSize is zero expecting infinite memory - if c.MaxSize != 0 { - for (c.totalSize + uint64(size)) > c.MaxSize { - c.RemoveOldest() - } - } - buffer := new(bytes.Buffer) - written, err := io.CopyN(buffer, r, size) - if err != nil { - err := iodine.New(err, nil) - r.CloseWithError(err) - blockingWriter.Release(err) - return - } - ele := c.ll.PushFront(&entry{key, time.Now(), buffer, written}) - c.cache[key] = ele - c.totalSize += uint64(written) - r.Close() - blockingWriter.Release(nil) - }() - return blockingWriter -} - -// Get looks up a key's value from the cache. -func (c *Cache) Get(key string) (value []byte, ok bool) { - if c.cache == nil { - return - } - if ele, hit := c.cache[key]; hit { - c.ll.MoveToFront(ele) - ele.Value.(*entry).time = time.Now() - return ele.Value.(*entry).value.Bytes(), true - } - return -} - -// Remove removes the provided key from the cache. -func (c *Cache) Remove(key string) { - if c.cache == nil { - return - } - if ele, hit := c.cache[key]; hit { - c.removeElement(ele) - } -} - -// RemoveOldest removes the oldest item from the cache. -func (c *Cache) RemoveOldest() { - if c.cache == nil { - return - } - ele := c.ll.Back() - if ele != nil { - c.removeElement(ele) - } -} - -// ExpireOldestAndWait expire old key which is expired and return wait times if any -func (c *Cache) ExpireOldestAndWait() time.Duration { - if c.cache == nil { - return 0 - } - ele := c.ll.Back() - if ele != nil { - switch { - case time.Now().Sub(ele.Value.(*entry).time) > c.Expiration: - c.removeElement(ele) - default: - return (c.Expiration - time.Now().Sub(ele.Value.(*entry).time)) - } - } - return 0 -} - -func (c *Cache) removeElement(e *list.Element) { - c.ll.Remove(e) - kv := e.Value.(*entry) - delete(c.cache, kv.key) - c.totalEvicted++ - c.totalSize -= uint64(kv.size) - kv.value.Reset() - if c.OnEvicted != nil { - c.OnEvicted(kv.key) - } -} - -// Len returns the number of items in the cache. -func (c *Cache) Len() int { - if c.cache == nil { - return 0 - } - return c.ll.Len() -} diff --git a/pkg/storage/drivers/memory/lru/lru.go b/pkg/storage/drivers/memory/lru/lru.go deleted file mode 100644 index 4dabcf72b..000000000 --- a/pkg/storage/drivers/memory/lru/lru.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---- -Modifications from Minio under the following license: - -Minimalist Object Storage, (C) 2015 Minio, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package lru implements an LRU cache. -package lru - -import ( - "container/list" - "sync" -) - -// Cache is an LRU cache. It is not safe for concurrent access. -type Cache struct { - // MaxEntries is the maximum number of cache entries before - // an item is evicted. Zero means no limit. - MaxEntries int - - // OnEvicted optionally specificies a callback function to be - // executed when an entry is purged from the cache. - OnEvicted func(key Key, value interface{}) - - ll *list.List - cache map[interface{}]*list.Element - lock *sync.RWMutex -} - -// A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators -type Key interface{} - -type entry struct { - key Key - value interface{} -} - -// New creates a new Cache. -// If maxEntries is zero, the cache has no limit and it's assumed -// that eviction is done by the caller. -func New(maxEntries int) *Cache { - return &Cache{ - MaxEntries: maxEntries, - ll: list.New(), - cache: make(map[interface{}]*list.Element), - lock: &sync.RWMutex{}, - } -} - -// Add adds a value to the cache. -func (c *Cache) Add(key Key, value interface{}) { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - c.cache = make(map[interface{}]*list.Element) - c.ll = list.New() - } - if ee, ok := c.cache[key]; ok { - c.ll.MoveToFront(ee) - ee.Value.(*entry).value = value - return - } - ele := c.ll.PushFront(&entry{key, value}) - c.cache[key] = ele - if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { - c.RemoveOldest() - } -} - -// Get looks up a key's value from the cache. -func (c *Cache) Get(key Key) (value interface{}, ok bool) { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - return - } - if ele, hit := c.cache[key]; hit { - c.ll.MoveToFront(ele) - return ele.Value.(*entry).value, true - } - return -} - -// Remove removes the provided key from the cache. -func (c *Cache) Remove(key Key) { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - return - } - if ele, hit := c.cache[key]; hit { - c.removeElement(ele) - } -} - -// RemoveOldest removes the oldest item from the cache. -func (c *Cache) RemoveOldest() { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - return - } - ele := c.ll.Back() - if ele != nil { - c.removeElement(ele) - } -} - -// GetOldest returns the oldest key, value, ok without modifying the lru -func (c *Cache) GetOldest() (key Key, value interface{}, ok bool) { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - return nil, nil, false - } - ele := c.ll.Back() - if ele != nil { - return ele.Value.(*entry).key, ele.Value.(*entry).value, true - } - return nil, nil, false -} - -func (c *Cache) removeElement(e *list.Element) { - c.ll.Remove(e) - kv := e.Value.(*entry) - delete(c.cache, kv.key) - if c.OnEvicted != nil { - c.OnEvicted(kv.key, kv.value) - } -} - -// Len returns the number of items in the cache. -func (c *Cache) Len() int { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - return 0 - } - return c.ll.Len() -} diff --git a/pkg/storage/drivers/memory/lru/lru_test.go b/pkg/storage/drivers/memory/lru/lru_test.go deleted file mode 100644 index 28f4006fe..000000000 --- a/pkg/storage/drivers/memory/lru/lru_test.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2013 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---- -Modifications from Minio under the following license: - -Minimalist Object Storage, (C) 2015 Minio, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package lru - -import ( - "testing" -) - -type simpleStruct struct { - int - string -} - -type complexStruct struct { - int - simpleStruct -} - -var getTests = []struct { - name string - keyToAdd interface{} - keyToGet interface{} - expectedOk bool -}{ - {"string_hit", "myKey", "myKey", true}, - {"string_miss", "myKey", "nonsense", false}, - {"simple_struct_hit", simpleStruct{1, "two"}, simpleStruct{1, "two"}, true}, - {"simeple_struct_miss", simpleStruct{1, "two"}, simpleStruct{0, "noway"}, false}, - {"complex_struct_hit", complexStruct{1, simpleStruct{2, "three"}}, - complexStruct{1, simpleStruct{2, "three"}}, true}, -} - -func TestGet(t *testing.T) { - for _, tt := range getTests { - lru := New(0) - lru.Add(tt.keyToAdd, 1234) - val, ok := lru.Get(tt.keyToGet) - if ok != tt.expectedOk { - t.Fatalf("%s: cache hit = %v; want %v", tt.name, ok, !ok) - } else if ok && val != 1234 { - t.Fatalf("%s expected get to return 1234 but got %v", tt.name, val) - } - } -} - -func TestRemove(t *testing.T) { - lru := New(0) - lru.Add("myKey", 1234) - if val, ok := lru.Get("myKey"); !ok { - t.Fatal("TestRemove returned no match") - } else if val != 1234 { - t.Fatalf("TestRemove failed. Expected %d, got %v", 1234, val) - } - - lru.Remove("myKey") - if _, ok := lru.Get("myKey"); ok { - t.Fatal("TestRemove returned a removed entry") - } -} - -func TestOldest(t *testing.T) { - lru := New(0) - lru.Add("a", 1) - lru.Add("b", 2) - lru.Add("c", 3) - key, val, ok := lru.GetOldest() - if ok != true { - t.Fatalf("%s expected get to return 1 but got %v", "a", val) - } - if key != "a" && val != 1 { - t.Fatalf("%s expected get to return 1 but got %v", "a", val) - } - lru.RemoveOldest() - key, val, ok = lru.GetOldest() - if ok != true { - t.Fatalf("%s expected get to return 1 but got %v", "a", val) - } - if key != "b" && val != 2 { - t.Fatalf("%s expected get to return 1 but got %v", "a", val) - } -} diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 7100be074..bdf805fb5 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -38,10 +38,9 @@ import ( // memoryDriver - local variables type memoryDriver struct { - storedBuckets map[string]storedBucket - lock *sync.RWMutex - objects *Cache - lastAccessedObjects map[string]time.Time + storedBuckets map[string]storedBucket + lock *sync.RWMutex + objects *Intelligent } type storedBucket struct { @@ -61,16 +60,14 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro var memory *memoryDriver memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) - memory.lastAccessedObjects = make(map[string]time.Time) - memory.objects = NewCache(maxSize, expiration) + memory.objects = NewIntelligent(maxSize, expiration) memory.lock = new(sync.RWMutex) memory.objects.OnEvicted = memory.evictObject // set up memory expiration - if expiration > 0 { - go memory.expireLRUObjects() - } + memory.objects.ExpireObjects(time.Millisecond * 10) + go start(ctrlChannel, errorChannel) return ctrlChannel, errorChannel, memory } @@ -100,7 +97,7 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) memory.lock.RUnlock() return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, nil) } - written, err := io.Copy(w, bytes.NewBuffer(data)) + written, err := io.Copy(w, bytes.NewBuffer(data.([]byte))) memory.lock.RUnlock() return written, iodine.New(err, nil) } @@ -134,7 +131,7 @@ func (memory *memoryDriver) GetPartialObject(w io.Writer, bucket, object string, memory.lock.RUnlock() return 0, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: object}, errParams) } - written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length) + written, err := io.CopyN(w, bytes.NewBuffer(data.([]byte)[start:]), length) memory.lock.RUnlock() return written, iodine.New(err, nil) } @@ -195,7 +192,34 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { } func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { - return memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) + humanReadableErr, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) + // free + debug.FreeOSMemory() + return humanReadableErr, iodine.New(err, nil) +} + +// getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way +func getMD5AndData(reader io.Reader) ([]byte, []byte, error) { + hash := md5.New() + var data []byte + + var err error + var length int + for err == nil { + byteBuffer := make([]byte, 1024*1024) + length, err = reader.Read(byteBuffer) + // While hash.Write() wouldn't mind a Nil byteBuffer + // It is necessary for us to verify this and break + if length == 0 { + break + } + hash.Write(byteBuffer[0:length]) + data = append(data, byteBuffer[0:length]...) + } + if err != io.EOF { + return nil, nil, err + } + return hash.Sum(nil), data, nil } // CreateObject - PUT object to memory buffer @@ -234,23 +258,17 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - - memory.lock.Lock() - md5Writer := md5.New() - lruWriter := memory.objects.Add(objectKey, size) - mw := io.MultiWriter(md5Writer, lruWriter) - totalLength, err := io.CopyN(mw, data, size) + md5SumBytes, readBytes, err := getMD5AndData(data) if err != nil { - memory.lock.Unlock() - return "", iodine.New(err, nil) - } - if err := lruWriter.Close(); err != nil { - memory.lock.Unlock() return "", iodine.New(err, nil) } + totalLength := len(readBytes) + memory.lock.Lock() + memory.objects.Set(objectKey, readBytes) memory.lock.Unlock() + // de-allocating + readBytes = nil - md5SumBytes := md5Writer.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { @@ -280,9 +298,6 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su } memory.storedBuckets[bucket] = storedBucket memory.lock.Unlock() - - // free - debug.FreeOSMemory() return newObject.Md5, nil } @@ -465,12 +480,11 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive func (memory *memoryDriver) evictObject(a ...interface{}) { cacheStats := memory.objects.Stats() log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d", - cacheStats.Bytes, memory.objects.Len(), cacheStats.Evictions) + cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions) key := a[0].(string) // loop through all buckets for bucket, storedBucket := range memory.storedBuckets { delete(storedBucket.objectMetadata, key) - delete(memory.lastAccessedObjects, key) // remove bucket if no objects found anymore if len(storedBucket.objectMetadata) == 0 { delete(memory.storedBuckets, bucket) @@ -478,18 +492,3 @@ func (memory *memoryDriver) evictObject(a ...interface{}) { } debug.FreeOSMemory() } - -func (memory *memoryDriver) expireLRUObjects() { - for { - var sleepDuration time.Duration - memory.lock.Lock() - switch { - case memory.objects.Len() > 0: - sleepDuration = memory.objects.ExpireOldestAndWait() - default: - sleepDuration = memory.objects.Expiration - } - memory.lock.Unlock() - time.Sleep(sleepDuration) - } -} diff --git a/pkg/storage/drivers/memory/memory_intelligent.go b/pkg/storage/drivers/memory/memory_intelligent.go new file mode 100644 index 000000000..8d0d4bbbe --- /dev/null +++ b/pkg/storage/drivers/memory/memory_intelligent.go @@ -0,0 +1,151 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "sync" + "time" +) + +var zeroExpiration = time.Duration(0) + +// Intelligent holds the required variables to compose an in memory cache system +// which also provides expiring key mechanism and also maxSize +type Intelligent struct { + // Mutex is used for handling the concurrent + // read/write requests for cache + sync.Mutex + + // items hold the cached objects + items map[string]interface{} + + // createdAt holds the time that related item's created At + createdAt map[string]time.Time + + // expiration is a duration for a cache key to expire + expiration time.Duration + + // gcInterval is a duration for garbage collection + gcInterval time.Duration + + // maxSize is a total size for overall cache + maxSize uint64 + + // currentSize is a current size in memory + currentSize uint64 + + // OnEvicted - callback function for eviction + OnEvicted func(a ...interface{}) + + // totalEvicted counter to keep track of total evictions + totalEvicted uint64 +} + +// Stats current cache statistics +type Stats struct { + Bytes uint64 + Items uint64 + Evictions uint64 +} + +// NewIntelligent creates an inmemory cache +// +// maxSize is used for expiring objects before we run out of memory +// expiration is used for expiration of a key from cache +func NewIntelligent(maxSize uint64, expiration time.Duration) *Intelligent { + return &Intelligent{ + items: map[string]interface{}{}, + createdAt: map[string]time.Time{}, + expiration: expiration, + maxSize: maxSize, + } +} + +// Stats get current cache statistics +func (r *Intelligent) Stats() Stats { + return Stats{ + Bytes: r.currentSize, + Items: uint64(len(r.items)), + Evictions: r.totalEvicted, + } +} + +// ExpireObjects expire objects in go routine +func (r *Intelligent) ExpireObjects(gcInterval time.Duration) { + r.gcInterval = gcInterval + go func() { + for range time.Tick(gcInterval) { + for key := range r.items { + r.Lock() + if !r.isValid(key) { + r.Delete(key) + } + r.Unlock() + } + } + }() +} + +// Get returns a value of a given key if it exists +func (r *Intelligent) Get(key string) (interface{}, bool) { + r.Lock() + defer r.Unlock() + value, ok := r.items[key] + return value, ok +} + +// Set will persist a value to the cache +func (r *Intelligent) Set(key string, value interface{}) { + r.Lock() + // remove random key if only we reach the maxSize threshold, + // if not assume infinite memory + if r.maxSize > 0 { + for key := range r.items { + for (r.currentSize + uint64(len(value.([]byte)))) > r.maxSize { + r.Delete(key) + } + break + } + } + r.items[key] = value + r.currentSize += uint64(len(value.([]byte))) + r.createdAt[key] = time.Now() + r.Unlock() + return +} + +// Delete deletes a given key if exists +func (r *Intelligent) Delete(key string) { + r.currentSize -= uint64(len(r.items[key].([]byte))) + delete(r.items, key) + delete(r.createdAt, key) + r.totalEvicted++ + if r.OnEvicted != nil { + r.OnEvicted(key) + } +} + +func (r *Intelligent) isValid(key string) bool { + createdAt, ok := r.createdAt[key] + if !ok { + return false + } + if r.expiration == zeroExpiration { + return true + } + return createdAt.Add(r.expiration).After(time.Now()) +}