simplify further bucket configuration properly (#9650)

This PR is a continuation from #9586, now the
entire parsing logic is fully merged into
bucket metadata sub-system, simplify the
quota API further by reducing the remove
quota handler implementation.
This commit is contained in:
Harshavardhana
2020-05-20 10:18:15 -07:00
committed by GitHub
parent 0cc2ed04f5
commit 6656fa3066
22 changed files with 372 additions and 594 deletions

View File

@@ -19,7 +19,6 @@ package cmd
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
@@ -32,95 +31,34 @@ import (
)
// BucketQuotaSys - map of bucket and quota configuration.
type BucketQuotaSys struct {
quotaMap map[string]madmin.BucketQuota
}
type BucketQuotaSys struct{}
// Get - Get quota configuration.
func (sys *BucketQuotaSys) Get(bucketName string) (q madmin.BucketQuota, err error) {
func (sys *BucketQuotaSys) Get(bucketName string) (*madmin.BucketQuota, error) {
if globalIsGateway {
objAPI := newObjectLayerFn()
if objAPI == nil {
return q, errServerNotInitialized
return nil, errServerNotInitialized
}
return q, BucketQuotaConfigNotFound{Bucket: bucketName}
return &madmin.BucketQuota{}, nil
}
q, ok := sys.quotaMap[bucketName]
if !ok {
configData, err := globalBucketMetadataSys.GetConfig(bucketName, bucketQuotaConfigFile)
if err != nil {
if errors.Is(err, errConfigNotFound) {
return q, BucketQuotaConfigNotFound{Bucket: bucketName}
}
return q, err
}
return parseBucketQuota(bucketName, configData)
}
return q, nil
}
// Buckets - list of buckets with quota configuration
func (sys *BucketQuotaSys) Buckets() []string {
var buckets []string
for k := range sys.quotaMap {
buckets = append(buckets, k)
}
return buckets
}
// Init initialize bucket quota sys configuration with all buckets.
func (sys *BucketQuotaSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
if objAPI == nil {
return errServerNotInitialized
}
// In gateway mode, we do not support bucket quota.
// So, this is a no-op for gateway servers.
if globalIsGateway {
return nil
}
return sys.load(buckets, objAPI)
}
func (sys *BucketQuotaSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
for _, bucket := range buckets {
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
configData, err := globalBucketMetadataSys.GetConfig(bucket.Name, bucketQuotaConfigFile)
if err != nil {
if errors.Is(err, errConfigNotFound) {
continue
}
return err
}
quotaCfg, err := parseBucketQuota(bucket.Name, configData)
if err != nil {
if _, ok := err.(BucketQuotaConfigNotFound); !ok {
logger.LogIf(ctx, err)
}
continue
}
sys.quotaMap[bucket.Name] = quotaCfg
}
return nil
return globalBucketMetadataSys.GetQuotaConfig(bucketName)
}
// NewBucketQuotaSys returns initialized BucketQuotaSys
func NewBucketQuotaSys() *BucketQuotaSys {
return &BucketQuotaSys{quotaMap: map[string]madmin.BucketQuota{}}
return &BucketQuotaSys{}
}
// parseBucketQuota parses BucketQuota from json
func parseBucketQuota(bucket string, data []byte) (quotaCfg madmin.BucketQuota, err error) {
if len(data) == 0 {
return quotaCfg, BucketQuotaConfigNotFound{Bucket: bucket}
}
if err = json.Unmarshal(data, &quotaCfg); err != nil {
func parseBucketQuota(bucket string, data []byte) (quotaCfg *madmin.BucketQuota, err error) {
quotaCfg = &madmin.BucketQuota{}
if err = json.Unmarshal(data, quotaCfg); err != nil {
return quotaCfg, err
}
if !quotaCfg.Type.IsValid() {
return quotaCfg, fmt.Errorf("Invalid quota type %s", quotaCfg.Type)
if !quotaCfg.IsValid() {
return quotaCfg, fmt.Errorf("Invalid quota config %#v", quotaCfg)
}
return
}
@@ -131,7 +69,12 @@ type bucketStorageCache struct {
mu sync.Mutex
}
func (b *bucketStorageCache) check(ctx context.Context, q madmin.BucketQuota, bucket string, size int64) error {
func (b *bucketStorageCache) check(ctx context.Context, q *madmin.BucketQuota, bucket string, size int64) error {
if q.Quota == 0 {
// No quota set return quickly.
return nil
}
b.mu.Lock()
defer b.mu.Unlock()
if time.Since(b.lastUpdate) > 10*time.Second {
@@ -143,7 +86,7 @@ func (b *bucketStorageCache) check(ctx context.Context, q madmin.BucketQuota, bu
b.bucketsSizes = dui.BucketsSizes
}
currUsage := b.bucketsSizes[bucket]
if (currUsage+uint64(size)) > q.Quota && q.Quota > 0 {
if (currUsage + uint64(size)) > q.Quota {
return BucketQuotaExceeded{Bucket: bucket}
}
return nil
@@ -155,15 +98,13 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error {
q, err := globalBucketQuotaSys.Get(bucket)
if err != nil {
if _, ok := err.(BucketQuotaConfigNotFound); !ok {
return err
}
return nil
}
if q.Type == madmin.FIFOQuota {
return nil
}
return globalBucketStorageCache.check(ctx, q, bucket, size)
}
@@ -198,7 +139,12 @@ func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) error {
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
return nil
}
for _, bucket := range globalBucketQuotaSys.Buckets() {
buckets, err := objectAPI.ListBuckets(ctx)
if err != nil {
return err
}
for _, binfo := range buckets {
bucket := binfo.Name
// Check if the current bucket has quota restrictions, if not skip it
cfg, err := globalBucketQuotaSys.Get(bucket)
if err != nil {