mirror of
https://github.com/minio/minio.git
synced 2025-11-20 18:06:10 -05:00
Add cache eviction low and high watermarks (#8958)
To allow better control the cache eviction process. Introduce MINIO_CACHE_WATERMARK_LOW and MINIO_CACHE_WATERMARK_HIGH env. variables to specify when to stop/start cache eviction process. Deprecate MINIO_CACHE_EXPIRY environment variable. Cache gc sweeps at 30 minute intervals whenever high watermark is reached to clear least recently accessed entries in the cache until sufficient space is cleared to reach the low watermark. Garbage collection uses an adaptive file scoring approach based on last access time, with greater weights assigned to larger objects and those with more hits to find the candidates for eviction. Thanks to @klauspost for this file scoring algorithm Co-authored-by: Klaus Post <klauspost@minio.io>
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -37,6 +38,7 @@ import (
|
||||
"github.com/minio/minio/pkg/disk"
|
||||
"github.com/minio/sio"
|
||||
"github.com/ncw/directio"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -44,7 +46,7 @@ const (
|
||||
cacheMetaJSONFile = "cache.json"
|
||||
cacheDataFile = "part.1"
|
||||
cacheMetaVersion = "1.0.0"
|
||||
|
||||
cacheExpiryDays = time.Duration(90 * time.Hour * 24) // defaults to 90 days
|
||||
// SSECacheEncrypted is the metadata key indicating that the object
|
||||
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
|
||||
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
|
||||
@@ -126,15 +128,15 @@ func (m *cacheMeta) ToObjectInfo(bucket, object string) (o ObjectInfo) {
|
||||
type diskCache struct {
|
||||
dir string // caching directory
|
||||
quotaPct int // max usage in %
|
||||
expiry int // cache expiry in days
|
||||
// mark false if drive is offline
|
||||
online bool
|
||||
// mutex to protect updates to online variable
|
||||
onlineMutex *sync.RWMutex
|
||||
// purge() listens on this channel to start the cache-purge process
|
||||
purgeChan chan struct{}
|
||||
pool sync.Pool
|
||||
after int // minimum accesses before an object is cached.
|
||||
onlineMutex *sync.RWMutex
|
||||
pool sync.Pool
|
||||
after int // minimum accesses before an object is cached.
|
||||
lowWatermark int
|
||||
highWatermark int
|
||||
gcCounter atomic.Uint64
|
||||
// nsMutex namespace lock
|
||||
nsMutex *nsLockMap
|
||||
// Object functions pointing to the corresponding functions of backend implementation.
|
||||
@@ -142,18 +144,18 @@ type diskCache struct {
|
||||
}
|
||||
|
||||
// Inits the disk cache dir if it is not initialized already.
|
||||
func newDiskCache(dir string, expiry int, quotaPct, after int) (*diskCache, error) {
|
||||
func newDiskCache(dir string, quotaPct, after, lowWatermark, highWatermark int) (*diskCache, error) {
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, fmt.Errorf("Unable to initialize '%s' dir, %w", dir, err)
|
||||
}
|
||||
cache := diskCache{
|
||||
dir: dir,
|
||||
expiry: expiry,
|
||||
quotaPct: quotaPct,
|
||||
after: after,
|
||||
purgeChan: make(chan struct{}),
|
||||
online: true,
|
||||
onlineMutex: &sync.RWMutex{},
|
||||
dir: dir,
|
||||
quotaPct: quotaPct,
|
||||
after: after,
|
||||
lowWatermark: lowWatermark,
|
||||
highWatermark: highWatermark,
|
||||
online: true,
|
||||
onlineMutex: &sync.RWMutex{},
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := directio.AlignedBlock(int(cacheBlkSize))
|
||||
@@ -168,12 +170,12 @@ func newDiskCache(dir string, expiry int, quotaPct, after int) (*diskCache, erro
|
||||
return &cache, nil
|
||||
}
|
||||
|
||||
// Returns if the disk usage is low.
|
||||
// Disk usage is low if usage is < 80% of cacheMaxDiskUsagePct
|
||||
// Ex. for a 100GB disk, if maxUsage is configured as 70% then cacheMaxDiskUsagePct is 70G
|
||||
// hence disk usage is low if the disk usage is less than 56G (because 80% of 70G is 56G)
|
||||
// diskUsageLow() returns true if disk usage falls below the low watermark w.r.t configured cache quota.
|
||||
// Ex. for a 100GB disk, if quota is configured as 70% and watermark_low = 80% and
|
||||
// watermark_high = 90% then garbage collection starts when 63% of disk is used and
|
||||
// stops when disk usage drops to 56%
|
||||
func (c *diskCache) diskUsageLow() bool {
|
||||
minUsage := c.quotaPct * 80 / 100
|
||||
gcStopPct := c.quotaPct * c.lowWatermark / 100
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
@@ -182,21 +184,22 @@ func (c *diskCache) diskUsageLow() bool {
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) < minUsage
|
||||
return int(usedPercent) < gcStopPct
|
||||
}
|
||||
|
||||
// Return if the disk usage is high.
|
||||
// Disk usage is high if disk used is > cacheMaxDiskUsagePct
|
||||
// Returns if the disk usage reaches high water mark w.r.t the configured cache quota.
|
||||
// gc starts if high water mark reached.
|
||||
func (c *diskCache) diskUsageHigh() bool {
|
||||
gcTriggerPct := c.quotaPct * c.highWatermark / 100
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return true
|
||||
return false
|
||||
}
|
||||
usedPercent := (di.Total - di.Free) * 100 / di.Total
|
||||
return int(usedPercent) > c.quotaPct
|
||||
return int(usedPercent) >= gcTriggerPct
|
||||
}
|
||||
|
||||
// Returns if size space can be allocated without exceeding
|
||||
@@ -213,12 +216,42 @@ func (c *diskCache) diskAvailable(size int64) bool {
|
||||
return int(usedPercent) < c.quotaPct
|
||||
}
|
||||
|
||||
// toClear returns how many bytes should be cleared to reach the low watermark quota.
|
||||
// returns 0 if below quota.
|
||||
func (c *diskCache) toClear() uint64 {
|
||||
di, err := disk.GetInfo(c.dir)
|
||||
if err != nil {
|
||||
reqInfo := (&logger.ReqInfo{}).AppendTags("cachePath", c.dir)
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
logger.LogIf(ctx, err)
|
||||
return 0
|
||||
}
|
||||
return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark))
|
||||
}
|
||||
|
||||
// Purge cache entries that were not accessed.
|
||||
func (c *diskCache) purge() {
|
||||
func (c *diskCache) purge(ctx context.Context, doneCh <-chan struct{}) {
|
||||
if c.diskUsageLow() {
|
||||
return
|
||||
}
|
||||
toFree := c.toClear()
|
||||
if toFree == 0 {
|
||||
return
|
||||
}
|
||||
// expiry for cleaning up old cache.json files that
|
||||
// need to be cleaned up.
|
||||
expiry := UTCNow().Add(-cacheExpiryDays)
|
||||
// defaulting max hits count to 100
|
||||
scorer, err := newFileScorer(int64(toFree), time.Now().Unix(), 100)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
// this function returns FileInfo for cached range files and cache data file.
|
||||
fiStatFn := func(ranges map[string]string, dataFile, pathPrefix string) map[string]os.FileInfo {
|
||||
fm := make(map[string]os.FileInfo)
|
||||
fname := pathJoin(pathPrefix, cacheDataFile)
|
||||
fname := pathJoin(pathPrefix, dataFile)
|
||||
if fi, err := os.Stat(fname); err == nil {
|
||||
fm[fname] = fi
|
||||
}
|
||||
@@ -231,63 +264,73 @@ func (c *diskCache) purge() {
|
||||
}
|
||||
return fm
|
||||
}
|
||||
ctx := context.Background()
|
||||
for {
|
||||
olderThan := c.expiry * 24
|
||||
for !c.diskUsageLow() {
|
||||
// delete unaccessed objects older than expiry duration
|
||||
expiry := UTCNow().Add(time.Hour * time.Duration(-1*olderThan))
|
||||
olderThan /= 2
|
||||
if olderThan < 1 {
|
||||
break
|
||||
}
|
||||
deletedCount := 0
|
||||
objDirs, err := ioutil.ReadDir(c.dir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
objDirs, err := ioutil.ReadDir(c.dir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
for _, obj := range objDirs {
|
||||
if obj.Name() == minioMetaBucket {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, obj := range objDirs {
|
||||
if obj.Name() == minioMetaBucket {
|
||||
continue
|
||||
}
|
||||
meta, _, _, err := c.statCachedMeta(context.Background(), pathJoin(c.dir, obj.Name()))
|
||||
if err != nil {
|
||||
// delete any partially filled cache entry left behind.
|
||||
removeAll(pathJoin(c.dir, obj.Name()))
|
||||
continue
|
||||
}
|
||||
// stat all cached file ranges and cacheDataFile.
|
||||
fis := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, obj.Name()))
|
||||
objInfo := meta.ToObjectInfo("", "")
|
||||
cc := cacheControlOpts(objInfo)
|
||||
|
||||
for fname, fi := range fis {
|
||||
if atime.Get(fi).Before(expiry) ||
|
||||
cc.isStale(objInfo.ModTime) {
|
||||
if err = removeAll(fname); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
deletedCount++
|
||||
// break early if sufficient disk space reclaimed.
|
||||
if !c.diskUsageLow() {
|
||||
break
|
||||
}
|
||||
cacheDir := pathJoin(c.dir, obj.Name())
|
||||
meta, _, numHits, err := c.statCachedMeta(ctx, cacheDir)
|
||||
if err != nil {
|
||||
// delete any partially filled cache entry left behind.
|
||||
removeAll(cacheDir)
|
||||
continue
|
||||
}
|
||||
// stat all cached file ranges and cacheDataFile.
|
||||
cachedFiles := fiStatFn(meta.Ranges, cacheDataFile, pathJoin(c.dir, obj.Name()))
|
||||
objInfo := meta.ToObjectInfo("", "")
|
||||
cc := cacheControlOpts(objInfo)
|
||||
for fname, fi := range cachedFiles {
|
||||
if cc != nil {
|
||||
if cc.isStale(objInfo.ModTime) {
|
||||
if err = removeAll(fname); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
scorer.adjustSaveBytes(-fi.Size())
|
||||
// break early if sufficient disk space reclaimed.
|
||||
if c.diskUsageLow() {
|
||||
return
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
if deletedCount == 0 {
|
||||
break
|
||||
}
|
||||
scorer.addFile(fname, atime.Get(fi), fi.Size(), numHits)
|
||||
}
|
||||
for {
|
||||
<-c.purgeChan
|
||||
if c.diskUsageHigh() {
|
||||
break
|
||||
}
|
||||
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
|
||||
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
|
||||
if err != nil || (fi.ModTime().Before(expiry) && len(cachedFiles) == 0) {
|
||||
removeAll(cacheDir)
|
||||
scorer.adjustSaveBytes(-fi.Size())
|
||||
continue
|
||||
}
|
||||
if c.diskUsageLow() {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, path := range scorer.fileNames() {
|
||||
removeAll(path)
|
||||
slashIdx := strings.LastIndex(path, SlashSeparator)
|
||||
pathPrefix := path[0:slashIdx]
|
||||
fname := path[slashIdx+1:]
|
||||
if fname == cacheDataFile {
|
||||
removeAll(pathPrefix)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *diskCache) incGCCounter() {
|
||||
c.gcCounter.Add(uint64(1))
|
||||
}
|
||||
func (c *diskCache) resetGCCounter() {
|
||||
c.gcCounter.Store(uint64(0))
|
||||
}
|
||||
func (c *diskCache) gcCount() uint64 {
|
||||
return c.gcCounter.Load()
|
||||
}
|
||||
|
||||
// sets cache drive status
|
||||
@@ -378,6 +421,9 @@ func (c *diskCache) statRange(ctx context.Context, bucket, object string, rs *HT
|
||||
if !ok {
|
||||
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
|
||||
}
|
||||
if _, err = os.Stat(pathJoin(cacheObjPath, rngFile)); err != nil {
|
||||
return oi, rngInfo, numHits, ObjectNotFound{Bucket: bucket, Object: object}
|
||||
}
|
||||
rngInfo = RangeInfo{Range: rng, File: rngFile, Size: int64(actualRngSize)}
|
||||
|
||||
err = decryptCacheObjectETag(&oi)
|
||||
@@ -568,10 +614,8 @@ func newCacheEncryptMetadata(bucket, object string, metadata map[string]string)
|
||||
// Caches the object to disk
|
||||
func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Reader, size int64, rs *HTTPRangeSpec, opts ObjectOptions, incHitsOnly bool) error {
|
||||
if c.diskUsageHigh() {
|
||||
select {
|
||||
case c.purgeChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
c.incGCCounter()
|
||||
io.Copy(ioutil.Discard, data)
|
||||
return errDiskFull
|
||||
}
|
||||
cachePath := getCacheSHADir(c.dir, bucket, object)
|
||||
@@ -622,11 +666,11 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
||||
if IsErr(err, baseErrs...) {
|
||||
c.setOnline(false)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
removeAll(cachePath)
|
||||
return err
|
||||
}
|
||||
|
||||
if actualSize != uint64(n) {
|
||||
removeAll(cachePath)
|
||||
return IncompleteBody{}
|
||||
|
||||
Reference in New Issue
Block a user