mirror of
https://github.com/minio/minio.git
synced 2025-01-23 20:53:18 -05:00
fix: allow diskInfo at storageRPC to be cached (#19112)
Bonus: convert timedValue into a typed implementation
This commit is contained in:
parent
ee158e1610
commit
2faba02d6b
@ -1204,7 +1204,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
||||
|
||||
// Rely on older value if usage loading fails from disk.
|
||||
bucketStorageCache.Relax = true
|
||||
bucketStorageCache.Update = func() (interface{}, error) {
|
||||
bucketStorageCache.Update = func() (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer done()
|
||||
|
||||
@ -1212,11 +1212,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
||||
}
|
||||
})
|
||||
|
||||
var dataUsageInfo DataUsageInfo
|
||||
v, _ := bucketStorageCache.Get()
|
||||
if v != nil {
|
||||
dataUsageInfo, _ = v.(DataUsageInfo)
|
||||
}
|
||||
dataUsageInfo, _ := bucketStorageCache.Get()
|
||||
|
||||
// If etcd, dns federation configured list buckets from etcd.
|
||||
var err error
|
||||
|
@ -42,7 +42,7 @@ func NewBucketQuotaSys() *BucketQuotaSys {
|
||||
return &BucketQuotaSys{}
|
||||
}
|
||||
|
||||
var bucketStorageCache timedValue
|
||||
var bucketStorageCache = newTimedValue[DataUsageInfo]()
|
||||
|
||||
// Init initialize bucket quota.
|
||||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
@ -52,7 +52,7 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
bucketStorageCache.TTL = 10 * time.Second
|
||||
// Rely on older value if usage loading fails from disk.
|
||||
bucketStorageCache.Relax = true
|
||||
bucketStorageCache.Update = func() (interface{}, error) {
|
||||
bucketStorageCache.Update = func() (DataUsageInfo, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer done()
|
||||
|
||||
@ -63,23 +63,23 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
|
||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) {
|
||||
v, err := bucketStorageCache.Get()
|
||||
dui, err := bucketStorageCache.Get()
|
||||
timedout := OperationTimedOut{}
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) {
|
||||
if v != nil {
|
||||
if len(dui.BucketsUsage) > 0 {
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("unable to retrieve usage information for bucket: %s, relying on older value cached in-memory: err(%v)", bucket, err), "bucket-usage-cache-"+bucket)
|
||||
} else {
|
||||
logger.LogOnceIf(GlobalContext, errors.New("unable to retrieve usage information for bucket: %s, no reliable usage value available - quota will not be enforced"), "bucket-usage-empty-"+bucket)
|
||||
}
|
||||
}
|
||||
|
||||
var bui BucketUsageInfo
|
||||
dui, ok := v.(DataUsageInfo)
|
||||
if len(dui.BucketsUsage) > 0 {
|
||||
bui, ok := dui.BucketsUsage[bucket]
|
||||
if ok {
|
||||
bui = dui.BucketsUsage[bucket]
|
||||
}
|
||||
|
||||
return bui, nil
|
||||
}
|
||||
}
|
||||
return BucketUsageInfo{}, nil
|
||||
}
|
||||
|
||||
// parseBucketQuota parses BucketQuota from json
|
||||
|
@ -62,7 +62,7 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan
|
||||
}
|
||||
}
|
||||
|
||||
var prefixUsageCache timedValue
|
||||
var prefixUsageCache = newTimedValue[map[string]uint64]()
|
||||
|
||||
// loadPrefixUsageFromBackend returns prefix usages found in passed buckets
|
||||
//
|
||||
@ -81,7 +81,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||
|
||||
// No need to fail upon Update() error, fallback to old value.
|
||||
prefixUsageCache.Relax = true
|
||||
prefixUsageCache.Update = func() (interface{}, error) {
|
||||
prefixUsageCache.Update = func() (map[string]uint64, error) {
|
||||
m := make(map[string]uint64)
|
||||
for _, pool := range z.serverPools {
|
||||
for _, er := range pool.sets {
|
||||
@ -109,12 +109,7 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||
}
|
||||
})
|
||||
|
||||
v, _ := prefixUsageCache.Get()
|
||||
if v != nil {
|
||||
return v.(map[string]uint64), nil
|
||||
}
|
||||
|
||||
return map[string]uint64{}, nil
|
||||
return prefixUsageCache.Get()
|
||||
}
|
||||
|
||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||
|
@ -1001,20 +1001,10 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
|
||||
}
|
||||
|
||||
object = encodeDirObject(object)
|
||||
|
||||
if z.SinglePool() {
|
||||
if !isMinioMetaBucketName(bucket) {
|
||||
avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), data.Size())
|
||||
if err != nil {
|
||||
logger.LogOnceIf(ctx, err, "erasure-write-quorum")
|
||||
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum)
|
||||
}
|
||||
if !avail {
|
||||
return ObjectInfo{}, toObjectErr(errDiskFull)
|
||||
}
|
||||
}
|
||||
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
|
||||
}
|
||||
|
||||
if !opts.NoLock {
|
||||
ns := z.NewNSLock(bucket, object)
|
||||
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
|
||||
@ -1586,16 +1576,6 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
|
||||
}
|
||||
|
||||
if z.SinglePool() {
|
||||
if !isMinioMetaBucketName(bucket) {
|
||||
avail, err := hasSpaceFor(getDiskInfos(ctx, z.serverPools[0].getHashedSet(object).getDisks()...), -1)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, toObjectErr(errErasureWriteQuorum)
|
||||
}
|
||||
if !avail {
|
||||
return nil, toObjectErr(errDiskFull)
|
||||
}
|
||||
}
|
||||
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
@ -1860,7 +1840,7 @@ func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix strin
|
||||
}
|
||||
}
|
||||
|
||||
var listBucketsCache timedValue
|
||||
var listBucketsCache = newTimedValue[[]BucketInfo]()
|
||||
|
||||
// List all buckets from one of the serverPools, we are not doing merge
|
||||
// sort here just for simplification. As per design it is assumed
|
||||
@ -1871,7 +1851,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
||||
listBucketsCache.TTL = time.Second
|
||||
|
||||
listBucketsCache.Relax = true
|
||||
listBucketsCache.Update = func() (interface{}, error) {
|
||||
listBucketsCache.Update = func() ([]BucketInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
cancel()
|
||||
@ -1888,12 +1868,7 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
|
||||
}
|
||||
})
|
||||
|
||||
v, _ := listBucketsCache.Get()
|
||||
if v != nil {
|
||||
return v.([]BucketInfo), nil
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
return listBucketsCache.Get()
|
||||
}
|
||||
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
|
@ -330,7 +330,7 @@ type Metric struct {
|
||||
|
||||
// MetricsGroup are a group of metrics that are initialized together.
|
||||
type MetricsGroup struct {
|
||||
metricsCache timedValue `msg:"-"`
|
||||
metricsCache *timedValue[[]Metric] `msg:"-"`
|
||||
cacheInterval time.Duration
|
||||
metricsGroupOpts MetricsGroupOpts
|
||||
}
|
||||
@ -354,10 +354,11 @@ type MetricsGroupOpts struct {
|
||||
// RegisterRead register the metrics populator function to be used
|
||||
// to populate new values upon cache invalidation.
|
||||
func (g *MetricsGroup) RegisterRead(read func(ctx context.Context) []Metric) {
|
||||
g.metricsCache = newTimedValue[[]Metric]()
|
||||
g.metricsCache.Once.Do(func() {
|
||||
g.metricsCache.Relax = true
|
||||
g.metricsCache.TTL = g.cacheInterval
|
||||
g.metricsCache.Update = func() (interface{}, error) {
|
||||
g.metricsCache.Update = func() ([]Metric, error) {
|
||||
if g.metricsGroupOpts.dependGlobalObjectAPI {
|
||||
objLayer := newObjectLayerFn()
|
||||
// Service not initialized yet
|
||||
@ -445,9 +446,8 @@ func (m *Metric) clone() Metric {
|
||||
// once the TTL expires "read()" registered function is called
|
||||
// to return the new values and updated.
|
||||
func (g *MetricsGroup) Get() (metrics []Metric) {
|
||||
c, _ := g.metricsCache.Get()
|
||||
m, ok := c.([]Metric)
|
||||
if !ok {
|
||||
m, _ := g.metricsCache.Get()
|
||||
if len(m) == 0 {
|
||||
return []Metric{}
|
||||
}
|
||||
|
||||
|
@ -166,6 +166,8 @@ type storageRESTClient struct {
|
||||
formatData []byte
|
||||
formatMutex sync.RWMutex
|
||||
|
||||
diskInfoCache *timedValue[DiskInfo]
|
||||
|
||||
// Indexes, will be -1 until assigned a set.
|
||||
poolIndex, setIndex, diskIndex int
|
||||
}
|
||||
@ -306,6 +308,9 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||
// transport is already down.
|
||||
return info, errDiskNotFound
|
||||
}
|
||||
|
||||
// if metrics was asked, or it was a NoOp we do not need to cache the value.
|
||||
if opts.Metrics || opts.NoOp {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@ -321,6 +326,30 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context, opts DiskInfoOpti
|
||||
}
|
||||
info.Scanning = atomic.LoadInt32(&client.scanning) == 1
|
||||
return info, nil
|
||||
} // In all other cases cache the value upto 1sec.
|
||||
|
||||
client.diskInfoCache.Once.Do(func() {
|
||||
client.diskInfoCache.TTL = time.Second
|
||||
client.diskInfoCache.Update = func() (info DiskInfo, err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
nopts := DiskInfoOptions{DiskID: client.diskID}
|
||||
infop, err := storageDiskInfoRPC.Call(ctx, client.gridConn, &nopts)
|
||||
if err != nil {
|
||||
return info, toStorageErr(err)
|
||||
}
|
||||
info = *infop
|
||||
if info.Error != "" {
|
||||
return info, toStorageErr(errors.New(info.Error))
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
})
|
||||
|
||||
info, err = client.diskInfoCache.Get()
|
||||
info.Scanning = atomic.LoadInt32(&client.scanning) == 1
|
||||
return info, err
|
||||
}
|
||||
|
||||
// MakeVolBulk - create multiple volumes in a bulk operation.
|
||||
@ -864,5 +893,6 @@ func newStorageRESTClient(endpoint Endpoint, healthCheck bool, gm *grid.Manager)
|
||||
return &storageRESTClient{
|
||||
endpoint: endpoint, restClient: restClient, poolIndex: -1, setIndex: -1, diskIndex: -1,
|
||||
gridConn: conn,
|
||||
diskInfoCache: newTimedValue[DiskInfo](),
|
||||
}, nil
|
||||
}
|
||||
|
53
cmd/utils.go
53
cmd/utils.go
@ -929,13 +929,13 @@ func iamPolicyClaimNameSA() string {
|
||||
// timedValue contains a synchronized value that is considered valid
|
||||
// for a specific amount of time.
|
||||
// An Update function must be set to provide an updated value when needed.
|
||||
type timedValue struct {
|
||||
type timedValue[I any] struct {
|
||||
// Update must return an updated value.
|
||||
// If an error is returned the cached value is not set.
|
||||
// Only one caller will call this function at any time, others will be blocking.
|
||||
// The returned value can no longer be modified once returned.
|
||||
// Should be set before calling Get().
|
||||
Update func() (interface{}, error)
|
||||
Update func() (item I, err error)
|
||||
|
||||
// TTL for a cached value.
|
||||
// If not set 1 second TTL is assumed.
|
||||
@ -951,20 +951,26 @@ type timedValue struct {
|
||||
Once sync.Once
|
||||
|
||||
// Managed values.
|
||||
value interface{}
|
||||
value I
|
||||
valueSet bool
|
||||
lastUpdate time.Time
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// newTimedValue
|
||||
func newTimedValue[I any]() *timedValue[I] {
|
||||
return &timedValue[I]{}
|
||||
}
|
||||
|
||||
// Get will return a cached value or fetch a new one.
|
||||
// If the Update function returns an error the value is forwarded as is and not cached.
|
||||
func (t *timedValue) Get() (interface{}, error) {
|
||||
v := t.get(t.ttl())
|
||||
if v != nil {
|
||||
return v, nil
|
||||
func (t *timedValue[I]) Get() (item I, err error) {
|
||||
item, ok := t.get(t.ttl())
|
||||
if ok {
|
||||
return item, nil
|
||||
}
|
||||
|
||||
v, err := t.Update()
|
||||
item, err = t.Update()
|
||||
if err != nil {
|
||||
if t.Relax {
|
||||
// if update fails, return current
|
||||
@ -973,17 +979,19 @@ func (t *timedValue) Get() (interface{}, error) {
|
||||
// Let the caller decide if they want
|
||||
// to use the returned value based
|
||||
// on error.
|
||||
v = t.get(0)
|
||||
return v, err
|
||||
item, ok = t.get(0)
|
||||
if ok {
|
||||
return item, err
|
||||
}
|
||||
return v, err
|
||||
}
|
||||
return item, err
|
||||
}
|
||||
|
||||
t.update(v)
|
||||
return v, nil
|
||||
t.update(item)
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func (t *timedValue) ttl() time.Duration {
|
||||
func (t *timedValue[_]) ttl() time.Duration {
|
||||
ttl := t.TTL
|
||||
if ttl <= 0 {
|
||||
ttl = time.Second
|
||||
@ -991,23 +999,26 @@ func (t *timedValue) ttl() time.Duration {
|
||||
return ttl
|
||||
}
|
||||
|
||||
func (t *timedValue) get(ttl time.Duration) (v interface{}) {
|
||||
func (t *timedValue[I]) get(ttl time.Duration) (item I, ok bool) {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
v = t.value
|
||||
if t.valueSet {
|
||||
item = t.value
|
||||
if ttl <= 0 {
|
||||
return v
|
||||
return item, true
|
||||
}
|
||||
if time.Since(t.lastUpdate) < ttl {
|
||||
return v
|
||||
return item, true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return item, false
|
||||
}
|
||||
|
||||
func (t *timedValue) update(v interface{}) {
|
||||
func (t *timedValue[I]) update(item I) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
t.value = v
|
||||
t.value = item
|
||||
t.valueSet = true
|
||||
t.lastUpdate = time.Now()
|
||||
}
|
||||
|
||||
|
@ -402,28 +402,25 @@ func TestGetMinioMode(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTimedValue(t *testing.T) {
|
||||
var cache timedValue
|
||||
cache := newTimedValue[time.Time]()
|
||||
t.Parallel()
|
||||
cache.Once.Do(func() {
|
||||
cache.TTL = 2 * time.Second
|
||||
cache.Update = func() (interface{}, error) {
|
||||
cache.Update = func() (time.Time, error) {
|
||||
return time.Now(), nil
|
||||
}
|
||||
})
|
||||
|
||||
i, _ := cache.Get()
|
||||
t1 := i.(time.Time)
|
||||
t1, _ := cache.Get()
|
||||
|
||||
j, _ := cache.Get()
|
||||
t2 := j.(time.Time)
|
||||
t2, _ := cache.Get()
|
||||
|
||||
if !t1.Equal(t2) {
|
||||
t.Fatalf("expected time to be equal: %s != %s", t1, t2)
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
k, _ := cache.Get()
|
||||
t3 := k.(time.Time)
|
||||
t3, _ := cache.Get()
|
||||
|
||||
if t1.Equal(t3) {
|
||||
t.Fatalf("expected time to be un-equal: %s == %s", t1, t3)
|
||||
|
@ -89,7 +89,7 @@ type xlStorageDiskIDCheck struct {
|
||||
health *diskHealthTracker
|
||||
healthCheck bool
|
||||
|
||||
metricsCache timedValue
|
||||
metricsCache *timedValue[DiskMetrics]
|
||||
diskCtx context.Context
|
||||
diskCancel context.CancelFunc
|
||||
}
|
||||
@ -97,7 +97,7 @@ type xlStorageDiskIDCheck struct {
|
||||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||
p.metricsCache.Once.Do(func() {
|
||||
p.metricsCache.TTL = 5 * time.Second
|
||||
p.metricsCache.Update = func() (interface{}, error) {
|
||||
p.metricsCache.Update = func() (DiskMetrics, error) {
|
||||
diskMetric := DiskMetrics{
|
||||
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
|
||||
APICalls: make(map[string]uint64, len(p.apiCalls)),
|
||||
@ -111,12 +111,8 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||
return diskMetric, nil
|
||||
}
|
||||
})
|
||||
m, _ := p.metricsCache.Get()
|
||||
diskMetric := DiskMetrics{}
|
||||
if m != nil {
|
||||
diskMetric = m.(DiskMetrics)
|
||||
}
|
||||
|
||||
diskMetric, _ := p.metricsCache.Get()
|
||||
// Do not need this value to be cached.
|
||||
diskMetric.TotalErrorsTimeout = p.totalErrsTimeout.Load()
|
||||
diskMetric.TotalErrorsAvailability = p.totalErrsAvailability.Load()
|
||||
@ -183,6 +179,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
|
||||
storage: storage,
|
||||
health: newDiskHealthTracker(),
|
||||
healthCheck: healthCheck && globalDriveMonitoring,
|
||||
metricsCache: newTimedValue[DiskMetrics](),
|
||||
}
|
||||
|
||||
xl.totalWrites.Store(xl.storage.getWriteAttribute())
|
||||
|
@ -112,7 +112,7 @@ type xlStorage struct {
|
||||
formatLegacy bool
|
||||
formatLastCheck time.Time
|
||||
|
||||
diskInfoCache timedValue
|
||||
diskInfoCache *timedValue[DiskInfo]
|
||||
sync.RWMutex
|
||||
|
||||
formatData []byte
|
||||
@ -233,6 +233,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
|
||||
drivePath: ep.Path,
|
||||
endpoint: ep,
|
||||
globalSync: globalFSOSync,
|
||||
diskInfoCache: newTimedValue[DiskInfo](),
|
||||
poolIndex: -1,
|
||||
setIndex: -1,
|
||||
diskIndex: -1,
|
||||
@ -732,7 +733,7 @@ func (s *xlStorage) setWriteAttribute(writeCount uint64) error {
|
||||
func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInfo, err error) {
|
||||
s.diskInfoCache.Once.Do(func() {
|
||||
s.diskInfoCache.TTL = time.Second
|
||||
s.diskInfoCache.Update = func() (interface{}, error) {
|
||||
s.diskInfoCache.Update = func() (DiskInfo, error) {
|
||||
dcinfo := DiskInfo{}
|
||||
di, err := getDiskInfo(s.drivePath)
|
||||
if err != nil {
|
||||
@ -758,11 +759,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ DiskInfoOptions) (info DiskInf
|
||||
}
|
||||
})
|
||||
|
||||
v, err := s.diskInfoCache.Get()
|
||||
if v != nil {
|
||||
info = v.(DiskInfo)
|
||||
}
|
||||
|
||||
info, err = s.diskInfoCache.Get()
|
||||
info.MountPath = s.drivePath
|
||||
info.Endpoint = s.endpoint.String()
|
||||
info.Scanning = atomic.LoadInt32(&s.scanning) == 1
|
||||
|
Loading…
x
Reference in New Issue
Block a user