mirror of
https://github.com/minio/minio.git
synced 2025-11-29 13:28:17 -05:00
Allow caching based on a configurable number of hits. (#8891)
Co-authored-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2019,2020 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -67,20 +67,16 @@ type cacheObjects struct {
|
||||
cache []*diskCache
|
||||
// file path patterns to exclude from cache
|
||||
exclude []string
|
||||
|
||||
// number of accesses after which to cache an object
|
||||
after int
|
||||
// if true migration is in progress from v1 to v2
|
||||
migrating bool
|
||||
// mutex to protect migration bool
|
||||
migMutex sync.Mutex
|
||||
|
||||
// nsMutex namespace lock
|
||||
nsMutex *nsLockMap
|
||||
|
||||
// Cache stats
|
||||
cacheStats *CacheStats
|
||||
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
NewNSLockFn func(ctx context.Context, bucket, object string) RWLocker
|
||||
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
|
||||
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
DeleteObjectFn func(ctx context.Context, bucket, object string) error
|
||||
@@ -88,53 +84,39 @@ type cacheObjects struct {
|
||||
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
}
|
||||
|
||||
func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) {
|
||||
cLock := c.NewNSLockFn(ctx, bucket, object)
|
||||
if err := cLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
return dcache.Delete(ctx, bucket, object)
|
||||
func (c *cacheObjects) incHitsToMeta(ctx context.Context, dcache *diskCache, bucket, object string, size int64, eTag string) error {
|
||||
metadata := make(map[string]string)
|
||||
metadata["etag"] = eTag
|
||||
return dcache.SaveMetadata(ctx, bucket, object, metadata, size, nil, "", true)
|
||||
}
|
||||
|
||||
func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions) error {
|
||||
cLock := c.NewNSLockFn(ctx, bucket, object)
|
||||
if err := cLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
defer cLock.Unlock()
|
||||
return dcache.Put(ctx, bucket, object, data, size, rs, opts)
|
||||
}
|
||||
// Backend metadata could have changed through server side copy - reset cache metadata if that is the case
|
||||
func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *diskCache, bucket, object string, bkObjectInfo, cacheObjInfo ObjectInfo) error {
|
||||
|
||||
func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
cLock := c.NewNSLockFn(ctx, bucket, object)
|
||||
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return nil, err
|
||||
bkMeta := make(map[string]string)
|
||||
cacheMeta := make(map[string]string)
|
||||
for k, v := range bkObjectInfo.UserDefined {
|
||||
if HasPrefix(k, ReservedMetadataPrefix) {
|
||||
// Do not need to send any internal metadata
|
||||
continue
|
||||
}
|
||||
bkMeta[http.CanonicalHeaderKey(k)] = v
|
||||
}
|
||||
for k, v := range cacheObjInfo.UserDefined {
|
||||
if HasPrefix(k, ReservedMetadataPrefix) {
|
||||
// Do not need to send any internal metadata
|
||||
continue
|
||||
}
|
||||
cacheMeta[http.CanonicalHeaderKey(k)] = v
|
||||
}
|
||||
|
||||
defer cLock.RUnlock()
|
||||
return dcache.Get(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
|
||||
func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) {
|
||||
cLock := c.NewNSLockFn(ctx, bucket, object)
|
||||
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
if !isMetadataSame(bkMeta, cacheMeta) ||
|
||||
bkObjectInfo.ETag != cacheObjInfo.ETag ||
|
||||
bkObjectInfo.ContentType != cacheObjInfo.ContentType ||
|
||||
!bkObjectInfo.Expires.Equal(cacheObjInfo.Expires) {
|
||||
return dcache.SaveMetadata(ctx, bucket, object, getMetadata(bkObjectInfo), bkObjectInfo.Size, nil, "", false)
|
||||
}
|
||||
|
||||
defer cLock.RUnlock()
|
||||
return dcache.Stat(ctx, bucket, object)
|
||||
}
|
||||
|
||||
func (c *cacheObjects) statRange(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, err error) {
|
||||
cLock := c.NewNSLockFn(ctx, bucket, object)
|
||||
if err := cLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
|
||||
defer cLock.RUnlock()
|
||||
oi, _, err = dcache.statRange(ctx, bucket, object, rs)
|
||||
return oi, err
|
||||
return c.incHitsToMeta(ctx, dcache, bucket, object, cacheObjInfo.Size, cacheObjInfo.ETag)
|
||||
}
|
||||
|
||||
// DeleteObject clears cache entry if backend delete operation succeeds
|
||||
@@ -150,9 +132,7 @@ func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string)
|
||||
if cerr != nil {
|
||||
return
|
||||
}
|
||||
if dcache.Exists(ctx, bucket, object) {
|
||||
c.delete(ctx, dcache, bucket, object)
|
||||
}
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -200,7 +180,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
}
|
||||
|
||||
cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts)
|
||||
cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts)
|
||||
if cacheErr == nil {
|
||||
cacheObjSize = cacheReader.ObjInfo.Size
|
||||
if rs != nil {
|
||||
@@ -220,6 +200,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
}
|
||||
c.cacheStats.incHit()
|
||||
c.cacheStats.incBytesServed(bytesServed)
|
||||
c.incHitsToMeta(ctx, dcache, bucket, object, cacheReader.ObjInfo.Size, cacheReader.ObjInfo.ETag)
|
||||
return cacheReader, nil
|
||||
}
|
||||
if cc.noStore {
|
||||
@@ -263,13 +244,13 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
// if ETag matches for stale cache entry, serve from cache
|
||||
if cacheReader.ObjInfo.ETag == objInfo.ETag {
|
||||
// Update metadata in case server-side copy might have changed object metadata
|
||||
dcache.updateMetadataIfChanged(ctx, bucket, object, objInfo, cacheReader.ObjInfo)
|
||||
c.updateMetadataIfChanged(ctx, dcache, bucket, object, objInfo, cacheReader.ObjInfo)
|
||||
c.incCacheStats(cacheObjSize)
|
||||
return cacheReader, nil
|
||||
}
|
||||
cacheReader.Close()
|
||||
// Object is stale, so delete from cache
|
||||
c.delete(ctx, dcache, bucket, object)
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
}
|
||||
|
||||
// Reaching here implies cache miss
|
||||
@@ -284,9 +265,17 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
}
|
||||
|
||||
bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
|
||||
if bkErr != nil {
|
||||
return bkReader, bkErr
|
||||
}
|
||||
// If object has less hits than configured cache after, just increment the hit counter
|
||||
// but do not cache it.
|
||||
if numCacheHits < c.after {
|
||||
c.incHitsToMeta(ctx, dcache, bucket, object, objInfo.Size, objInfo.ETag)
|
||||
return bkReader, bkErr
|
||||
}
|
||||
|
||||
// Record if cache has a hit that was invalidated by ETag verification
|
||||
if cacheErr == nil {
|
||||
bkReader.ObjInfo.CacheLookupStatus = CacheHit
|
||||
@@ -303,10 +292,10 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
return
|
||||
}
|
||||
defer bReader.Close()
|
||||
oi, err := c.statRange(ctx, dcache, bucket, object, rs)
|
||||
oi, _, _, err := dcache.statRange(ctx, bucket, object, rs)
|
||||
// avoid cache overwrite if another background routine filled cache
|
||||
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
|
||||
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
|
||||
dcache.Put(ctx, bucket, object, bReader, bReader.ObjInfo.Size, rs, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, true)
|
||||
}
|
||||
}()
|
||||
return bkReader, bkErr
|
||||
@@ -316,7 +305,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
teeReader := io.TeeReader(bkReader, pipeWriter)
|
||||
go func() {
|
||||
putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
|
||||
putErr := dcache.Put(ctx, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)}, false)
|
||||
// close the write end of the pipe, so the error gets
|
||||
// propagated to getObjReader
|
||||
pipeWriter.CloseWithError(putErr)
|
||||
@@ -341,7 +330,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
||||
}
|
||||
var cc cacheControl
|
||||
// if cache control setting is valid, avoid HEAD operation to backend
|
||||
cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object)
|
||||
cachedObjInfo, _, cerr := dcache.Stat(ctx, bucket, object)
|
||||
if cerr == nil {
|
||||
cc = cacheControlOpts(cachedObjInfo)
|
||||
if !cc.isStale(cachedObjInfo.ModTime) {
|
||||
@@ -355,7 +344,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
||||
if err != nil {
|
||||
if _, ok := err.(ObjectNotFound); ok {
|
||||
// Delete the cached entry if backend object was deleted.
|
||||
c.delete(ctx, dcache, bucket, object)
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
c.cacheStats.incMiss()
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
@@ -379,7 +368,7 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
|
||||
}
|
||||
if cachedObjInfo.ETag != objInfo.ETag {
|
||||
// Delete the cached entry if the backend object was replaced.
|
||||
c.delete(ctx, dcache, bucket, object)
|
||||
dcache.Delete(ctx, bucket, object)
|
||||
}
|
||||
return objInfo, nil
|
||||
}
|
||||
@@ -508,7 +497,7 @@ func newCache(config cache.Config) ([]*diskCache, bool, error) {
|
||||
quota = config.Quota
|
||||
}
|
||||
|
||||
cache, err := newDiskCache(dir, config.Expiry, quota)
|
||||
cache, err := newDiskCache(dir, config.Expiry, quota, config.After)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
@@ -516,7 +505,6 @@ func newCache(config cache.Config) ([]*diskCache, bool, error) {
|
||||
if !migrating {
|
||||
go cache.purge()
|
||||
}
|
||||
|
||||
caches = append(caches, cache)
|
||||
}
|
||||
return caches, migrating, nil
|
||||
@@ -630,10 +618,10 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
|
||||
return
|
||||
}
|
||||
defer bReader.Close()
|
||||
oi, err := c.stat(ctx, dcache, bucket, object)
|
||||
oi, _, err := dcache.Stat(ctx, bucket, object)
|
||||
// avoid cache overwrite if another background routine filled cache
|
||||
if err != nil || oi.ETag != bReader.ObjInfo.ETag {
|
||||
c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
|
||||
dcache.Put(ctx, bucket, object, bReader, bReader.ObjInfo.Size, nil, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)}, true)
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -649,13 +637,12 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &cacheObjects{
|
||||
cache: cache,
|
||||
exclude: config.Exclude,
|
||||
after: config.After,
|
||||
migrating: migrateSw,
|
||||
migMutex: sync.Mutex{},
|
||||
nsMutex: newNSLock(false),
|
||||
cacheStats: newCacheStats(),
|
||||
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
|
||||
@@ -677,9 +664,6 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
|
||||
return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
|
||||
},
|
||||
}
|
||||
c.NewNSLockFn = func(ctx context.Context, bucket, object string) RWLocker {
|
||||
return c.nsMutex.NewNSLock(ctx, nil, bucket, object)
|
||||
}
|
||||
|
||||
if migrateSw {
|
||||
go c.migrateCacheFromV1toV2(ctx)
|
||||
|
||||
Reference in New Issue
Block a user