diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 5a8cf5977..c373db68d 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -25,6 +25,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/logger" ) @@ -42,7 +43,7 @@ func NewBucketQuotaSys() *BucketQuotaSys { return &BucketQuotaSys{} } -var bucketStorageCache = newTimedValue[DataUsageInfo]() +var bucketStorageCache = cachevalue.New[DataUsageInfo]() // Init initialize bucket quota. func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) { diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 95e3fb14f..65adb7c0e 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -24,6 +24,7 @@ import ( "time" jsoniter "github.com/json-iterator/go" + "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/logger" ) @@ -62,7 +63,7 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan } } -var prefixUsageCache = newTimedValue[map[string]uint64]() +var prefixUsageCache = cachevalue.New[map[string]uint64]() // loadPrefixUsageFromBackend returns prefix usages found in passed buckets // diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 05569d134..e9c705552 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -39,6 +39,7 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/bpool" + "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/config/storageclass" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" @@ -1840,7 +1841,7 @@ func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix strin } } -var listBucketsCache = newTimedValue[[]BucketInfo]() +var listBucketsCache = cachevalue.New[[]BucketInfo]() // List all buckets from one of the serverPools, we are not doing merge // sort here just for simplification. As per design it is assumed diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index fe68835ff..5392ab1c4 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -32,6 +32,7 @@ import ( "github.com/minio/kes-go" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/lifecycle" + "github.com/minio/minio/internal/cachevalue" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/mcontext" @@ -330,7 +331,7 @@ type Metric struct { // MetricsGroup are a group of metrics that are initialized together. type MetricsGroup struct { - metricsCache *timedValue[[]Metric] `msg:"-"` + metricsCache *cachevalue.Cache[[]Metric] `msg:"-"` cacheInterval time.Duration metricsGroupOpts MetricsGroupOpts } @@ -354,7 +355,7 @@ type MetricsGroupOpts struct { // RegisterRead register the metrics populator function to be used // to populate new values upon cache invalidation. func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) { - g.metricsCache = newTimedValue[[]Metric]() + g.metricsCache = cachevalue.New[[]Metric]() g.metricsCache.Once.Do(func() { g.metricsCache.Relax = true g.metricsCache.TTL = g.cacheInterval diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 75c352b88..5a5adbb0e 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -35,6 +35,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/grid" xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" @@ -166,7 +167,7 @@ type storageRESTClient struct { formatData []byte formatMutex sync.RWMutex - diskInfoCache *timedValue[DiskInfo] + diskInfoCache *cachevalue.Cache[DiskInfo] // Indexes, will be -1 until assigned a set. poolIndex, setIndex, diskIndex int @@ -893,6 +894,6 @@ func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager) return &storageRESTClient{ endpoint: endpoint, restClient: restClient, poolIndex: -1, setIndex: -1, diskIndex: -1, gridConn: conn, - diskInfoCache: newTimedValue[DiskInfo](), + diskInfoCache: cachevalue.New[DiskInfo](), }, nil } diff --git a/cmd/utils.go b/cmd/utils.go index 34fa64deb..3c13fe9e8 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -926,102 +926,6 @@ func iamPolicyClaimNameSA() string { return "sa-policy" } -// timedValue contains a synchronized value that is considered valid -// for a specific amount of time. -// An Update function must be set to provide an updated value when needed. -type timedValue[I any] struct { - // Update must return an updated value. - // If an error is returned the cached value is not set. - // Only one caller will call this function at any time, others will be blocking. - // The returned value can no longer be modified once returned. - // Should be set before calling Get(). - Update func() (item I, err error) - - // TTL for a cached value. - // If not set 1 second TTL is assumed. - // Should be set before calling Get(). - TTL time.Duration - - // When set to true, return the last cached value - // even if updating the value errors out - Relax bool - - // Once can be used to initialize values for lazy initialization. - // Should be set before calling Get(). - Once sync.Once - - // Managed values. - value I - valueSet bool - lastUpdate time.Time - mu sync.RWMutex -} - -// newTimedValue -func newTimedValue[I any]() *timedValue[I] { - return &timedValue[I]{} -} - -// Get will return a cached value or fetch a new one. -// If the Update function returns an error the value is forwarded as is and not cached. -func (t *timedValue[I]) Get() (item I, err error) { - item, ok := t.get(t.ttl()) - if ok { - return item, nil - } - - item, err = t.Update() - if err != nil { - if t.Relax { - // if update fails, return current - // cached value along with error. - // - // Let the caller decide if they want - // to use the returned value based - // on error. - item, ok = t.get(0) - if ok { - return item, err - } - } - return item, err - } - - t.update(item) - return item, nil -} - -func (t *timedValue[_]) ttl() time.Duration { - ttl := t.TTL - if ttl <= 0 { - ttl = time.Second - } - return ttl -} - -func (t *timedValue[I]) get(ttl time.Duration) (item I, ok bool) { - t.mu.RLock() - defer t.mu.RUnlock() - if t.valueSet { - item = t.value - if ttl <= 0 { - return item, true - } - if time.Since(t.lastUpdate) < ttl { - return item, true - } - } - return item, false -} - -func (t *timedValue[I]) update(item I) { - t.mu.Lock() - defer t.mu.Unlock() - t.value = item - t.valueSet = true - t.lastUpdate = time.Now() -} - // On MinIO a directory object is stored as a regular object with "__XLDIR__" suffix. // For ex. "prefix/" is stored as "prefix__XLDIR__" func encodeDirObject(object string) string { diff --git a/cmd/utils_test.go b/cmd/utils_test.go index 406319fbc..bdbf17f4d 100644 --- a/cmd/utils_test.go +++ b/cmd/utils_test.go @@ -26,7 +26,6 @@ import ( "reflect" "strings" "testing" - "time" ) // Tests maximum object size. @@ -400,29 +399,3 @@ func TestGetMinioMode(t *testing.T) { globalIsDistErasure, globalIsErasure = false, false testMinioMode(globalMinioModeFS) } - -func TestTimedValue(t *testing.T) { - cache := newTimedValue[time.Time]() - t.Parallel() - cache.Once.Do(func() { - cache.TTL = 2 * time.Second - cache.Update = func() (time.Time, error) { - return time.Now(), nil - } - }) - - t1, _ := cache.Get() - - t2, _ := cache.Get() - - if !t1.Equal(t2) { - t.Fatalf("expected time to be equal: %s != %s", t1, t2) - } - - time.Sleep(3 * time.Second) - t3, _ := cache.Get() - - if t1.Equal(t3) { - t.Fatalf("expected time to be un-equal: %s == %s", t1, t3) - } -} diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 6b18cb0de..d2a07aaf0 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -31,6 +31,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + "github.com/minio/minio/internal/cachevalue" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" ) @@ -89,7 +90,7 @@ type xlStorageDiskIDCheck struct { health *diskHealthTracker healthCheck bool - metricsCache *timedValue[DiskMetrics] + metricsCache *cachevalue.Cache[DiskMetrics] diskCtx context.Context diskCancel context.CancelFunc } @@ -179,7 +180,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis storage: storage, health: newDiskHealthTracker(), healthCheck: healthCheck && globalDriveMonitoring, - metricsCache: newTimedValue[DiskMetrics](), + metricsCache: cachevalue.New[DiskMetrics](), } xl.totalWrites.Store(xl.storage.getWriteAttribute()) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 7a1aa3815..f8b8f60bd 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -43,6 +43,7 @@ import ( "github.com/klauspost/filepathx" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/lifecycle" + "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/disk" xioutil "github.com/minio/minio/internal/ioutil" @@ -112,7 +113,7 @@ type xlStorage struct { formatLegacy bool formatLastCheck time.Time - diskInfoCache *timedValue[DiskInfo] + diskInfoCache *cachevalue.Cache[DiskInfo] sync.RWMutex formatData []byte @@ -233,7 +234,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { drivePath: ep.Path, endpoint: ep, globalSync: globalFSOSync, - diskInfoCache: newTimedValue[DiskInfo](), + diskInfoCache: cachevalue.New[DiskInfo](), poolIndex: -1, setIndex: -1, diskIndex: -1, diff --git a/internal/cachevalue/cache.go b/internal/cachevalue/cache.go new file mode 100644 index 000000000..718676b68 --- /dev/null +++ b/internal/cachevalue/cache.go @@ -0,0 +1,119 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cachevalue + +import ( + "sync" + "time" +) + +// Cache contains a synchronized value that is considered valid +// for a specific amount of time. +// An Update function must be set to provide an updated value when needed. +type Cache[I any] struct { + // Update must return an updated value. + // If an error is returned the cached value is not set. + // Only one caller will call this function at any time, others will be blocking. + // The returned value can no longer be modified once returned. + // Should be set before calling Get(). + Update func() (item I, err error) + + // TTL for a cached value. + // If not set 1 second TTL is assumed. + // Should be set before calling Get(). + TTL time.Duration + + // When set to true, return the last cached value + // even if updating the value errors out + Relax bool + + // Once can be used to initialize values for lazy initialization. + // Should be set before calling Get(). + Once sync.Once + + // Managed values. + value I // our cached value + valueSet bool // 'true' if the value 'I' has a value + lastUpdate time.Time // indicates when value 'I' was updated last, used for invalidation. + mu sync.RWMutex +} + +// New initializes a new cached value instance. +func New[I any]() *Cache[I] { + return &Cache[I]{} +} + +// Get will return a cached value or fetch a new one. +// If the Update function returns an error the value is forwarded as is and not cached. +func (t *Cache[I]) Get() (item I, err error) { + item, ok := t.get(t.ttl()) + if ok { + return item, nil + } + + item, err = t.Update() + if err != nil { + if t.Relax { + // if update fails, return current + // cached value along with error. + // + // Let the caller decide if they want + // to use the returned value based + // on error. + item, ok = t.get(0) + if ok { + return item, err + } + } + return item, err + } + + t.update(item) + return item, nil +} + +func (t *Cache[_]) ttl() time.Duration { + ttl := t.TTL + if ttl <= 0 { + ttl = time.Second + } + return ttl +} + +func (t *Cache[I]) get(ttl time.Duration) (item I, ok bool) { + t.mu.RLock() + defer t.mu.RUnlock() + if t.valueSet { + item = t.value + if ttl <= 0 { + return item, true + } + if time.Since(t.lastUpdate) < ttl { + return item, true + } + } + return item, false +} + +func (t *Cache[I]) update(item I) { + t.mu.Lock() + defer t.mu.Unlock() + t.value = item + t.valueSet = true + t.lastUpdate = time.Now() +} diff --git a/internal/cachevalue/cache_test.go b/internal/cachevalue/cache_test.go new file mode 100644 index 000000000..b4bdea0c8 --- /dev/null +++ b/internal/cachevalue/cache_test.go @@ -0,0 +1,68 @@ +// Copyright (c) 2015-2024 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cachevalue + +import ( + "testing" + "time" +) + +func TestCache(t *testing.T) { + cache := New[time.Time]() + t.Parallel() + cache.Once.Do(func() { + cache.TTL = 2 * time.Second + cache.Update = func() (time.Time, error) { + return time.Now(), nil + } + }) + + t1, _ := cache.Get() + + t2, _ := cache.Get() + + if !t1.Equal(t2) { + t.Fatalf("expected time to be equal: %s != %s", t1, t2) + } + + time.Sleep(3 * time.Second) + t3, _ := cache.Get() + + if t1.Equal(t3) { + t.Fatalf("expected time to be un-equal: %s == %s", t1, t3) + } +} + +func BenchmarkCache(b *testing.B) { + cache := New[time.Time]() + cache.Once.Do(func() { + cache.TTL = 1 * time.Microsecond + cache.Update = func() (time.Time, error) { + return time.Now(), nil + } + }) + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.Get() + } + }) +}