mirror of https://github.com/minio/minio.git
cachevalue: simplify exported interface (#19137)
- Also add cache options type
This commit is contained in:
parent
2bdb9511bd
commit
62ce52c8fd
|
@ -33,6 +33,7 @@ import (
|
||||||
"github.com/klauspost/compress/zip"
|
"github.com/klauspost/compress/zip"
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/minio/internal/auth"
|
"github.com/minio/minio/internal/auth"
|
||||||
|
"github.com/minio/minio/internal/cachevalue"
|
||||||
"github.com/minio/minio/internal/config/dns"
|
"github.com/minio/minio/internal/config/dns"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/mux"
|
"github.com/minio/mux"
|
||||||
|
@ -1197,21 +1198,15 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
||||||
return rd, wr
|
return rd, wr
|
||||||
}
|
}
|
||||||
|
|
||||||
bucketStorageCache.Once.Do(func() {
|
bucketStorageCache.InitOnce(10*time.Second,
|
||||||
// Set this to 10 secs since its enough, as scanner
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
// does not update the bucket usage values frequently.
|
func() (DataUsageInfo, error) {
|
||||||
bucketStorageCache.TTL = 10 * time.Second
|
|
||||||
|
|
||||||
// Rely on older value if usage loading fails from disk.
|
|
||||||
bucketStorageCache.ReturnLastGood = true
|
|
||||||
bucketStorageCache.NoWait = true
|
|
||||||
bucketStorageCache.Update = func() (DataUsageInfo, error) {
|
|
||||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
return loadDataUsageFromBackend(ctx, objectAPI)
|
return loadDataUsageFromBackend(ctx, objectAPI)
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
dataUsageInfo, _ := bucketStorageCache.Get()
|
dataUsageInfo, _ := bucketStorageCache.Get()
|
||||||
|
|
||||||
|
|
|
@ -47,20 +47,15 @@ var bucketStorageCache = cachevalue.New[DataUsageInfo]()
|
||||||
|
|
||||||
// Init initialize bucket quota.
|
// Init initialize bucket quota.
|
||||||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||||
bucketStorageCache.Once.Do(func() {
|
bucketStorageCache.InitOnce(10*time.Second,
|
||||||
// Set this to 10 secs since its enough, as scanner
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
// does not update the bucket usage values frequently.
|
func() (DataUsageInfo, error) {
|
||||||
bucketStorageCache.TTL = 10 * time.Second
|
|
||||||
// Rely on older value if usage loading fails from disk.
|
|
||||||
bucketStorageCache.ReturnLastGood = true
|
|
||||||
bucketStorageCache.NoWait = true
|
|
||||||
bucketStorageCache.Update = func() (DataUsageInfo, error) {
|
|
||||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
return loadDataUsageFromBackend(ctx, objAPI)
|
return loadDataUsageFromBackend(ctx, objAPI)
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||||
|
|
|
@ -77,13 +77,10 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||||
|
|
||||||
cache := dataUsageCache{}
|
cache := dataUsageCache{}
|
||||||
|
|
||||||
prefixUsageCache.Once.Do(func() {
|
prefixUsageCache.InitOnce(30*time.Second,
|
||||||
prefixUsageCache.TTL = 30 * time.Second
|
|
||||||
|
|
||||||
// No need to fail upon Update() error, fallback to old value.
|
// No need to fail upon Update() error, fallback to old value.
|
||||||
prefixUsageCache.ReturnLastGood = true
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
prefixUsageCache.NoWait = true
|
func() (map[string]uint64, error) {
|
||||||
prefixUsageCache.Update = func() (map[string]uint64, error) {
|
|
||||||
m := make(map[string]uint64)
|
m := make(map[string]uint64)
|
||||||
for _, pool := range z.serverPools {
|
for _, pool := range z.serverPools {
|
||||||
for _, er := range pool.sets {
|
for _, er := range pool.sets {
|
||||||
|
@ -108,8 +105,8 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
return prefixUsageCache.Get()
|
return prefixUsageCache.Get()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1869,15 +1869,13 @@ var listBucketsCache = cachevalue.New[[]BucketInfo]()
|
||||||
// that all buckets are present on all serverPools.
|
// that all buckets are present on all serverPools.
|
||||||
func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
||||||
if opts.Cached {
|
if opts.Cached {
|
||||||
listBucketsCache.Once.Do(func() {
|
listBucketsCache.InitOnce(time.Second,
|
||||||
listBucketsCache.TTL = time.Second
|
cachevalue.Opts{ReturnLastGood: true, NoWait: true},
|
||||||
|
func() ([]BucketInfo, error) {
|
||||||
listBucketsCache.ReturnLastGood = true
|
|
||||||
listBucketsCache.NoWait = true
|
|
||||||
listBucketsCache.Update = func() ([]BucketInfo, error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||||
cancel()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1888,8 +1886,8 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return buckets, nil
|
return buckets, nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
return listBucketsCache.Get()
|
return listBucketsCache.Get()
|
||||||
}
|
}
|
||||||
|
|
|
@ -354,12 +354,10 @@ type MetricsGroupOpts struct {
|
||||||
|
|
||||||
// RegisterRead register the metrics populator function to be used
|
// RegisterRead register the metrics populator function to be used
|
||||||
// to populate new values upon cache invalidation.
|
// to populate new values upon cache invalidation.
|
||||||
func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) {
|
func (g *MetricsGroup) RegisterRead(read func(context.Context) []Metric) {
|
||||||
g.metricsCache = cachevalue.New[[]Metric]()
|
g.metricsCache = cachevalue.NewFromFunc(g.cacheInterval,
|
||||||
g.metricsCache.Once.Do(func() {
|
cachevalue.Opts{ReturnLastGood: true},
|
||||||
g.metricsCache.ReturnLastGood = true
|
func() ([]Metric, error) {
|
||||||
g.metricsCache.TTL = g.cacheInterval
|
|
||||||
g.metricsCache.Update = func() ([]Metric, error) {
|
|
||||||
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
||||||
objLayer := newObjectLayerFn()
|
objLayer := newObjectLayerFn()
|
||||||
// Service not initialized yet
|
// Service not initialized yet
|
||||||
|
@ -418,8 +416,8 @@ func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return read(GlobalContext), nil
|
return read(GlobalContext), nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Metric) clone() Metric {
|
func (m *Metric) clone() Metric {
|
||||||
|
|
|
@ -322,10 +322,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||||
return info, nil
|
return info, nil
|
||||||
} // In all other cases cache the value upto 1sec.
|
} // In all other cases cache the value upto 1sec.
|
||||||
|
|
||||||
client.diskInfoCache.Once.Do(func() {
|
client.diskInfoCache.InitOnce(time.Second,
|
||||||
client.diskInfoCache.TTL = time.Second
|
cachevalue.Opts{CacheError: true},
|
||||||
client.diskInfoCache.CacheError = true
|
func() (info DiskInfo, err error) {
|
||||||
client.diskInfoCache.Update = func() (info DiskInfo, err error) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -339,8 +338,8 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||||
return info, toStorageErr(errors.New(info.Error))
|
return info, toStorageErr(errors.New(info.Error))
|
||||||
}
|
}
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
return client.diskInfoCache.Get()
|
return client.diskInfoCache.Get()
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,9 +96,9 @@ type xlStorageDiskIDCheck struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||||
p.metricsCache.Once.Do(func() {
|
p.metricsCache.InitOnce(5*time.Second,
|
||||||
p.metricsCache.TTL = 5 * time.Second
|
cachevalue.Opts{},
|
||||||
p.metricsCache.Update = func() (DiskMetrics, error) {
|
func() (DiskMetrics, error) {
|
||||||
diskMetric := DiskMetrics{
|
diskMetric := DiskMetrics{
|
||||||
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
||||||
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
||||||
|
@ -110,8 +110,8 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||||
diskMetric.APICalls[storageMetric(i).String()] = atomic.LoadUint64(&p.apiCalls[i])
|
diskMetric.APICalls[storageMetric(i).String()] = atomic.LoadUint64(&p.apiCalls[i])
|
||||||
}
|
}
|
||||||
return diskMetric, nil
|
return diskMetric, nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
diskMetric, _ := p.metricsCache.Get()
|
diskMetric, _ := p.metricsCache.Get()
|
||||||
// Do not need this value to be cached.
|
// Do not need this value to be cached.
|
||||||
|
|
|
@ -732,9 +732,8 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error {
|
||||||
// DiskInfo provides current information about disk space usage,
|
// DiskInfo provides current information about disk space usage,
|
||||||
// total free inodes and underlying filesystem.
|
// total free inodes and underlying filesystem.
|
||||||
func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
||||||
s.diskInfoCache.Once.Do(func() {
|
s.diskInfoCache.InitOnce(time.Second, cachevalue.Opts{},
|
||||||
s.diskInfoCache.TTL = time.Second
|
func() (DiskInfo, error) {
|
||||||
s.diskInfoCache.Update = func() (DiskInfo, error) {
|
|
||||||
dcinfo := DiskInfo{}
|
dcinfo := DiskInfo{}
|
||||||
di, err := getDiskInfo(s.drivePath)
|
di, err := getDiskInfo(s.drivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -757,8 +756,8 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInf
|
||||||
dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
|
dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
|
||||||
dcinfo.ID = diskID
|
dcinfo.ID = diskID
|
||||||
return dcinfo, err
|
return dcinfo, err
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
info, err = s.diskInfoCache.Get()
|
info, err = s.diskInfoCache.Get()
|
||||||
info.MountPath = s.drivePath
|
info.MountPath = s.drivePath
|
||||||
|
|
|
@ -23,22 +23,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Cache contains a synchronized value that is considered valid
|
// Opts contains options for the cache.
|
||||||
// for a specific amount of time.
|
type Opts struct {
|
||||||
// An Update function must be set to provide an updated value when needed.
|
|
||||||
type Cache[T any] struct {
|
|
||||||
// Update must return an updated value.
|
|
||||||
// If an error is returned the cached value is not set.
|
|
||||||
// Only one caller will call this function at any time, others will be blocking.
|
|
||||||
// The returned value can no longer be modified once returned.
|
|
||||||
// Should be set before calling Get().
|
|
||||||
Update func() (T, error)
|
|
||||||
|
|
||||||
// TTL for a cached value.
|
|
||||||
// If not set 1 second TTL is assumed.
|
|
||||||
// Should be set before calling Get().
|
|
||||||
TTL time.Duration
|
|
||||||
|
|
||||||
// When set to true, return the last cached value
|
// When set to true, return the last cached value
|
||||||
// even if updating the value errors out.
|
// even if updating the value errors out.
|
||||||
// Returns the last good value AND the error.
|
// Returns the last good value AND the error.
|
||||||
|
@ -53,6 +39,23 @@ type Cache[T any] struct {
|
||||||
// if TTL has expired but 2x TTL has not yet passed,
|
// if TTL has expired but 2x TTL has not yet passed,
|
||||||
// but will fetch a new value in the background.
|
// but will fetch a new value in the background.
|
||||||
NoWait bool
|
NoWait bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache contains a synchronized value that is considered valid
|
||||||
|
// for a specific amount of time.
|
||||||
|
// An Update function must be set to provide an updated value when needed.
|
||||||
|
type Cache[T any] struct {
|
||||||
|
// updateFn must return an updated value.
|
||||||
|
// If an error is returned the cached value is not set.
|
||||||
|
// Only one caller will call this function at any time, others will be blocking.
|
||||||
|
// The returned value can no longer be modified once returned.
|
||||||
|
// Should be set before calling Get().
|
||||||
|
updateFn func() (T, error)
|
||||||
|
|
||||||
|
// ttl for a cached value.
|
||||||
|
ttl time.Duration
|
||||||
|
|
||||||
|
opts Opts
|
||||||
|
|
||||||
// Once can be used to initialize values for lazy initialization.
|
// Once can be used to initialize values for lazy initialization.
|
||||||
// Should be set before calling Get().
|
// Should be set before calling Get().
|
||||||
|
@ -67,29 +70,50 @@ type Cache[T any] struct {
|
||||||
updating sync.Mutex
|
updating sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// New initializes a new cached value instance.
|
// New allocates a new cached value instance. It must be initialized with
|
||||||
|
// `.InitOnce`.
|
||||||
func New[I any]() *Cache[I] {
|
func New[I any]() *Cache[I] {
|
||||||
return &Cache[I]{}
|
return &Cache[I]{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewFromFunc allocates a new cached value instance and initializes it with an
|
||||||
|
// update function, making it ready for use.
|
||||||
|
func NewFromFunc[I any](ttl time.Duration, opts Opts, update func() (I, error)) *Cache[I] {
|
||||||
|
return &Cache[I]{
|
||||||
|
ttl: ttl,
|
||||||
|
updateFn: update,
|
||||||
|
opts: opts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitOnce initializes the cache with a TTL and an update function. It is
|
||||||
|
// guaranteed to be called only once.
|
||||||
|
func (t *Cache[I]) InitOnce(ttl time.Duration, opts Opts, update func() (I, error)) {
|
||||||
|
t.Once.Do(func() {
|
||||||
|
t.ttl = ttl
|
||||||
|
t.updateFn = update
|
||||||
|
t.opts = opts
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Get will return a cached value or fetch a new one.
|
// Get will return a cached value or fetch a new one.
|
||||||
// If the Update function returns an error the value is forwarded as is and not cached.
|
// If the Update function returns an error the value is forwarded as is and not cached.
|
||||||
func (t *Cache[I]) Get() (item I, err error) {
|
func (t *Cache[I]) Get() (I, error) {
|
||||||
v := t.valErr.Load()
|
v := t.valErr.Load()
|
||||||
ttl := t.ttl()
|
ttl := t.ttl
|
||||||
vTime := t.lastUpdateMs.Load()
|
vTime := t.lastUpdateMs.Load()
|
||||||
tNow := time.Now().UnixMilli()
|
tNow := time.Now().UnixMilli()
|
||||||
if v != nil && tNow-vTime < ttl.Milliseconds() {
|
if v != nil && tNow-vTime < ttl.Milliseconds() {
|
||||||
if v.e == nil {
|
if v.e == nil {
|
||||||
return v.v, nil
|
return v.v, nil
|
||||||
}
|
}
|
||||||
if v.e != nil && t.CacheError || t.ReturnLastGood {
|
if v.e != nil && t.opts.CacheError || t.opts.ReturnLastGood {
|
||||||
return v.v, v.e
|
return v.v, v.e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch new value.
|
// Fetch new value.
|
||||||
if t.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 && (v.e == nil || t.CacheError) {
|
if t.opts.NoWait && v != nil && tNow-vTime < ttl.Milliseconds()*2 && (v.e == nil || t.opts.CacheError) {
|
||||||
if t.updating.TryLock() {
|
if t.updating.TryLock() {
|
||||||
go func() {
|
go func() {
|
||||||
defer t.updating.Unlock()
|
defer t.updating.Unlock()
|
||||||
|
@ -113,18 +137,10 @@ func (t *Cache[I]) Get() (item I, err error) {
|
||||||
return v.v, v.e
|
return v.v, v.e
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Cache[_]) ttl() time.Duration {
|
|
||||||
ttl := t.TTL
|
|
||||||
if ttl <= 0 {
|
|
||||||
ttl = time.Second
|
|
||||||
}
|
|
||||||
return ttl
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Cache[T]) update() {
|
func (t *Cache[T]) update() {
|
||||||
val, err := t.Update()
|
val, err := t.updateFn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if t.ReturnLastGood {
|
if t.opts.ReturnLastGood {
|
||||||
// Keep last good value.
|
// Keep last good value.
|
||||||
v := t.valErr.Load()
|
v := t.valErr.Load()
|
||||||
if v != nil {
|
if v != nil {
|
||||||
|
|
|
@ -25,12 +25,11 @@ import (
|
||||||
func TestCache(t *testing.T) {
|
func TestCache(t *testing.T) {
|
||||||
cache := New[time.Time]()
|
cache := New[time.Time]()
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
cache.Once.Do(func() {
|
cache.InitOnce(2*time.Second, Opts{},
|
||||||
cache.TTL = 2 * time.Second
|
func() (time.Time, error) {
|
||||||
cache.Update = func() (time.Time, error) {
|
|
||||||
return time.Now(), nil
|
return time.Now(), nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
t1, _ := cache.Get()
|
t1, _ := cache.Get()
|
||||||
|
|
||||||
|
@ -50,12 +49,11 @@ func TestCache(t *testing.T) {
|
||||||
|
|
||||||
func BenchmarkCache(b *testing.B) {
|
func BenchmarkCache(b *testing.B) {
|
||||||
cache := New[time.Time]()
|
cache := New[time.Time]()
|
||||||
cache.Once.Do(func() {
|
cache.InitOnce(1*time.Millisecond, Opts{},
|
||||||
cache.TTL = 1 * time.Millisecond
|
func() (time.Time, error) {
|
||||||
cache.Update = func() (time.Time, error) {
|
|
||||||
return time.Now(), nil
|
return time.Now(), nil
|
||||||
}
|
},
|
||||||
})
|
)
|
||||||
|
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
Loading…
Reference in New Issue