mirror of
https://github.com/minio/minio.git
synced 2025-01-12 07:23:23 -05:00
2165d45d3f
Remove the random sleep. This is running in 4 goroutines, so mostly doing nothing. We use the getSize latency to estimate system load, meaning when there is little load on the system and we get the result fast we sleep a little. If it took a long time we have high load and release ourselves longer. We are sleeping inside the mutex so this affects all goroutines doing IO.
212 lines
5.6 KiB
Go
212 lines
5.6 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/hash"
|
|
)
|
|
|
|
const (
|
|
dataUsageObjName = "data-usage"
|
|
dataUsageCrawlInterval = 12 * time.Hour
|
|
)
|
|
|
|
func initDataUsageStats() {
|
|
go runDataUsageInfoUpdateRoutine()
|
|
}
|
|
|
|
func runDataUsageInfoUpdateRoutine() {
|
|
// Wait until the object layer is ready
|
|
var objAPI ObjectLayer
|
|
for {
|
|
objAPI = newObjectLayerWithoutSafeModeFn()
|
|
if objAPI == nil {
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
|
|
runDataUsageInfo(context.Background(), objAPI, GlobalServiceDoneCh)
|
|
}
|
|
|
|
// timeToNextCrawl returns the duration until next crawl should occur
|
|
// this is validated by verifying the LastUpdate time.
|
|
func timeToCrawl(ctx context.Context, objAPI ObjectLayer) time.Duration {
|
|
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objAPI)
|
|
if err != nil {
|
|
// Upon an error wait for like 10
|
|
// seconds to start the crawler.
|
|
return 10 * time.Second
|
|
}
|
|
// File indeed doesn't exist when LastUpdate is zero
|
|
// so we have never crawled, start crawl right away.
|
|
if dataUsageInfo.LastUpdate.IsZero() {
|
|
return 1 * time.Second
|
|
}
|
|
waitDuration := dataUsageInfo.LastUpdate.Sub(UTCNow())
|
|
if waitDuration > dataUsageCrawlInterval {
|
|
// Waited long enough start crawl in a 1 second
|
|
return 1 * time.Second
|
|
}
|
|
// No crawling needed, ask the routine to wait until
|
|
// the daily interval 12hrs - delta between last update
|
|
// with current time.
|
|
return dataUsageCrawlInterval - waitDuration
|
|
}
|
|
|
|
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan struct{}) {
|
|
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
|
|
for {
|
|
err := locker.GetLock(newDynamicTimeout(time.Millisecond, time.Millisecond))
|
|
if err != nil {
|
|
time.Sleep(5 * time.Minute)
|
|
continue
|
|
}
|
|
// Break without unlocking, this node will acquire
|
|
// data usage calculator role for its lifetime.
|
|
break
|
|
}
|
|
|
|
for {
|
|
wait := timeToCrawl(ctx, objAPI)
|
|
select {
|
|
case <-endCh:
|
|
locker.Unlock()
|
|
return
|
|
case <-time.NewTimer(wait).C:
|
|
// Crawl only when no previous crawl has occurred,
|
|
// or its been too long since last crawl.
|
|
err := storeDataUsageInBackend(ctx, objAPI, objAPI.CrawlAndGetDataUsage(ctx, endCh))
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dataUsageInfo DataUsageInfo) error {
|
|
dataUsageJSON, err := json.Marshal(dataUsageInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
size := int64(len(dataUsageJSON))
|
|
r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = objAPI.PutObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
|
|
return err
|
|
}
|
|
|
|
func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
|
|
var dataUsageInfoJSON bytes.Buffer
|
|
|
|
err := objAPI.GetObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) {
|
|
return DataUsageInfo{}, nil
|
|
}
|
|
return DataUsageInfo{}, toObjectErr(err, minioMetaBackgroundOpsBucket, dataUsageObjName)
|
|
}
|
|
|
|
var dataUsageInfo DataUsageInfo
|
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
|
err = json.Unmarshal(dataUsageInfoJSON.Bytes(), &dataUsageInfo)
|
|
if err != nil {
|
|
return DataUsageInfo{}, err
|
|
}
|
|
|
|
return dataUsageInfo, nil
|
|
}
|
|
|
|
// Item represents each file while walking.
|
|
type Item struct {
|
|
Path string
|
|
Typ os.FileMode
|
|
}
|
|
|
|
type getSizeFn func(item Item) (int64, error)
|
|
|
|
func updateUsage(basePath string, doneCh <-chan struct{}, waitForLowActiveIO func(), getSize getSizeFn) DataUsageInfo {
|
|
var dataUsageInfo = DataUsageInfo{
|
|
BucketsSizes: make(map[string]uint64),
|
|
ObjectsSizesHistogram: make(map[string]uint64),
|
|
}
|
|
|
|
numWorkers := 4
|
|
var mutex sync.Mutex // Mutex to update dataUsageInfo
|
|
|
|
fastWalk(basePath, numWorkers, doneCh, func(path string, typ os.FileMode) error {
|
|
// Wait for I/O to go down.
|
|
waitForLowActiveIO()
|
|
|
|
bucket, entry := path2BucketObjectWithBasePath(basePath, path)
|
|
if bucket == "" {
|
|
return nil
|
|
}
|
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
return filepath.SkipDir
|
|
}
|
|
|
|
if entry == "" && typ&os.ModeDir != 0 {
|
|
mutex.Lock()
|
|
dataUsageInfo.BucketsCount++
|
|
dataUsageInfo.BucketsSizes[bucket] = 0
|
|
mutex.Unlock()
|
|
return nil
|
|
}
|
|
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
|
|
if typ&os.ModeDir != 0 {
|
|
return nil
|
|
}
|
|
|
|
t := time.Now()
|
|
size, err := getSize(Item{path, typ})
|
|
// Use the response time of the getSize call to guess system load.
|
|
// Sleep equivalent time.
|
|
if d := time.Since(t); d > 100*time.Microsecond {
|
|
time.Sleep(d)
|
|
}
|
|
if err != nil {
|
|
return errSkipFile
|
|
}
|
|
|
|
dataUsageInfo.ObjectsCount++
|
|
dataUsageInfo.ObjectsTotalSize += uint64(size)
|
|
dataUsageInfo.BucketsSizes[bucket] += uint64(size)
|
|
dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
|
|
return nil
|
|
})
|
|
|
|
return dataUsageInfo
|
|
}
|