diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 8240eb187..637437dbb 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -22,6 +22,7 @@ import ( "encoding/json" "os" "path/filepath" + "sync" "time" jsoniter "github.com/json-iterator/go" @@ -150,77 +151,55 @@ type Item struct { } type getSizeFn func(item Item) (int64, error) -type activeIOFn func() error -func updateUsage(basePath string, endCh <-chan struct{}, waitForLowActiveIO activeIOFn, getSize getSizeFn) DataUsageInfo { +func updateUsage(basePath string, doneCh <-chan struct{}, waitForLowActiveIO func(), getSize getSizeFn) DataUsageInfo { var dataUsageInfo = DataUsageInfo{ BucketsSizes: make(map[string]uint64), ObjectsSizesHistogram: make(map[string]uint64), } - itemCh := make(chan Item) - skipCh := make(chan error) - defer close(skipCh) + numWorkers := 4 - go func() { - defer close(itemCh) - fastWalk(basePath, func(path string, typ os.FileMode) error { - if err := waitForLowActiveIO(); err != nil { - return filepath.SkipDir - } + var mutex sync.Mutex // Mutex to update dataUsageInfo - select { - case <-endCh: - return filepath.SkipDir - case itemCh <- Item{path, typ}: - } - return <-skipCh - }) - }() + fastWalk(basePath, numWorkers, doneCh, func(path string, typ os.FileMode) error { + // Wait for I/O to go down. + waitForLowActiveIO() - for { - select { - case <-endCh: - return dataUsageInfo - case item, ok := <-itemCh: - if !ok { - return dataUsageInfo - } - - bucket, entry := path2BucketObjectWithBasePath(basePath, item.Path) - if bucket == "" { - skipCh <- nil - continue - } - - if isReservedOrInvalidBucket(bucket, false) { - skipCh <- filepath.SkipDir - continue - } - - if entry == "" && item.Typ&os.ModeDir != 0 { - dataUsageInfo.BucketsCount++ - dataUsageInfo.BucketsSizes[bucket] = 0 - skipCh <- nil - continue - } - - if item.Typ&os.ModeDir != 0 { - skipCh <- nil - continue - } - - size, err := getSize(item) - if err != nil { - skipCh <- errSkipFile - continue - } - - dataUsageInfo.ObjectsCount++ - dataUsageInfo.ObjectsTotalSize += uint64(size) - dataUsageInfo.BucketsSizes[bucket] += uint64(size) - dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++ - skipCh <- nil + bucket, entry := path2BucketObjectWithBasePath(basePath, path) + if bucket == "" { + return nil } - } + + if isReservedOrInvalidBucket(bucket, false) { + return filepath.SkipDir + } + + if entry == "" && typ&os.ModeDir != 0 { + mutex.Lock() + dataUsageInfo.BucketsCount++ + dataUsageInfo.BucketsSizes[bucket] = 0 + mutex.Unlock() + return nil + } + + if typ&os.ModeDir != 0 { + return nil + } + + size, err := getSize(Item{path, typ}) + if err != nil { + return errSkipFile + } + + mutex.Lock() + dataUsageInfo.ObjectsCount++ + dataUsageInfo.ObjectsTotalSize += uint64(size) + dataUsageInfo.BucketsSizes[bucket] += uint64(size) + dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++ + mutex.Unlock() + return nil + }) + + return dataUsageInfo } diff --git a/cmd/fastwalk.go b/cmd/fastwalk.go index 2da818ff8..cb911f01a 100644 --- a/cmd/fastwalk.go +++ b/cmd/fastwalk.go @@ -12,7 +12,6 @@ import ( "errors" "os" "path/filepath" - "runtime" "strings" "sync" ) @@ -44,16 +43,7 @@ var errSkipFile = errors.New("fastwalk: skip this file") // * fastWalk can follow symlinks if walkFn returns the TraverseLink // sentinel error. It is the walkFn's responsibility to prevent // fastWalk from going into symlink cycles. -func fastWalk(root string, walkFn func(path string, typ os.FileMode) error) error { - // TODO(bradfitz): make numWorkers configurable? We used a - // minimum of 4 to give the kernel more info about multiple - // things we want, in hopes its I/O scheduling can take - // advantage of that. Hopefully most are in cache. Maybe 4 is - // even too low of a minimum. Profile more. - numWorkers := 4 - if n := runtime.NumCPU(); n > numWorkers { - numWorkers = n - } +func fastWalk(root string, nworkers int, doneCh <-chan struct{}, walkFn func(path string, typ os.FileMode) error) error { // Make sure to wait for all workers to finish, otherwise // walkFn could still be called after returning. This Wait call @@ -63,19 +53,20 @@ func fastWalk(root string, walkFn func(path string, typ os.FileMode) error) erro w := &walker{ fn: walkFn, - enqueuec: make(chan walkItem, numWorkers), // buffered for performance - workc: make(chan walkItem, numWorkers), // buffered for performance + enqueuec: make(chan walkItem, nworkers), // buffered for performance + workc: make(chan walkItem, nworkers), // buffered for performance donec: make(chan struct{}), // buffered for correctness & not leaking goroutines: - resc: make(chan error, numWorkers), + resc: make(chan error, nworkers), } defer close(w.donec) - for i := 0; i < numWorkers; i++ { + for i := 0; i < nworkers; i++ { wg.Add(1) go w.doWork(&wg) } + todo := []walkItem{{dir: root}} out := 0 for { @@ -87,6 +78,8 @@ func fastWalk(root string, walkFn func(path string, typ os.FileMode) error) erro workItem = todo[len(todo)-1] } select { + case <-doneCh: + return nil case workc <- workItem: todo = todo[:len(todo)-1] out++ diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 376ce747f..727d25d43 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -19,7 +19,6 @@ package cmd import ( "bytes" "context" - "errors" "fmt" "io" "io/ioutil" @@ -226,18 +225,10 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) StorageInfo { return storageInfo } -func (fs *FSObjects) waitForLowActiveIO() error { +func (fs *FSObjects) waitForLowActiveIO() { for atomic.LoadInt64(&fs.activeIOCount) >= fs.maxActiveIOCount { - select { - case <-GlobalServiceDoneCh: - return errors.New("forced exit") - case <-time.NewTimer(lowActiveIOWaitTick).C: - continue - } + time.Sleep(lowActiveIOWaitTick) } - - return nil - } // CrawlAndGetDataUsage returns data usage stats of the current FS deployment diff --git a/cmd/posix.go b/cmd/posix.go index 9e24c0555..5a5c37942 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -333,16 +333,10 @@ func isQuitting(endCh chan struct{}) bool { } } -func (s *posix) waitForLowActiveIO() error { +func (s *posix) waitForLowActiveIO() { for atomic.LoadInt32(&s.activeIOCount) >= s.maxActiveIOCount { - select { - case <-GlobalServiceDoneCh: - return errors.New("forced exit") - case <-time.NewTimer(lowActiveIOWaitTick).C: - continue - } + time.Sleep(lowActiveIOWaitTick) } - return nil } func (s *posix) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) {