minio/cmd/data-usage.go
Harshavardhana eba423bb9d
Disable crawler in FS/NAS gateway mode (#9695)
No one really uses FS for large scale accounting
usage, neither we crawl in NAS gateway mode. It is
worthwhile to simply disable this feature as its
not useful for anyone.

Bonus disable bucket quota ops as well in, FS
and gateway mode
2020-05-25 00:17:52 -07:00

514 lines
14 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"os"
"path"
"strconv"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/hash"
"github.com/willf/bloom"
)
const (
envDataUsageCrawlConf = "MINIO_DISK_USAGE_CRAWL_ENABLE"
envDataUsageCrawlDelay = "MINIO_DISK_USAGE_CRAWL_DELAY"
envDataUsageCrawlDebug = "MINIO_DISK_USAGE_CRAWL_DEBUG"
dataUsageRoot = SlashSeparator
dataUsageBucket = minioMetaBucket + SlashSeparator + bucketMetaPrefix
dataUsageObjName = ".usage.json"
dataUsageCacheName = ".usage-cache.bin"
dataUsageBloomName = ".bloomcycle.bin"
dataUsageSleepPerFolder = 1 * time.Millisecond
dataUsageSleepDefMult = 10.0
dataUsageUpdateDirCycles = 16
dataUsageStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
)
// initDataUsageStats will start the crawler unless disabled.
func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) {
// data usage stats are only available erasure
// coded mode
if globalIsXL || globalIsDistXL {
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
go runDataUsageInfo(ctx, objAPI)
}
}
}
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) {
// Load current bloom cycle
nextBloomCycle := intDataUpdateTracker.current() + 1
var buf bytes.Buffer
err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageBloomName, 0, -1, &buf, "", ObjectOptions{})
if err != nil {
if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) {
logger.LogIf(ctx, err)
}
} else {
if buf.Len() == 8 {
nextBloomCycle = binary.LittleEndian.Uint64(buf.Bytes())
}
}
for {
select {
case <-ctx.Done():
return
case <-time.NewTimer(dataUsageStartDelay).C:
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
logger.LogIf(ctx, err)
err = objAPI.CrawlAndGetDataUsage(ctx, bf, results)
close(results)
logger.LogIf(ctx, err)
if err == nil {
// Store new cycle...
nextBloomCycle++
if nextBloomCycle%dataUpdateTrackerResetEvery == 0 {
if intDataUpdateTracker.debug {
logger.Info(color.Green("runDataUsageInfo:") + " Resetting bloom filter for next runs.")
}
nextBloomCycle++
}
var tmp [8]byte
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false)
if err != nil {
logger.LogIf(ctx, err)
continue
}
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r, nil, nil), ObjectOptions{})
if !isErrBucketNotFound(err) {
logger.LogIf(ctx, err)
}
}
}
}
}
// storeDataUsageInBackend will store all objects sent on the gui channel until closed.
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, gui <-chan DataUsageInfo) {
for dataUsageInfo := range gui {
dataUsageJSON, err := json.Marshal(dataUsageInfo)
if err != nil {
logger.LogIf(ctx, err)
continue
}
size := int64(len(dataUsageJSON))
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
if err != nil {
logger.LogIf(ctx, err)
continue
}
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
if !isErrBucketNotFound(err) {
logger.LogIf(ctx, err)
}
}
}
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
var dataUsageInfoJSON bytes.Buffer
err := objAPI.GetObject(ctx, dataUsageBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
if err != nil {
if isErrObjectNotFound(err) || isErrBucketNotFound(err) {
return DataUsageInfo{}, nil
}
return DataUsageInfo{}, toObjectErr(err, dataUsageBucket, dataUsageObjName)
}
var dataUsageInfo DataUsageInfo
var json = jsoniter.ConfigCompatibleWithStandardLibrary
err = json.Unmarshal(dataUsageInfoJSON.Bytes(), &dataUsageInfo)
if err != nil {
return DataUsageInfo{}, err
}
return dataUsageInfo, nil
}
// Item represents each file while walking.
type Item struct {
Path string
Typ os.FileMode
}
type getSizeFn func(item Item) (int64, error)
type cachedFolder struct {
name string
parent *dataUsageHash
}
type folderScanner struct {
root string
getSize getSizeFn
oldCache dataUsageCache
newCache dataUsageCache
withFilter *bloomFilter
waitForLowActiveIO func()
dataUsageCrawlMult float64
dataUsageCrawlDebug bool
newFolders []cachedFolder
existingFolders []cachedFolder
}
// sleepDuration multiplies the duration d by x and sleeps if is more than 100 micro seconds.
// sleep is limited to max 1 second.
func sleepDuration(d time.Duration, x float64) {
// Don't sleep for really small amount of time
if d := time.Duration(float64(d) * x); d > time.Microsecond*100 {
if d > time.Second {
d = time.Second
}
time.Sleep(d)
}
}
// scanQueuedLevels will scan the provided folders.
// Files found in the folders will be added to f.newCache.
// If final is provided folders will be put into f.newFolders or f.existingFolders.
// If final is not provided the folders found are returned from the function.
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool) ([]cachedFolder, error) {
var nextFolders []cachedFolder
done := ctx.Done()
for _, folder := range folders {
select {
case <-done:
return nil, ctx.Err()
default:
}
thisHash := hashPath(folder.name)
if _, ok := f.oldCache.Cache[thisHash]; f.withFilter != nil && ok {
// If folder isn't in filter and we have data, skip it completely.
if folder.name != dataUsageRoot && !f.withFilter.containsDir(folder.name) {
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
if f.dataUsageCrawlDebug {
logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name)
}
continue
}
}
f.waitForLowActiveIO()
sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult)
cache := dataUsageEntry{}
err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
// Parse
entName = path.Clean(path.Join(folder.name, entName))
bucket, _ := path2BucketObjectWithBasePath(f.root, entName)
if bucket == "" {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName)
}
return nil
}
if isReservedOrInvalidBucket(bucket, false) {
if f.dataUsageCrawlDebug {
logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName)
}
return nil
}
select {
case <-done:
return ctx.Err()
default:
}
if typ&os.ModeDir != 0 {
h := hashPath(entName)
_, exists := f.oldCache.Cache[h]
cache.addChildString(entName)
this := cachedFolder{name: entName, parent: &thisHash}
cache.addChild(h)
if final {
if exists {
f.existingFolders = append(f.existingFolders, this)
} else {
f.newFolders = append(f.newFolders, this)
}
} else {
nextFolders = append(nextFolders, this)
}
return nil
}
f.waitForLowActiveIO()
// Dynamic time delay.
t := UTCNow()
// Get file size, ignore errors.
size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ})
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
if err == errSkipFile || err == errFileNotFound {
return nil
}
logger.LogIf(ctx, err)
cache.Size += size
cache.Objects++
cache.ObjSizes.add(size)
return nil
})
if err != nil {
return nil, err
}
f.newCache.replaceHashed(thisHash, folder.parent, cache)
}
return nextFolders, nil
}
// deepScanFolder will deep scan a folder and return the size if no error occurs.
func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) {
var cache dataUsageEntry
done := ctx.Done()
var addDir func(entName string, typ os.FileMode) error
var dirStack = []string{f.root, folder}
addDir = func(entName string, typ os.FileMode) error {
select {
case <-done:
return ctx.Err()
default:
}
f.waitForLowActiveIO()
if typ&os.ModeDir != 0 {
dirStack = append(dirStack, entName)
err := readDirFn(path.Join(dirStack...), addDir)
dirStack = dirStack[:len(dirStack)-1]
sleepDuration(dataUsageSleepPerFolder, f.dataUsageCrawlMult)
return err
}
// Dynamic time delay.
t := UTCNow()
// Get file size, ignore errors.
dirStack = append(dirStack, entName)
fileName := path.Join(dirStack...)
dirStack = dirStack[:len(dirStack)-1]
size, err := f.getSize(Item{Path: fileName, Typ: typ})
// Don't sleep for really small amount of time
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
if err == errSkipFile {
return nil
}
logger.LogIf(ctx, err)
cache.Size += size
cache.Objects++
cache.ObjSizes.add(size)
return nil
}
err := readDirFn(path.Join(dirStack...), addDir)
if err != nil {
return nil, err
}
return &cache, nil
}
// updateUsage will crawl the basepath+cache.Info.Name and return an updated cache.
// The returned cache will always be valid, but may not be updated from the existing.
// Before each operation waitForLowActiveIO is called which can be used to temporarily halt the crawler.
// If the supplied context is canceled the function will return at the first chance.
func updateUsage(ctx context.Context, basePath string, cache dataUsageCache, waitForLowActiveIO func(), getSize getSizeFn) (dataUsageCache, error) {
t := UTCNow()
dataUsageDebug := env.Get(envDataUsageCrawlDebug, config.EnableOff) == config.EnableOn
logPrefix := color.Green("data-usage: ")
logSuffix := color.Blue(" - %v + %v", basePath, cache.Info.Name)
if dataUsageDebug {
defer func() {
logger.Info(logPrefix+" Crawl time: %v"+logSuffix, time.Since(t))
}()
}
if cache.Info.Name == "" {
cache.Info.Name = dataUsageRoot
}
delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64)
if err != nil {
logger.LogIf(ctx, err)
delayMult = dataUsageSleepDefMult
}
s := folderScanner{
root: basePath,
getSize: getSize,
oldCache: cache,
newCache: dataUsageCache{Info: cache.Info},
waitForLowActiveIO: waitForLowActiveIO,
newFolders: nil,
existingFolders: nil,
dataUsageCrawlMult: delayMult,
dataUsageCrawlDebug: dataUsageDebug,
}
if len(cache.Info.BloomFilter) > 0 {
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
_, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter))
if err != nil {
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
s.withFilter = nil
}
}
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Start crawling. Bloom filter: %v"+logSuffix, s.withFilter != nil)
}
done := ctx.Done()
var flattenLevels = 3
// If we are scanning inside a bucket reduce depth by 1.
if cache.Info.Name != dataUsageRoot {
flattenLevels--
}
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Cycle: %v, Entries: %v"+logSuffix, cache.Info.NextCycle, len(cache.Cache))
}
// Always scan flattenLevels deep. Cache root is level 0.
todo := []cachedFolder{{name: cache.Info.Name}}
for i := 0; i < flattenLevels; i++ {
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo))
}
select {
case <-done:
return cache, ctx.Err()
default:
}
var err error
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1)
if err != nil {
// No useful information...
return cache, err
}
}
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"New folders: %v"+logSuffix, s.newFolders)
}
// Add new folders first
for _, folder := range s.newFolders {
select {
case <-done:
return s.newCache, ctx.Err()
default:
}
du, err := s.deepScanFolder(ctx, folder.name)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if du == nil {
logger.Info(logPrefix + "no disk usage provided" + logSuffix)
continue
}
s.newCache.replace(folder.name, "", *du)
// Add to parent manually
if folder.parent != nil {
parent := s.newCache.Cache[*folder.parent]
parent.addChildString(folder.name)
}
}
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Existing folders: %v"+logSuffix, len(s.existingFolders))
}
// Do selective scanning of existing folders.
for _, folder := range s.existingFolders {
select {
case <-done:
return s.newCache, ctx.Err()
default:
}
h := hashPath(folder.name)
if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h])
continue
}
if s.withFilter != nil {
// If folder isn't in filter, skip it completely.
if !s.withFilter.containsDir(folder.name) {
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder)
}
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h])
continue
}
}
// Update on this cycle...
du, err := s.deepScanFolder(ctx, folder.name)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if du == nil {
logger.LogIf(ctx, errors.New("data-usage: no disk usage provided"))
continue
}
s.newCache.replaceHashed(h, folder.parent, *du)
}
if s.dataUsageCrawlDebug {
logger.Info(logPrefix+"Finished crawl, %v entries"+logSuffix, len(s.newCache.Cache))
}
s.newCache.Info.LastUpdate = UTCNow()
s.newCache.Info.NextCycle++
return s.newCache, nil
}