mirror of
https://github.com/minio/minio.git
synced 2025-11-09 05:34:56 -05:00
re-implement data usage crawler to be more efficient (#9075)
Implementation overview: https://gist.github.com/klauspost/1801c858d5e0df391114436fdad6987b
This commit is contained in:
@@ -20,23 +20,36 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/color"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
"github.com/minio/minio/pkg/hash"
|
||||
)
|
||||
|
||||
const (
|
||||
dataUsageObjName = "data-usage"
|
||||
dataUsageCrawlInterval = 12 * time.Hour
|
||||
dataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL"
|
||||
dataUsageObjName = "usage.json"
|
||||
dataUsageCacheName = "usage-cache.bin"
|
||||
dataUsageBucketCacheDir = "usage-caches"
|
||||
dataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL"
|
||||
dataUsageCrawlDelay = "MINIO_DISK_USAGE_CRAWL_DELAY"
|
||||
dataUsageDebug = true
|
||||
dataUsageSleepPerFolder = 1 * time.Millisecond
|
||||
dataUsageSleepDefMult = 10.0
|
||||
dataUsageUpdateDirCycles = 16
|
||||
dataUsageRoot = SlashSeparator
|
||||
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
|
||||
dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
|
||||
)
|
||||
|
||||
// initDataUsageStats will start the crawler unless disabled.
|
||||
func initDataUsageStats() {
|
||||
dataUsageEnabled, err := config.ParseBool(env.Get(dataUsageCrawlConf, config.EnableOn))
|
||||
if err == nil && !dataUsageEnabled {
|
||||
@@ -45,6 +58,7 @@ func initDataUsageStats() {
|
||||
go runDataUsageInfoUpdateRoutine()
|
||||
}
|
||||
|
||||
// runDataUsageInfoUpdateRoutine will contain the main crawler.
|
||||
func runDataUsageInfoUpdateRoutine() {
|
||||
// Wait until the object layer is ready
|
||||
var objAPI ObjectLayer
|
||||
@@ -57,37 +71,13 @@ func runDataUsageInfoUpdateRoutine() {
|
||||
break
|
||||
}
|
||||
|
||||
runDataUsageInfo(context.Background(), objAPI, GlobalServiceDoneCh)
|
||||
}
|
||||
|
||||
// timeToNextCrawl returns the duration until next crawl should occur
|
||||
// this is validated by verifying the LastUpdate time.
|
||||
func timeToCrawl(ctx context.Context, objAPI ObjectLayer) time.Duration {
|
||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objAPI)
|
||||
if err != nil {
|
||||
// Upon an error wait for like 10
|
||||
// seconds to start the crawler.
|
||||
return 10 * time.Second
|
||||
}
|
||||
// File indeed doesn't exist when LastUpdate is zero
|
||||
// so we have never crawled, start crawl right away.
|
||||
if dataUsageInfo.LastUpdate.IsZero() {
|
||||
return 1 * time.Second
|
||||
}
|
||||
timeSinceLastUpdate := UTCNow().Sub(dataUsageInfo.LastUpdate)
|
||||
if timeSinceLastUpdate > dataUsageCrawlInterval {
|
||||
// Waited long enough start crawl in a 1 second
|
||||
return 1 * time.Second
|
||||
}
|
||||
// No crawling needed, ask the routine to wait until
|
||||
// the daily interval 12hrs - delta between last update
|
||||
// with current time.
|
||||
return dataUsageCrawlInterval - timeSinceLastUpdate
|
||||
runDataUsageInfo(GlobalContext, objAPI)
|
||||
}
|
||||
|
||||
var dataUsageLockTimeout = lifecycleLockTimeout
|
||||
|
||||
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan struct{}) {
|
||||
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Make sure only 1 crawler is running on the cluster.
|
||||
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
|
||||
for {
|
||||
err := locker.GetLock(dataUsageLockTimeout)
|
||||
@@ -99,47 +89,57 @@ func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan stru
|
||||
// data usage calculator role for its lifetime.
|
||||
break
|
||||
}
|
||||
|
||||
if dataUsageDebug {
|
||||
logger.Info(color.Green("runDataUsageInfo:") + " Starting crawler master")
|
||||
}
|
||||
for {
|
||||
wait := timeToCrawl(ctx, objAPI)
|
||||
select {
|
||||
case <-endCh:
|
||||
case <-ctx.Done():
|
||||
locker.Unlock()
|
||||
return
|
||||
case <-time.NewTimer(wait).C:
|
||||
// Crawl only when no previous crawl has occurred,
|
||||
// or its been too long since last crawl.
|
||||
err := storeDataUsageInBackend(ctx, objAPI, objAPI.CrawlAndGetDataUsage(ctx, endCh))
|
||||
// Wait before starting next cycle and wait on startup.
|
||||
case <-time.NewTimer(dataUsageStartDelay).C:
|
||||
results := make(chan DataUsageInfo, 1)
|
||||
go storeDataUsageInBackend(ctx, objAPI, results)
|
||||
err := objAPI.CrawlAndGetDataUsage(ctx, results)
|
||||
close(results)
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dataUsageInfo DataUsageInfo) error {
|
||||
dataUsageJSON, err := json.Marshal(dataUsageInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
|
||||
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan DataUsageInfo) {
|
||||
for dataUsageInfo := range gui {
|
||||
dataUsageJSON, err := json.MarshalIndent(dataUsageInfo, "", " ")
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
if dataUsageDebug {
|
||||
logger.Info(color.Green("data-usage:")+" Received update: %s", string(dataUsageJSON))
|
||||
}
|
||||
size := int64(len(dataUsageJSON))
|
||||
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
|
||||
size := int64(len(dataUsageJSON))
|
||||
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
|
||||
if err != nil {
|
||||
return err
|
||||
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
_, err = objAPI.PutObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
||||
var dataUsageInfoJSON bytes.Buffer
|
||||
|
||||
err := objAPI.GetObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
|
||||
err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) {
|
||||
return DataUsageInfo{}, nil
|
||||
}
|
||||
return DataUsageInfo{}, toObjectErr(err, minioMetaBackgroundOpsBucket, dataUsageObjName)
|
||||
return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
|
||||
}
|
||||
|
||||
var dataUsageInfo DataUsageInfo
|
||||
@@ -160,52 +160,295 @@ type Item struct {
|
||||
|
||||
type getSizeFn func(item Item) (int64, error)
|
||||
|
||||
func updateUsage(basePath string, doneCh <-chan struct{}, waitForLowActiveIO func(), getSize getSizeFn) DataUsageInfo {
|
||||
var dataUsageInfo = DataUsageInfo{
|
||||
BucketsSizes: make(map[string]uint64),
|
||||
ObjectsSizesHistogram: make(map[string]uint64),
|
||||
type cachedFolder struct {
|
||||
name string
|
||||
parent *dataUsageHash
|
||||
}
|
||||
|
||||
type folderScanner struct {
|
||||
root string
|
||||
getSize getSizeFn
|
||||
oldCache dataUsageCache
|
||||
newCache dataUsageCache
|
||||
waitForLowActiveIO func()
|
||||
|
||||
newFolders []cachedFolder
|
||||
existingFolders []cachedFolder
|
||||
}
|
||||
|
||||
// sleepDuration multiplies the duration d by x and sleeps if is more than 100 micro seconds.
|
||||
// sleep is limited to max 1 second.
|
||||
func sleepDuration(d time.Duration, x float64) {
|
||||
// Don't sleep for really small amount of time
|
||||
if d := time.Duration(float64(d) * x); d > time.Microsecond*100 {
|
||||
if d > time.Second {
|
||||
d = time.Second
|
||||
}
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
|
||||
// scanQueuedLevels will scan the provided folders.
|
||||
// Files found in the folders will be added to f.newCache.
|
||||
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
||||
// If final is not provided the folders found are returned from the function.
|
||||
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) {
|
||||
var nextFolders []cachedFolder
|
||||
delayMult := dataUsageSleepDefMult
|
||||
if mult := os.Getenv(dataUsageCrawlDelay); mult != "" {
|
||||
if d, err := strconv.ParseFloat(mult, 64); err == nil {
|
||||
delayMult = d
|
||||
}
|
||||
}
|
||||
done := ctx.Done()
|
||||
for _, folder := range folders {
|
||||
select {
|
||||
case <-done:
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
f.waitForLowActiveIO()
|
||||
sleepDuration(dataUsageSleepPerFolder, delayMult)
|
||||
|
||||
cache := dataUsageEntry{}
|
||||
thisHash := hashPath(folder.name)
|
||||
|
||||
err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
|
||||
// Parse
|
||||
entName = path.Clean(path.Join(folder.name, entName))
|
||||
bucket, _ := path2BucketObjectWithBasePath(f.root, entName)
|
||||
if bucket == "" {
|
||||
if dataUsageDebug {
|
||||
logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) {
|
||||
if dataUsageDebug {
|
||||
logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if typ&os.ModeDir != 0 {
|
||||
h := hashPath(entName)
|
||||
_, exists := f.oldCache.Cache[h]
|
||||
cache.addChildString(entName)
|
||||
|
||||
this := cachedFolder{name: entName, parent: &thisHash}
|
||||
cache.addChild(h)
|
||||
if final {
|
||||
if exists {
|
||||
f.existingFolders = append(f.existingFolders, this)
|
||||
} else {
|
||||
f.newFolders = append(f.newFolders, this)
|
||||
}
|
||||
} else {
|
||||
nextFolders = append(nextFolders, this)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
f.waitForLowActiveIO()
|
||||
// Dynamic time delay.
|
||||
t := time.Now()
|
||||
|
||||
// Get file size, ignore errors.
|
||||
size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ})
|
||||
|
||||
sleepDuration(time.Since(t), delayMult)
|
||||
if err == errSkipFile {
|
||||
return nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
cache.Size += size
|
||||
cache.Objects++
|
||||
cache.ObjSizes.add(size)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
||||
}
|
||||
return nextFolders, nil
|
||||
}
|
||||
|
||||
// deepScanFolder will deep scan a folder and return the size if no error occurs.
|
||||
func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) {
|
||||
var cache dataUsageEntry
|
||||
delayMult := dataUsageSleepDefMult
|
||||
if mult := os.Getenv(dataUsageCrawlDelay); mult != "" {
|
||||
if d, err := strconv.ParseFloat(mult, 64); err == nil {
|
||||
delayMult = d
|
||||
}
|
||||
}
|
||||
done := ctx.Done()
|
||||
|
||||
var addDir func(entName string, typ os.FileMode) error
|
||||
var dirStack = []string{f.root, folder}
|
||||
|
||||
addDir = func(entName string, typ os.FileMode) error {
|
||||
select {
|
||||
case <-done:
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
f.waitForLowActiveIO()
|
||||
if typ&os.ModeDir != 0 {
|
||||
dirStack = append(dirStack, entName)
|
||||
err := readDirFn(path.Join(dirStack...), addDir)
|
||||
dirStack = dirStack[:len(dirStack)-1]
|
||||
sleepDuration(dataUsageSleepPerFolder, delayMult)
|
||||
return err
|
||||
}
|
||||
// Dynamic time delay.
|
||||
t := time.Now()
|
||||
|
||||
// Get file size, ignore errors.
|
||||
dirStack = append(dirStack, entName)
|
||||
fileName := path.Join(dirStack...)
|
||||
dirStack = dirStack[:len(dirStack)-1]
|
||||
|
||||
size, err := f.getSize(Item{Path: fileName, Typ: typ})
|
||||
|
||||
// Don't sleep for really small amount of time
|
||||
sleepDuration(time.Since(t), delayMult)
|
||||
|
||||
if err == errSkipFile {
|
||||
return nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
cache.Size += size
|
||||
cache.Objects++
|
||||
cache.ObjSizes.add(size)
|
||||
return nil
|
||||
}
|
||||
err := readDirFn(path.Join(dirStack...), addDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cache, nil
|
||||
}
|
||||
|
||||
// updateUsage will crawl the basepath+cache.Info.Name and return an updated cache.
|
||||
// The returned cache will always be valid, but may not be updated from the existing.
|
||||
// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler.
|
||||
// If the supplied context is canceled the function will return at the first chance.
|
||||
func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, waitForLowActiveIO func(), getSize getSizeFn) (dataUsageCache, error) {
|
||||
if cache.Info.Name == "" {
|
||||
cache.Info.Name = dataUsageRoot
|
||||
}
|
||||
var logPrefix, logSuffix string
|
||||
if dataUsageDebug {
|
||||
logPrefix = color.Green("data-usage: ")
|
||||
logSuffix = color.Blue(" - %v + %v", basePath, cache.Info.Name)
|
||||
}
|
||||
s := folderScanner{
|
||||
root: basePath,
|
||||
getSize: getSize,
|
||||
oldCache: cache,
|
||||
newCache: dataUsageCache{Info: cache.Info},
|
||||
waitForLowActiveIO: waitForLowActiveIO,
|
||||
newFolders: nil,
|
||||
existingFolders: nil,
|
||||
}
|
||||
|
||||
fastWalk(basePath, 1, doneCh, func(path string, typ os.FileMode) error {
|
||||
// Wait for I/O to go down.
|
||||
waitForLowActiveIO()
|
||||
done := ctx.Done()
|
||||
var flattenLevels = 3
|
||||
|
||||
bucket, entry := path2BucketObjectWithBasePath(basePath, path)
|
||||
if bucket == "" {
|
||||
return nil
|
||||
}
|
||||
// If we are scanning inside a bucket reduce depth by 1.
|
||||
if cache.Info.Name != dataUsageRoot {
|
||||
flattenLevels--
|
||||
}
|
||||
if dataUsageDebug {
|
||||
logger.Info(logPrefix+"Cycle: %v"+logSuffix, cache.Info.NextCycle)
|
||||
}
|
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) {
|
||||
return filepath.SkipDir
|
||||
// Always scan flattenLevels deep. Cache root is level 0.
|
||||
todo := []cachedFolder{{name: cache.Info.Name}}
|
||||
for i := 0; i < flattenLevels; i++ {
|
||||
if dataUsageDebug {
|
||||
logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo))
|
||||
}
|
||||
|
||||
if entry == "" && typ&os.ModeDir != 0 {
|
||||
dataUsageInfo.BucketsCount++
|
||||
dataUsageInfo.BucketsSizes[bucket] = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
if typ&os.ModeDir != 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
t := time.Now()
|
||||
size, err := getSize(Item{path, typ})
|
||||
// Use the response time of the getSize call to guess system load.
|
||||
// Sleep equivalent time.
|
||||
if d := time.Since(t); d > 100*time.Microsecond {
|
||||
time.Sleep(d)
|
||||
select {
|
||||
case <-done:
|
||||
return cache, ctx.Err()
|
||||
default:
|
||||
}
|
||||
var err error
|
||||
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1)
|
||||
if err != nil {
|
||||
return errSkipFile
|
||||
// No useful information...
|
||||
return cache, err
|
||||
}
|
||||
}
|
||||
|
||||
if dataUsageDebug {
|
||||
logger.Info(logPrefix+"New folders: %v"+logSuffix, s.newFolders)
|
||||
}
|
||||
// Add new folders first
|
||||
for _, folder := range s.newFolders {
|
||||
select {
|
||||
case <-done:
|
||||
return s.newCache, ctx.Err()
|
||||
default:
|
||||
}
|
||||
du, err := s.deepScanFolder(ctx, folder.name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
if du == nil {
|
||||
logger.LogIf(ctx, errors.New("data-usage: no disk usage provided"))
|
||||
continue
|
||||
}
|
||||
s.newCache.replace(folder.name, "", *du)
|
||||
// Add to parent manually
|
||||
if folder.parent != nil {
|
||||
parent := s.newCache.Cache[*folder.parent]
|
||||
parent.addChildString(folder.name)
|
||||
}
|
||||
}
|
||||
|
||||
if dataUsageDebug {
|
||||
logger.Info(logPrefix+"Existing folders: %v"+logSuffix, len(s.existingFolders))
|
||||
}
|
||||
// Do selective scanning of existing folders.
|
||||
for _, folder := range s.existingFolders {
|
||||
select {
|
||||
case <-done:
|
||||
return s.newCache, ctx.Err()
|
||||
default:
|
||||
}
|
||||
h := hashPath(folder.name)
|
||||
if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
||||
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h])
|
||||
continue
|
||||
}
|
||||
|
||||
dataUsageInfo.ObjectsCount++
|
||||
dataUsageInfo.ObjectsTotalSize += uint64(size)
|
||||
dataUsageInfo.BucketsSizes[bucket] += uint64(size)
|
||||
dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
|
||||
return nil
|
||||
})
|
||||
// Update on this cycle...
|
||||
du, err := s.deepScanFolder(ctx, folder.name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
if du == nil {
|
||||
logger.LogIf(ctx, errors.New("data-usage: no disk usage provided"))
|
||||
continue
|
||||
}
|
||||
s.newCache.replaceHashed(h, folder.parent, *du)
|
||||
}
|
||||
|
||||
return dataUsageInfo
|
||||
s.newCache.Info.LastUpdate = time.Now()
|
||||
s.newCache.Info.NextCycle++
|
||||
return s.newCache, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user