caching: Optimize memory allocations. (#3405)

This change brings in changes at multiple places

 - Reuse buffers at almost all locations ranging
   from rpc, fs, xl, checksum etc.
 - Change caching behavior to disable itself
   under low memory conditions i.e < 8GB of RAM.
 - Only objects cached are of size 1/10th the size
   of the cache for example if 4GB is the cache size
   the maximum object size which will be cached
   is going to be 400MB. This change is an
   optimization to cache more objects rather
   than few larger objects.
 - If object cache is enabled default GC
   percent has been reduced to 20% in lieu
   with newly found behavior of GC. If the cache
   utilization reaches 75% of the maximum value
   GC percent is reduced to 10% to make GC
   more aggressive.
 - Do not use *bytes.Buffer* due to its growth
   requirements. For every allocation *bytes.Buffer*
   allocates an additional buffer for its internal
   purposes. This is undesirable for us, so
   implemented a new cappedWriter which is capped to a
   desired size, beyond this all writes rejected.

Possible fix for #3403.
This commit is contained in:
Harshavardhana 2016-12-08 20:35:07 -08:00 committed by GitHub
parent 410b579e87
commit b363709c11
17 changed files with 271 additions and 104 deletions

View File

@ -29,7 +29,7 @@ import (
// all the disks, writes also calculate individual block's checksum // all the disks, writes also calculate individual block's checksum
// for future bit-rot protection. // for future bit-rot protection.
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) {
// Allocated blockSized buffer for reading. // Allocated blockSized buffer for reading from incoming stream.
buf := make([]byte, blockSize) buf := make([]byte, blockSize)
hashWriters := newHashWriters(len(disks), algo) hashWriters := newHashWriters(len(disks), algo)

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"hash" "hash"
"io" "io"
"sync"
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
"github.com/minio/blake2b-simd" "github.com/minio/blake2b-simd"
@ -47,13 +48,23 @@ func newHash(algo string) hash.Hash {
} }
} }
// Hash buffer pool is a pool of reusable
// buffers used while checksumming a stream.
var hashBufferPool = sync.Pool{
New: func() interface{} {
b := make([]byte, readSizeV1)
return &b
},
}
// hashSum calculates the hash of the entire path and returns. // hashSum calculates the hash of the entire path and returns.
func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) {
// Allocate staging buffer of 128KiB for copyBuffer. // Fetch staging a new staging buffer from the pool.
buf := make([]byte, readSizeV1) bufp := hashBufferPool.Get().(*[]byte)
defer hashBufferPool.Put(bufp)
// Copy entire buffer to writer. // Copy entire buffer to writer.
if err := copyBuffer(writer, disk, volume, path, buf); err != nil { if err := copyBuffer(writer, disk, volume, path, *bufp); err != nil {
return nil, err return nil, err
} }

View File

@ -129,6 +129,8 @@ func (b *backgroundAppend) abort(uploadID string) {
func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID string, info bgAppendPartsInfo) { func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID string, info bgAppendPartsInfo) {
// Holds the list of parts that is already appended to the "append" file. // Holds the list of parts that is already appended to the "append" file.
appendMeta := fsMetaV1{} appendMeta := fsMetaV1{}
// Allocate staging read buffer.
buf := make([]byte, readSizeV1)
for { for {
select { select {
case input := <-info.inputCh: case input := <-info.inputCh:
@ -151,7 +153,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
} }
break break
} }
if err := appendPart(disk, bucket, object, uploadID, part); err != nil { if err := appendPart(disk, bucket, object, uploadID, part, buf); err != nil {
disk.DeleteFile(minioMetaTmpBucket, uploadID) disk.DeleteFile(minioMetaTmpBucket, uploadID)
appendMeta.Parts = nil appendMeta.Parts = nil
input.errCh <- err input.errCh <- err
@ -183,12 +185,11 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID
// Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location
// upon complete-multipart-upload. // upon complete-multipart-upload.
func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo, buf []byte) error {
partPath := pathJoin(bucket, object, uploadID, part.Name) partPath := pathJoin(bucket, object, uploadID, part.Name)
offset := int64(0) offset := int64(0)
totalLeft := part.Size totalLeft := part.Size
buf := make([]byte, readSizeV1)
for totalLeft > 0 { for totalLeft > 0 {
curLeft := int64(readSizeV1) curLeft := int64(readSizeV1)
if totalLeft < readSizeV1 { if totalLeft < readSizeV1 {

View File

@ -61,8 +61,10 @@ var (
globalIsDistXL = false // "Is Distributed?" flag. globalIsDistXL = false // "Is Distributed?" flag.
// Maximum cache size. // Maximum cache size. Defaults to disabled.
globalMaxCacheSize = uint64(maxCacheSize) // Caching is enabled only for RAM size > 8GiB.
globalMaxCacheSize = uint64(0)
// Cache expiry. // Cache expiry.
globalCacheExpiry = objcache.DefaultExpiry globalCacheExpiry = objcache.DefaultExpiry
// Minio local server address (in `host:port` format) // Minio local server address (in `host:port` format)

View File

@ -253,11 +253,9 @@ func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []S
// Print configuration errors. // Print configuration errors.
printConfigErrMsg(storageDisks, sErrs, printOnceFn()) printConfigErrMsg(storageDisks, sErrs, printOnceFn())
case WaitForAll: case WaitForAll:
console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", getElapsedTime())
getElapsedTime())
case WaitForFormatting: case WaitForFormatting:
console.Println("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", console.Printf("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", getElapsedTime())
getElapsedTime())
} }
continue continue
} // else We have FS backend now. Check fs format as well now. } // else We have FS backend now. Check fs format as well now.

View File

@ -66,18 +66,18 @@ func setMaxMemory() error {
// Validate if rlimit memory is set to lower // Validate if rlimit memory is set to lower
// than max cache size. Then we should use such value. // than max cache size. Then we should use such value.
if uint64(rLimit.Cur) < globalMaxCacheSize { if uint64(rLimit.Cur) < globalMaxCacheSize {
globalMaxCacheSize = (80 / 100) * uint64(rLimit.Cur) globalMaxCacheSize = uint64(float64(50*rLimit.Cur) / 100)
} }
// Make sure globalMaxCacheSize is less than RAM size. // Make sure globalMaxCacheSize is less than RAM size.
stats, err := sys.GetStats() stats, err := sys.GetStats()
if err != nil && err != sys.ErrNotImplemented { if err != nil && err != sys.ErrNotImplemented {
// sys.GetStats() is implemented only on linux. Ignore errors
// from other OSes.
return err return err
} }
if err == nil && stats.TotalRAM < globalMaxCacheSize { // If TotalRAM is >= minRAMSize we proceed to enable cache.
globalMaxCacheSize = uint64(float64(80*stats.TotalRAM) / 100) // cache is always 50% of the totalRAM.
if err == nil && stats.TotalRAM >= minRAMSize {
globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100)
} }
return nil return nil
} }

View File

@ -18,6 +18,8 @@
package cmd package cmd
import "github.com/minio/minio/pkg/sys"
func setMaxOpenFiles() error { func setMaxOpenFiles() error {
// Golang uses Win32 file API (CreateFile, WriteFile, ReadFile, // Golang uses Win32 file API (CreateFile, WriteFile, ReadFile,
// CloseHandle, etc.), then you don't have a limit on open files // CloseHandle, etc.), then you don't have a limit on open files
@ -26,6 +28,15 @@ func setMaxOpenFiles() error {
} }
func setMaxMemory() error { func setMaxMemory() error {
// TODO: explore if Win32 API's provide anything special here. // Make sure globalMaxCacheSize is less than RAM size.
stats, err := sys.GetStats()
if err != nil && err != sys.ErrNotImplemented {
return err
}
// If TotalRAM is <= minRAMSize we proceed to enable cache.
// cache is always 50% of the totalRAM.
if err == nil && stats.TotalRAM >= minRAMSize {
globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100)
}
return nil return nil
} }

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"bytes"
"io" "io"
"net" "net"
"net/rpc" "net/rpc"
@ -366,6 +367,13 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff
} }
}() }()
defer func() {
if r := recover(); r != nil {
// Recover any panic from allocation, and return error.
err = bytes.ErrTooLarge
}
}() // Do not crash the server.
// Take remote disk offline if the total network errors. // Take remote disk offline if the total network errors.
// are more than maximum allowable IO error limit. // are more than maximum allowable IO error limit.
if n.networkIOErrCount > maxAllowedNetworkIOError { if n.networkIOErrCount > maxAllowedNetworkIOError {
@ -377,10 +385,12 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff
Vol: volume, Vol: volume,
Path: path, Path: path,
Offset: offset, Offset: offset,
Size: len(buffer), Buffer: buffer,
}, &result) }, &result)
// Copy results to buffer. // Copy results to buffer.
copy(buffer, result) copy(buffer, result)
// Return length of result, err if any. // Return length of result, err if any.
return int64(len(result)), toStorageErr(err) return int64(len(result)), toStorageErr(err)
} }

View File

@ -57,8 +57,8 @@ type ReadFileArgs struct {
// Starting offset to start reading into Buffer. // Starting offset to start reading into Buffer.
Offset int64 Offset int64
// Data size read from the path at offset. // Data buffer read from the path at offset.
Size int Buffer []byte
} }
// PrepareFileArgs represents append file RPC arguments. // PrepareFileArgs represents append file RPC arguments.

View File

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"bytes"
"io" "io"
"net/rpc" "net/rpc"
"path" "path"
@ -156,19 +155,12 @@ func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error
// ReadFileHandler - read file handler is rpc wrapper to read file. // ReadFileHandler - read file handler is rpc wrapper to read file.
func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) { func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) {
defer func() {
if r := recover(); r != nil {
// Recover any panic and return ErrCacheFull.
err = bytes.ErrTooLarge
}
}() // Do not crash the server.
if !isRPCTokenValid(args.Token) { if !isRPCTokenValid(args.Token) {
return errInvalidToken return errInvalidToken
} }
// Allocate the requested buffer from the client.
*reply = make([]byte, args.Size)
var n int64 var n int64
n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, *reply) n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, args.Buffer)
// Sending an error over the rpc layer, would cause unmarshalling to fail. In situations // Sending an error over the rpc layer, would cause unmarshalling to fail. In situations
// when we have short read i.e `io.ErrUnexpectedEOF` treat it as good condition and copy // when we have short read i.e `io.ErrUnexpectedEOF` treat it as good condition and copy
// the buffer properly. // the buffer properly.
@ -176,7 +168,7 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err
// Reset to nil as good condition. // Reset to nil as good condition.
err = nil err = nil
} }
*reply = (*reply)[0:n] *reply = args.Buffer[0:n]
return err return err
} }

View File

@ -62,6 +62,9 @@ func init() {
// Disable printing console messages during tests. // Disable printing console messages during tests.
color.Output = ioutil.Discard color.Output = ioutil.Discard
// Enable caching.
setMaxMemory()
} }
func prepareFS() (ObjectLayer, string, error) { func prepareFS() (ObjectLayer, string, error) {

View File

@ -708,17 +708,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
destLock := nsMutex.NewNSLock(bucket, object) destLock := nsMutex.NewNSLock(bucket, object)
destLock.Lock() destLock.Lock()
defer func() { defer func() {
// A new complete multipart upload invalidates any if xl.objCacheEnabled {
// previously cached object in memory. // A new complete multipart upload invalidates any
xl.objCache.Delete(path.Join(bucket, object)) // previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
}
// This lock also protects the cache namespace. // This lock also protects the cache namespace.
destLock.Unlock() destLock.Unlock()
// Prefetch the object from disk by triggering a fake GetObject call if xl.objCacheEnabled {
// Unlike a regular single PutObject, multipart PutObject is comes in // Prefetch the object from disk by triggering a fake GetObject call
// stages and it is harder to cache. // Unlike a regular single PutObject, multipart PutObject is comes in
go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) // stages and it is harder to cache.
go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard)
}
}() }()
// Rename if an object already exists to temporary location. // Rename if an object already exists to temporary location.

View File

@ -621,8 +621,10 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
return toObjectErr(err, bucket, object) return toObjectErr(err, bucket, object)
} }
// Delete from the cache. if xl.objCacheEnabled {
xl.objCache.Delete(pathJoin(bucket, object)) // Delete from the cache.
xl.objCache.Delete(pathJoin(bucket, object))
}
// Success. // Success.
return nil return nil

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"fmt" "fmt"
"os" "os"
"runtime/debug"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -42,8 +43,9 @@ const (
// Uploads metadata file carries per multipart object metadata. // Uploads metadata file carries per multipart object metadata.
uploadsJSONFile = "uploads.json" uploadsJSONFile = "uploads.json"
// 8GiB cache by default. // Represents the minimum required RAM size before
maxCacheSize = 8 * humanize.GiByte // we enable caching.
minRAMSize = 8 * humanize.GiByte
// Maximum erasure blocks. // Maximum erasure blocks.
maxErasureBlocks = 16 maxErasureBlocks = 16
@ -92,9 +94,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
// Calculate data and parity blocks. // Calculate data and parity blocks.
dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2 dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2
// Initialize object cache.
objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry)
// Initialize list pool. // Initialize list pool.
listPool := newTreeWalkPool(globalLookupTimeout) listPool := newTreeWalkPool(globalLookupTimeout)
@ -103,13 +102,25 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
// Initialize xl objects. // Initialize xl objects.
xl := &xlObjects{ xl := &xlObjects{
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
storageDisks: newStorageDisks, storageDisks: newStorageDisks,
dataBlocks: dataBlocks, dataBlocks: dataBlocks,
parityBlocks: parityBlocks, parityBlocks: parityBlocks,
listPool: listPool, listPool: listPool,
objCache: objCache, }
objCacheEnabled: !objCacheDisabled,
// Object cache is enabled when _MINIO_CACHE env is missing.
// and cache size is > 0.
xl.objCacheEnabled = !objCacheDisabled && globalMaxCacheSize > 0
// Check if object cache is enabled.
if xl.objCacheEnabled {
// Initialize object cache.
objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry)
objCache.OnEviction = func(key string) {
debug.FreeOSMemory()
}
xl.objCache = objCache
} }
// Initialize meta volume, if volume already exists ignores it. // Initialize meta volume, if volume already exists ignores it.

View File

@ -0,0 +1,50 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package objcache implements in memory caching methods.
package objcache
// Used for adding entry to the object cache.
// Implements io.WriteCloser
type cappedWriter struct {
offset int64
cap int64
buffer []byte
onClose func() error
}
// Write implements a limited writer, returns error.
// if the writes go beyond allocated size.
func (c *cappedWriter) Write(b []byte) (n int, err error) {
if c.offset+int64(len(b)) > c.cap {
return 0, ErrExcessData
}
n = copy(c.buffer[int(c.offset):int(c.offset)+len(b)], b)
c.offset = c.offset + int64(n)
return n, nil
}
// Reset relinquishes the allocated underlying buffer.
func (c *cappedWriter) Reset() {
c.buffer = nil
}
// On close, onClose() is called which checks if all object contents
// have been written so that it can save the buffer to the cache.
func (c cappedWriter) Close() (err error) {
return c.onClose()
}

View File

@ -22,6 +22,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"io" "io"
"runtime/debug"
"sync" "sync"
"time" "time"
) )
@ -32,6 +33,13 @@ var NoExpiry = time.Duration(0)
// DefaultExpiry represents default time duration value when individual entries will be expired. // DefaultExpiry represents default time duration value when individual entries will be expired.
var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs. var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs.
// DefaultBufferRatio represents default ratio used to calculate the
// individual cache entry buffer size.
var DefaultBufferRatio = uint64(10)
// DefaultGCPercent represents default garbage collection target percentage.
var DefaultGCPercent = 20
// buffer represents the in memory cache of a single entry. // buffer represents the in memory cache of a single entry.
// buffer carries value of the data and last accessed time. // buffer carries value of the data and last accessed time.
type buffer struct { type buffer struct {
@ -46,9 +54,16 @@ type Cache struct {
// read/write requests for cache // read/write requests for cache
mutex sync.Mutex mutex sync.Mutex
// Once is used for resetting GC once after
// peak cache usage.
onceGC sync.Once
// maxSize is a total size for overall cache // maxSize is a total size for overall cache
maxSize uint64 maxSize uint64
// maxCacheEntrySize is a total size per key buffer.
maxCacheEntrySize uint64
// currentSize is a current size in memory // currentSize is a current size in memory
currentSize uint64 currentSize uint64
@ -68,27 +83,58 @@ type Cache struct {
stopGC chan struct{} stopGC chan struct{}
} }
// New - Return a new cache with a given default expiry duration. // New - Return a new cache with a given default expiry
// If the expiry duration is less than one (or NoExpiry), // duration. If the expiry duration is less than one
// the items in the cache never expire (by default), and must be deleted // (or NoExpiry), the items in the cache never expire
// manually. // (by default), and must be deleted manually.
func New(maxSize uint64, expiry time.Duration) *Cache { func New(maxSize uint64, expiry time.Duration) *Cache {
if maxSize == 0 { if maxSize == 0 {
panic("objcache: setting maximum cache size to zero is forbidden.") panic("objcache: setting maximum cache size to zero is forbidden.")
} }
C := &Cache{
maxSize: maxSize, // A garbage collection is triggered when the ratio
entries: make(map[string]*buffer), // of freshly allocated data to live data remaining
expiry: expiry, // after the previous collection reaches this percentage.
//
// - https://golang.org/pkg/runtime/debug/#SetGCPercent
//
// This means that by default GC is triggered after
// we've allocated an extra amount of memory proportional
// to the amount already in use.
//
// If gcpercent=100 and we're using 4M, we'll gc again
// when we get to 8M.
//
// Set this value to 20% if caching is enabled.
debug.SetGCPercent(DefaultGCPercent)
// Max cache entry size - indicates the
// maximum buffer per key that can be held in
// memory. Currently this value is 1/10th
// the size of requested cache size.
maxCacheEntrySize := func() uint64 {
i := maxSize / DefaultBufferRatio
if i == 0 {
i = maxSize
}
return i
}()
c := &Cache{
onceGC: sync.Once{},
maxSize: maxSize,
maxCacheEntrySize: maxCacheEntrySize,
entries: make(map[string]*buffer),
expiry: expiry,
} }
// We have expiry start the janitor routine. // We have expiry start the janitor routine.
if expiry > 0 { if expiry > 0 {
C.stopGC = make(chan struct{}) // Initialize a new stop GC channel.
c.stopGC = make(chan struct{})
// Start garbage collection routine to expire objects. // Start garbage collection routine to expire objects.
C.startGC() c.StartGC()
} }
return C return c
} }
// ErrKeyNotFoundInCache - key not found in cache. // ErrKeyNotFoundInCache - key not found in cache.
@ -100,18 +146,6 @@ var ErrCacheFull = errors.New("Not enough space in cache")
// ErrExcessData - excess data was attempted to be written on cache. // ErrExcessData - excess data was attempted to be written on cache.
var ErrExcessData = errors.New("Attempted excess write on cache") var ErrExcessData = errors.New("Attempted excess write on cache")
// Used for adding entry to the object cache. Implements io.WriteCloser
type cacheBuffer struct {
*bytes.Buffer // Implements io.Writer
onClose func() error
}
// On close, onClose() is called which checks if all object contents
// have been written so that it can save the buffer to the cache.
func (c cacheBuffer) Close() (err error) {
return c.onClose()
}
// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser
// to which object contents can be written and finally Close()'d. During Close() we // to which object contents can be written and finally Close()'d. During Close() we
// checks if the amount of data written is equal to the size of the object, in which // checks if the amount of data written is equal to the size of the object, in which
@ -126,29 +160,46 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
}() // Do not crash the server. }() // Do not crash the server.
valueLen := uint64(size) valueLen := uint64(size)
// Check if the size of the object is not bigger than the capacity of the cache. // Check if the size of the object is > 1/10th the size
if c.maxSize > 0 && valueLen > c.maxSize { // of the cache, if yes then we ignore it.
if valueLen > c.maxCacheEntrySize {
return nil, ErrCacheFull return nil, ErrCacheFull
} }
// Will hold the object contents. // Check if the incoming size is going to exceed
buf := bytes.NewBuffer(make([]byte, 0, size)) // the effective cache size, if yes return error
// instead.
c.mutex.Lock()
if c.currentSize+valueLen > c.maxSize {
c.mutex.Unlock()
return nil, ErrCacheFull
}
// Change GC percent if the current cache usage
// is already 75% of the maximum allowed usage.
if c.currentSize > (75 * c.maxSize / 100) {
c.onceGC.Do(func() { debug.SetGCPercent(DefaultGCPercent - 10) })
}
c.mutex.Unlock()
cbuf := &cappedWriter{
offset: 0,
cap: size,
buffer: make([]byte, size),
}
// Function called on close which saves the object contents // Function called on close which saves the object contents
// to the object cache. // to the object cache.
onClose := func() error { onClose := func() error {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if size != int64(buf.Len()) { if size != cbuf.offset {
cbuf.Reset() // Reset resets the buffer to be empty.
// Full object not available hence do not save buf to object cache. // Full object not available hence do not save buf to object cache.
return io.ErrShortBuffer return io.ErrShortBuffer
} }
if c.maxSize > 0 && c.currentSize+valueLen > c.maxSize {
return ErrExcessData
}
// Full object available in buf, save it to cache. // Full object available in buf, save it to cache.
c.entries[key] = &buffer{ c.entries[key] = &buffer{
value: buf.Bytes(), value: cbuf.buffer,
lastAccessed: time.Now().UTC(), // Save last accessed time. lastAccessed: time.Now().UTC(), // Save last accessed time.
} }
// Account for the memory allocated above. // Account for the memory allocated above.
@ -156,12 +207,10 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
return nil return nil
} }
// Object contents that is written - cacheBuffer.Write(data) // Object contents that is written - cappedWriter.Write(data)
// will be accumulated in buf which implements io.Writer. // will be accumulated in buf which implements io.Writer.
return cacheBuffer{ cbuf.onClose = onClose
buf, return cbuf, nil
onClose,
}, nil
} }
// Open - open the in-memory file, returns an in memory read seeker. // Open - open the in-memory file, returns an in memory read seeker.
@ -215,17 +264,17 @@ func (c *Cache) gc() {
// StopGC sends a message to the expiry routine to stop // StopGC sends a message to the expiry routine to stop
// expiring cached entries. NOTE: once this is called, cached // expiring cached entries. NOTE: once this is called, cached
// entries will not be expired if the consumer has called this. // entries will not be expired, be careful if you are using this.
func (c *Cache) StopGC() { func (c *Cache) StopGC() {
if c.stopGC != nil { if c.stopGC != nil {
c.stopGC <- struct{}{} c.stopGC <- struct{}{}
} }
} }
// startGC starts running a routine ticking at expiry interval, on each interval // StartGC starts running a routine ticking at expiry interval,
// this routine does a sweep across the cache entries and garbage collects all the // on each interval this routine does a sweep across the cache
// expired entries. // entries and garbage collects all the expired entries.
func (c *Cache) startGC() { func (c *Cache) StartGC() {
go func() { go func() {
for { for {
select { select {
@ -242,9 +291,10 @@ func (c *Cache) startGC() {
// Deletes a requested entry from the cache. // Deletes a requested entry from the cache.
func (c *Cache) delete(key string) { func (c *Cache) delete(key string) {
if buf, ok := c.entries[key]; ok { if _, ok := c.entries[key]; ok {
deletedSize := uint64(len(c.entries[key].value))
delete(c.entries, key) delete(c.entries, key)
c.currentSize -= uint64(len(buf.value)) c.currentSize -= deletedSize
c.totalEvicted++ c.totalEvicted++
} }
} }

View File

@ -117,6 +117,12 @@ func TestObjCache(t *testing.T) {
cacheSize: 5, cacheSize: 5,
closeErr: ErrExcessData, closeErr: ErrExcessData,
}, },
// Validate error excess data during write.
{
expiry: NoExpiry,
cacheSize: 2048,
err: ErrExcessData,
},
} }
// Test 1 validating Open failure. // Test 1 validating Open failure.
@ -232,14 +238,30 @@ func TestObjCache(t *testing.T) {
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
t.Errorf("Test case 7 expected to pass, failed instead %s", err) t.Errorf("Test case 7 expected to pass, failed instead %s", err)
} }
w, err = cache.Create("test2", 1) _, err = cache.Create("test2", 1)
if err != nil { if err != ErrCacheFull {
t.Errorf("Test case 7 expected to pass, failed instead %s", err) t.Errorf("Test case 7 expected to pass, failed instead %s", err)
} }
// Write '1' byte.
w.Write([]byte("H")) // Test 8 validates rejecting Writes which write excess data.
if err = w.Close(); err != testCase.closeErr { testCase = testCases[7]
t.Errorf("Test case 7 expected to fail, passed instead") cache = New(testCase.cacheSize, testCase.expiry)
w, err = cache.Create("test1", 5)
if err != nil {
t.Errorf("Test case 8 expected to pass, failed instead %s", err)
}
// Write '5' bytes.
n, err := w.Write([]byte("Hello"))
if err != nil {
t.Errorf("Test case 8 expected to pass, failed instead %s", err)
}
if n != 5 {
t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n)
}
// Write '1' more byte, should return error.
n, err = w.Write([]byte("W"))
if n == 0 && err != testCase.err {
t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err)
} }
} }