diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 6ce4e0a14..30ed713cd 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -22,9 +22,7 @@ import ( "fmt" "time" - "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" - "github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/madmin" ) @@ -120,149 +118,112 @@ func enforceBucketQuota(ctx context.Context, bucket string, size int64) error { return globalBucketQuotaSys.check(ctx, bucket, size) } -const ( - bgQuotaInterval = 1 * time.Hour -) - -// initQuotaEnforcement starts the routine that deletes objects in bucket -// that exceeds the FIFO quota -func initQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { - if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { - go startBucketQuotaEnforcement(ctx, objAPI) - } -} - -func startBucketQuotaEnforcement(ctx context.Context, objAPI ObjectLayer) { - for { - select { - case <-ctx.Done(): - return - case <-time.NewTimer(bgQuotaInterval).C: - enforceFIFOQuota(ctx, objAPI) - } - - } -} - // enforceFIFOQuota deletes objects in FIFO order until sufficient objects -// have been deleted so as to bring bucket usage within quota -func enforceFIFOQuota(ctx context.Context, objectAPI ObjectLayer) { - // Turn off quota enforcement if data usage info is unavailable. - if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { +// have been deleted so as to bring bucket usage within quota. +func enforceFIFOQuotaBucket(ctx context.Context, objectAPI ObjectLayer, bucket string, bui BucketUsageInfo) { + // Check if the current bucket has quota restrictions, if not skip it + cfg, err := globalBucketQuotaSys.Get(bucket) + if err != nil { return } - buckets, err := objectAPI.ListBuckets(ctx) + if cfg.Type != madmin.FIFOQuota { + return + } + + var toFree uint64 + if bui.Size > cfg.Quota && cfg.Quota > 0 { + toFree = bui.Size - cfg.Quota + } + + if toFree <= 0 { + return + } + + // Allocate new results channel to receive ObjectInfo. + objInfoCh := make(chan ObjectInfo) + + versioned := globalBucketVersioningSys.Enabled(bucket) + + // Walk through all objects + if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil { + logger.LogIf(ctx, err) + return + } + + // reuse the fileScorer used by disk cache to score entries by + // ModTime to find the oldest objects in bucket to delete. In + // the context of bucket quota enforcement - number of hits are + // irrelevant. + scorer, err := newFileScorer(toFree, time.Now().Unix(), 1) if err != nil { logger.LogIf(ctx, err) return } - dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI) - if err != nil { - logger.LogIf(ctx, err) - return - } - - for _, binfo := range buckets { - bucket := binfo.Name - - bui, ok := dataUsageInfo.BucketsUsage[bucket] - if !ok { - // bucket doesn't exist anymore, or we - // do not have any information to proceed. - continue - } - - // Check if the current bucket has quota restrictions, if not skip it - cfg, err := globalBucketQuotaSys.Get(bucket) - if err != nil { - continue - } - - if cfg.Type != madmin.FIFOQuota { - continue - } - - var toFree uint64 - if bui.Size > cfg.Quota && cfg.Quota > 0 { - toFree = bui.Size - cfg.Quota - } - - if toFree == 0 { - continue - } - - // Allocate new results channel to receive ObjectInfo. - objInfoCh := make(chan ObjectInfo) - - versioned := globalBucketVersioningSys.Enabled(bucket) - - // Walk through all objects - if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkVersions: versioned}); err != nil { - logger.LogIf(ctx, err) - continue - } - - // reuse the fileScorer used by disk cache to score entries by - // ModTime to find the oldest objects in bucket to delete. In - // the context of bucket quota enforcement - number of hits are - // irrelevant. - scorer, err := newFileScorer(toFree, time.Now().Unix(), 1) - if err != nil { - logger.LogIf(ctx, err) - continue - } - - rcfg, _ := globalBucketObjectLockSys.Get(bucket) - for obj := range objInfoCh { - if obj.DeleteMarker { - // Delete markers are automatically added for FIFO purge. - scorer.addFileWithObjInfo(obj, 1) - continue - } - // skip objects currently under retention - if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { - continue - } + rcfg, _ := globalBucketObjectLockSys.Get(bucket) + for obj := range objInfoCh { + if obj.DeleteMarker { + // Delete markers are automatically added for FIFO purge. scorer.addFileWithObjInfo(obj, 1) + continue + } + // skip objects currently under retention + if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { + continue + } + scorer.addFileWithObjInfo(obj, 1) + } + + // If we saw less than quota we are good. + if scorer.seenBytes <= cfg.Quota { + return + } + // Calculate how much we want to delete now. + toFreeNow := scorer.seenBytes - cfg.Quota + // We were less over quota than we thought. Adjust so we delete less. + // If we are more over, leave it for the next run to pick up. + if toFreeNow < toFree { + if !scorer.adjustSaveBytes(int64(toFreeNow) - int64(toFree)) { + // We got below or at quota. + return + } + } + + var objects []ObjectToDelete + numKeys := len(scorer.fileObjInfos()) + for i, obj := range scorer.fileObjInfos() { + objects = append(objects, ObjectToDelete{ + ObjectName: obj.Name, + VersionID: obj.VersionID, + }) + if len(objects) < maxDeleteList && (i < numKeys-1) { + // skip deletion until maxDeleteList or end of slice + continue } - var objects []ObjectToDelete - numKeys := len(scorer.fileObjInfos()) - for i, obj := range scorer.fileObjInfos() { - objects = append(objects, ObjectToDelete{ - ObjectName: obj.Name, - VersionID: obj.VersionID, - }) - if len(objects) < maxDeleteList && (i < numKeys-1) { - // skip deletion until maxDeleteList or end of slice + if len(objects) == 0 { + break + } + + // Deletes a list of objects. + _, deleteErrs := objectAPI.DeleteObjects(ctx, bucket, objects, ObjectOptions{ + Versioned: versioned, + }) + for i := range deleteErrs { + if deleteErrs[i] != nil { + logger.LogIf(ctx, deleteErrs[i]) continue } - if len(objects) == 0 { - break - } - - // Deletes a list of objects. - _, deleteErrs := objectAPI.DeleteObjects(ctx, bucket, objects, ObjectOptions{ - Versioned: versioned, + // Notify object deleted event. + sendEvent(eventArgs{ + EventName: event.ObjectRemovedDelete, + BucketName: bucket, + Object: obj, + Host: "Internal: [FIFO-QUOTA-EXPIRY]", }) - for i := range deleteErrs { - if deleteErrs[i] != nil { - logger.LogIf(ctx, deleteErrs[i]) - continue - } - - // Notify object deleted event. - sendEvent(eventArgs{ - EventName: event.ObjectRemovedDelete, - BucketName: bucket, - Object: obj, - Host: "Internal: [FIFO-QUOTA-EXPIRY]", - }) - } - objects = nil } + objects = nil } } diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 667a60a20..d53cd94ec 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -335,6 +335,21 @@ func (d *dataUsageCache) bucketsUsageInfo(buckets []BucketInfo) map[string]Bucke return dst } +// bucketUsageInfo returns the buckets usage info. +// If not found all values returned are zero values. +func (d *dataUsageCache) bucketUsageInfo(bucket string) BucketUsageInfo { + e := d.find(bucket) + if e == nil { + return BucketUsageInfo{} + } + flat := d.flatten(*e) + return BucketUsageInfo{ + Size: uint64(flat.Size), + ObjectsCount: uint64(flat.Objects), + ObjectSizesHistogram: flat.ObjSizes.toMap(), + } +} + // sizeRecursive returns the path as a flattened entry. func (d *dataUsageCache) sizeRecursive(path string) *dataUsageEntry { root := d.find(path) diff --git a/cmd/disk-cache-utils.go b/cmd/disk-cache-utils.go index e927dd4f7..de77d41ff 100644 --- a/cmd/disk-cache-utils.go +++ b/cmd/disk-cache-utils.go @@ -295,6 +295,7 @@ type fileScorer struct { // The list is kept sorted according to score, highest at top, lowest at bottom. queue list.List queuedBytes uint64 + seenBytes uint64 } type queuedFile struct { @@ -337,6 +338,7 @@ func (f *fileScorer) addFileWithObjInfo(objInfo ObjectInfo, hits int) { versionID: objInfo.VersionID, size: uint64(objInfo.Size), } + f.seenBytes += uint64(objInfo.Size) var score float64 if objInfo.ModTime.IsZero() { @@ -369,9 +371,14 @@ func (f *fileScorer) addFileWithObjInfo(objInfo ObjectInfo, hits int) { // adjustSaveBytes allows to adjust the number of bytes to save. // This can be used to adjust the count on the fly. -// Returns true if there still is a need to delete files (saveBytes >0), +// Returns true if there still is a need to delete files (n+saveBytes >0), // false if no more bytes needs to be saved. func (f *fileScorer) adjustSaveBytes(n int64) bool { + if int64(f.saveBytes)+n <= 0 { + f.saveBytes = 0 + f.trimQueue() + return false + } if n < 0 { f.saveBytes -= ^uint64(n - 1) } else { diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index 719baa3a7..9b0b17fda 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -337,13 +337,15 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter updateTicker := time.NewTicker(30 * time.Second) defer updateTicker.Stop() var lastUpdate time.Time + + // We need to merge since we will get the same buckets from each zone. + // Therefore to get the exact bucket sizes we must merge before we can convert. + allMerged := dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}} + update := func() { mu.Lock() defer mu.Unlock() - // We need to merge since we will get the same buckets from each zone. - // Therefore to get the exact bucket sizes we must merge before we can convert. - allMerged := dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}} for _, info := range results { if info.Info.LastUpdate.IsZero() { // Not filled yet. @@ -362,6 +364,10 @@ func (z *erasureZones) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter return case v := <-updateCloser: update() + // Enforce quotas when all is done. + for _, b := range allBuckets { + enforceFIFOQuotaBucket(ctx, z, b.Name, allMerged.bucketUsageInfo(b.Name)) + } close(v) return case <-updateTicker.C: diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index a54a75844..3bf03798f 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -301,6 +301,7 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, logger.LogIf(ctx, totalCache.save(ctx, fs, dataUsageCacheName)) cloned := totalCache.clone() updates <- cloned.dui(dataUsageRoot, buckets) + enforceFIFOQuotaBucket(ctx, fs, b.Name, cloned.bucketUsageInfo(b.Name)) } return nil diff --git a/cmd/server-main.go b/cmd/server-main.go index 438c8f1e0..11e43f964 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -360,7 +360,6 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { } initDataCrawler(ctx, objAPI) - initQuotaEnforcement(ctx, objAPI) } // serverMain handler called for 'minio server' command.