mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Export bucket usage counts as part of bucket metrics (#9710)
Bonus fixes in quota enforcement to use the new datastructure and use timedValue to cache a value/reload automatically avoids one less global variable.
This commit is contained in:
@@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/config"
|
||||
@@ -31,7 +30,9 @@ import (
|
||||
)
|
||||
|
||||
// BucketQuotaSys - map of bucket and quota configuration.
|
||||
type BucketQuotaSys struct{}
|
||||
type BucketQuotaSys struct {
|
||||
bucketStorageCache timedValue
|
||||
}
|
||||
|
||||
// Get - Get quota configuration.
|
||||
func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error) {
|
||||
@@ -63,40 +64,13 @@ func parseBucketQuota(bucket string, data []byte) (quotaCfg *madmin.BucketQuota,
|
||||
return
|
||||
}
|
||||
|
||||
type bucketStorageCache struct {
|
||||
bucketsSizes map[string]uint64
|
||||
lastUpdate time.Time
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (b *bucketStorageCache) check(ctx context.Context, q *madmin.BucketQuota, bucket string, size int64) error {
|
||||
if q.Quota == 0 {
|
||||
// No quota set return quickly.
|
||||
return nil
|
||||
func (sys *BucketQuotaSys) check(ctx context.Context, bucket string, size int64) error {
|
||||
objAPI := newObjectLayerWithoutSafeModeFn()
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if time.Since(b.lastUpdate) > 10*time.Second {
|
||||
dui, err := loadDataUsageFromBackend(ctx, newObjectLayerWithoutSafeModeFn())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.lastUpdate = time.Now()
|
||||
b.bucketsSizes = dui.BucketsSizes
|
||||
}
|
||||
currUsage := b.bucketsSizes[bucket]
|
||||
if (currUsage + uint64(size)) > q.Quota {
|
||||
return BucketQuotaExceeded{Bucket: bucket}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
|
||||
if size < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
q, err := globalBucketQuotaSys.Get(bucket)
|
||||
q, err := sys.Get(bucket)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -105,7 +79,45 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return globalBucketStorageCache.check(ctx, q, bucket, size)
|
||||
if q.Quota == 0 {
|
||||
// No quota set return quickly.
|
||||
return nil
|
||||
}
|
||||
|
||||
sys.bucketStorageCache.Once.Do(func() {
|
||||
sys.bucketStorageCache.TTL = 10 * time.Second
|
||||
sys.bucketStorageCache.Update = func() (interface{}, error) {
|
||||
return loadDataUsageFromBackend(ctx, objAPI)
|
||||
}
|
||||
})
|
||||
|
||||
v, err := sys.bucketStorageCache.Get()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dui := v.(DataUsageInfo)
|
||||
|
||||
bui, ok := dui.BucketsUsage[bucket]
|
||||
if !ok {
|
||||
// bucket not found, cannot enforce quota
|
||||
// call will fail anyways later.
|
||||
return nil
|
||||
}
|
||||
|
||||
if (bui.Size + uint64(size)) > q.Quota {
|
||||
return BucketQuotaExceeded{Bucket: bucket}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
|
||||
if size < 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return globalBucketQuotaSys.check(ctx, bucket, size)
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -139,29 +151,40 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error {
|
||||
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
|
||||
return nil
|
||||
}
|
||||
|
||||
buckets, err := objectAPI.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, binfo := range buckets {
|
||||
bucket := binfo.Name
|
||||
|
||||
bui, ok := dataUsageInfo.BucketsUsage[bucket]
|
||||
if !ok {
|
||||
// bucket doesn't exist anymore, or we
|
||||
// do not have any information to proceed.
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the current bucket has quota restrictions, if not skip it
|
||||
cfg, err := globalBucketQuotaSys.Get(bucket)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if cfg.Type != madmin.FIFOQuota {
|
||||
continue
|
||||
}
|
||||
|
||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var toFree uint64
|
||||
if dataUsageInfo.BucketsSizes[bucket] > cfg.Quota && cfg.Quota > 0 {
|
||||
toFree = dataUsageInfo.BucketsSizes[bucket] - cfg.Quota
|
||||
if bui.Size > cfg.Quota && cfg.Quota > 0 {
|
||||
toFree = bui.Size - cfg.Quota
|
||||
}
|
||||
|
||||
if toFree == 0 {
|
||||
@@ -175,6 +198,7 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error {
|
||||
if err := objectAPI.Walk(ctx, bucket, "", objInfoCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reuse the fileScorer used by disk cache to score entries by
|
||||
// ModTime to find the oldest objects in bucket to delete. In
|
||||
// the context of bucket quota enforcement - number of hits are
|
||||
|
||||
Reference in New Issue
Block a user