Deprecate and remove in-memory object caching (#5481)

in-memory caching cannot be cleanly implemented
without the access to GC which Go doesn't naturally
provide. At times we have seen that object caching
is more of an hindrance rather than a boon for
our use cases.

Removing it completely from our implementation
  related to #5160 and #5182
This commit is contained in:
Harshavardhana
2018-02-02 10:17:13 -08:00
committed by kannappanr
parent 1ebbc2ce88
commit 0c880bb852
15 changed files with 4 additions and 1022 deletions

View File

@@ -27,7 +27,6 @@ import (
"github.com/minio/minio/pkg/errors"
"github.com/minio/minio/pkg/hash"
"github.com/minio/minio/pkg/mimedb"
"github.com/minio/minio/pkg/objcache"
)
// list all errors which can be ignored in object operations.
@@ -289,52 +288,6 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i
return errors.Trace(InvalidRange{startOffset, length, xlMeta.Stat.Size})
}
// Save the writer.
mw := writer
// Object cache enabled block.
if xlMeta.Stat.Size > 0 && xl.objCacheEnabled {
// Validate if we have previous cache.
var cachedBuffer io.ReaderAt
cachedBuffer, err = xl.objCache.Open(path.Join(bucket, object), modTime)
if err == nil { // Cache hit
// Create a new section reader, starting at an offset with length.
reader := io.NewSectionReader(cachedBuffer, startOffset, length)
// Copy the data out.
if _, err = io.Copy(writer, reader); err != nil {
return errors.Trace(err)
}
// Success.
return nil
} // Cache miss.
// For unknown error, return and error out.
if err != objcache.ErrKeyNotFoundInCache {
return errors.Trace(err)
} // Cache has not been found, fill the cache.
// Cache is only set if whole object is being read.
if startOffset == 0 && length == xlMeta.Stat.Size {
// Proceed to set the cache.
var newBuffer io.WriteCloser
// Create a new entry in memory of length.
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length)
if err == nil {
// Create a multi writer to write to both memory and client response.
mw = io.MultiWriter(newBuffer, writer)
defer newBuffer.Close()
}
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull {
// For any other error return here.
return toObjectErr(errors.Trace(err), bucket, object)
}
}
}
var totalBytesRead int64
storage, err := NewErasureStorage(onlineDisks, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xlMeta.Erasure.BlockSize)
if err != nil {
@@ -366,7 +319,7 @@ func (xl xlObjects) getObject(bucket, object string, startOffset int64, length i
checksums[index] = checksumInfo.Hash
}
file, err := storage.ReadFile(mw, bucket, pathJoin(object, partName), partOffset, readSize, partSize, checksums, algorithm, xlMeta.Erasure.BlockSize)
file, err := storage.ReadFile(writer, bucket, pathJoin(object, partName), partOffset, readSize, partSize, checksums, algorithm, xlMeta.Erasure.BlockSize)
if err != nil {
return toObjectErr(err, bucket, object)
}
@@ -663,27 +616,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
// Limit the reader to its provided size if specified.
var reader io.Reader = data
// Proceed to set the cache.
var newBuffer io.WriteCloser
// If caching is enabled, proceed to set the cache.
if data.Size() > 0 && xl.objCacheEnabled {
// PutObject invalidates any previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
// Create a new entry in memory of size.
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), data.Size())
if err == nil {
// Cache incoming data into a buffer
reader = io.TeeReader(data, newBuffer)
} else {
// Return errors other than ErrCacheFull
if err != objcache.ErrCacheFull {
return ObjectInfo{}, toObjectErr(errors.Trace(err), bucket, object)
}
}
}
// Initialize parts metadata
partsMetadata := make([]xlMetaV1, len(xl.storageDisks))
@@ -811,12 +743,6 @@ func (xl xlObjects) putObject(bucket string, object string, data *hash.Reader, m
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Once we have successfully renamed the object, Close the buffer which would
// save the object on cache.
if sizeWritten > 0 && xl.objCacheEnabled && newBuffer != nil {
newBuffer.Close()
}
// Object info is the same in all disks, so we can pick the first meta
// of the first disk
xlMeta = partsMetadata[0]
@@ -916,11 +842,6 @@ func (xl xlObjects) DeleteObject(bucket, object string) (err error) {
return toObjectErr(err, bucket, object)
}
if xl.objCacheEnabled {
// Delete from the cache.
xl.objCache.Delete(pathJoin(bucket, object))
}
// Success.
return nil
}