merge object lifecycle checks into usage crawler (#9579)

This commit is contained in:
Klaus Post
2020-06-12 10:28:21 -07:00
committed by GitHub
parent 225b812b5e
commit 43d6e3ae06
18 changed files with 1400 additions and 852 deletions

View File

@@ -19,7 +19,6 @@ package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -239,66 +238,123 @@ func (fs *FSObjects) waitForLowActiveIO() {
// CrawlAndGetDataUsage returns data usage stats of the current FS deployment
func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
// Load bucket totals
var oldCache dataUsageCache
err := oldCache.load(ctx, fs, dataUsageCacheName)
var totalCache dataUsageCache
err := totalCache.load(ctx, fs, dataUsageCacheName)
if err != nil {
return err
}
if oldCache.Info.Name == "" {
oldCache.Info.Name = dataUsageRoot
}
totalCache.Info.Name = dataUsageRoot
buckets, err := fs.ListBuckets(ctx)
if err != nil {
return err
}
oldCache.Info.BloomFilter = nil
totalCache.Info.BloomFilter = nil
if bf != nil {
oldCache.Info.BloomFilter = bf.bytes()
totalCache.Info.BloomFilter = bf.bytes()
}
if false && intDataUpdateTracker.debug {
b, _ := json.MarshalIndent(bf, "", " ")
logger.Info("Bloom filter: %v", string(b))
// Clear totals.
var root dataUsageEntry
if r := totalCache.root(); r != nil {
root.Children = r.Children
}
cache, err := updateUsage(ctx, fs.fsPath, oldCache, fs.waitForLowActiveIO, func(item Item) (int64, error) {
bucket, object := path2BucketObject(strings.TrimPrefix(item.Path, fs.fsPath))
totalCache.replace(dataUsageRoot, "", root)
// Delete all buckets that does not exist anymore.
totalCache.keepBuckets(buckets)
for _, b := range buckets {
// Load bucket cache.
var bCache dataUsageCache
err := bCache.load(ctx, fs, path.Join(b.Name, dataUsageCacheName))
if err != nil {
return err
}
if bCache.Info.Name == "" {
bCache.Info.Name = b.Name
}
bCache.Info.BloomFilter = totalCache.Info.BloomFilter
cache, err := fs.crawlBucket(ctx, b.Name, bCache)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
logger.LogIf(ctx, err)
cache.Info.BloomFilter = nil
if cache.Info.LastUpdate.After(bCache.Info.LastUpdate) {
if intDataUpdateTracker.debug {
logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving bucket %q cache with %d entries", b.Name, len(cache.Cache))
}
logger.LogIf(ctx, cache.save(ctx, fs, path.Join(b.Name, dataUsageCacheName)))
}
// Merge, save and send update.
// We do it even if unchanged.
cl := cache.clone()
entry := cl.flatten(*cl.root())
totalCache.replace(cl.Info.Name, dataUsageRoot, entry)
if intDataUpdateTracker.debug {
logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving totals cache with %d entries", len(totalCache.Cache))
}
logger.LogIf(ctx, totalCache.save(ctx, fs, dataUsageCacheName))
cloned := totalCache.clone()
updates <- cloned.dui(dataUsageRoot, buckets)
}
return nil
}
// crawlBucket crawls a single bucket in FS mode.
// The updated cache for the bucket is returned.
// A partially updated bucket may be returned.
func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataUsageCache) (dataUsageCache, error) {
// Get bucket policy
// Check if the current bucket has a configured lifecycle policy
lc, err := globalLifecycleSys.Get(bucket)
if err == nil && lc.HasActiveRules("", true) {
if intDataUpdateTracker.debug {
logger.Info(color.Green("crawlBucket:") + " lifecycle: Active rules found")
}
cache.Info.lifeCycle = lc
}
// Load bucket info.
cache, err = crawlDataFolder(ctx, fs.fsPath, cache, fs.waitForLowActiveIO, func(item crawlItem) (int64, error) {
bucket, object := item.bucket, item.objectPath()
fsMetaBytes, err := ioutil.ReadFile(pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile))
if err != nil && !os.IsNotExist(err) {
return 0, errSkipFile
}
// Get file size, symlinks which cannot be
// followed are automatically filtered by fastwalk.
fi, err := os.Stat(item.Path)
if err != nil {
fsMeta := newFSMetaV1()
metaOk := false
if len(fsMetaBytes) > 0 {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(fsMetaBytes, &fsMeta); err == nil {
metaOk = true
}
}
if !metaOk {
fsMeta = fs.defaultFsJSON(object)
}
// Stat the file.
fi, fiErr := os.Stat(item.Path)
if fiErr != nil {
return 0, errSkipFile
}
if len(fsMetaBytes) > 0 {
fsMeta := newFSMetaV1()
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(fsMetaBytes, &fsMeta); err != nil {
return 0, errSkipFile
}
return fsMeta.ToObjectInfo(bucket, object, fi).GetActualSize()
oi := fsMeta.ToObjectInfo(bucket, object, fi)
sz := item.applyActions(ctx, fs, actionMeta{oi: oi, meta: fsMeta.Meta})
if sz >= 0 {
return sz, nil
}
return fi.Size(), nil
})
cache.Info.BloomFilter = nil
// Even if there was an error, the new cache may have better info.
if cache.Info.LastUpdate.After(oldCache.Info.LastUpdate) {
if intDataUpdateTracker.debug {
logger.Info(color.Green("CrawlAndGetDataUsage:")+" Saving cache with %d entries", len(cache.Cache))
}
logger.LogIf(ctx, cache.save(ctx, fs, dataUsageCacheName))
updates <- cache.dui(dataUsageRoot, buckets)
} else {
if intDataUpdateTracker.debug {
logger.Info(color.Green("CrawlAndGetDataUsage:")+" Cache not updated, %d entries", len(cache.Cache))
}
}
return err
return cache, err
}
/// Bucket operations
@@ -381,6 +437,7 @@ func (fs *FSObjects) SetBucketPolicy(ctx context.Context, bucket string, p *poli
return err
}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
configData, err := json.Marshal(p)
if err != nil {
return err