Avoid select inside a recursive function to avoid CPU spikes (#8923)

Additionally also allow configurable go-routines
This commit is contained in:
Harshavardhana
2020-02-04 06:15:59 +05:30
committed by GitHub
parent 9bbf5cb74f
commit 2d295a31de
4 changed files with 54 additions and 97 deletions

View File

@@ -22,6 +22,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"sync"
"time"
jsoniter "github.com/json-iterator/go"
@@ -150,77 +151,55 @@ type Item struct {
}
type getSizeFn func(item Item) (int64, error)
type activeIOFn func() error
func updateUsage(basePath string, endCh <-chan struct{}, waitForLowActiveIO activeIOFn, getSize getSizeFn) DataUsageInfo {
func updateUsage(basePath string, doneCh <-chan struct{}, waitForLowActiveIO func(), getSize getSizeFn) DataUsageInfo {
var dataUsageInfo = DataUsageInfo{
BucketsSizes: make(map[string]uint64),
ObjectsSizesHistogram: make(map[string]uint64),
}
itemCh := make(chan Item)
skipCh := make(chan error)
defer close(skipCh)
numWorkers := 4
go func() {
defer close(itemCh)
fastWalk(basePath, func(path string, typ os.FileMode) error {
if err := waitForLowActiveIO(); err != nil {
return filepath.SkipDir
}
var mutex sync.Mutex // Mutex to update dataUsageInfo
select {
case <-endCh:
return filepath.SkipDir
case itemCh <- Item{path, typ}:
}
return <-skipCh
})
}()
fastWalk(basePath, numWorkers, doneCh, func(path string, typ os.FileMode) error {
// Wait for I/O to go down.
waitForLowActiveIO()
for {
select {
case <-endCh:
return dataUsageInfo
case item, ok := <-itemCh:
if !ok {
return dataUsageInfo
}
bucket, entry := path2BucketObjectWithBasePath(basePath, item.Path)
if bucket == "" {
skipCh <- nil
continue
}
if isReservedOrInvalidBucket(bucket, false) {
skipCh <- filepath.SkipDir
continue
}
if entry == "" && item.Typ&os.ModeDir != 0 {
dataUsageInfo.BucketsCount++
dataUsageInfo.BucketsSizes[bucket] = 0
skipCh <- nil
continue
}
if item.Typ&os.ModeDir != 0 {
skipCh <- nil
continue
}
size, err := getSize(item)
if err != nil {
skipCh <- errSkipFile
continue
}
dataUsageInfo.ObjectsCount++
dataUsageInfo.ObjectsTotalSize += uint64(size)
dataUsageInfo.BucketsSizes[bucket] += uint64(size)
dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
skipCh <- nil
bucket, entry := path2BucketObjectWithBasePath(basePath, path)
if bucket == "" {
return nil
}
}
if isReservedOrInvalidBucket(bucket, false) {
return filepath.SkipDir
}
if entry == "" && typ&os.ModeDir != 0 {
mutex.Lock()
dataUsageInfo.BucketsCount++
dataUsageInfo.BucketsSizes[bucket] = 0
mutex.Unlock()
return nil
}
if typ&os.ModeDir != 0 {
return nil
}
size, err := getSize(Item{path, typ})
if err != nil {
return errSkipFile
}
mutex.Lock()
dataUsageInfo.ObjectsCount++
dataUsageInfo.ObjectsTotalSize += uint64(size)
dataUsageInfo.BucketsSizes[bucket] += uint64(size)
dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
mutex.Unlock()
return nil
})
return dataUsageInfo
}