Revert "fix: remove persistence layer for metacache store in memory (#11538)"

This reverts commit b23659927cb40a864556da5082ecb0c811ad714e.
This commit is contained in:
Harshavardhana 2021-02-24 22:24:38 -08:00
parent ca5c6e3160
commit a8e4f64ff3
14 changed files with 601 additions and 679 deletions

View File

@ -857,6 +857,7 @@ func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, pre
ri := logger.GetReqInfo(ctx)
if ri != nil && strings.Contains(ri.UserAgent, `1.0 Veeam/1.0 Backup`) && strings.HasSuffix(prefix, ".blk") {
opts.discardResult = true
opts.Transient = true
}
merged, err := z.listPath(ctx, opts)
@ -1214,6 +1215,31 @@ func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, fo
return nil
}
// deleteAll will delete a bucket+prefix unconditionally across all disks.
// Note that set distribution is ignored so it should only be used in cases where
// data is not distributed across sets.
// Errors are logged but individual disk failures are not returned.
func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) {
for _, servers := range z.serverPools {
for _, set := range servers.sets {
set.deleteAll(ctx, bucket, prefix)
}
}
}
// renameAll will rename bucket+prefix unconditionally across all disks to
// minioMetaTmpBucket + unique uuid,
// Note that set distribution is ignored so it should only be used in cases where
// data is not distributed across sets. Errors are logged but individual
// disk failures are not returned.
func (z *erasureServerPools) renameAll(ctx context.Context, bucket, prefix string) {
for _, servers := range z.serverPools {
for _, set := range servers.sets {
set.renameAll(ctx, bucket, prefix)
}
}
}
// This function is used to undo a successful DeleteBucket operation.
func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) {
g := errgroup.WithNErrs(len(serverPools))

View File

@ -29,7 +29,6 @@ import (
"time"
"github.com/dchest/siphash"
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
@ -38,7 +37,6 @@ import (
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/objcache"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -368,11 +366,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
// setCount * setDriveCount with each memory upto blockSizeV1.
bp := bpool.NewBytePoolCap(n, blockSizeV1, blockSizeV1*2)
mcache, err := objcache.New(1*humanize.GiByte, objcache.DefaultExpiry)
if err != nil {
return nil, err
}
for i := 0; i < setCount; i++ {
s.erasureDisks[i] = make([]StorageAPI, setDriveCount)
}
@ -419,7 +412,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
getEndpoints: s.GetEndpoints(i),
nsMutex: mutex,
bp: bp,
metaCache: mcache,
mrfOpCh: make(chan partialOperation, 10000),
}
}

View File

@ -30,7 +30,6 @@ import (
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/dsync"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/objcache"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -71,9 +70,6 @@ type erasureObjects struct {
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
// holds current list cache.
metaCache *objcache.Cache
mrfOpCh chan partialOperation
}

View File

@ -17,6 +17,11 @@
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"path"
"runtime/debug"
"sort"
@ -24,8 +29,11 @@ import (
"sync"
"time"
"github.com/klauspost/compress/s2"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/hash"
"github.com/tinylib/msgp/msgp"
)
//go:generate msgp -file $GOFILE -unexported
@ -49,7 +57,16 @@ type bucketMetacache struct {
// newBucketMetacache creates a new bucketMetacache.
// Optionally remove all existing caches.
func newBucketMetacache(bucket string) *bucketMetacache {
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
if cleanup {
// Recursively delete all caches.
objAPI := newObjectLayerFn()
ez, ok := objAPI.(*erasureServerPools)
if ok {
ctx := context.Background()
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
}
}
return &bucketMetacache{
bucket: bucket,
caches: make(map[string]metacache, 10),
@ -63,6 +80,111 @@ func (b *bucketMetacache) debugf(format string, data ...interface{}) {
}
}
// loadBucketMetaCache will load the cache from the object layer.
// If the cache cannot be found a new one is created.
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
objAPI := newObjectLayerFn()
for objAPI == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
time.Sleep(250 * time.Millisecond)
}
objAPI = newObjectLayerFn()
if objAPI == nil {
logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
}
}
var meta bucketMetacache
var decErr error
// Use global context for this.
r, err := objAPI.GetObjectNInfo(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), nil, http.Header{}, readLock, ObjectOptions{})
if err == nil {
dec := s2DecPool.Get().(*s2.Reader)
dec.Reset(r)
decErr = meta.DecodeMsg(msgp.NewReader(dec))
dec.Reset(nil)
r.Close()
s2DecPool.Put(dec)
}
if err != nil {
switch err.(type) {
case ObjectNotFound:
err = nil
case InsufficientReadQuorum:
// Cache is likely lost. Clean up and return new.
return newBucketMetacache(bucket, true), nil
default:
logger.LogIf(ctx, err)
}
return newBucketMetacache(bucket, false), err
}
if decErr != nil {
if errors.Is(err, context.Canceled) {
return newBucketMetacache(bucket, false), err
}
// Log the error, but assume the data is lost and return a fresh bucket.
// Otherwise a broken cache will never recover.
logger.LogIf(ctx, decErr)
return newBucketMetacache(bucket, true), nil
}
// Sanity check...
if meta.bucket != bucket {
logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
return newBucketMetacache(bucket, true), nil
}
meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
// Index roots
for id, cache := range meta.caches {
meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
}
return &meta, nil
}
// save the bucket cache to the object storage.
func (b *bucketMetacache) save(ctx context.Context) error {
if b.transient {
return nil
}
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
// Keep lock while we marshal.
// We need a write lock since we update 'updated'
b.mu.Lock()
if !b.updated {
b.mu.Unlock()
return nil
}
// Save as s2 compressed msgpack
tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
enc := s2.NewWriter(tmp)
err := msgp.Encode(enc, b)
if err != nil {
b.mu.Unlock()
return err
}
err = enc.Close()
if err != nil {
b.mu.Unlock()
return err
}
b.updated = false
b.mu.Unlock()
hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()))
if err != nil {
return err
}
_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr), ObjectOptions{})
logger.LogIf(ctx, err)
return err
}
// findCache will attempt to find a matching cache for the provided options.
// If a cache with the same ID exists already it will be returned.
// If none can be found a new is created with the provided ID.
@ -326,6 +448,41 @@ func (b *bucketMetacache) getCache(id string) *metacache {
return &c
}
// deleteAll will delete all on disk data for ALL caches.
// Deletes are performed concurrently.
func (b *bucketMetacache) deleteAll() {
ctx := context.Background()
ez, ok := newObjectLayerFn().(*erasureServerPools)
if !ok {
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools"))
return
}
b.mu.Lock()
defer b.mu.Unlock()
b.updated = true
if !b.transient {
// Delete all.
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
b.caches = make(map[string]metacache, 10)
b.cachesRoot = make(map[string][]string, 10)
return
}
// Transient are in different buckets.
var wg sync.WaitGroup
for id := range b.caches {
wg.Add(1)
go func(cache metacache) {
defer wg.Done()
ez.renameAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
}(b.caches[id])
}
wg.Wait()
b.caches = make(map[string]metacache, 10)
}
// deleteCache will delete a specific cache and all files related to it across the cluster.
func (b *bucketMetacache) deleteCache(id string) {
b.mu.Lock()
@ -344,4 +501,7 @@ func (b *bucketMetacache) deleteCache(id string) {
b.updated = true
}
b.mu.Unlock()
if ok {
c.delete(context.Background())
}
}

View File

@ -6,7 +6,7 @@ import (
)
func Benchmark_bucketMetacache_findCache(b *testing.B) {
bm := newBucketMetacache("")
bm := newBucketMetacache("", false)
const elements = 50000
const paths = 100
if elements%paths != 0 {

View File

@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"
@ -41,13 +42,25 @@ type metacacheManager struct {
trash map[string]metacache // Recently deleted lists.
}
const metacacheManagerTransientBucket = "**transient**"
const metacacheMaxEntries = 5000
// initManager will start async saving the cache.
func (m *metacacheManager) initManager() {
// Add a transient bucket.
tb := newBucketMetacache(metacacheManagerTransientBucket, false)
tb.transient = true
m.buckets[metacacheManagerTransientBucket] = tb
// Start saver when object layer is ready.
go func() {
objAPI := newObjectLayerFn()
for objAPI == nil {
time.Sleep(time.Second)
objAPI = newObjectLayerFn()
}
if !globalIsErasure {
logger.Info("metacacheManager was initialized in non-erasure mode, skipping save")
return
}
@ -55,6 +68,7 @@ func (m *metacacheManager) initManager() {
defer t.Stop()
var exit bool
bg := context.Background()
for !exit {
select {
case <-t.C:
@ -66,11 +80,13 @@ func (m *metacacheManager) initManager() {
if !exit {
v.cleanup()
}
logger.LogIf(bg, v.save(bg))
}
m.mu.RUnlock()
m.mu.Lock()
for k, v := range m.trash {
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
v.delete(context.Background())
delete(m.trash, k)
}
}
@ -81,6 +97,9 @@ func (m *metacacheManager) initManager() {
// findCache will get a metacache.
func (m *metacacheManager) findCache(ctx context.Context, o listPathOptions) metacache {
if o.Transient || isReservedOrInvalidBucket(o.Bucket, false) {
return m.getTransient().findCache(o)
}
m.mu.RLock()
b, ok := m.buckets[o.Bucket]
if ok {
@ -117,6 +136,10 @@ func (m *metacacheManager) updateCacheEntry(update metacache) (metacache, error)
func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucketMetacache {
m.init.Do(m.initManager)
// Return a transient bucket for invalid or system buckets.
if isReservedOrInvalidBucket(bucket, false) {
return m.getTransient()
}
m.mu.RLock()
b, ok := m.buckets[bucket]
m.mu.RUnlock()
@ -140,7 +163,16 @@ func (m *metacacheManager) getBucket(ctx context.Context, bucket string) *bucket
return b
}
b = newBucketMetacache(bucket)
// Load bucket. If we fail return the transient bucket.
b, err := loadBucketMetaCache(ctx, bucket)
if err != nil {
m.mu.Unlock()
logger.LogIf(ctx, err)
return m.getTransient()
}
if b.bucket != bucket {
logger.LogIf(ctx, fmt.Errorf("getBucket: loaded bucket %s does not match this bucket %s", b.bucket, bucket))
}
m.buckets[bucket] = b
m.mu.Unlock()
return b
@ -163,6 +195,10 @@ func (m *metacacheManager) deleteBucketCache(bucket string) {
b.mu.Lock()
defer b.mu.Unlock()
for k, v := range b.caches {
if time.Since(v.lastUpdate) > metacacheMaxRunningAge {
v.delete(context.Background())
continue
}
v.error = "Bucket deleted"
v.status = scanStateError
m.mu.Lock()
@ -176,7 +212,59 @@ func (m *metacacheManager) deleteAll() {
m.init.Do(m.initManager)
m.mu.Lock()
defer m.mu.Unlock()
for bucket := range m.buckets {
delete(m.buckets, bucket)
for bucket, b := range m.buckets {
b.deleteAll()
if !b.transient {
delete(m.buckets, bucket)
}
}
}
// getTransient will return a transient bucket.
func (m *metacacheManager) getTransient() *bucketMetacache {
m.init.Do(m.initManager)
m.mu.RLock()
bmc := m.buckets[metacacheManagerTransientBucket]
m.mu.RUnlock()
return bmc
}
// checkMetacacheState should be used if data is not updating.
// Should only be called if a failure occurred.
func (o listPathOptions) checkMetacacheState(ctx context.Context, rpc *peerRESTClient) error {
// We operate on a copy...
o.Create = false
var cache metacache
if rpc == nil || o.Transient {
cache = localMetacacheMgr.findCache(ctx, o)
} else {
c, err := rpc.GetMetacacheListing(ctx, o)
if err != nil {
return err
}
cache = *c
}
if cache.status == scanStateNone || cache.fileNotFound {
return errFileNotFound
}
if cache.status == scanStateSuccess || cache.status == scanStateStarted {
if time.Since(cache.lastUpdate) > metacacheMaxRunningAge {
// We got a stale entry, mark error on handling server.
err := fmt.Errorf("timeout: list %s not updated", cache.id)
cache.error = err.Error()
cache.status = scanStateError
if rpc == nil || o.Transient {
localMetacacheMgr.updateCacheEntry(cache)
} else {
rpc.UpdateMetacacheListing(ctx, cache)
}
return err
}
return nil
}
if cache.error != "" {
return fmt.Errorf("async cache listing failed with: %s", cache.error)
}
return nil
}

View File

@ -116,6 +116,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
// will be generated due to the marker without ID and this check failing.
if o.Limit < 10 && o.Marker == "" && o.Create && o.Recursive {
o.discardResult = true
o.Transient = true
}
var cache metacache
@ -126,13 +127,12 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
var cache metacache
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
if isReservedOrInvalidBucket(o.Bucket, false) {
// discard all list caches for reserved buckets.
o.discardResult = true
rpc = nil
o.Transient = true
}
// Apply prefix filter if enabled.
o.SetFilter()
if rpc == nil {
if rpc == nil || o.Transient {
// Local
cache = localMetacacheMgr.findCache(ctx, o)
} else {
@ -148,6 +148,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (e
if !errors.Is(err, context.DeadlineExceeded) {
logger.LogIf(ctx, err)
}
o.Transient = true
cache = localMetacacheMgr.findCache(ctx, o)
} else {
cache = *c

View File

@ -17,18 +17,23 @@
package cmd
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/hash"
)
type listPathOptions struct {
@ -86,6 +91,11 @@ type listPathOptions struct {
// Include pure directories.
IncludeDirectories bool
// Transient is set if the cache is transient due to an error or being a reserved bucket.
// This means the cache metadata will not be persisted on disk.
// A transient result will never be returned from the cache so knowing the list id is required.
Transient bool
// discardResult will not persist the cache to storage.
// When the initial results are returned listing will be canceled.
discardResult bool
@ -186,14 +196,74 @@ func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCa
}
}
// findFirstPart will find the part with 0 being the first that corresponds to the marker in the options.
// io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet.
// io.EOF indicates the marker is beyond the end of the stream and does not exist.
func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
search := o.Marker
if search == "" {
search = o.Prefix
}
if search == "" {
return 0, nil
}
o.debugln("searching for ", search)
var tmp metacacheBlock
var json = jsoniter.ConfigCompatibleWithStandardLibrary
i := 0
for {
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i)
v, ok := fi.Metadata[partKey]
if !ok {
o.debugln("no match in metadata, waiting")
return -1, io.ErrUnexpectedEOF
}
err := json.Unmarshal([]byte(v), &tmp)
if !ok {
logger.LogIf(context.Background(), err)
return -1, err
}
if tmp.First == "" && tmp.Last == "" && tmp.EOS {
return 0, errFileNotFound
}
if tmp.First >= search {
o.debugln("First >= search", v)
return i, nil
}
if tmp.Last >= search {
o.debugln("Last >= search", v)
return i, nil
}
if tmp.EOS {
o.debugln("no match, at EOS", v)
return -3, io.EOF
}
o.debugln("First ", tmp.First, "<", search, " search", i)
i++
}
}
// updateMetacacheListing will update the metacache listing.
func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) {
if o.Transient {
return localMetacacheMgr.getTransient().updateCacheEntry(m)
}
if rpc == nil {
return localMetacacheMgr.updateCacheEntry(m)
}
return rpc.UpdateMetacacheListing(context.Background(), m)
}
func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) {
var tmp metacacheBlock
partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block)
v, ok := fi.Metadata[partKey]
if !ok {
return nil, io.ErrUnexpectedEOF
}
return &tmp, json.Unmarshal([]byte(v), &tmp)
}
const metacachePrefix = ".metacache"
func metacachePrefixForID(bucket, id string) string {
@ -201,8 +271,8 @@ func metacachePrefixForID(bucket, id string) string {
}
// objectPath returns the object path of the cache.
func (o *listPathOptions) objectPath() string {
return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block.s2")
func (o *listPathOptions) objectPath(block int) string {
return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2")
}
func (o *listPathOptions) SetFilter() {
@ -287,33 +357,187 @@ func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSor
}
func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
r, err := er.metaCache.Open(pathJoin(minioMetaBucket, o.objectPath()), time.Now().Add(-time.Hour))
if err != nil {
return entries, io.EOF
}
retries := 0
rpc := globalNotificationSys.restClientFromHash(o.Bucket)
tmp, err := newMetacacheReader(r)
if err != nil {
return entries, err
}
for {
select {
case <-ctx.Done():
return entries, ctx.Err()
default:
}
e, err := tmp.filter(o)
entries.o = append(entries.o, e.o...)
if o.Limit > 0 && entries.len() > o.Limit {
entries.truncate(o.Limit)
return entries, nil
}
// If many failures, check the cache state.
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = 1
}
if err == nil {
// We stopped within the listing, we are done for now...
return entries, nil
}
const retryDelay = 500 * time.Millisecond
// Load first part metadata...
// All operations are performed without locks, so we must be careful and allow for failures.
// Read metadata associated with the object from a disk.
if retries > 0 {
disks := er.getOnlineDisks()
if len(disks) == 0 {
time.Sleep(retryDelay)
retries++
continue
}
if !errors.Is(err, io.EOF) {
logger.LogIf(ctx, err)
}
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
return entries, err
// Read metadata associated with the object from all disks.
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{}, true)
if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
return entries, fmt.Errorf("reading first part metadata: %w", err)
}
}
partN, err := o.findFirstPart(fi)
switch {
case err == nil:
case errors.Is(err, io.ErrUnexpectedEOF):
if retries == 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("remote listing canceled: %w", err)
}
retries = -1
}
retries++
time.Sleep(retryDelay)
continue
case errors.Is(err, io.EOF):
return entries, io.EOF
}
// We got a stream to start at.
loadedPart := 0
buf := bufferPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
bufferPool.Put(buf)
}()
for {
select {
case <-ctx.Done():
return entries, ctx.Err()
default:
}
if partN != loadedPart {
if retries > 10 {
err := o.checkMetacacheState(ctx, rpc)
if err != nil {
return entries, fmt.Errorf("waiting for next part %d: %w", partN, err)
}
retries = 1
}
if retries > 0 {
// Load from one disk only
disks := er.getOnlineDisks()
if len(disks) == 0 {
time.Sleep(retryDelay)
retries++
continue
}
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
}
// Load first part metadata...
fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{}, true)
if err != nil {
time.Sleep(retryDelay)
retries++
continue
}
loadedPart = partN
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err == nil {
if bi.pastPrefix(o.Prefix) {
return entries, io.EOF
}
}
}
buf.Reset()
err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, buf, fi, metaArr, onlineDisks)
if err != nil {
switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
case ObjectNotFound:
retries++
time.Sleep(retryDelay)
continue
case InsufficientReadQuorum:
retries++
time.Sleep(retryDelay)
continue
default:
logger.LogIf(ctx, err)
return entries, err
}
}
tmp, err := newMetacacheReader(buf)
if err != nil {
return entries, err
}
e, err := tmp.filter(o)
entries.o = append(entries.o, e.o...)
if o.Limit > 0 && entries.len() > o.Limit {
entries.truncate(o.Limit)
return entries, nil
}
if err == nil {
// We stopped within the listing, we are done for now...
return entries, nil
}
if !errors.Is(err, io.EOF) {
logger.LogIf(ctx, err)
return entries, err
}
// We finished at the end of the block.
// And should not expect any more results.
bi, err := getMetacacheBlockInfo(fi, partN)
logger.LogIf(ctx, err)
if err != nil || bi.EOS {
// We are done and there are no more parts.
return entries, io.EOF
}
if bi.endedPrefix(o.Prefix) {
// Nothing more for prefix.
return entries, io.EOF
}
partN++
retries = 0
}
}
}
// Will return io.EOF if continuing would not yield more results.
@ -434,14 +658,28 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
}
}()
wc := er.metaCache.Create(pathJoin(minioMetaBucket, o.objectPath()))
const retryDelay = 200 * time.Millisecond
const maxTries = 5
var bw *metacacheBlockWriter
// Don't save single object listings.
if !o.discardResult {
// Write results to disk.
bw = newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
n, err := wc.Write(b.data)
// if the block is 0 bytes and its a first block skip it.
// skip only this for Transient caches.
if len(b.data) == 0 && b.n == 0 && o.Transient {
return nil
}
o.debugln(color.Green("listPath:")+" saving block", b.n, "to", o.objectPath(b.n))
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
ParentIsObject: nil,
})
if err != nil {
metaMu.Lock()
if meta.error != "" {
@ -452,17 +690,29 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
cancel()
return err
}
if n != len(b.data) {
metaMu.Lock()
if meta.error != "" {
meta.status = scanStateError
meta.error = io.ErrShortWrite.Error()
}
metaMu.Unlock()
cancel()
return io.ErrShortWrite
if b.n == 0 {
return nil
}
// Update block 0 metadata.
var retries int
for {
err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
if err == nil {
break
}
switch err.(type) {
case ObjectNotFound:
return err
case InsufficientReadQuorum:
default:
logger.LogIf(ctx, err)
}
if retries >= maxTries {
return err
}
retries++
time.Sleep(retryDelay)
}
o.debugln(color.Green("listPath:")+" saving block to", o.objectPath())
return nil
})
}
@ -522,13 +772,6 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entr
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
if err := wc.Close(); err != nil {
metaMu.Lock()
meta.error = err.Error()
meta.status = scanStateError
meta, _ = o.updateMetacacheListing(meta, rpc)
metaMu.Unlock()
}
}
}()

View File

@ -17,9 +17,14 @@
package cmd
import (
"context"
"errors"
"fmt"
"path"
"strings"
"time"
"github.com/minio/minio/cmd/logger"
)
type scanStatus uint8
@ -226,3 +231,21 @@ func (m *metacache) update(update metacache) {
}
m.fileNotFound = m.fileNotFound || update.fileNotFound
}
// delete all cache data on disks.
func (m *metacache) delete(ctx context.Context) {
if m.bucket == "" || m.id == "" {
logger.LogIf(ctx, fmt.Errorf("metacache.delete: bucket (%s) or id (%s) empty", m.bucket, m.id))
}
objAPI := newObjectLayerFn()
if objAPI == nil {
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
return
}
ez, ok := objAPI.(*erasureServerPools)
if !ok {
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
return
}
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
}

3
go.mod
View File

@ -70,6 +70,7 @@ require (
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/procfs v0.2.0
github.com/quasilyte/go-ruleguard v0.2.1 // indirect
github.com/rjeczalik/notify v0.9.2
github.com/rs/cors v1.7.0
github.com/secure-io/sio-go v0.3.1
@ -80,7 +81,7 @@ require (
github.com/tidwall/gjson v1.6.7
github.com/tidwall/sjson v1.0.4
github.com/tinylib/msgp v1.1.3
github.com/valyala/bytebufferpool v1.0.0
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 // indirect
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
github.com/willf/bitset v1.1.11 // indirect
github.com/willf/bloom v2.0.3+incompatible

9
go.sum
View File

@ -527,6 +527,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4=
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/quasilyte/go-ruleguard v0.2.1 h1:56eRm0daAyny9UhJnmtJW/UyLZQusukBAB8oT8AHKHo=
github.com/quasilyte/go-ruleguard v0.2.1/go.mod h1:hN2rVc/uS4bQhQKTio2XaSJSafJwqBUWWwtssT3cQmc=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
@ -595,11 +597,11 @@ github.com/tinylib/msgp v1.1.3 h1:3giwAkmtaEDLSV0MdO1lDLuPgklgPzmk8H9+So2BVfA=
github.com/tinylib/msgp v1.1.3/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31 h1:OXcKh35JaYsGMRzpvFkLv/MEyPuL49CThT1pZ8aSml4=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a h1:0R4NLDRDZX6JcmhJgXi5E4b8Wg84ihbmUKp/GvSPEzc=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE=
@ -612,6 +614,7 @@ github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
@ -703,6 +706,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -774,6 +778,7 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=

View File

@ -1,270 +0,0 @@
/*
* Minio Cloud Storage, (C) 2021 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package objcache implements in memory caching methods.
package objcache
import (
"bytes"
"errors"
"io"
"sync"
"time"
"github.com/valyala/bytebufferpool"
)
const (
// NoExpiry represents caches to be permanent and can only be deleted.
NoExpiry = time.Duration(0)
// DefaultExpiry represents 1 hour time duration when all entries shall be expired.
DefaultExpiry = time.Hour
// defaultBufferRatio represents default ratio used to calculate the
// individual cache entry buffer size.
defaultBufferRatio = uint64(10)
)
var (
// ErrKeyNotFoundInCache - key not found in cache.
ErrKeyNotFoundInCache = errors.New("Key not found in cache")
// ErrCacheFull - cache is full.
ErrCacheFull = errors.New("Not enough space in cache")
// ErrExcessData - excess data was attempted to be written on cache.
ErrExcessData = errors.New("Attempted excess write on cache")
)
// buffer represents the in memory cache of a single entry.
// buffer carries value of the data and last accessed time.
type buffer struct {
buf *bytebufferpool.ByteBuffer
lastAccessed time.Time // Represents time when value was last accessed.
}
// Cache holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize.
type Cache struct {
// Mutex is used for handling the concurrent
// read/write requests for cache
mutex sync.Mutex
// Once is used for resetting GC once after
// peak cache usage.
onceGC sync.Once
// maxSize is a total size for overall cache
maxSize uint64
// maxCacheEntrySize is a total size per key buffer.
maxCacheEntrySize uint64
// currentSize is a current size in memory
currentSize uint64
// OnEviction - callback function for eviction
OnEviction func(key string)
// totalEvicted counter to keep track of total expirys
totalEvicted int
// map of cached keys and its values
entries map[string]*buffer
// Expiry in time duration.
expiry time.Duration
// Stop garbage collection routine, stops any running GC routine.
stopGC chan struct{}
}
// New - Return a new cache with a given default expiry
// duration. If the expiry duration is less than one
// (or NoExpiry), the items in the cache never expire
// (by default), and must be deleted manually.
func New(maxSize uint64, expiry time.Duration) (c *Cache, err error) {
if maxSize == 0 {
err = errors.New("invalid maximum cache size")
return c, err
}
// Max cache entry size - indicates the
// maximum buffer per key that can be held in
// memory. Currently this value is 1/10th
// the size of requested cache size.
maxCacheEntrySize := func() uint64 {
i := maxSize / defaultBufferRatio
if i == 0 {
i = maxSize
}
return i
}()
c = &Cache{
onceGC: sync.Once{},
maxSize: maxSize,
maxCacheEntrySize: maxCacheEntrySize,
entries: make(map[string]*buffer),
expiry: expiry,
}
// We have expiry start the janitor routine.
if expiry > 0 {
// Initialize a new stop GC channel.
c.stopGC = make(chan struct{})
// Start garbage collection routine to expire objects.
c.StartGC()
}
return c, nil
}
// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser
// to which object contents can be written and finally Close()'d. During Close() we
// checks if the amount of data written is equal to the size of the object, in which
// case it saves the contents to object cache.
func (c *Cache) Create(key string) (wc io.WriteCloser) {
buf := bytebufferpool.Get()
// Function called on close which saves the object contents
// to the object cache.
onClose := func() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if buf.Len() == 0 {
buf.Reset()
bytebufferpool.Put(buf)
// If nothing is written in the buffer
// the key is not stored.
return nil
}
if uint64(buf.Len()) > c.maxCacheEntrySize {
buf.Reset()
bytebufferpool.Put(buf)
return ErrCacheFull
}
// Full object available in buf, save it to cache.
c.entries[key] = &buffer{
buf: buf,
lastAccessed: time.Now().UTC(), // Save last accessed time.
}
// Account for the memory allocated above.
c.currentSize += uint64(buf.Len())
return nil
}
return &writeCloser{ByteBuffer: buf, onClose: onClose}
}
// Open - open the in-memory file, returns an in memory reader.
// returns an error ErrKeyNotFoundInCache, if the key does not
// exist. ErrKeyNotFoundInCache is also returned if lastAccessed
// is older than input atime.
func (c *Cache) Open(key string, atime time.Time) (io.Reader, error) {
// Entry exists, return the readable buffer.
c.mutex.Lock()
defer c.mutex.Unlock()
b, ok := c.entries[key]
if !ok {
return nil, ErrKeyNotFoundInCache
}
// Check if buf was recently accessed.
if b.lastAccessed.Before(atime) {
c.delete(key)
return nil, ErrKeyNotFoundInCache
}
b.lastAccessed = time.Now()
return bytes.NewReader(b.buf.Bytes()), nil
}
// Delete - delete deletes an entry from the cache.
func (c *Cache) Delete(key string) {
c.mutex.Lock()
c.delete(key)
c.mutex.Unlock()
if c.OnEviction != nil {
c.OnEviction(key)
}
}
// gc - garbage collect all the expired entries from the cache.
func (c *Cache) gc() {
var evictedEntries []string
c.mutex.Lock()
for k, v := range c.entries {
if c.expiry > 0 && time.Now().UTC().Sub(v.lastAccessed) > c.expiry {
c.delete(k)
evictedEntries = append(evictedEntries, k)
}
}
c.mutex.Unlock()
for _, k := range evictedEntries {
if c.OnEviction != nil {
c.OnEviction(k)
}
}
}
// StopGC sends a message to the expiry routine to stop
// expiring cached entries. NOTE: once this is called, cached
// entries will not be expired, be careful if you are using this.
func (c *Cache) StopGC() {
if c.stopGC != nil {
c.stopGC <- struct{}{}
}
}
// StartGC starts running a routine ticking at expiry interval,
// on each interval this routine does a sweep across the cache
// entries and garbage collects all the expired entries.
func (c *Cache) StartGC() {
go func() {
for {
select {
// Wait till cleanup interval and initiate delete expired entries.
case <-time.After(c.expiry / 4):
c.gc()
// Stop the routine, usually called by the user of object cache during cleanup.
case <-c.stopGC:
return
}
}
}()
}
// Deletes a requested entry from the cache.
func (c *Cache) delete(key string) {
if _, ok := c.entries[key]; ok {
deletedSize := uint64(c.entries[key].buf.Len())
c.entries[key].buf.Reset()
bytebufferpool.Put(c.entries[key].buf)
delete(c.entries, key)
c.currentSize -= deletedSize
c.totalEvicted++
}
}

View File

@ -1,309 +0,0 @@
/*
* Minio Cloud Storage, (C) 2021 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package objcache
import (
"bytes"
"io"
"testing"
"time"
)
// TestObjectCache tests cases of object cache with expiry.
func TestObjExpiry(t *testing.T) {
// Non exhaustive list of all object cache behavior cases.
testCases := []struct {
expiry time.Duration
cacheSize uint64
err error
closeErr error
}{
{
expiry: 100 * time.Millisecond,
cacheSize: 1024,
err: ErrKeyNotFoundInCache,
closeErr: nil,
},
}
// Test case 1 validates running of GC.
testCase := testCases[0]
cache, err := New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
cache.OnEviction = func(key string) {}
w := cache.Create("test")
// Write a byte.
w.Write([]byte("1"))
if err = w.Close(); err != nil {
t.Errorf("Test case 1 expected to pass, failed instead %s", err)
}
// Wait for 500 millisecond.
time.Sleep(500 * time.Millisecond)
// Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry.
fakeObjModTime := time.Time{}
_, err = cache.Open("test", fakeObjModTime)
if err != testCase.err {
t.Errorf("Test case 1 expected %s, got instead %s", testCase.err, err)
}
}
// TestObjCache - tests various cases for object cache behavior.
func TestObjCache(t *testing.T) {
// Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry.
fakeObjModTime := time.Time{}
// Non exhaustive list of all object cache behavior cases.
testCases := []struct {
expiry time.Duration
cacheSize uint64
err error
closeErr error
}{
// Validate if a key is not found in cache and Open fails.
{
expiry: NoExpiry,
cacheSize: 1024,
err: ErrKeyNotFoundInCache,
},
// Validate if cache indicates that it is full and Create fails.
{
expiry: NoExpiry,
cacheSize: 1,
err: ErrCacheFull,
},
// Validate if Create succeeds but Close fails to write to buffer.
{
expiry: NoExpiry,
cacheSize: 2,
closeErr: io.ErrShortBuffer,
},
// Validate that Create and Close succeed, making sure to update the cache.
{
expiry: NoExpiry,
cacheSize: 1024,
},
// Validate that Delete succeeds and Open fails with key not found in cache.
{
expiry: NoExpiry,
cacheSize: 1024,
err: ErrKeyNotFoundInCache,
},
// Validate OnEviction function is called upon entry delete.
{
expiry: NoExpiry,
cacheSize: 1024,
},
// Validate error excess data.
{
expiry: NoExpiry,
cacheSize: 5,
closeErr: ErrExcessData,
},
// Validate error excess data during write.
{
expiry: NoExpiry,
cacheSize: 2048,
err: ErrExcessData,
},
}
// Test 1 validating Open failure.
testCase := testCases[0]
cache, err := New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
_, err = cache.Open("test", fakeObjModTime)
if testCase.err != err {
t.Errorf("Test case 2 expected to pass, failed instead %s", err)
}
// Test 2 validating Create failure.
testCase = testCases[1]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w := cache.Create("test")
if w == nil {
t.Errorf("Test case 2 expected to pass, but returned nil")
}
w.Close()
// Test 3 validating Create succeeds and returns a writer.
// Subsequently we Close() without writing any data, to receive
// `io.ErrShortBuffer`
testCase = testCases[2]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test")
// nothing is stored in the key, upon Close() the buf is discarded.
if err = w.Close(); err != nil {
t.Errorf("Test case 3 expected to pass, failed instead %s", err)
}
// Test 4 validates Create and Close succeeds successfully caching
// the writes.
testCase = testCases[3]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test")
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
r, err := cache.Open("test", fakeObjModTime)
if err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
// Reads everything stored for key "test".
cbytes := make([]byte, 5)
rat := r.(io.ReaderAt)
_, err = rat.ReadAt(cbytes, 0)
if err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
// Validate if read bytes match.
if !bytes.Equal(cbytes, []byte("Hello")) {
t.Errorf("Test case 4 expected to pass. wanted \"Hello\", got %s", string(cbytes))
}
// Test 5 validates Delete succeeds and Open fails with err
testCase = testCases[4]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test")
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 5 expected to pass, failed instead %s", err)
}
// Delete the cache entry.
cache.Delete("test")
_, err = cache.Open("test", fakeObjModTime)
if testCase.err != err {
t.Errorf("Test case 5 expected to pass, failed instead %s", err)
}
// Test 6 validates OnEviction being called upon Delete is being invoked.
testCase = testCases[5]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test")
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 6 expected to pass, failed instead %s", err)
}
var deleteKey string
cache.OnEviction = func(key string) {
deleteKey = key
}
// Delete the cache entry.
cache.Delete("test")
if deleteKey != "test" {
t.Errorf("Test case 6 expected to pass, wanted \"test\", got %s", deleteKey)
}
// Test 7 validates rejecting requests when excess data is being saved.
testCase = testCases[6]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test1")
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 7 expected to pass, failed instead %s", err)
}
w = cache.Create("test2")
// nothing got written, Close() will return success.
if err = w.Close(); err != nil {
t.Errorf("Test case 7 expected to pass, failed instead %s", err)
}
// Test 8 validates rejecting Writes which write excess data.
testCase = testCases[7]
cache, err = New(testCase.cacheSize, testCase.expiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w = cache.Create("test1")
defer w.Close()
// Write '5' bytes.
n, err := w.Write([]byte("Hello"))
if err != nil {
t.Errorf("Test case 8 expected to pass, failed instead %s", err)
}
if n != 5 {
t.Errorf("Test case 8 expected 5 bytes written, instead found %d", n)
}
// Write '1' more byte, should return error.
n, err = w.Write([]byte("W"))
if n == 0 && err != testCase.err {
t.Errorf("Test case 8 expected to fail with ErrExcessData, but failed with %s instead", err)
}
}
// TestStateEntryPurge - tests if objCache purges stale entry and returns ErrKeyNotFoundInCache.
func TestStaleEntryPurge(t *testing.T) {
cache, err := New(1024, NoExpiry)
if err != nil {
t.Fatalf("Unable to create new objcache")
}
w := cache.Create("test")
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case expected to pass, failed instead %s", err)
}
_, err = cache.Open("test", time.Now().AddDate(0, 0, 1).UTC())
if err != ErrKeyNotFoundInCache {
t.Errorf("Test case expected to return ErrKeyNotFoundInCache, instead returned %s", err)
}
}

View File

@ -1,34 +0,0 @@
/*
* Minio Cloud Storage, (C) 2021 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package objcache implements in memory caching methods.
package objcache
import "github.com/valyala/bytebufferpool"
// Is an Closer wrapper for bytebufferpool, upon close
// calls the defined onClose function.
type writeCloser struct {
*bytebufferpool.ByteBuffer
onClose func() error
}
// On close, onClose() is called which checks if all object contents
// have been written so that it can save the buffer to the cache.
func (c writeCloser) Close() (err error) {
return c.onClose()
}