From c0c8a8430e4ae703e377a90acef3d716aa8847e3 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 8 Jul 2016 20:34:27 -0700 Subject: [PATCH] XL/PutObject: Add single putObject and multipart caching. (#2115) - Additionally adds test cases as well for object cache. - Adds auto-expiry with expiration and cleanup time interval. Fixes #2080 Fixes #2091 --- docs/caching.md | 39 ++++++++ globals.go | 7 +- namespace-lock.go | 8 +- pkg/objcache/README.md | 62 ++++++++++++ pkg/objcache/objcache.go | 182 +++++++++++++++++++++++----------- pkg/objcache/objcache_test.go | 172 ++++++++++++++++++++++++++++++++ server-main.go | 8 ++ xl-v1-multipart.go | 15 ++- xl-v1-object.go | 50 ++++++++-- xl-v1.go | 20 ++-- 10 files changed, 482 insertions(+), 81 deletions(-) create mode 100644 docs/caching.md create mode 100644 pkg/objcache/README.md create mode 100644 pkg/objcache/objcache_test.go diff --git a/docs/caching.md b/docs/caching.md new file mode 100644 index 000000000..fddd3fbee --- /dev/null +++ b/docs/caching.md @@ -0,0 +1,39 @@ +## Object caching + +Object caching by turned on by default with following settings + + - Default cache size 8GB, can be changed from environment variable + ``MINIO_CACHE_SIZE`` supports both SI and ISO IEC standard forms + for input size parameters. + + - Default expiration of entries is 72 hours, can be changed from + environment variable ``MINIO_CACHE_EXPIRY`` supportings Go + ``time.Duration`` with valid units "ns", "us" (or "µs"), + "ms", "s", "m", "h". + + - Default expiry interval is 1/4th of the expiration hours, so + expiration sweep happens across the cache every 1/4th the time + duration of the set entry expiration duration. + +### Tricks + +Setting MINIO_CACHE_SIZE=0 will turn off caching entirely. +Setting MINIO_CACHE_EXPIRY=0s will turn off cache garbage collections, +all cached objects will never expire. + +### Behavior + +Caching happens for both GET and PUT. + +- GET caches new objects for entries not found in cache, +otherwise serves from the cache. + +- PUT/POST caches all successfully uploaded objects. + +NOTE: Cache is not populated if there are any errors + while reading from the disk. + +Expiration happens automatically based on the configured +interval as explained above, frequently accessed objects +stay alive for significantly longer time due to the fact +that expiration time is reset for every cache hit. diff --git a/globals.go b/globals.go index 21182abf0..21fa85b47 100644 --- a/globals.go +++ b/globals.go @@ -16,7 +16,10 @@ package main -import "github.com/fatih/color" +import ( + "github.com/fatih/color" + "github.com/minio/minio/pkg/objcache" +) // Global constants for Minio. const ( @@ -45,6 +48,8 @@ var ( globalMaxConn = 0 // Maximum cache size. globalMaxCacheSize = uint64(maxCacheSize) + // Cache expiry. + globalCacheExpiry = objcache.DefaultExpiry // Add new variable global values here. ) diff --git a/namespace-lock.go b/namespace-lock.go index 07ca23691..2822adaff 100644 --- a/namespace-lock.go +++ b/namespace-lock.go @@ -29,7 +29,7 @@ type nsParam struct { // nsLock - provides primitives for locking critical namespace regions. type nsLock struct { - *sync.RWMutex + sync.RWMutex ref uint } @@ -37,7 +37,7 @@ type nsLock struct { // Unlock, RLock and RUnlock. type nsLockMap struct { lockMap map[nsParam]*nsLock - mutex *sync.Mutex + mutex sync.Mutex } // Global name space lock. @@ -47,7 +47,6 @@ var nsMutex *nsLockMap func initNSLock() { nsMutex = &nsLockMap{ lockMap: make(map[nsParam]*nsLock), - mutex: &sync.Mutex{}, } } @@ -59,8 +58,7 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) { nsLk, found := n.lockMap[param] if !found { nsLk = &nsLock{ - RWMutex: &sync.RWMutex{}, - ref: 0, + ref: 0, } n.lockMap[param] = nsLk } diff --git a/pkg/objcache/README.md b/pkg/objcache/README.md new file mode 100644 index 000000000..283ceff6f --- /dev/null +++ b/pkg/objcache/README.md @@ -0,0 +1,62 @@ +``` +PACKAGE DOCUMENTATION + +package objcache + import "github.com/minio/minio/pkg/objcache" + + Package objcache implements in memory caching methods. + +VARIABLES + +var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs. + + DefaultExpiry represents default time duration value when individual + entries will be expired. + +var ErrCacheFull = errors.New("Not enough space in cache") + ErrCacheFull - cache is full. + +var ErrKeyNotFoundInCache = errors.New("Key not found in cache") + ErrKeyNotFoundInCache - key not found in cache. + +var NoExpiry = time.Duration(0) + NoExpiry represents caches to be permanent and can only be deleted. + +TYPES + +type Cache struct { + + // OnEviction - callback function for eviction + OnEviction func(key string) + // contains filtered or unexported fields +} + Cache holds the required variables to compose an in memory cache system + which also provides expiring key mechanism and also maxSize. + +func New(maxSize uint64, expiry time.Duration) *Cache + New - Return a new cache with a given default expiry duration. If the + expiry duration is less than one (or NoExpiry), the items in the cache + never expire (by default), and must be deleted manually. + +func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) + Create - validates if object size fits with in cache size limit and + returns a io.WriteCloser to which object contents can be written and + finally Close()'d. During Close() we checks if the amount of data + written is equal to the size of the object, in which case it saves the + contents to object cache. + +func (c *Cache) Delete(key string) + Delete - delete deletes an entry from the cache. + +func (c *Cache) DeleteExpired() + DeleteExpired - deletes all the expired entries from the cache. + +func (c *Cache) Open(key string) (io.ReadSeeker, error) + Open - open the in-memory file, returns an in memory read seeker. + returns an error ErrNotFoundInCache, if the key does not exist. + +func (c *Cache) StopExpiry() + StopExpiry sends a message to the expiry routine to stop expiring cached + entries. NOTE: once this is called, cached entries will not be expired + if the consume has called this. +``` diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go index 013fdebde..7a3473338 100644 --- a/pkg/objcache/objcache.go +++ b/pkg/objcache/objcache.go @@ -26,15 +26,25 @@ import ( "time" ) -// NoExpiration represents caches to be permanent and can only be deleted. -var NoExpiration = time.Duration(0) +// NoExpiry represents caches to be permanent and can only be deleted. +var NoExpiry = time.Duration(0) + +// DefaultExpiry represents default time duration value when individual entries will be expired. +var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs. + +// buffer represents the in memory cache of a single entry. +// buffer carries value of the data and last accessed time. +type buffer struct { + value []byte // Value of the entry. + lastAccessed time.Time // Represents time when value was last accessed. +} // Cache holds the required variables to compose an in memory cache system // which also provides expiring key mechanism and also maxSize. type Cache struct { // Mutex is used for handling the concurrent // read/write requests for cache - mutex *sync.RWMutex + mutex sync.Mutex // maxSize is a total size for overall cache maxSize uint64 @@ -43,29 +53,39 @@ type Cache struct { currentSize uint64 // OnEviction - callback function for eviction - OnEviction func(a ...interface{}) + OnEviction func(key string) - // totalEvicted counter to keep track of total expirations + // totalEvicted counter to keep track of total expirys totalEvicted int // map of objectName and its contents - entries map[string][]byte + entries map[string]*buffer - // Expiration in time duration. + // Expiry in time duration. expiry time.Duration + + // Stop garbage collection routine, stops any running GC routine. + stopGC chan struct{} } -// New creates an inmemory cache -// -// maxSize is used for expiring objects before we run out of memory -// expiration is used for expiration of a key from cache +// New - Return a new cache with a given default expiry duration. +// If the expiry duration is less than one (or NoExpiry), +// the items in the cache never expire (by default), and must be deleted +// manually. func New(maxSize uint64, expiry time.Duration) *Cache { - return &Cache{ - mutex: &sync.RWMutex{}, + C := &Cache{ maxSize: maxSize, - entries: make(map[string][]byte), + entries: make(map[string]*buffer), expiry: expiry, } + // We have expiry start the janitor routine. + if expiry > 0 { + C.stopGC = make(chan struct{}) + + // Start garbage collection routine to expire objects. + C.startGC() + } + return C } // ErrKeyNotFoundInCache - key not found in cache. @@ -74,17 +94,19 @@ var ErrKeyNotFoundInCache = errors.New("Key not found in cache") // ErrCacheFull - cache is full. var ErrCacheFull = errors.New("Not enough space in cache") +// ErrExcessData - excess data was attempted to be written on cache. +var ErrExcessData = errors.New("Attempted excess write on cache") + // Used for adding entry to the object cache. Implements io.WriteCloser type cacheBuffer struct { *bytes.Buffer // Implements io.Writer - onClose func() + onClose func() error } // On close, onClose() is called which checks if all object contents // have been written so that it can save the buffer to the cache. -func (c cacheBuffer) Close() error { - c.onClose() - return nil +func (c cacheBuffer) Close() (err error) { + return c.onClose() } // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser @@ -92,39 +114,43 @@ func (c cacheBuffer) Close() error { // checks if the amount of data written is equal to the size of the object, in which // case it saves the contents to object cache. func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { - c.mutex.Lock() - defer c.mutex.Unlock() + // Recovers any panic generated and return errors appropriately. + defer func() { + if r := recover(); r != nil { + // Recover any panic and return ErrCacheFull. + err = ErrCacheFull + } + }() // Do not crash the server. valueLen := uint64(size) - if c.maxSize > 0 { - // Check if the size of the object is not bigger than the capacity of the cache. - if valueLen > c.maxSize { - return nil, ErrCacheFull - } - // TODO - auto expire random key. - if c.currentSize+valueLen > c.maxSize { - return nil, ErrCacheFull - } + // Check if the size of the object is not bigger than the capacity of the cache. + if c.maxSize > 0 && valueLen > c.maxSize { + return nil, ErrCacheFull } // Will hold the object contents. buf := bytes.NewBuffer(make([]byte, 0, size)) - // Account for the memory allocated above. - c.currentSize += uint64(size) // Function called on close which saves the object contents // to the object cache. - onClose := func() { + onClose := func() error { c.mutex.Lock() defer c.mutex.Unlock() - if buf.Len() != int(size) { + if size != int64(buf.Len()) { // Full object not available hence do not save buf to object cache. - c.currentSize -= uint64(size) - return + return io.ErrShortBuffer + } + if c.maxSize > 0 && c.currentSize+valueLen > c.maxSize { + return ErrExcessData } // Full object available in buf, save it to cache. - c.entries[key] = buf.Bytes() - return + c.entries[key] = &buffer{ + value: buf.Bytes(), + lastAccessed: time.Now().UTC(), // Save last accessed time. + } + // Account for the memory allocated above. + c.currentSize += uint64(size) + return nil } // Object contents that is written - cacheBuffer.Write(data) @@ -138,35 +164,77 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { // Open - open the in-memory file, returns an in memory read seeker. // returns an error ErrNotFoundInCache, if the key does not exist. func (c *Cache) Open(key string) (io.ReadSeeker, error) { - c.mutex.RLock() - defer c.mutex.RUnlock() - // Entry exists, return the readable buffer. - buffer, ok := c.entries[key] + c.mutex.Lock() + defer c.mutex.Unlock() + buf, ok := c.entries[key] if !ok { return nil, ErrKeyNotFoundInCache } - return bytes.NewReader(buffer), nil + buf.lastAccessed = time.Now().UTC() + return bytes.NewReader(buf.value), nil } -// Delete - delete deletes an entry from in-memory fs. +// Delete - delete deletes an entry from the cache. func (c *Cache) Delete(key string) { c.mutex.Lock() - defer c.mutex.Unlock() - - // Delete an entry. - buffer, ok := c.entries[key] - if ok { - c.deleteEntry(key, int64(len(buffer))) - } -} - -// Deletes the entry that was found. -func (c *Cache) deleteEntry(key string, size int64) { - delete(c.entries, key) - c.currentSize -= uint64(size) - c.totalEvicted++ + c.delete(key) + c.mutex.Unlock() if c.OnEviction != nil { c.OnEviction(key) } } + +// gc - garbage collect all the expired entries from the cache. +func (c *Cache) gc() { + var evictedEntries []string + c.mutex.Lock() + for k, v := range c.entries { + if c.expiry > 0 && time.Now().UTC().Sub(v.lastAccessed) > c.expiry { + c.delete(k) + evictedEntries = append(evictedEntries, k) + } + } + c.mutex.Unlock() + for _, k := range evictedEntries { + if c.OnEviction != nil { + c.OnEviction(k) + } + } +} + +// StopGC sends a message to the expiry routine to stop +// expiring cached entries. NOTE: once this is called, cached +// entries will not be expired if the consumer has called this. +func (c *Cache) StopGC() { + if c.stopGC != nil { + c.stopGC <- struct{}{} + } +} + +// startGC starts running a routine ticking at expiry interval, on each interval +// this routine does a sweep across the cache entries and garbage collects all the +// expired entries. +func (c *Cache) startGC() { + go func() { + for { + select { + // Wait till cleanup interval and initiate delete expired entries. + case <-time.After(c.expiry / 4): + c.gc() + // Stop the routine, usually called by the user of object cache during cleanup. + case <-c.stopGC: + return + } + } + }() +} + +// Deletes a requested entry from the cache. +func (c *Cache) delete(key string) { + if buf, ok := c.entries[key]; ok { + delete(c.entries, key) + c.currentSize -= uint64(len(buf.value)) + c.totalEvicted++ + } +} diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go new file mode 100644 index 000000000..91207a31c --- /dev/null +++ b/pkg/objcache/objcache_test.go @@ -0,0 +1,172 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package objcache + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + "time" +) + +// TestObjCache - tests various cases for object cache behavior. +func TestObjCache(t *testing.T) { + // Non exhaustive list of all object cache behavior cases. + testCases := []struct { + expiry time.Duration + cacheSize uint64 + err error + closeErr error + }{ + // Validate if a key is not found in cache and Open fails. + { + expiry: NoExpiry, + cacheSize: 1024, + err: ErrKeyNotFoundInCache, + }, + // Validate if cache indicates that it is full and Create fails. + { + expiry: NoExpiry, + cacheSize: 1, + err: ErrCacheFull, + }, + // Validate if Create succeeds but Close fails to write to buffer. + { + expiry: NoExpiry, + cacheSize: 2, + closeErr: io.ErrShortBuffer, + }, + // Validate that Create and Close succeed, making sure to update the cache. + { + expiry: NoExpiry, + cacheSize: 1024, + }, + // Validate that Delete succeeds and Open fails with key not found in cache. + { + expiry: NoExpiry, + cacheSize: 1024, + err: ErrKeyNotFoundInCache, + }, + // Validate OnEviction function is called upon entry delete. + { + expiry: NoExpiry, + cacheSize: 1024, + }, + } + + // Test 1 validating Open failure. + testCase := testCases[0] + cache := New(testCase.cacheSize, testCase.expiry) + _, err := cache.Open("test") + if testCase.err != err { + t.Errorf("Test case 2 expected to pass, failed instead %s", err) + } + + // Test 2 validating Create failure. + testCase = testCases[1] + cache = New(testCase.cacheSize, testCase.expiry) + _, err = cache.Create("test", 2) + if testCase.err != err { + t.Errorf("Test case 2 expected to pass, failed instead %s", err) + } + + // Test 3 validating Create succeeds and returns a writer. + // Subsequently we Close() without writing any data, to receive + // `io.ErrShortBuffer` + testCase = testCases[2] + cache = New(testCase.cacheSize, testCase.expiry) + w, err := cache.Create("test", 1) + if testCase.err != err { + t.Errorf("Test case 3 expected to pass, failed instead %s", err) + } + if err = w.Close(); err != testCase.closeErr { + t.Errorf("Test case 3 expected to pass, failed instead %s", err) + } + + // Test 4 validates Create and Close succeeds successfully caching + // the writes. + testCase = testCases[3] + cache = New(testCase.cacheSize, testCase.expiry) + w, err = cache.Create("test", 5) + if testCase.err != err { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + r, err := cache.Open("test") + if err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + // Reads everything stored for key "test". + cbytes, err := ioutil.ReadAll(r) + if err != nil { + t.Errorf("Test case 4 expected to pass, failed instead %s", err) + } + // Validate if read bytes match. + if !bytes.Equal(cbytes, []byte("Hello")) { + t.Errorf("Test case 4 expected to pass. wanted \"Hello\", got %s", string(cbytes)) + } + + // Test 5 validates Delete succeeds and Open fails with err + testCase = testCases[4] + cache = New(testCase.cacheSize, testCase.expiry) + w, err = cache.Create("test", 5) + if err != nil { + t.Errorf("Test case 5 expected to pass, failed instead %s", err) + } + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 5 expected to pass, failed instead %s", err) + } + // Delete the cache entry. + cache.Delete("test") + _, err = cache.Open("test") + if testCase.err != err { + t.Errorf("Test case 5 expected to pass, failed instead %s", err) + } + + // Test 6 validates OnEviction being called upon Delete is being invoked. + testCase = testCases[5] + cache = New(testCase.cacheSize, testCase.expiry) + w, err = cache.Create("test", 5) + if err != nil { + t.Errorf("Test case 6 expected to pass, failed instead %s", err) + } + // Write '5' bytes. + w.Write([]byte("Hello")) + // Close to successfully save into cache. + if err = w.Close(); err != nil { + t.Errorf("Test case 6 expected to pass, failed instead %s", err) + } + var deleteKey string + cache.OnEviction = func(key string) { + deleteKey = key + } + // Delete the cache entry. + cache.Delete("test") + if deleteKey != "test" { + t.Errorf("Test case 6 expected to pass, wanted \"test\", got %s", deleteKey) + } +} diff --git a/server-main.go b/server-main.go index adbdfde35..524e5debb 100644 --- a/server-main.go +++ b/server-main.go @@ -149,6 +149,14 @@ func initServerConfig(c *cli.Context) { fatalIf(err, "Unable to convert MINIO_CACHE_SIZE=%s environment variable into its integer value.", maxCacheSizeStr) } + // Fetch cache expiry from environment variable. + if cacheExpiryStr := os.Getenv("MINIO_CACHE_EXPIRY"); cacheExpiryStr != "" { + // We need to parse cache expiry to its time.Duration value. + var err error + globalCacheExpiry, err = time.ParseDuration(cacheExpiryStr) + fatalIf(err, "Unable to convert MINIO_CACHE_EXPIRY=%s environment variable into its time.Duration value.", cacheExpiryStr) + } + // Fetch access keys from environment variables if any and update the config. accessKey := os.Getenv("MINIO_ACCESS_KEY") secretKey := os.Getenv("MINIO_SECRET_KEY") diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index e418f7cc3..7c2116200 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "fmt" "io" + "io/ioutil" "path" "path/filepath" "strings" @@ -683,7 +684,19 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload } // Hold write lock on the destination before rename. nsMutex.Lock(bucket, object) - defer nsMutex.Unlock(bucket, object) + defer func() { + // A new complete multipart upload invalidates any + // previously cached object in memory. + xl.objCache.Delete(path.Join(bucket, object)) + + // This lock also protects the cache namespace. + nsMutex.Unlock(bucket, object) + + // Prefetch the object from disk by triggerring a fake GetObject call + // Unlike a regular single PutObject, multipart PutObject is comes in + // stages and it is harder to cache. + go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + }() // Rename if an object already exists to temporary location. uniqueID := getUUID() diff --git a/xl-v1-object.go b/xl-v1-object.go index f5718e0e2..06163bfa0 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -114,7 +114,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i mw := writer // Object cache enabled block. - if xl.objCacheEnabled { + if xlMeta.Stat.Size > 0 && xl.objCacheEnabled { // Validate if we have previous cache. cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object)) if err == nil { // Cache hit. @@ -144,9 +144,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i mw = io.MultiWriter(newBuffer, writer) defer newBuffer.Close() } + // Ignore error if cache is full, proceed to write the object. if err != nil && err != objcache.ErrCacheFull { - // Perhaps cache is full, returns here. - return err + // For any other error return here. + return toObjectErr(err, bucket, object) } } } @@ -170,7 +171,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i // Start reading the part name. n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { - return err + return toObjectErr(err, bucket, object) } totalBytesRead += n @@ -340,7 +341,7 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri // until EOF, erasure codes the data across all disk and additionally // writes `xl.json` which carries the necessary metadata for future // object operations. -func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { +func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (md5Sum string, err error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { return "", BucketNameInvalid{Bucket: bucket} @@ -359,14 +360,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. if metadata == nil { metadata = make(map[string]string) } - nsMutex.Lock(bucket, object) - defer nsMutex.Unlock(bucket, object) - uniqueID := getUUID() tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "part.1") minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix) tempObj := uniqueID + // Lock the object. + nsMutex.Lock(bucket, object) + defer nsMutex.Unlock(bucket, object) + // Initialize xl meta. xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) @@ -388,9 +390,33 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. higherVersion++ } + var mw io.Writer // Initialize md5 writer. md5Writer := md5.New() + // Proceed to set the cache. + var newBuffer io.WriteCloser + + // If caching is enabled, proceed to set the cache. + if size > 0 && xl.objCacheEnabled { + // PutObject invalidates any previously cached object in memory. + xl.objCache.Delete(path.Join(bucket, object)) + + // Create a new entry in memory of size. + newBuffer, err = xl.objCache.Create(path.Join(bucket, object), size) + if err == nil { + // Create a multi writer to write to both memory and client response. + mw = io.MultiWriter(newBuffer, md5Writer) + } + // Ignore error if cache is full, proceed to write the object. + if err != nil && err != objcache.ErrCacheFull { + // For any other error return here. + return "", toObjectErr(err, bucket, object) + } + } else { + mw = md5Writer + } + // Limit the reader to its provided size if specified. var limitDataReader io.Reader if size > 0 { @@ -403,7 +429,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. } // Tee reader combines incoming data stream and md5, data read from input stream is written to md5. - teeReader := io.TeeReader(limitDataReader, md5Writer) + teeReader := io.TeeReader(limitDataReader, mw) // Collect all the previous erasure infos across the disk. var eInfos []erasureInfo @@ -505,6 +531,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io. // Delete the temporary object. xl.deleteObject(minioMetaTmpBucket, newUniqueID) + // Once we have successfully renamed the object, Close the buffer which would + // save the object on cache. + if size > 0 && xl.objCacheEnabled && newBuffer != nil { + newBuffer.Close() + } + // Return md5sum, successfully wrote object. return newMD5Hex, nil } diff --git a/xl-v1.go b/xl-v1.go index d5b4993ac..a626d9cc6 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -158,16 +158,20 @@ func newXLObjects(disks []string) (ObjectLayer, error) { // Calculate data and parity blocks. dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2 + // Initialize object cache. + objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) + + // Initialize list pool. + listPool := newTreeWalkPool(globalLookupTimeout) + // Initialize xl objects. xl := xlObjects{ - physicalDisks: disks, - storageDisks: newPosixDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, - // Inititalize list pool. - listPool: newTreeWalkPool(globalLookupTimeout), - // Initialize object caching, FIXME: support auto cache expiration. - objCache: objcache.New(globalMaxCacheSize, objcache.NoExpiration), + physicalDisks: disks, + storageDisks: newPosixDisks, + dataBlocks: dataBlocks, + parityBlocks: parityBlocks, + listPool: listPool, + objCache: objCache, objCacheEnabled: globalMaxCacheSize > 0, }