mirror of https://github.com/minio/minio.git
fix: reduce crawler memory usage by orders of magnitude (#11556)
currently crawler waits for an entire readdir call to return until it processes usage, lifecycle, replication and healing - instead we should pass the applicator all the way down to avoid building any special stack for all the contents in a single directory. This allows for - no need to remember the entire list of entries per directory before applying the required functions - no need to wait for entire readdir() call to finish before applying the required functions
This commit is contained in:
parent
e07918abe3
commit
289e1d8b2a
|
@ -407,19 +407,19 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||
if f.dataUsageCrawlDebug {
|
||||
console.Debugf(scannerLogPrefix+" no bucket (%s,%s)\n", f.root, entName)
|
||||
}
|
||||
return nil
|
||||
return errDoneForNow
|
||||
}
|
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) {
|
||||
if f.dataUsageCrawlDebug {
|
||||
console.Debugf(scannerLogPrefix+" invalid bucket: %v, entry: %v\n", bucket, entName)
|
||||
}
|
||||
return nil
|
||||
return errDoneForNow
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return ctx.Err()
|
||||
return errDoneForNow
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -682,7 +682,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder,
|
|||
addDir = func(entName string, typ os.FileMode) error {
|
||||
select {
|
||||
case <-done:
|
||||
return ctx.Err()
|
||||
return errDoneForNow
|
||||
default:
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -269,10 +268,6 @@ func (c *diskCache) toClear() uint64 {
|
|||
return bytesToClear(int64(di.Total), int64(di.Free), uint64(c.quotaPct), uint64(c.lowWatermark), uint64(c.highWatermark))
|
||||
}
|
||||
|
||||
var (
|
||||
errDoneForNow = errors.New("done for now")
|
||||
)
|
||||
|
||||
func (c *diskCache) purgeWait(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
|
@ -382,7 +377,7 @@ func (c *diskCache) purge(ctx context.Context) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := readDirFilterFn(c.dir, filterFn); err != nil {
|
||||
if err := readDirFn(c.dir, filterFn); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
@ -1025,7 +1020,7 @@ func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if err := readDirFilterFn(c.dir, filterFn); err != nil {
|
||||
if err := readDirFn(c.dir, filterFn); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
// Copyright 2016 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// This code is imported from "golang.org/x/tools/internal/fastwalk",
|
||||
// only fastwalk.go is imported since we already implement readDir()
|
||||
// with some little tweaks.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var errSkipFile = errors.New("fastwalk: skip this file")
|
||||
|
||||
func readDirFn(dirName string, fn func(entName string, typ os.FileMode) error) error {
|
||||
fis, err := readDir(dirName)
|
||||
if err != nil {
|
||||
if osIsNotExist(err) || err == errFileNotFound {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
for _, fi := range fis {
|
||||
var mode os.FileMode
|
||||
if strings.HasSuffix(fi, SlashSeparator) {
|
||||
mode |= os.ModeDir
|
||||
}
|
||||
|
||||
if err = fn(fi, mode); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -33,7 +33,7 @@ import (
|
|||
func renameAllBucketMetacache(epPath string) error {
|
||||
// Rename all previous `.minio.sys/buckets/<bucketname>/.metacache` to
|
||||
// to `.minio.sys/tmp/` for deletion.
|
||||
return readDirFilterFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
|
||||
return readDirFn(pathJoin(epPath, minioMetaBucket, bucketMetaPrefix), func(name string, typ os.FileMode) error {
|
||||
if typ == os.ModeDir {
|
||||
tmpMetacacheOld := pathJoin(epPath, minioMetaTmpBucket+"-old", mustGetUUID())
|
||||
if err := renameAll(pathJoin(epPath, minioMetaBucket, metacachePrefixForID(name, slashSeparator)),
|
||||
|
|
|
@ -29,11 +29,15 @@ func readDir(dirPath string) (entries []string, err error) {
|
|||
return readDirN(dirPath, -1)
|
||||
}
|
||||
|
||||
// readDir applies the filter function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself.
|
||||
func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
|
||||
// readDirFn applies the fn() function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself, if the dirPath doesn't exist this function doesn't return
|
||||
// an error.
|
||||
func readDirFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
|
||||
d, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
if osErrToFileErr(err) == errFileNotFound {
|
||||
return nil
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
defer d.Close()
|
||||
|
@ -46,6 +50,9 @@ func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) e
|
|||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if osErrToFileErr(err) == errFileNotFound {
|
||||
return nil
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
for _, fi := range fis {
|
||||
|
|
|
@ -84,11 +84,15 @@ func readDir(dirPath string) (entries []string, err error) {
|
|||
return readDirN(dirPath, -1)
|
||||
}
|
||||
|
||||
// readDir applies the filter function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself.
|
||||
func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
|
||||
// readDirFn applies the fn() function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself, if the dirPath doesn't exist this function doesn't return
|
||||
// an error.
|
||||
func readDirFn(dirPath string, fn func(name string, typ os.FileMode) error) error {
|
||||
f, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
if osErrToFileErr(err) == errFileNotFound {
|
||||
return nil
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
@ -103,7 +107,7 @@ func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) e
|
|||
nbuf, err = syscall.ReadDirent(int(f.Fd()), buf)
|
||||
if err != nil {
|
||||
if isSysErrNotDir(err) {
|
||||
return errFileNotFound
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -122,8 +126,8 @@ func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) e
|
|||
if typ&os.ModeSymlink == os.ModeSymlink {
|
||||
continue
|
||||
}
|
||||
if err = filter(string(name), typ); err == errDoneForNow {
|
||||
// filtering requested to return by caller.
|
||||
if err = fn(string(name), typ); err == errDoneForNow {
|
||||
// fn() requested to return by caller.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,11 +29,15 @@ func readDir(dirPath string) (entries []string, err error) {
|
|||
return readDirN(dirPath, -1)
|
||||
}
|
||||
|
||||
// readDir applies the filter function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself.
|
||||
func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
|
||||
// readDirFn applies the fn() function on each entries at dirPath, doesn't recurse into
|
||||
// the directory itself, if the dirPath doesn't exist this function doesn't return
|
||||
// an error.
|
||||
func readDirFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
|
||||
f, err := os.Open(dirPath)
|
||||
if err != nil {
|
||||
if osErrToFileErr(err) == errFileNotFound {
|
||||
return nil
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
@ -45,6 +49,9 @@ func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) e
|
|||
if e == syscall.ERROR_NO_MORE_FILES {
|
||||
break
|
||||
} else {
|
||||
if isSysErrPathNotFound(e) {
|
||||
return nil
|
||||
}
|
||||
return osErrToFileErr(&os.PathError{
|
||||
Op: "FindNextFile",
|
||||
Path: dirPath,
|
||||
|
@ -69,7 +76,7 @@ func readDirFilterFn(dirPath string, filter func(name string, typ os.FileMode) e
|
|||
}
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return N entries at the directory dirPath. If count is -1, return all entries
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
package cmd
|
||||
|
||||
import "errors"
|
||||
|
||||
// errUnexpected - unexpected error, requires manual intervention.
|
||||
var errUnexpected = StorageErr("unexpected error, please report this issue at https://github.com/minio/minio/issues")
|
||||
|
||||
|
@ -104,6 +106,13 @@ var errLessData = StorageErr("less data available than what was requested")
|
|||
// errMoreData = returned when more data was sent by the caller than what it was supposed to.
|
||||
var errMoreData = StorageErr("more data was sent than what was advertised")
|
||||
|
||||
// indicates readDirFn to return without further applying the fn()
|
||||
var errDoneForNow = errors.New("done for now")
|
||||
|
||||
// errSkipFile returned by the fn() for readDirFn() when it needs
|
||||
// to proceed to next entry.
|
||||
var errSkipFile = errors.New("skip this file")
|
||||
|
||||
// StorageErr represents error generated by xlStorage call.
|
||||
type StorageErr string
|
||||
|
||||
|
|
Loading…
Reference in New Issue