diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 2f10dae30..8f69d99b6 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -366,7 +366,7 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso } // CreateObject creates a new object -func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) (string, error) { +func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, size int64, reader io.Reader) (string, error) { errParams := map[string]string{ "bucketName": bucketName, "objectName": objectName, @@ -383,6 +383,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } metadata := make(map[string]string) metadata["contentType"] = strings.TrimSpace(contentType) + metadata["contentLength"] = strconv.FormatInt(size, 10) if strings.TrimSpace(expectedMD5Sum) != "" { expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 2f1e4f72c..99e277bce 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -31,7 +31,15 @@ limitations under the License. package memory -import "container/list" +import ( + "bytes" + "container/list" + "io" + "strconv" + + "github.com/minio-io/minio/pkg/iodine" + "github.com/minio-io/minio/pkg/storage/drivers" +) // CacheStats are returned by stats accessors on Group. type CacheStats struct { @@ -53,7 +61,6 @@ type Cache struct { totalSize uint64 totalEvicted int64 cache map[interface{}]*list.Element - value []byte } // A Key may be any value that is comparable. See http://golang.org/ref/spec#Comparison_operators @@ -61,7 +68,7 @@ type Key interface{} type entry struct { key Key - value []byte + value *bytes.Buffer } // NewCache creates a new Cache. @@ -83,30 +90,37 @@ func (c *Cache) Stats() CacheStats { } } -func (c *Cache) Write(p []byte) (n int, err error) { - c.totalSize = c.totalSize + uint64(len(p)) - // If MaxSize is zero expecting infinite memory - if c.MaxSize != 0 && c.totalSize > c.MaxSize { - c.totalSize -= uint64(len(p)) - c.RemoveOldest() - } - c.value = append(c.value, p...) - return len(p), nil -} - // Add adds a value to the cache. -func (c *Cache) Add(key Key) { - if c.cache == nil { - c.cache = make(map[interface{}]*list.Element) - c.ll = list.New() - } - ele := c.ll.PushFront(&entry{key, c.value}) - c.value = nil - c.cache[key] = ele +func (c *Cache) Add(key Key, size int64) io.WriteCloser { + r, w := io.Pipe() + go func() { + if uint64(size) > c.MaxSize { + err := iodine.New(drivers.EntityTooLarge{ + Size: strconv.FormatInt(size, 10), + MaxSize: strconv.FormatUint(c.MaxSize, 10), + }, nil) + r.CloseWithError(err) + return + } + // If MaxSize is zero expecting infinite memory + if c.MaxSize != 0 && (c.totalSize+uint64(size)) > c.MaxSize { + c.RemoveOldest() + } + value := new(bytes.Buffer) + n, err := io.CopyN(value, r, size) + if err != nil { + r.CloseWithError(iodine.New(err, nil)) + return + } + ele := c.ll.PushFront(&entry{key, value}) + c.cache[key] = ele + c.totalSize += uint64(n) + }() + return w } // Get looks up a key's value from the cache. -func (c *Cache) Get(key Key) (value []byte, ok bool) { +func (c *Cache) Get(key Key) (value *bytes.Buffer, ok bool) { if c.cache == nil { return } @@ -155,6 +169,7 @@ func (c *Cache) removeElement(e *list.Element) { kv := e.Value.(*entry) delete(c.cache, kv.key) c.totalEvicted++ + c.totalSize -= uint64(kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key) } diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 7445319c8..873816b78 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -25,6 +25,7 @@ import ( "errors" "io" "io/ioutil" + "log" "runtime/debug" "sort" "strings" @@ -33,18 +34,14 @@ import ( "github.com/minio-io/minio/pkg/iodine" "github.com/minio-io/minio/pkg/storage/drivers" - "github.com/minio-io/minio/pkg/storage/drivers/memory/lru" - "github.com/minio-io/minio/pkg/utils/log" - "github.com/minio-io/minio/pkg/utils/split" ) // memoryDriver - local variables type memoryDriver struct { storedBuckets map[string]storedBucket lock *sync.RWMutex - objects *lru.Cache + objects *Cache lastAccessedObjects map[string]time.Time - totalSize uint64 maxSize uint64 expiration time.Duration shutdown bool @@ -68,27 +65,18 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro memory = new(memoryDriver) memory.storedBuckets = make(map[string]storedBucket) memory.lastAccessedObjects = make(map[string]time.Time) - memory.objects = lru.New(0) + memory.objects = NewCache(maxSize) + memory.maxSize = maxSize memory.lock = new(sync.RWMutex) memory.expiration = expiration memory.shutdown = false - switch { - case maxSize <= 0: - memory.maxSize = 9223372036854775807 - case maxSize > 0: - memory.maxSize = maxSize - default: - log.Println("Error") - } - memory.objects.OnEvicted = memory.evictObject // set up memory expiration if expiration > 0 { go memory.expireLRUObjects() } - go start(ctrlChannel, errorChannel) return ctrlChannel, errorChannel, memory } @@ -117,11 +105,9 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) objectKey := bucket + "/" + object if _, ok := storedBucket.objectMetadata[objectKey]; ok { if data, ok := memory.objects.Get(objectKey); ok { - dataSlice := data.([]byte) - objectBuffer := bytes.NewBuffer(dataSlice) memory.lock.RUnlock() go memory.updateAccessTime(objectKey) - written, err := io.Copy(w, objectBuffer) + written, err := io.Copy(w, data) return written, iodine.New(err, nil) } } @@ -204,14 +190,12 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error { return iodine.New(errors.New("invalid argument"), nil) } -func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) { - humanError, err := memory.createObject(bucket, key, contentType, expectedMD5Sum, data) - debug.FreeOSMemory() - return humanError, err +func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { + return memory.createObject(bucket, key, contentType, expectedMD5Sum, size, data) } // CreateObject - PUT object to memory buffer -func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) { +func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { memory.lock.RLock() if !drivers.IsValidBucket(bucket) { memory.lock.RUnlock() @@ -247,37 +231,30 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - var bytesBuffer bytes.Buffer - - chunks := split.Stream(data, 10*1024*1024) - totalLength := 0 - summer := md5.New() - for chunk := range chunks { - if chunk.Err == nil { - totalLength = totalLength + len(chunk.Data) - summer.Write(chunk.Data) - _, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data)) - if err != nil { - err := iodine.New(err, nil) - log.Println(err) - return "", err - } - if uint64(totalLength)+memory.totalSize > memory.maxSize { - memory.objects.RemoveOldest() - } - } + memory.lock.Lock() + md5Writer := md5.New() + lruWriter := memory.objects.Add(objectKey, size) + mw := io.MultiWriter(md5Writer, lruWriter) + totalLength, err := io.CopyN(mw, data, size) + if err != nil { + memory.lock.Unlock() + return "", iodine.New(err, nil) } - md5SumBytes := summer.Sum(nil) + if err := lruWriter.Close(); err != nil { + memory.lock.Unlock() + return "", iodine.New(err, nil) + } + memory.lock.Unlock() + + md5SumBytes := md5Writer.Sum(nil) md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such if strings.TrimSpace(expectedMD5Sum) != "" { if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { - memory.lock.Lock() - defer memory.lock.Unlock() - memory.objects.RemoveOldest() return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) } } + newObject := drivers.ObjectMetadata{ Bucket: bucket, Key: key, @@ -287,21 +264,21 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su Md5: md5Sum, Size: int64(totalLength), } + memory.lock.Lock() memoryObject := make(map[string]drivers.ObjectMetadata) - if len(memory.storedBuckets[bucket].objectMetadata) == 0 { + switch { + case len(memory.storedBuckets[bucket].objectMetadata) == 0: storedBucket.objectMetadata = memoryObject storedBucket.objectMetadata[objectKey] = newObject - } else { + default: storedBucket.objectMetadata[objectKey] = newObject } memory.storedBuckets[bucket] = storedBucket - memory.objects.Add(objectKey, bytesBuffer.Bytes()) - memory.totalSize = memory.totalSize + uint64(newObject.Size) - if memory.totalSize > memory.maxSize { - memory.objects.RemoveOldest() - } memory.lock.Unlock() + + // free + debug.FreeOSMemory() return newObject.Md5, nil } @@ -481,25 +458,21 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) } -func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { - memory.doEvictObject(key, value) - debug.FreeOSMemory() -} - -func (memory *memoryDriver) doEvictObject(key lru.Key, value interface{}) { - k := key.(string) +func (memory *memoryDriver) evictObject(a ...interface{}) { + cacheStats := memory.objects.Stats() + log.Printf("CurrenSize: %d, CurrentItems: %d, TotalEvictions: %d", + cacheStats.Bytes, memory.objects.Len(), cacheStats.Evictions) + key := a[0].(string) // loop through all buckets for bucket, storedBucket := range memory.storedBuckets { - memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size) - log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size) - log.Println("TotalSize:", memory.totalSize) - delete(storedBucket.objectMetadata, k) + delete(storedBucket.objectMetadata, key) + delete(memory.lastAccessedObjects, key) // remove bucket if no objects found anymore if len(storedBucket.objectMetadata) == 0 { delete(memory.storedBuckets, bucket) } - delete(memory.lastAccessedObjects, k) } + debug.FreeOSMemory() } func (memory *memoryDriver) expireLRUObjects() { @@ -509,16 +482,18 @@ func (memory *memoryDriver) expireLRUObjects() { } var sleepDuration time.Duration memory.lock.Lock() - if memory.objects.Len() > 0 { - if k, _, ok := memory.objects.GetOldest(); ok { + switch { + case memory.objects.Len() > 0: + if k, ok := memory.objects.GetOldest(); ok { key := k.(string) - if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration { + switch { + case time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration: memory.objects.RemoveOldest() - } else { + default: sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) } } - } else { + default: sleepDuration = memory.expiration } memory.lock.Unlock() diff --git a/pkg/storage/drivers/memory/memory_test.go b/pkg/storage/drivers/memory/memory_test.go index ed53a0577..3ed145fd8 100644 --- a/pkg/storage/drivers/memory/memory_test.go +++ b/pkg/storage/drivers/memory/memory_test.go @@ -21,7 +21,6 @@ import ( . "github.com/minio-io/check" "github.com/minio-io/minio/pkg/storage/drivers" - "time" ) func Test(t *testing.T) { TestingT(t) } @@ -32,7 +31,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - _, _, store := Start(1000, 3*time.Hour) + _, _, store := Start(10000000, 0) return store } drivers.APITestSuite(c, create) diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index 81d5af815..455583215 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -116,8 +116,8 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet } // CreateObject is a mock -func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) { - ret := m.Called(bucket, key, contentType, md5sum, data) +func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, size int64, data io.Reader) (string, error) { + ret := m.Called(bucket, key, contentType, md5sum, size, data) r0 := ret.Get(0).(string) r1 := ret.Error(1)