mirror of
https://github.com/minio/minio.git
synced 2025-01-24 13:13:16 -05:00
498389123e
data usage tracker and crawler seem to be logging non-actionable information on console, which is not useful and is fixed on its own in almost all deployments, lets keep this logging to minimal.
510 lines
14 KiB
Go
510 lines
14 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
"github.com/minio/minio/cmd/config"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/color"
|
|
"github.com/minio/minio/pkg/env"
|
|
"github.com/minio/minio/pkg/hash"
|
|
"github.com/willf/bloom"
|
|
)
|
|
|
|
const (
|
|
envDataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL_ENABLE"
|
|
envDataUsageCrawlDelay = "MINIO_DISK_USAGE_CRAWL_DELAY"
|
|
envDataUsageCrawlDebug = "MINIO_DISK_USAGE_CRAWL_DEBUG"
|
|
|
|
dataUsageRoot = SlashSeparator
|
|
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
|
|
|
|
dataUsageObjName = ".usage.json"
|
|
dataUsageCacheName = ".usage-cache.bin"
|
|
dataUsageBloomName = ".bloomcycle.bin"
|
|
dataUsageSleepPerFolder = 1 * time.Millisecond
|
|
dataUsageSleepDefMult = 10.0
|
|
dataUsageUpdateDirCycles = 16
|
|
dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
|
|
)
|
|
|
|
// initDataUsageStats will start the crawler unless disabled.
|
|
func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) {
|
|
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
|
|
go runDataUsageInfo(ctx, objAPI)
|
|
}
|
|
}
|
|
|
|
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) {
|
|
// Load current bloom cycle
|
|
nextBloomCycle := intDataUpdateTracker.current() + 1
|
|
var buf bytes.Buffer
|
|
err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{})
|
|
if err != nil {
|
|
if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
} else {
|
|
if buf.Len() == 8 {
|
|
nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes())
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.NewTimer(dataUsageStartDelay).C:
|
|
// Wait before starting next cycle and wait on startup.
|
|
results := make(chan DataUsageInfo, 1)
|
|
go storeDataUsageInBackend(ctx, objAPI, results)
|
|
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
|
|
logger.LogIf(ctx, err)
|
|
err = objAPI.CrawlAndGetDataUsage(ctx, bf, results)
|
|
close(results)
|
|
logger.LogIf(ctx, err)
|
|
if err == nil {
|
|
// Store new cycle...
|
|
nextBloomCycle++
|
|
if nextBloomCycle%dataUpdateTrackerResetEvery == 0 {
|
|
if intDataUpdateTracker.debug {
|
|
logger.Info(color.Green("runDataUsageInfo:") + " Resetting bloom filter for next runs.")
|
|
}
|
|
nextBloomCycle++
|
|
}
|
|
var tmp [8]byte
|
|
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
|
|
r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
|
|
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{})
|
|
if !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
|
|
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan DataUsageInfo) {
|
|
for dataUsageInfo := range gui {
|
|
dataUsageJSON, err := json.Marshal(dataUsageInfo)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
size := int64(len(dataUsageJSON))
|
|
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
|
|
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
|
|
if !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
|
var dataUsageInfoJSON bytes.Buffer
|
|
|
|
err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) || isErrBucketNotFound(err) {
|
|
return DataUsageInfo{}, nil
|
|
}
|
|
return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
|
|
}
|
|
|
|
var dataUsageInfo DataUsageInfo
|
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
|
err = json.Unmarshal(dataUsageInfoJSON.Bytes(), &dataUsageInfo)
|
|
if err != nil {
|
|
return DataUsageInfo{}, err
|
|
}
|
|
|
|
return dataUsageInfo, nil
|
|
}
|
|
|
|
// Item represents each file while walking.
|
|
type Item struct {
|
|
Path string
|
|
Typ os.FileMode
|
|
}
|
|
|
|
type getSizeFn func(item Item) (int64, error)
|
|
|
|
type cachedFolder struct {
|
|
name string
|
|
parent *dataUsageHash
|
|
}
|
|
|
|
type folderScanner struct {
|
|
root string
|
|
getSize getSizeFn
|
|
oldCache dataUsageCache
|
|
newCache dataUsageCache
|
|
withFilter *bloomFilter
|
|
waitForLowActiveIO func()
|
|
|
|
dataUsageCrawlMult float64
|
|
dataUsageCrawlDebug bool
|
|
|
|
newFolders []cachedFolder
|
|
existingFolders []cachedFolder
|
|
}
|
|
|
|
// sleepDuration multiplies the duration d by x and sleeps if is more than 100 micro seconds.
|
|
// sleep is limited to max 1 second.
|
|
func sleepDuration(d time.Duration, x float64) {
|
|
// Don't sleep for really small amount of time
|
|
if d := time.Duration(float64(d) * x); d > time.Microsecond*100 {
|
|
if d > time.Second {
|
|
d = time.Second
|
|
}
|
|
time.Sleep(d)
|
|
}
|
|
}
|
|
|
|
// scanQueuedLevels will scan the provided folders.
|
|
// Files found in the folders will be added to f.newCache.
|
|
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
|
// If final is not provided the folders found are returned from the function.
|
|
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) {
|
|
var nextFolders []cachedFolder
|
|
done := ctx.Done()
|
|
for _, folder := range folders {
|
|
select {
|
|
case <-done:
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
thisHash := hashPath(folder.name)
|
|
|
|
if _, ok := f.oldCache.Cache[thisHash]; f.withFilter != nil && ok {
|
|
// If folder isn't in filter and we have data, skip it completely.
|
|
if folder.name != dataUsageRoot && !f.withFilter.containsDir(folder.name) {
|
|
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
|
if f.dataUsageCrawlDebug {
|
|
logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
f.waitForLowActiveIO()
|
|
sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult)
|
|
|
|
cache := dataUsageEntry{}
|
|
|
|
err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
|
|
// Parse
|
|
entName = path.Clean(path.Join(folder.name, entName))
|
|
bucket, _ := path2BucketObjectWithBasePath(f.root, entName)
|
|
if bucket == "" {
|
|
if f.dataUsageCrawlDebug {
|
|
logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
if f.dataUsageCrawlDebug {
|
|
logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if typ&os.ModeDir != 0 {
|
|
h := hashPath(entName)
|
|
_, exists := f.oldCache.Cache[h]
|
|
cache.addChildString(entName)
|
|
|
|
this := cachedFolder{name: entName, parent: &thisHash}
|
|
cache.addChild(h)
|
|
if final {
|
|
if exists {
|
|
f.existingFolders = append(f.existingFolders, this)
|
|
} else {
|
|
f.newFolders = append(f.newFolders, this)
|
|
}
|
|
} else {
|
|
nextFolders = append(nextFolders, this)
|
|
}
|
|
return nil
|
|
}
|
|
f.waitForLowActiveIO()
|
|
// Dynamic time delay.
|
|
t := UTCNow()
|
|
|
|
// Get file size, ignore errors.
|
|
size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ})
|
|
|
|
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
|
|
if err == errSkipFile {
|
|
return nil
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
cache.Size += size
|
|
cache.Objects++
|
|
cache.ObjSizes.add(size)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
|
}
|
|
return nextFolders, nil
|
|
}
|
|
|
|
// deepScanFolder will deep scan a folder and return the size if no error occurs.
|
|
func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) {
|
|
var cache dataUsageEntry
|
|
|
|
done := ctx.Done()
|
|
|
|
var addDir func(entName string, typ os.FileMode) error
|
|
var dirStack = []string{f.root, folder}
|
|
|
|
addDir = func(entName string, typ os.FileMode) error {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
f.waitForLowActiveIO()
|
|
if typ&os.ModeDir != 0 {
|
|
dirStack = append(dirStack, entName)
|
|
err := readDirFn(path.Join(dirStack...), addDir)
|
|
dirStack = dirStack[:len(dirStack)-1]
|
|
sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult)
|
|
return err
|
|
}
|
|
|
|
// Dynamic time delay.
|
|
t := UTCNow()
|
|
|
|
// Get file size, ignore errors.
|
|
dirStack = append(dirStack, entName)
|
|
fileName := path.Join(dirStack...)
|
|
dirStack = dirStack[:len(dirStack)-1]
|
|
|
|
size, err := f.getSize(Item{Path: fileName, Typ: typ})
|
|
|
|
// Don't sleep for really small amount of time
|
|
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
|
|
|
|
if err == errSkipFile {
|
|
return nil
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
cache.Size += size
|
|
cache.Objects++
|
|
cache.ObjSizes.add(size)
|
|
return nil
|
|
}
|
|
err := readDirFn(path.Join(dirStack...), addDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &cache, nil
|
|
}
|
|
|
|
// updateUsage will crawl the basepath+cache.Info.Name and return an updated cache.
|
|
// The returned cache will always be valid, but may not be updated from the existing.
|
|
// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler.
|
|
// If the supplied context is canceled the function will return at the first chance.
|
|
func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, waitForLowActiveIO func(), getSize getSizeFn) (dataUsageCache, error) {
|
|
t := UTCNow()
|
|
|
|
dataUsageDebug := env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn
|
|
logPrefix := color.Green("data-usage: ")
|
|
logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name)
|
|
if dataUsageDebug {
|
|
defer func() {
|
|
logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t))
|
|
}()
|
|
|
|
}
|
|
|
|
if cache.Info.Name == "" {
|
|
cache.Info.Name = dataUsageRoot
|
|
}
|
|
|
|
delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
delayMult = dataUsageSleepDefMult
|
|
}
|
|
|
|
s := folderScanner{
|
|
root: basePath,
|
|
getSize: getSize,
|
|
oldCache: cache,
|
|
newCache: dataUsageCache{Info: cache.Info},
|
|
waitForLowActiveIO: waitForLowActiveIO,
|
|
newFolders: nil,
|
|
existingFolders: nil,
|
|
dataUsageCrawlMult: delayMult,
|
|
dataUsageCrawlDebug: dataUsageDebug,
|
|
}
|
|
|
|
if len(cache.Info.BloomFilter) > 0 {
|
|
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
|
_, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter))
|
|
if err != nil {
|
|
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
|
|
s.withFilter = nil
|
|
}
|
|
}
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil)
|
|
}
|
|
|
|
done := ctx.Done()
|
|
var flattenLevels = 3
|
|
|
|
// If we are scanning inside a bucket reduce depth by 1.
|
|
if cache.Info.Name != dataUsageRoot {
|
|
flattenLevels--
|
|
}
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache))
|
|
}
|
|
|
|
// Always scan flattenLevels deep. Cache root is level 0.
|
|
todo := []cachedFolder{{name: cache.Info.Name}}
|
|
for i := 0; i < flattenLevels; i++ {
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo))
|
|
}
|
|
select {
|
|
case <-done:
|
|
return cache, ctx.Err()
|
|
default:
|
|
}
|
|
var err error
|
|
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1)
|
|
if err != nil {
|
|
// No useful information...
|
|
return cache, err
|
|
}
|
|
}
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"New folders: %v"+logSuffix, s.newFolders)
|
|
}
|
|
|
|
// Add new folders first
|
|
for _, folder := range s.newFolders {
|
|
select {
|
|
case <-done:
|
|
return s.newCache, ctx.Err()
|
|
default:
|
|
}
|
|
du, err := s.deepScanFolder(ctx, folder.name)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
if du == nil {
|
|
logger.Info(logPrefix + "no disk usage provided" + logSuffix)
|
|
continue
|
|
}
|
|
|
|
s.newCache.replace(folder.name, "", *du)
|
|
// Add to parent manually
|
|
if folder.parent != nil {
|
|
parent := s.newCache.Cache[*folder.parent]
|
|
parent.addChildString(folder.name)
|
|
}
|
|
}
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Existing folders: %v"+logSuffix, len(s.existingFolders))
|
|
}
|
|
|
|
// Do selective scanning of existing folders.
|
|
for _, folder := range s.existingFolders {
|
|
select {
|
|
case <-done:
|
|
return s.newCache, ctx.Err()
|
|
default:
|
|
}
|
|
h := hashPath(folder.name)
|
|
if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
|
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h])
|
|
continue
|
|
}
|
|
|
|
if s.withFilter != nil {
|
|
// If folder isn't in filter, skip it completely.
|
|
if !s.withFilter.containsDir(folder.name) {
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder)
|
|
}
|
|
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h])
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Update on this cycle...
|
|
du, err := s.deepScanFolder(ctx, folder.name)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
if du == nil {
|
|
logger.LogIf(ctx, errors.New("data-usage: no disk usage provided"))
|
|
continue
|
|
}
|
|
s.newCache.replaceHashed(h, folder.parent, *du)
|
|
}
|
|
if s.dataUsageCrawlDebug {
|
|
logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache))
|
|
}
|
|
s.newCache.Info.LastUpdate = UTCNow()
|
|
s.newCache.Info.NextCycle++
|
|
return s.newCache, nil
|
|
}
|