mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
cache usage, prefix-usage, and buckets for AccountInfo up to 10 secs (#18051)
AccountInfo is quite frequently called by the Console UI login attempts, when many users are logging in it is important that we provide them with better responsiveness. - ListBuckets information is cached every second - Bucket usage info is cached for up to 10 seconds - Prefix usage (optional) info is cached for up to 10 secs Failure to update after cache expiration, would still allow login which would end up providing information previously cached. This allows for seamless responsiveness for the Console UI logins, and overall responsiveness on a heavily loaded system.
This commit is contained in:
parent
8c4561b8da
commit
c3d70e0795
@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
||||
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -1192,7 +1193,7 @@ func (a adminAPIHandlers) DeleteServiceAccount(w http.ResponseWriter, r *http.Re
|
||||
writeSuccessNoContent(w)
|
||||
}
|
||||
|
||||
// AccountInfoHandler returns usage
|
||||
// AccountInfoHandler returns usage, permissions and other bucket metadata for incoming us
|
||||
func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
@ -1261,12 +1262,30 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
||||
return rd, wr
|
||||
}
|
||||
|
||||
// Load the latest calculated data usage
|
||||
dataUsageInfo, _ := loadDataUsageFromBackend(ctx, objectAPI)
|
||||
bucketStorageCache.Once.Do(func() {
|
||||
// Set this to 10 secs since its enough, as scanner
|
||||
// does not update the bucket usage values frequently.
|
||||
bucketStorageCache.TTL = 10 * time.Second
|
||||
|
||||
// Rely on older value if usage loading fails from disk.
|
||||
bucketStorageCache.Relax = true
|
||||
bucketStorageCache.Update = func() (interface{}, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer done()
|
||||
|
||||
return loadDataUsageFromBackend(ctx, objectAPI)
|
||||
}
|
||||
})
|
||||
|
||||
var dataUsageInfo DataUsageInfo
|
||||
v, _ := bucketStorageCache.Get()
|
||||
if v != nil {
|
||||
dataUsageInfo, _ = v.(DataUsageInfo)
|
||||
}
|
||||
|
||||
// If etcd, dns federation configured list buckets from etcd.
|
||||
var buckets []BucketInfo
|
||||
var err error
|
||||
var buckets []BucketInfo
|
||||
if globalDNSConfig != nil && globalBucketFederation {
|
||||
dnsBuckets, err := globalDNSConfig.List()
|
||||
if err != nil && !IsErrIgnored(err,
|
||||
@ -1285,7 +1304,7 @@ func (a adminAPIHandlers) AccountInfoHandler(w http.ResponseWriter, r *http.Requ
|
||||
return buckets[i].Name < buckets[j].Name
|
||||
})
|
||||
} else {
|
||||
buckets, err = objectAPI.ListBuckets(ctx, BucketOptions{})
|
||||
buckets, err = objectAPI.ListBuckets(ctx, BucketOptions{Cached: true})
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
|
@ -29,9 +29,7 @@ import (
|
||||
)
|
||||
|
||||
// BucketQuotaSys - map of bucket and quota configuration.
|
||||
type BucketQuotaSys struct {
|
||||
bucketStorageCache timedValue
|
||||
}
|
||||
type BucketQuotaSys struct{}
|
||||
|
||||
// Get - Get quota configuration.
|
||||
func (sys *BucketQuotaSys) Get(ctx context.Context, bucketName string) (*madmin.BucketQuota, error) {
|
||||
@ -44,16 +42,18 @@ func NewBucketQuotaSys() *BucketQuotaSys {
|
||||
return &BucketQuotaSys{}
|
||||
}
|
||||
|
||||
var bucketStorageCache timedValue
|
||||
|
||||
// Init initialize bucket quota.
|
||||
func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
sys.bucketStorageCache.Once.Do(func() {
|
||||
bucketStorageCache.Once.Do(func() {
|
||||
// Set this to 10 secs since its enough, as scanner
|
||||
// does not update the bucket usage values frequently.
|
||||
sys.bucketStorageCache.TTL = 10 * time.Second
|
||||
bucketStorageCache.TTL = 10 * time.Second
|
||||
// Rely on older value if usage loading fails from disk.
|
||||
sys.bucketStorageCache.Relax = true
|
||||
sys.bucketStorageCache.Update = func() (interface{}, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
bucketStorageCache.Relax = true
|
||||
bucketStorageCache.Update = func() (interface{}, error) {
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer done()
|
||||
|
||||
return loadDataUsageFromBackend(ctx, objAPI)
|
||||
@ -63,16 +63,17 @@ func (sys *BucketQuotaSys) Init(objAPI ObjectLayer) {
|
||||
|
||||
// GetBucketUsageInfo return bucket usage info for a given bucket
|
||||
func (sys *BucketQuotaSys) GetBucketUsageInfo(bucket string) (BucketUsageInfo, error) {
|
||||
v, err := sys.bucketStorageCache.Get()
|
||||
if err != nil && v != nil {
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("unable to retrieve usage information for bucket: %s, relying on older value cached in-memory: err(%v)", bucket, err), "bucket-usage-cache-"+bucket)
|
||||
}
|
||||
if v == nil {
|
||||
logger.LogOnceIf(GlobalContext, errors.New("unable to retrieve usage information for bucket: %s, no reliable usage value available - quota will not be enforced"), "bucket-usage-empty-"+bucket)
|
||||
v, err := bucketStorageCache.Get()
|
||||
timedout := OperationTimedOut{}
|
||||
if err != nil && !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &timedout) {
|
||||
if v != nil {
|
||||
logger.LogOnceIf(GlobalContext, fmt.Errorf("unable to retrieve usage information for bucket: %s, relying on older value cached in-memory: err(%v)", bucket, err), "bucket-usage-cache-"+bucket)
|
||||
} else {
|
||||
logger.LogOnceIf(GlobalContext, errors.New("unable to retrieve usage information for bucket: %s, no reliable usage value available - quota will not be enforced"), "bucket-usage-empty-"+bucket)
|
||||
}
|
||||
}
|
||||
|
||||
var bui BucketUsageInfo
|
||||
|
||||
dui, ok := v.(DataUsageInfo)
|
||||
if ok {
|
||||
bui = dui.BucketsUsage[bucket]
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
@ -61,6 +62,8 @@ func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dui <-chan
|
||||
}
|
||||
}
|
||||
|
||||
var prefixUsageCache timedValue
|
||||
|
||||
// loadPrefixUsageFromBackend returns prefix usages found in passed buckets
|
||||
//
|
||||
// e.g.: /testbucket/prefix => 355601334
|
||||
@ -73,28 +76,45 @@ func loadPrefixUsageFromBackend(ctx context.Context, objAPI ObjectLayer, bucket
|
||||
|
||||
cache := dataUsageCache{}
|
||||
|
||||
m := make(map[string]uint64)
|
||||
for _, pool := range z.serverPools {
|
||||
for _, er := range pool.sets {
|
||||
// Load bucket usage prefixes
|
||||
if err := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName); err == nil {
|
||||
root := cache.find(bucket)
|
||||
if root == nil {
|
||||
// We dont have usage information for this bucket in this
|
||||
// set, go to the next set
|
||||
continue
|
||||
}
|
||||
prefixUsageCache.Once.Do(func() {
|
||||
prefixUsageCache.TTL = 30 * time.Second
|
||||
|
||||
for id, usageInfo := range cache.flattenChildrens(*root) {
|
||||
prefix := decodeDirObject(strings.TrimPrefix(id, bucket+slashSeparator))
|
||||
// decodeDirObject to avoid any __XLDIR__ objects
|
||||
m[prefix] += uint64(usageInfo.Size)
|
||||
// No need to fail upon Update() error, fallback to old value.
|
||||
prefixUsageCache.Relax = true
|
||||
prefixUsageCache.Update = func() (interface{}, error) {
|
||||
m := make(map[string]uint64)
|
||||
for _, pool := range z.serverPools {
|
||||
for _, er := range pool.sets {
|
||||
// Load bucket usage prefixes
|
||||
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
ok := cache.load(ctx, er, bucket+slashSeparator+dataUsageCacheName) == nil
|
||||
done()
|
||||
if ok {
|
||||
root := cache.find(bucket)
|
||||
if root == nil {
|
||||
// We dont have usage information for this bucket in this
|
||||
// set, go to the next set
|
||||
continue
|
||||
}
|
||||
|
||||
for id, usageInfo := range cache.flattenChildrens(*root) {
|
||||
prefix := decodeDirObject(strings.TrimPrefix(id, bucket+slashSeparator))
|
||||
// decodeDirObject to avoid any __XLDIR__ objects
|
||||
m[prefix] += uint64(usageInfo.Size)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
})
|
||||
|
||||
v, _ := prefixUsageCache.Get()
|
||||
if v != nil {
|
||||
return v.(map[string]uint64), nil
|
||||
}
|
||||
|
||||
return m, nil
|
||||
return map[string]uint64{}, nil
|
||||
}
|
||||
|
||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||
|
@ -1716,10 +1716,42 @@ func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix strin
|
||||
}
|
||||
}
|
||||
|
||||
var listBucketsCache timedValue
|
||||
|
||||
// List all buckets from one of the serverPools, we are not doing merge
|
||||
// sort here just for simplification. As per design it is assumed
|
||||
// that all buckets are present on all serverPools.
|
||||
func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
|
||||
if opts.Cached {
|
||||
listBucketsCache.Once.Do(func() {
|
||||
listBucketsCache.TTL = time.Second
|
||||
|
||||
listBucketsCache.Relax = true
|
||||
listBucketsCache.Update = func() (interface{}, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range buckets {
|
||||
createdAt, err := globalBucketMetadataSys.CreatedAt(buckets[i].Name)
|
||||
if err == nil {
|
||||
buckets[i].Created = createdAt
|
||||
}
|
||||
}
|
||||
return buckets, nil
|
||||
}
|
||||
})
|
||||
|
||||
v, _ := listBucketsCache.Get()
|
||||
if v != nil {
|
||||
return v.([]BucketInfo), nil
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
buckets, err = z.s3Peer.ListBuckets(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -146,6 +146,7 @@ type DeleteBucketOptions struct {
|
||||
// BucketOptions provides options for ListBuckets and GetBucketInfo call.
|
||||
type BucketOptions struct {
|
||||
Deleted bool // true only when site replication is enabled
|
||||
Cached bool // true only when we are requesting a cached response instead of hitting the disk for example ListBuckets() call.
|
||||
}
|
||||
|
||||
// SetReplicaStatus sets replica status and timestamp for delete operations in ObjectOptions
|
||||
|
@ -611,6 +611,8 @@ func (s *TestSuiteIAM) TestSTSForRoot(c *check) {
|
||||
}
|
||||
userAdmClient.SetCustomTransport(s.TestSuiteCommon.client.Transport)
|
||||
|
||||
time.Sleep(2 * time.Second) // wait for listbuckets cache to be invalidated
|
||||
|
||||
accInfo, err := userAdmClient.AccountInfo(ctx, madmin.AccountOpts{})
|
||||
if err != nil {
|
||||
c.Fatalf("root user STS should be able to get account info: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user