diff --git a/cmd/erasure-createfile.go b/cmd/erasure-createfile.go index 62bd21715..18797ba5f 100644 --- a/cmd/erasure-createfile.go +++ b/cmd/erasure-createfile.go @@ -29,7 +29,7 @@ import ( // all the disks, writes also calculate individual block's checksum // for future bit-rot protection. func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, algo string, writeQuorum int) (bytesWritten int64, checkSums []string, err error) { - // Allocated blockSized buffer for reading. + // Allocated blockSized buffer for reading from incoming stream. buf := make([]byte, blockSize) hashWriters := newHashWriters(len(disks), algo) diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 2f05f027a..0b8c6f47a 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -21,6 +21,7 @@ import ( "errors" "hash" "io" + "sync" "github.com/klauspost/reedsolomon" "github.com/minio/blake2b-simd" @@ -47,13 +48,23 @@ func newHash(algo string) hash.Hash { } } +// Hash buffer pool is a pool of reusable +// buffers used while checksumming a stream. +var hashBufferPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, readSizeV1) + return &b + }, +} + // hashSum calculates the hash of the entire path and returns. func hashSum(disk StorageAPI, volume, path string, writer hash.Hash) ([]byte, error) { - // Allocate staging buffer of 128KiB for copyBuffer. - buf := make([]byte, readSizeV1) + // Fetch staging a new staging buffer from the pool. + bufp := hashBufferPool.Get().(*[]byte) + defer hashBufferPool.Put(bufp) // Copy entire buffer to writer. - if err := copyBuffer(writer, disk, volume, path, buf); err != nil { + if err := copyBuffer(writer, disk, volume, path, *bufp); err != nil { return nil, err } diff --git a/cmd/fs-v1-background-append.go b/cmd/fs-v1-background-append.go index 06f8a2702..83634d680 100644 --- a/cmd/fs-v1-background-append.go +++ b/cmd/fs-v1-background-append.go @@ -129,6 +129,8 @@ func (b *backgroundAppend) abort(uploadID string) { func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID string, info bgAppendPartsInfo) { // Holds the list of parts that is already appended to the "append" file. appendMeta := fsMetaV1{} + // Allocate staging read buffer. + buf := make([]byte, readSizeV1) for { select { case input := <-info.inputCh: @@ -151,7 +153,7 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID } break } - if err := appendPart(disk, bucket, object, uploadID, part); err != nil { + if err := appendPart(disk, bucket, object, uploadID, part, buf); err != nil { disk.DeleteFile(minioMetaTmpBucket, uploadID) appendMeta.Parts = nil input.errCh <- err @@ -183,12 +185,11 @@ func (b *backgroundAppend) appendParts(disk StorageAPI, bucket, object, uploadID // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location // upon complete-multipart-upload. -func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo) error { +func appendPart(disk StorageAPI, bucket, object, uploadID string, part objectPartInfo, buf []byte) error { partPath := pathJoin(bucket, object, uploadID, part.Name) offset := int64(0) totalLeft := part.Size - buf := make([]byte, readSizeV1) for totalLeft > 0 { curLeft := int64(readSizeV1) if totalLeft < readSizeV1 { diff --git a/cmd/globals.go b/cmd/globals.go index 2eccb62f3..0223766b6 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -61,8 +61,10 @@ var ( globalIsDistXL = false // "Is Distributed?" flag. - // Maximum cache size. - globalMaxCacheSize = uint64(maxCacheSize) + // Maximum cache size. Defaults to disabled. + // Caching is enabled only for RAM size > 8GiB. + globalMaxCacheSize = uint64(0) + // Cache expiry. globalCacheExpiry = objcache.DefaultExpiry // Minio local server address (in `host:port` format) diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index ab6192e09..a15672d55 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -253,11 +253,9 @@ func retryFormattingDisks(firstDisk bool, endpoints []*url.URL, storageDisks []S // Print configuration errors. printConfigErrMsg(storageDisks, sErrs, printOnceFn()) case WaitForAll: - console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", - getElapsedTime()) + console.Printf("Initializing data volume for first time. Waiting for other servers to come online (elapsed %s)\n", getElapsedTime()) case WaitForFormatting: - console.Println("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", - getElapsedTime()) + console.Printf("Initializing data volume for first time. Waiting for first server to come online (elapsed %s)\n", getElapsedTime()) } continue } // else We have FS backend now. Check fs format as well now. diff --git a/cmd/server-rlimit-nix.go b/cmd/server-rlimit-nix.go index ff8bf1c65..9c0b73f00 100644 --- a/cmd/server-rlimit-nix.go +++ b/cmd/server-rlimit-nix.go @@ -66,18 +66,18 @@ func setMaxMemory() error { // Validate if rlimit memory is set to lower // than max cache size. Then we should use such value. if uint64(rLimit.Cur) < globalMaxCacheSize { - globalMaxCacheSize = (80 / 100) * uint64(rLimit.Cur) + globalMaxCacheSize = uint64(float64(50*rLimit.Cur) / 100) } // Make sure globalMaxCacheSize is less than RAM size. stats, err := sys.GetStats() if err != nil && err != sys.ErrNotImplemented { - // sys.GetStats() is implemented only on linux. Ignore errors - // from other OSes. return err } - if err == nil && stats.TotalRAM < globalMaxCacheSize { - globalMaxCacheSize = uint64(float64(80*stats.TotalRAM) / 100) + // If TotalRAM is >= minRAMSize we proceed to enable cache. + // cache is always 50% of the totalRAM. + if err == nil && stats.TotalRAM >= minRAMSize { + globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100) } return nil } diff --git a/cmd/server-rlimit-win.go b/cmd/server-rlimit-win.go index 8dd2a6c70..954e0f734 100644 --- a/cmd/server-rlimit-win.go +++ b/cmd/server-rlimit-win.go @@ -18,6 +18,8 @@ package cmd +import "github.com/minio/minio/pkg/sys" + func setMaxOpenFiles() error { // Golang uses Win32 file API (CreateFile, WriteFile, ReadFile, // CloseHandle, etc.), then you don't have a limit on open files @@ -26,6 +28,15 @@ func setMaxOpenFiles() error { } func setMaxMemory() error { - // TODO: explore if Win32 API's provide anything special here. + // Make sure globalMaxCacheSize is less than RAM size. + stats, err := sys.GetStats() + if err != nil && err != sys.ErrNotImplemented { + return err + } + // If TotalRAM is <= minRAMSize we proceed to enable cache. + // cache is always 50% of the totalRAM. + if err == nil && stats.TotalRAM >= minRAMSize { + globalMaxCacheSize = uint64(float64(50*stats.TotalRAM) / 100) + } return nil } diff --git a/cmd/storage-rpc-client.go b/cmd/storage-rpc-client.go index e90bc903d..4ed639c2f 100644 --- a/cmd/storage-rpc-client.go +++ b/cmd/storage-rpc-client.go @@ -17,6 +17,7 @@ package cmd import ( + "bytes" "io" "net" "net/rpc" @@ -366,6 +367,13 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff } }() + defer func() { + if r := recover(); r != nil { + // Recover any panic from allocation, and return error. + err = bytes.ErrTooLarge + } + }() // Do not crash the server. + // Take remote disk offline if the total network errors. // are more than maximum allowable IO error limit. if n.networkIOErrCount > maxAllowedNetworkIOError { @@ -377,10 +385,12 @@ func (n *networkStorage) ReadFile(volume string, path string, offset int64, buff Vol: volume, Path: path, Offset: offset, - Size: len(buffer), + Buffer: buffer, }, &result) + // Copy results to buffer. copy(buffer, result) + // Return length of result, err if any. return int64(len(result)), toStorageErr(err) } diff --git a/cmd/storage-rpc-server-datatypes.go b/cmd/storage-rpc-server-datatypes.go index f0b75d153..8f474feff 100644 --- a/cmd/storage-rpc-server-datatypes.go +++ b/cmd/storage-rpc-server-datatypes.go @@ -57,8 +57,8 @@ type ReadFileArgs struct { // Starting offset to start reading into Buffer. Offset int64 - // Data size read from the path at offset. - Size int + // Data buffer read from the path at offset. + Buffer []byte } // PrepareFileArgs represents append file RPC arguments. diff --git a/cmd/storage-rpc-server.go b/cmd/storage-rpc-server.go index ce20ed5f5..11658a299 100644 --- a/cmd/storage-rpc-server.go +++ b/cmd/storage-rpc-server.go @@ -17,7 +17,6 @@ package cmd import ( - "bytes" "io" "net/rpc" "path" @@ -156,19 +155,12 @@ func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error // ReadFileHandler - read file handler is rpc wrapper to read file. func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) { - defer func() { - if r := recover(); r != nil { - // Recover any panic and return ErrCacheFull. - err = bytes.ErrTooLarge - } - }() // Do not crash the server. if !isRPCTokenValid(args.Token) { return errInvalidToken } - // Allocate the requested buffer from the client. - *reply = make([]byte, args.Size) + var n int64 - n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, *reply) + n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, args.Buffer) // Sending an error over the rpc layer, would cause unmarshalling to fail. In situations // when we have short read i.e `io.ErrUnexpectedEOF` treat it as good condition and copy // the buffer properly. @@ -176,7 +168,7 @@ func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err // Reset to nil as good condition. err = nil } - *reply = (*reply)[0:n] + *reply = args.Buffer[0:n] return err } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 962a10a1f..01746bd94 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -62,6 +62,9 @@ func init() { // Disable printing console messages during tests. color.Output = ioutil.Discard + + // Enable caching. + setMaxMemory() } func prepareFS() (ObjectLayer, string, error) { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index e22c60a33..496e2672f 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -708,17 +708,21 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload destLock := nsMutex.NewNSLock(bucket, object) destLock.Lock() defer func() { - // A new complete multipart upload invalidates any - // previously cached object in memory. - xl.objCache.Delete(path.Join(bucket, object)) + if xl.objCacheEnabled { + // A new complete multipart upload invalidates any + // previously cached object in memory. + xl.objCache.Delete(path.Join(bucket, object)) + } // This lock also protects the cache namespace. destLock.Unlock() - // Prefetch the object from disk by triggering a fake GetObject call - // Unlike a regular single PutObject, multipart PutObject is comes in - // stages and it is harder to cache. - go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + if xl.objCacheEnabled { + // Prefetch the object from disk by triggering a fake GetObject call + // Unlike a regular single PutObject, multipart PutObject is comes in + // stages and it is harder to cache. + go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard) + } }() // Rename if an object already exists to temporary location. diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 71564d839..a8ed0f7fe 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -621,8 +621,10 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) { return toObjectErr(err, bucket, object) } - // Delete from the cache. - xl.objCache.Delete(pathJoin(bucket, object)) + if xl.objCacheEnabled { + // Delete from the cache. + xl.objCache.Delete(pathJoin(bucket, object)) + } // Success. return nil diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 490f9f732..e6affaa23 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" "os" + "runtime/debug" "sort" "strings" "sync" @@ -42,8 +43,9 @@ const ( // Uploads metadata file carries per multipart object metadata. uploadsJSONFile = "uploads.json" - // 8GiB cache by default. - maxCacheSize = 8 * humanize.GiByte + // Represents the minimum required RAM size before + // we enable caching. + minRAMSize = 8 * humanize.GiByte // Maximum erasure blocks. maxErasureBlocks = 16 @@ -92,9 +94,6 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Calculate data and parity blocks. dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2 - // Initialize object cache. - objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) - // Initialize list pool. listPool := newTreeWalkPool(globalLookupTimeout) @@ -103,13 +102,25 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) { // Initialize xl objects. xl := &xlObjects{ - mutex: &sync.Mutex{}, - storageDisks: newStorageDisks, - dataBlocks: dataBlocks, - parityBlocks: parityBlocks, - listPool: listPool, - objCache: objCache, - objCacheEnabled: !objCacheDisabled, + mutex: &sync.Mutex{}, + storageDisks: newStorageDisks, + dataBlocks: dataBlocks, + parityBlocks: parityBlocks, + listPool: listPool, + } + + // Object cache is enabled when _MINIO_CACHE env is missing. + // and cache size is > 0. + xl.objCacheEnabled = !objCacheDisabled && globalMaxCacheSize > 0 + + // Check if object cache is enabled. + if xl.objCacheEnabled { + // Initialize object cache. + objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry) + objCache.OnEviction = func(key string) { + debug.FreeOSMemory() + } + xl.objCache = objCache } // Initialize meta volume, if volume already exists ignores it. diff --git a/pkg/objcache/capped-writer.go b/pkg/objcache/capped-writer.go new file mode 100644 index 000000000..485896e0b --- /dev/null +++ b/pkg/objcache/capped-writer.go @@ -0,0 +1,50 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +// Used for adding entry to the object cache. +// Implements io.WriteCloser +type cappedWriter struct { + offset int64 + cap int64 + buffer []byte + onClose func() error +} + +// Write implements a limited writer, returns error. +// if the writes go beyond allocated size. +func (c *cappedWriter) Write(b []byte) (n int, err error) { + if c.offset+int64(len(b)) > c.cap { + return 0, ErrExcessData + } + n = copy(c.buffer[int(c.offset):int(c.offset)+len(b)], b) + c.offset = c.offset + int64(n) + return n, nil +} + +// Reset relinquishes the allocated underlying buffer. +func (c *cappedWriter) Reset() { + c.buffer = nil +} + +// On close, onClose() is called which checks if all object contents +// have been written so that it can save the buffer to the cache. +func (c cappedWriter) Close() (err error) { + return c.onClose() +} diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go index 11cf3cc88..3a63a68de 100644 --- a/pkg/objcache/objcache.go +++ b/pkg/objcache/objcache.go @@ -22,6 +22,7 @@ import ( "bytes" "errors" "io" + "runtime/debug" "sync" "time" ) @@ -32,6 +33,13 @@ var NoExpiry = time.Duration(0) // DefaultExpiry represents default time duration value when individual entries will be expired. var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs. +// DefaultBufferRatio represents default ratio used to calculate the +// individual cache entry buffer size. +var DefaultBufferRatio = uint64(10) + +// DefaultGCPercent represents default garbage collection target percentage. +var DefaultGCPercent = 20 + // buffer represents the in memory cache of a single entry. // buffer carries value of the data and last accessed time. type buffer struct { @@ -46,9 +54,16 @@ type Cache struct { // read/write requests for cache mutex sync.Mutex + // Once is used for resetting GC once after + // peak cache usage. + onceGC sync.Once + // maxSize is a total size for overall cache maxSize uint64 + // maxCacheEntrySize is a total size per key buffer. + maxCacheEntrySize uint64 + // currentSize is a current size in memory currentSize uint64 @@ -68,27 +83,58 @@ type Cache struct { stopGC chan struct{} } -// New - Return a new cache with a given default expiry duration. -// If the expiry duration is less than one (or NoExpiry), -// the items in the cache never expire (by default), and must be deleted -// manually. +// New - Return a new cache with a given default expiry +// duration. If the expiry duration is less than one +// (or NoExpiry), the items in the cache never expire +// (by default), and must be deleted manually. func New(maxSize uint64, expiry time.Duration) *Cache { if maxSize == 0 { panic("objcache: setting maximum cache size to zero is forbidden.") } - C := &Cache{ - maxSize: maxSize, - entries: make(map[string]*buffer), - expiry: expiry, + + // A garbage collection is triggered when the ratio + // of freshly allocated data to live data remaining + // after the previous collection reaches this percentage. + // + // - https://golang.org/pkg/runtime/debug/#SetGCPercent + // + // This means that by default GC is triggered after + // we've allocated an extra amount of memory proportional + // to the amount already in use. + // + // If gcpercent=100 and we're using 4M, we'll gc again + // when we get to 8M. + // + // Set this value to 20% if caching is enabled. + debug.SetGCPercent(DefaultGCPercent) + + // Max cache entry size - indicates the + // maximum buffer per key that can be held in + // memory. Currently this value is 1/10th + // the size of requested cache size. + maxCacheEntrySize := func() uint64 { + i := maxSize / DefaultBufferRatio + if i == 0 { + i = maxSize + } + return i + }() + c := &Cache{ + onceGC: sync.Once{}, + maxSize: maxSize, + maxCacheEntrySize: maxCacheEntrySize, + entries: make(map[string]*buffer), + expiry: expiry, } // We have expiry start the janitor routine. if expiry > 0 { - C.stopGC = make(chan struct{}) + // Initialize a new stop GC channel. + c.stopGC = make(chan struct{}) // Start garbage collection routine to expire objects. - C.startGC() + c.StartGC() } - return C + return c } // ErrKeyNotFoundInCache - key not found in cache. @@ -100,18 +146,6 @@ var ErrCacheFull = errors.New("Not enough space in cache") // ErrExcessData - excess data was attempted to be written on cache. var ErrExcessData = errors.New("Attempted excess write on cache") -// Used for adding entry to the object cache. Implements io.WriteCloser -type cacheBuffer struct { - *bytes.Buffer // Implements io.Writer - onClose func() error -} - -// On close, onClose() is called which checks if all object contents -// have been written so that it can save the buffer to the cache. -func (c cacheBuffer) Close() (err error) { - return c.onClose() -} - // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser // to which object contents can be written and finally Close()'d. During Close() we // checks if the amount of data written is equal to the size of the object, in which @@ -126,29 +160,46 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { }() // Do not crash the server. valueLen := uint64(size) - // Check if the size of the object is not bigger than the capacity of the cache. - if c.maxSize > 0 && valueLen > c.maxSize { + // Check if the size of the object is > 1/10th the size + // of the cache, if yes then we ignore it. + if valueLen > c.maxCacheEntrySize { return nil, ErrCacheFull } - // Will hold the object contents. - buf := bytes.NewBuffer(make([]byte, 0, size)) + // Check if the incoming size is going to exceed + // the effective cache size, if yes return error + // instead. + c.mutex.Lock() + if c.currentSize+valueLen > c.maxSize { + c.mutex.Unlock() + return nil, ErrCacheFull + } + // Change GC percent if the current cache usage + // is already 75% of the maximum allowed usage. + if c.currentSize > (75 * c.maxSize / 100) { + c.onceGC.Do(func() { debug.SetGCPercent(DefaultGCPercent - 10) }) + } + c.mutex.Unlock() + + cbuf := &cappedWriter{ + offset: 0, + cap: size, + buffer: make([]byte, size), + } // Function called on close which saves the object contents // to the object cache. onClose := func() error { c.mutex.Lock() defer c.mutex.Unlock() - if size != int64(buf.Len()) { + if size != cbuf.offset { + cbuf.Reset() // Reset resets the buffer to be empty. // Full object not available hence do not save buf to object cache. return io.ErrShortBuffer } - if c.maxSize > 0 && c.currentSize+valueLen > c.maxSize { - return ErrExcessData - } // Full object available in buf, save it to cache. c.entries[key] = &buffer{ - value: buf.Bytes(), + value: cbuf.buffer, lastAccessed: time.Now().UTC(), // Save last accessed time. } // Account for the memory allocated above. @@ -156,12 +207,10 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { return nil } - // Object contents that is written - cacheBuffer.Write(data) + // Object contents that is written - cappedWriter.Write(data) // will be accumulated in buf which implements io.Writer. - return cacheBuffer{ - buf, - onClose, - }, nil + cbuf.onClose = onClose + return cbuf, nil } // Open - open the in-memory file, returns an in memory read seeker. @@ -215,17 +264,17 @@ func (c *Cache) gc() { // StopGC sends a message to the expiry routine to stop // expiring cached entries. NOTE: once this is called, cached -// entries will not be expired if the consumer has called this. +// entries will not be expired, be careful if you are using this. func (c *Cache) StopGC() { if c.stopGC != nil { c.stopGC <- struct{}{} } } -// startGC starts running a routine ticking at expiry interval, on each interval -// this routine does a sweep across the cache entries and garbage collects all the -// expired entries. -func (c *Cache) startGC() { +// StartGC starts running a routine ticking at expiry interval, +// on each interval this routine does a sweep across the cache +// entries and garbage collects all the expired entries. +func (c *Cache) StartGC() { go func() { for { select { @@ -242,9 +291,10 @@ func (c *Cache) startGC() { // Deletes a requested entry from the cache. func (c *Cache) delete(key string) { - if buf, ok := c.entries[key]; ok { + if _, ok := c.entries[key]; ok { + deletedSize := uint64(len(c.entries[key].value)) delete(c.entries, key) - c.currentSize -= uint64(len(buf.value)) + c.currentSize -= deletedSize c.totalEvicted++ } } diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go index 31d96eab7..f1eed32d3 100644 --- a/pkg/objcache/objcache_test.go +++ b/pkg/objcache/objcache_test.go @@ -117,6 +117,12 @@ func TestObjCache(t *testing.T) { cacheSize: 5, closeErr: ErrExcessData, }, + // Validate error excess data during write. + { + expiry: NoExpiry, + cacheSize: 2048, + err: ErrExcessData, + }, } // Test 1 validating Open failure. @@ -232,14 +238,30 @@ func TestObjCache(t *testing.T) { if err = w.Close(); err != nil { t.Errorf("Test case 7 expected to pass, failed instead %s", err) } - w, err = cache.Create("test2", 1) - if err != nil { + _, err = cache.Create("test2", 1) + if err != ErrCacheFull { t.Errorf("Test case 7 expected to pass, failed instead %s", err) } - // Write '1' byte. - w.Write([]byte("H")) - if err = w.Close(); err != testCase.closeErr { - t.Errorf("Test case 7 expected to fail, passed instead") + + // Test 8 validates rejecting Writes which write excess data. + testCase = testCases[7] + cache = New(testCase.cacheSize, testCase.expiry) + w, err = cache.Create("test1", 5) + if err != nil { + t.Errorf("Test case 8 expected to pass, failed instead %s", err) + } + // Write '5' bytes. + n, err := w.Write([]byte("Hello")) + if err != nil { + t.Errorf("Test case 8 expected to pass, failed instead %s", err) + } + if n != 5 { + t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n) + } + // Write '1' more byte, should return error. + n, err = w.Write([]byte("W")) + if n == 0 && err != testCase.err { + t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err) } }