mirror of
https://github.com/minio/minio.git
synced 2025-01-24 21:23:15 -05:00
b87fae0049
This change moves away from a unified constructor for plaintext and encrypted usage. NewPutObjReader is simplified for the plain-text reader use. For encrypted reader use, WithEncryption should be called on an initialized PutObjReader. Plaintext: func NewPutObjReader(rawReader *hash.Reader) *PutObjReader The hash.Reader is used to provide payload size and md5sum to the downstream consumers. This is different from the previous version in that there is no need to pass nil values for unused parameters. Encrypted: func WithEncryption(encReader *hash.Reader, key *crypto.ObjectKey) (*PutObjReader, error) This method sets up encrypted reader along with the key to seal the md5sum produced by the plain-text reader (already setup when NewPutObjReader was called). Usage: ``` pReader := NewPutObjReader(rawReader) // ... other object handler code goes here // Prepare the encrypted hashed reader pReader, err = pReader.WithEncryption(encReader, objEncKey) ```
1232 lines
36 KiB
Go
1232 lines
36 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/minio/minio/cmd/config/heal"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/bucket/lifecycle"
|
|
"github.com/minio/minio/pkg/bucket/replication"
|
|
"github.com/minio/minio/pkg/color"
|
|
"github.com/minio/minio/pkg/console"
|
|
"github.com/minio/minio/pkg/event"
|
|
"github.com/minio/minio/pkg/hash"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
"github.com/willf/bloom"
|
|
)
|
|
|
|
const (
|
|
dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders.
|
|
dataCrawlStartDelay = 1 * time.Minute // Time to wait on startup and between cycles.
|
|
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
|
|
|
|
healDeleteDangling = true
|
|
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
|
|
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
|
|
)
|
|
|
|
var (
|
|
globalHealConfig heal.Config
|
|
globalHealConfigMu sync.Mutex
|
|
|
|
dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
|
// Sleeper values are updated when config is loaded.
|
|
crawlerSleeper = newDynamicSleeper(10, 10*time.Second)
|
|
)
|
|
|
|
// initDataCrawler will start the crawler in the background.
|
|
func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
|
go runDataCrawler(ctx, objAPI)
|
|
}
|
|
|
|
// runDataCrawler will start a data crawler.
|
|
// The function will block until the context is canceled.
|
|
// There should only ever be one crawler running per cluster.
|
|
func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
|
// Make sure only 1 crawler is running on the cluster.
|
|
locker := objAPI.NewNSLock(minioMetaBucket, "runDataCrawler.lock")
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
for {
|
|
err := locker.GetLock(ctx, dataCrawlerLeaderLockTimeout)
|
|
if err != nil {
|
|
time.Sleep(time.Duration(r.Float64() * float64(dataCrawlStartDelay)))
|
|
continue
|
|
}
|
|
break
|
|
// No unlock for "leader" lock.
|
|
}
|
|
|
|
// Load current bloom cycle
|
|
nextBloomCycle := intDataUpdateTracker.current() + 1
|
|
|
|
br, err := objAPI.GetObjectNInfo(ctx, dataUsageBucket, dataUsageBloomName, nil, http.Header{}, readLock, ObjectOptions{})
|
|
if err != nil {
|
|
if !isErrObjectNotFound(err) && !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
} else {
|
|
if br.ObjInfo.Size == 8 {
|
|
if err = binary.Read(br, binary.LittleEndian, &nextBloomCycle); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
br.Close()
|
|
}
|
|
|
|
crawlTimer := time.NewTimer(dataCrawlStartDelay)
|
|
defer crawlTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-crawlTimer.C:
|
|
// Reset the timer for next cycle.
|
|
crawlTimer.Reset(dataCrawlStartDelay)
|
|
|
|
if intDataUpdateTracker.debug {
|
|
console.Debugln("starting crawler cycle")
|
|
}
|
|
|
|
// Wait before starting next cycle and wait on startup.
|
|
results := make(chan DataUsageInfo, 1)
|
|
go storeDataUsageInBackend(ctx, objAPI, results)
|
|
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
|
|
logger.LogIf(ctx, err)
|
|
err = objAPI.CrawlAndGetDataUsage(ctx, bf, results)
|
|
close(results)
|
|
logger.LogIf(ctx, err)
|
|
if err == nil {
|
|
// Store new cycle...
|
|
nextBloomCycle++
|
|
var tmp [8]byte
|
|
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
|
|
r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
|
|
_, err = objAPI.PutObject(ctx, dataUsageBucket, dataUsageBloomName, NewPutObjReader(r), ObjectOptions{})
|
|
if !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type cachedFolder struct {
|
|
name string
|
|
parent *dataUsageHash
|
|
objectHealProbDiv uint32
|
|
}
|
|
|
|
type folderScanner struct {
|
|
root string
|
|
getSize getSizeFn
|
|
oldCache dataUsageCache
|
|
newCache dataUsageCache
|
|
withFilter *bloomFilter
|
|
|
|
dataUsageCrawlDebug bool
|
|
healFolderInclude uint32 // Include a clean folder one in n cycles.
|
|
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
|
|
|
newFolders []cachedFolder
|
|
existingFolders []cachedFolder
|
|
disks []StorageAPI
|
|
}
|
|
|
|
// crawlDataFolder will crawl the basepath+cache.Info.Name and return an updated cache.
|
|
// The returned cache will always be valid, but may not be updated from the existing.
|
|
// Before each operation sleepDuration is called which can be used to temporarily halt the crawler.
|
|
// If the supplied context is canceled the function will return at the first chance.
|
|
func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
|
|
t := UTCNow()
|
|
|
|
logPrefix := color.Green("data-usage: ")
|
|
logSuffix := color.Blue("- %v + %v", basePath, cache.Info.Name)
|
|
if intDataUpdateTracker.debug {
|
|
defer func() {
|
|
console.Debugf(logPrefix+" Crawl time: %v %s\n", time.Since(t), logSuffix)
|
|
}()
|
|
|
|
}
|
|
|
|
switch cache.Info.Name {
|
|
case "", dataUsageRoot:
|
|
return cache, errors.New("internal error: root scan attempted")
|
|
}
|
|
|
|
skipHeal := cache.Info.SkipHealing
|
|
|
|
s := folderScanner{
|
|
root: basePath,
|
|
getSize: getSize,
|
|
oldCache: cache,
|
|
newCache: dataUsageCache{Info: cache.Info},
|
|
newFolders: nil,
|
|
existingFolders: nil,
|
|
dataUsageCrawlDebug: intDataUpdateTracker.debug,
|
|
healFolderInclude: 0,
|
|
healObjectSelect: 0,
|
|
}
|
|
|
|
// Add disks for set healing.
|
|
if len(cache.Disks) > 0 {
|
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
|
if ok {
|
|
s.disks = objAPI.GetDisksID(cache.Disks...)
|
|
if len(s.disks) != len(cache.Disks) {
|
|
console.Debugf(logPrefix+"Missing disks, want %d, found %d. Cannot heal. %s\n", len(cache.Disks), len(s.disks), logSuffix)
|
|
s.disks = s.disks[:0]
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable healing in XL mode.
|
|
if globalIsErasure {
|
|
// Include a clean folder one in n cycles.
|
|
s.healFolderInclude = healFolderIncludeProb
|
|
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
|
s.healObjectSelect = healObjectSelectProb
|
|
}
|
|
if len(cache.Info.BloomFilter) > 0 {
|
|
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
|
_, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
|
|
if err != nil {
|
|
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
|
|
s.withFilter = nil
|
|
}
|
|
}
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Start crawling. Bloom filter: %v %s\n", s.withFilter != nil, logSuffix)
|
|
}
|
|
|
|
done := ctx.Done()
|
|
var flattenLevels = 2
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Cycle: %v, Entries: %v %s\n", cache.Info.NextCycle, len(cache.Cache), logSuffix)
|
|
}
|
|
|
|
// Always scan flattenLevels deep. Cache root is level 0.
|
|
todo := []cachedFolder{{name: cache.Info.Name, objectHealProbDiv: 1}}
|
|
for i := 0; i < flattenLevels; i++ {
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Level %v, scanning %v directories. %s\n", i, len(todo), logSuffix)
|
|
}
|
|
select {
|
|
case <-done:
|
|
return cache, ctx.Err()
|
|
default:
|
|
}
|
|
var err error
|
|
todo, err = s.scanQueuedLevels(ctx, todo, i == flattenLevels-1, skipHeal)
|
|
if err != nil {
|
|
// No useful information...
|
|
return cache, err
|
|
}
|
|
}
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"New folders: %v %s\n", s.newFolders, logSuffix)
|
|
}
|
|
|
|
// Add new folders first
|
|
for _, folder := range s.newFolders {
|
|
select {
|
|
case <-done:
|
|
return s.newCache, ctx.Err()
|
|
default:
|
|
}
|
|
du, err := s.deepScanFolder(ctx, folder, skipHeal)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
if du == nil {
|
|
console.Debugln(logPrefix + "no disk usage provided" + logSuffix)
|
|
continue
|
|
}
|
|
|
|
s.newCache.replace(folder.name, "", *du)
|
|
// Add to parent manually
|
|
if folder.parent != nil {
|
|
parent := s.newCache.Cache[folder.parent.Key()]
|
|
parent.addChildString(folder.name)
|
|
}
|
|
}
|
|
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Existing folders: %v %s\n", len(s.existingFolders), logSuffix)
|
|
}
|
|
|
|
// Do selective scanning of existing folders.
|
|
for _, folder := range s.existingFolders {
|
|
select {
|
|
case <-done:
|
|
return s.newCache, ctx.Err()
|
|
default:
|
|
}
|
|
h := hashPath(folder.name)
|
|
if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
|
if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) {
|
|
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
|
continue
|
|
} else {
|
|
folder.objectHealProbDiv = s.healFolderInclude
|
|
}
|
|
folder.objectHealProbDiv = dataUsageUpdateDirCycles
|
|
}
|
|
if s.withFilter != nil {
|
|
_, prefix := path2BucketObjectWithBasePath(basePath, folder.name)
|
|
if s.oldCache.Info.lifeCycle == nil || !s.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
|
|
// If folder isn't in filter, skip it completely.
|
|
if !s.withFilter.containsDir(folder.name) {
|
|
if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) {
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Skipping non-updated folder: %v %s\n", folder, logSuffix)
|
|
}
|
|
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
|
continue
|
|
} else {
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Adding non-updated folder to heal check: %v %s\n", folder.name, logSuffix)
|
|
}
|
|
// Update probability of including objects
|
|
folder.objectHealProbDiv = s.healFolderInclude
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update on this cycle...
|
|
du, err := s.deepScanFolder(ctx, folder, skipHeal)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
continue
|
|
}
|
|
if du == nil {
|
|
logger.LogIf(ctx, errors.New("data-usage: no disk usage provided"))
|
|
continue
|
|
}
|
|
s.newCache.replaceHashed(h, folder.parent, *du)
|
|
}
|
|
if s.dataUsageCrawlDebug {
|
|
console.Debugf(logPrefix+"Finished crawl, %v entries %s\n", len(s.newCache.Cache), logSuffix)
|
|
}
|
|
s.newCache.Info.LastUpdate = UTCNow()
|
|
s.newCache.Info.NextCycle++
|
|
return s.newCache, nil
|
|
}
|
|
|
|
// scanQueuedLevels will scan the provided folders.
|
|
// Files found in the folders will be added to f.newCache.
|
|
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
|
// If final is not provided the folders found are returned from the function.
|
|
func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFolder, final bool, skipHeal bool) ([]cachedFolder, error) {
|
|
var nextFolders []cachedFolder
|
|
done := ctx.Done()
|
|
scannerLogPrefix := color.Green("folder-scanner:")
|
|
for _, folder := range folders {
|
|
select {
|
|
case <-done:
|
|
return nil, ctx.Err()
|
|
default:
|
|
}
|
|
thisHash := hashPath(folder.name)
|
|
existing := f.oldCache.findChildrenCopy(thisHash)
|
|
|
|
// If there are lifecycle rules for the prefix, remove the filter.
|
|
filter := f.withFilter
|
|
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
|
|
var activeLifeCycle *lifecycle.Lifecycle
|
|
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
|
|
}
|
|
activeLifeCycle = f.oldCache.Info.lifeCycle
|
|
filter = nil
|
|
}
|
|
if _, ok := f.oldCache.Cache[thisHash.Key()]; filter != nil && ok {
|
|
// If folder isn't in filter and we have data, skip it completely.
|
|
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
|
|
if !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
|
|
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
|
|
}
|
|
continue
|
|
} else {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
|
|
}
|
|
// If probability was already crawlerHealFolderInclude, keep it.
|
|
folder.objectHealProbDiv = f.healFolderInclude
|
|
}
|
|
}
|
|
}
|
|
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
|
|
|
cache := dataUsageEntry{}
|
|
|
|
err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
|
|
// Parse
|
|
entName = path.Clean(path.Join(folder.name, entName))
|
|
bucket, prefix := path2BucketObjectWithBasePath(f.root, entName)
|
|
if bucket == "" {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" no bucket (%s,%s)\n", f.root, entName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" invalid bucket: %v, entry: %v\n", bucket, entName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if typ&os.ModeDir != 0 {
|
|
h := hashPath(entName)
|
|
_, exists := f.oldCache.Cache[h.Key()]
|
|
cache.addChildString(entName)
|
|
|
|
this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
|
|
|
delete(existing, h.Key()) // h.Key() already accounted for.
|
|
|
|
cache.addChild(h)
|
|
|
|
if final {
|
|
if exists {
|
|
f.existingFolders = append(f.existingFolders, this)
|
|
} else {
|
|
f.newFolders = append(f.newFolders, this)
|
|
}
|
|
} else {
|
|
nextFolders = append(nextFolders, this)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Dynamic time delay.
|
|
wait := crawlerSleeper.Timer(ctx)
|
|
|
|
// Get file size, ignore errors.
|
|
item := crawlItem{
|
|
Path: path.Join(f.root, entName),
|
|
Typ: typ,
|
|
bucket: bucket,
|
|
prefix: path.Dir(prefix),
|
|
objectName: path.Base(entName),
|
|
debug: f.dataUsageCrawlDebug,
|
|
lifeCycle: activeLifeCycle,
|
|
heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
|
|
}
|
|
|
|
// if the drive belongs to an erasure set
|
|
// that is already being healed, skip the
|
|
// healing attempt on this drive.
|
|
item.heal = item.heal && !skipHeal
|
|
|
|
sizeSummary, err := f.getSize(item)
|
|
if err == errSkipFile {
|
|
wait() // wait to proceed to next entry.
|
|
|
|
return nil
|
|
}
|
|
|
|
// successfully read means we have a valid object.
|
|
|
|
// Remove filename i.e is the meta file to construct object name
|
|
item.transformMetaDir()
|
|
|
|
// Object already accounted for, remove from heal map,
|
|
// simply because getSize() function already heals the
|
|
// object.
|
|
delete(existing, path.Join(item.bucket, item.objectPath()))
|
|
|
|
cache.addSizes(sizeSummary)
|
|
cache.Objects++
|
|
cache.ObjSizes.add(sizeSummary.totalSize)
|
|
|
|
wait() // wait to proceed to next entry.
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if f.healObjectSelect == 0 {
|
|
// If we are not scanning, return now.
|
|
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
|
continue
|
|
}
|
|
|
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
|
if !ok || len(f.disks) == 0 {
|
|
continue
|
|
}
|
|
|
|
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
|
if !found {
|
|
continue
|
|
}
|
|
|
|
// Whatever remains in 'existing' are folders at this level
|
|
// that existed in the previous run but wasn't found now.
|
|
//
|
|
// This may be because of 2 reasons:
|
|
//
|
|
// 1) The folder/object was deleted.
|
|
// 2) We come from another disk and this disk missed the write.
|
|
//
|
|
// We therefore perform a heal check.
|
|
// If that doesn't bring it back we remove the folder and assume it was deleted.
|
|
// This means that the next run will not look for it.
|
|
// How to resolve results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: getReadQuorum(len(f.disks)),
|
|
objQuorum: getReadQuorum(len(f.disks)),
|
|
bucket: "",
|
|
}
|
|
|
|
healObjectsPrefix := color.Green("healObjects:")
|
|
for k := range existing {
|
|
bucket, prefix := path2BucketObject(k)
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
|
|
}
|
|
|
|
// Dynamic time delay.
|
|
wait := crawlerSleeper.Timer(ctx)
|
|
resolver.bucket = bucket
|
|
|
|
foundObjs := false
|
|
dangling := false
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: f.disks,
|
|
bucket: bucket,
|
|
path: prefix,
|
|
recursive: true,
|
|
reportNotFound: true,
|
|
minDisks: len(f.disks), // We want full consistency.
|
|
// Weird, maybe transient error.
|
|
agreed: func(entry metaCacheEntry) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(healObjectsPrefix+" got agreement: %v\n", entry.name)
|
|
}
|
|
},
|
|
// Some disks have data for this.
|
|
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(healObjectsPrefix+" got partial, %d agreed, errs: %v\n", nAgreed, errs)
|
|
}
|
|
|
|
// agreed value less than expected quorum
|
|
dangling = nAgreed < resolver.objQuorum || nAgreed < resolver.dirQuorum
|
|
|
|
// Sleep and reset.
|
|
wait()
|
|
wait = crawlerSleeper.Timer(ctx)
|
|
entry, ok := entries.resolve(&resolver)
|
|
if !ok {
|
|
for _, err := range errs {
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
// If no errors, queue it for healing.
|
|
entry, _ = entries.firstFound()
|
|
}
|
|
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(healObjectsPrefix+" resolved to: %v, dir: %v\n", entry.name, entry.isDir())
|
|
}
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
// We got an entry which we should be able to heal.
|
|
fiv, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
err := bgSeq.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: entry.name,
|
|
versionID: "",
|
|
}, madmin.HealItemObject)
|
|
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
foundObjs = foundObjs || err == nil
|
|
return
|
|
}
|
|
for _, ver := range fiv.Versions {
|
|
// Sleep and reset.
|
|
wait()
|
|
wait = crawlerSleeper.Timer(ctx)
|
|
err := bgSeq.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: fiv.Name,
|
|
versionID: ver.VersionID,
|
|
}, madmin.HealItemObject)
|
|
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
foundObjs = foundObjs || err == nil
|
|
}
|
|
},
|
|
// Too many disks failed.
|
|
finished: func(errs []error) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(healObjectsPrefix+" too many errors: %v\n", errs)
|
|
}
|
|
cancel()
|
|
},
|
|
})
|
|
|
|
if f.dataUsageCrawlDebug && err != nil && err != errFileNotFound {
|
|
console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
|
|
}
|
|
|
|
// If we found one or more disks with this folder, delete it.
|
|
if err == nil && dangling {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(healObjectsPrefix+" deleting dangling directory %s\n", prefix)
|
|
}
|
|
|
|
objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{
|
|
Recursive: true,
|
|
Remove: true,
|
|
},
|
|
func(bucket, object, versionID string) error {
|
|
// Wait for each heal as per crawler frequency.
|
|
wait()
|
|
wait = crawlerSleeper.Timer(ctx)
|
|
return bgSeq.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: object,
|
|
versionID: versionID,
|
|
}, madmin.HealItemObject)
|
|
})
|
|
}
|
|
|
|
wait()
|
|
|
|
// Add unless healing returned an error.
|
|
if foundObjs {
|
|
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
|
cache.addChild(hashPath(k))
|
|
if final {
|
|
f.existingFolders = append(f.existingFolders, this)
|
|
} else {
|
|
nextFolders = append(nextFolders, this)
|
|
}
|
|
}
|
|
}
|
|
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
|
}
|
|
return nextFolders, nil
|
|
}
|
|
|
|
// deepScanFolder will deep scan a folder and return the size if no error occurs.
|
|
func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder, skipHeal bool) (*dataUsageEntry, error) {
|
|
var cache dataUsageEntry
|
|
|
|
done := ctx.Done()
|
|
|
|
var addDir func(entName string, typ os.FileMode) error
|
|
var dirStack = []string{f.root, folder.name}
|
|
|
|
deepScannerLogPrefix := color.Green("deep-scanner:")
|
|
addDir = func(entName string, typ os.FileMode) error {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if typ&os.ModeDir != 0 {
|
|
dirStack = append(dirStack, entName)
|
|
err := readDirFn(path.Join(dirStack...), addDir)
|
|
dirStack = dirStack[:len(dirStack)-1]
|
|
crawlerSleeper.Sleep(ctx, dataCrawlSleepPerFolder)
|
|
return err
|
|
}
|
|
|
|
// Dynamic time delay.
|
|
wait := crawlerSleeper.Timer(ctx)
|
|
|
|
// Get file size, ignore errors.
|
|
dirStack = append(dirStack, entName)
|
|
fileName := path.Join(dirStack...)
|
|
dirStack = dirStack[:len(dirStack)-1]
|
|
|
|
bucket, prefix := path2BucketObjectWithBasePath(f.root, fileName)
|
|
var activeLifeCycle *lifecycle.Lifecycle
|
|
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix, false) {
|
|
if f.dataUsageCrawlDebug {
|
|
console.Debugf(deepScannerLogPrefix+" Prefix %q has active rules\n", prefix)
|
|
}
|
|
activeLifeCycle = f.oldCache.Info.lifeCycle
|
|
}
|
|
|
|
item := crawlItem{
|
|
Path: fileName,
|
|
Typ: typ,
|
|
bucket: bucket,
|
|
prefix: path.Dir(prefix),
|
|
objectName: path.Base(entName),
|
|
debug: f.dataUsageCrawlDebug,
|
|
lifeCycle: activeLifeCycle,
|
|
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
|
|
}
|
|
|
|
// if the drive belongs to an erasure set
|
|
// that is already being healed, skip the
|
|
// healing attempt on this drive.
|
|
item.heal = item.heal && !skipHeal
|
|
|
|
sizeSummary, err := f.getSize(item)
|
|
if err == errSkipFile {
|
|
// Wait to throttle IO
|
|
wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
logger.LogIf(ctx, err)
|
|
cache.addSizes(sizeSummary)
|
|
cache.Objects++
|
|
cache.ObjSizes.add(sizeSummary.totalSize)
|
|
|
|
// Wait to throttle IO
|
|
wait()
|
|
|
|
return nil
|
|
}
|
|
err := readDirFn(path.Join(dirStack...), addDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &cache, nil
|
|
}
|
|
|
|
// crawlItem represents each file while walking.
|
|
type crawlItem struct {
|
|
Path string
|
|
Typ os.FileMode
|
|
|
|
bucket string // Bucket.
|
|
prefix string // Only the prefix if any, does not have final object name.
|
|
objectName string // Only the object name without prefixes.
|
|
lifeCycle *lifecycle.Lifecycle
|
|
heal bool // Has the object been selected for heal check?
|
|
debug bool
|
|
}
|
|
|
|
type sizeSummary struct {
|
|
totalSize int64
|
|
replicatedSize int64
|
|
pendingSize int64
|
|
failedSize int64
|
|
replicaSize int64
|
|
}
|
|
|
|
type getSizeFn func(item crawlItem) (sizeSummary, error)
|
|
|
|
// transformMetaDir will transform a directory to prefix/file.ext
|
|
func (i *crawlItem) transformMetaDir() {
|
|
split := strings.Split(i.prefix, SlashSeparator)
|
|
if len(split) > 1 {
|
|
i.prefix = path.Join(split[:len(split)-1]...)
|
|
} else {
|
|
i.prefix = ""
|
|
}
|
|
// Object name is last element
|
|
i.objectName = split[len(split)-1]
|
|
}
|
|
|
|
// actionMeta contains information used to apply actions.
|
|
type actionMeta struct {
|
|
oi ObjectInfo
|
|
bitRotScan bool // indicates if bitrot check was requested.
|
|
}
|
|
|
|
var applyActionsLogPrefix = color.Green("applyActions:")
|
|
|
|
// applyActions will apply lifecycle checks on to a scanned item.
|
|
// The resulting size on disk will always be returned.
|
|
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
|
// If no metadata is supplied, -1 is returned if no action is taken.
|
|
func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta actionMeta) (size int64) {
|
|
size, err := meta.oi.GetActualSize()
|
|
if i.debug {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
if i.heal {
|
|
if i.debug {
|
|
if meta.oi.VersionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), meta.oi.VersionID)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
|
|
}
|
|
}
|
|
healOpts := madmin.HealOpts{Remove: healDeleteDangling}
|
|
if meta.bitRotScan {
|
|
healOpts.ScanMode = madmin.HealDeepScan
|
|
}
|
|
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, healOpts)
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return 0
|
|
}
|
|
if err != nil && !errors.Is(err, NotImplemented{}) {
|
|
logger.LogIf(ctx, err)
|
|
return 0
|
|
}
|
|
size = res.ObjectSize
|
|
}
|
|
if i.lifeCycle == nil {
|
|
if i.debug {
|
|
console.Debugf(applyActionsLogPrefix+" no lifecycle rules to apply: %q\n", i.objectPath())
|
|
}
|
|
return size
|
|
}
|
|
|
|
versionID := meta.oi.VersionID
|
|
action := i.lifeCycle.ComputeAction(
|
|
lifecycle.ObjectOpts{
|
|
Name: i.objectPath(),
|
|
UserTags: meta.oi.UserTags,
|
|
ModTime: meta.oi.ModTime,
|
|
VersionID: meta.oi.VersionID,
|
|
DeleteMarker: meta.oi.DeleteMarker,
|
|
IsLatest: meta.oi.IsLatest,
|
|
NumVersions: meta.oi.NumVersions,
|
|
SuccessorModTime: meta.oi.SuccessorModTime,
|
|
RestoreOngoing: meta.oi.RestoreOngoing,
|
|
RestoreExpires: meta.oi.RestoreExpires,
|
|
TransitionStatus: meta.oi.TransitionStatus,
|
|
})
|
|
if i.debug {
|
|
if versionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, action)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), action)
|
|
}
|
|
}
|
|
switch action {
|
|
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction:
|
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
|
default:
|
|
// No action.
|
|
if i.debug {
|
|
console.Debugf(applyActionsLogPrefix+" object not expirable: %q\n", i.objectPath())
|
|
}
|
|
return size
|
|
}
|
|
|
|
obj, err := o.GetObjectInfo(ctx, i.bucket, i.objectPath(), ObjectOptions{
|
|
VersionID: versionID,
|
|
})
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case MethodNotAllowed: // This happens usually for a delete marker
|
|
if !obj.DeleteMarker { // if this is not a delete marker log and return
|
|
// Do nothing - heal in the future.
|
|
logger.LogIf(ctx, err)
|
|
return size
|
|
}
|
|
case ObjectNotFound, VersionNotFound:
|
|
// object not found or version not found return 0
|
|
return 0
|
|
default:
|
|
// All other errors proceed.
|
|
logger.LogIf(ctx, err)
|
|
return size
|
|
}
|
|
}
|
|
|
|
var applied bool
|
|
action = evalActionFromLifecycle(ctx, *i.lifeCycle, obj, i.debug)
|
|
if action != lifecycle.NoneAction {
|
|
applied = applyLifecycleAction(ctx, action, o, obj)
|
|
}
|
|
|
|
if applied {
|
|
return 0
|
|
}
|
|
return size
|
|
}
|
|
|
|
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj ObjectInfo, debug bool) (action lifecycle.Action) {
|
|
lcOpts := lifecycle.ObjectOpts{
|
|
Name: obj.Name,
|
|
UserTags: obj.UserTags,
|
|
ModTime: obj.ModTime,
|
|
VersionID: obj.VersionID,
|
|
DeleteMarker: obj.DeleteMarker,
|
|
IsLatest: obj.IsLatest,
|
|
NumVersions: obj.NumVersions,
|
|
SuccessorModTime: obj.SuccessorModTime,
|
|
RestoreOngoing: obj.RestoreOngoing,
|
|
RestoreExpires: obj.RestoreExpires,
|
|
TransitionStatus: obj.TransitionStatus,
|
|
}
|
|
|
|
action = lc.ComputeAction(lcOpts)
|
|
if debug {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action)
|
|
}
|
|
|
|
if action == lifecycle.NoneAction {
|
|
return action
|
|
}
|
|
|
|
switch action {
|
|
case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction:
|
|
// Defensive code, should never happen
|
|
if obj.VersionID == "" {
|
|
return lifecycle.NoneAction
|
|
}
|
|
if rcfg, _ := globalBucketObjectLockSys.Get(obj.Bucket); rcfg.LockEnabled {
|
|
locked := enforceRetentionForDeletion(ctx, obj)
|
|
if locked {
|
|
if debug {
|
|
if obj.VersionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
|
|
}
|
|
}
|
|
return lifecycle.NoneAction
|
|
}
|
|
}
|
|
}
|
|
|
|
return action
|
|
}
|
|
|
|
func applyTransitionAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) bool {
|
|
opts := ObjectOptions{}
|
|
if obj.TransitionStatus == "" {
|
|
opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
|
|
opts.VersionID = obj.VersionID
|
|
opts.TransitionStatus = lifecycle.TransitionPending
|
|
if _, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts); err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return false
|
|
}
|
|
// Assume it is still there.
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
}
|
|
globalTransitionState.queueTransitionTask(obj)
|
|
return true
|
|
|
|
}
|
|
|
|
func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool {
|
|
lcOpts := lifecycle.ObjectOpts{
|
|
Name: obj.Name,
|
|
UserTags: obj.UserTags,
|
|
ModTime: obj.ModTime,
|
|
VersionID: obj.VersionID,
|
|
DeleteMarker: obj.DeleteMarker,
|
|
IsLatest: obj.IsLatest,
|
|
NumVersions: obj.NumVersions,
|
|
SuccessorModTime: obj.SuccessorModTime,
|
|
RestoreOngoing: obj.RestoreOngoing,
|
|
RestoreExpires: obj.RestoreExpires,
|
|
TransitionStatus: obj.TransitionStatus,
|
|
}
|
|
|
|
if err := deleteTransitionedObject(ctx, objLayer, obj.Bucket, obj.Name, lcOpts, restoredObject, false); err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return false
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
// Notification already sent at *deleteTransitionedObject*, just return 'true' here.
|
|
return true
|
|
}
|
|
|
|
func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, applyOnVersion bool) bool {
|
|
opts := ObjectOptions{}
|
|
|
|
if applyOnVersion {
|
|
opts.VersionID = obj.VersionID
|
|
}
|
|
if opts.VersionID == "" {
|
|
opts.Versioned = globalBucketVersioningSys.Enabled(obj.Bucket)
|
|
}
|
|
|
|
obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts)
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return false
|
|
}
|
|
// Assume it is still there.
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
|
|
eventName := event.ObjectRemovedDelete
|
|
if obj.DeleteMarker {
|
|
eventName = event.ObjectRemovedDeleteMarkerCreated
|
|
}
|
|
|
|
// Notify object deleted event.
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: obj.Bucket,
|
|
Object: obj,
|
|
Host: "Internal: [ILM-EXPIRY]",
|
|
})
|
|
|
|
return true
|
|
}
|
|
|
|
// Apply object, object version, restored object or restored object version action on the given object
|
|
func applyExpiryRule(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject, applyOnVersion bool) bool {
|
|
if obj.TransitionStatus != "" {
|
|
return applyExpiryOnTransitionedObject(ctx, objLayer, obj, restoredObject)
|
|
}
|
|
return applyExpiryOnNonTransitionedObjects(ctx, objLayer, obj, applyOnVersion)
|
|
}
|
|
|
|
// Perform actions (removal of transitioning of objects), return true the action is successfully performed
|
|
func applyLifecycleAction(ctx context.Context, action lifecycle.Action, objLayer ObjectLayer, obj ObjectInfo) (success bool) {
|
|
switch action {
|
|
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
|
|
success = applyExpiryRule(ctx, objLayer, obj, false, action == lifecycle.DeleteVersionAction)
|
|
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
|
success = applyExpiryRule(ctx, objLayer, obj, true, action == lifecycle.DeleteRestoredVersionAction)
|
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
success = applyTransitionAction(ctx, action, objLayer, obj)
|
|
}
|
|
return
|
|
}
|
|
|
|
// objectPath returns the prefix and object name.
|
|
func (i *crawlItem) objectPath() string {
|
|
return path.Join(i.prefix, i.objectName)
|
|
}
|
|
|
|
// healReplication will heal a scanned item that has failed replication.
|
|
func (i *crawlItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
|
|
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
|
|
// heal delete marker replication failure or versioned delete replication failure
|
|
if oi.ReplicationStatus == replication.Pending ||
|
|
oi.ReplicationStatus == replication.Failed ||
|
|
oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending {
|
|
i.healReplicationDeletes(ctx, o, oi)
|
|
return
|
|
}
|
|
}
|
|
switch oi.ReplicationStatus {
|
|
case replication.Pending:
|
|
sizeS.pendingSize += oi.Size
|
|
globalReplicationState.queueReplicaTask(oi)
|
|
case replication.Failed:
|
|
sizeS.failedSize += oi.Size
|
|
globalReplicationState.queueReplicaTask(oi)
|
|
case replication.Completed, "COMPLETE":
|
|
sizeS.replicatedSize += oi.Size
|
|
case replication.Replica:
|
|
sizeS.replicaSize += oi.Size
|
|
}
|
|
}
|
|
|
|
// healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes.
|
|
func (i *crawlItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
|
|
// handle soft delete and permanent delete failures here.
|
|
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
|
|
versionID := ""
|
|
dmVersionID := ""
|
|
if oi.VersionPurgeStatus.Empty() {
|
|
dmVersionID = oi.VersionID
|
|
} else {
|
|
versionID = oi.VersionID
|
|
}
|
|
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
|
|
DeletedObject: DeletedObject{
|
|
ObjectName: oi.Name,
|
|
DeleteMarkerVersionID: dmVersionID,
|
|
VersionID: versionID,
|
|
DeleteMarkerReplicationStatus: string(oi.ReplicationStatus),
|
|
DeleteMarkerMTime: DeleteMarkerMTime{oi.ModTime},
|
|
DeleteMarker: oi.DeleteMarker,
|
|
VersionPurgeStatus: oi.VersionPurgeStatus,
|
|
},
|
|
Bucket: oi.Bucket,
|
|
})
|
|
}
|
|
}
|
|
|
|
type dynamicSleeper struct {
|
|
mu sync.RWMutex
|
|
|
|
// Sleep factor
|
|
factor float64
|
|
|
|
// maximum sleep cap,
|
|
// set to <= 0 to disable.
|
|
maxSleep time.Duration
|
|
|
|
// Don't sleep at all, if time taken is below this value.
|
|
// This is to avoid too small costly sleeps.
|
|
minSleep time.Duration
|
|
|
|
// cycle will be closed
|
|
cycle chan struct{}
|
|
}
|
|
|
|
// newDynamicSleeper
|
|
func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper {
|
|
return &dynamicSleeper{
|
|
factor: factor,
|
|
cycle: make(chan struct{}),
|
|
maxSleep: maxWait,
|
|
minSleep: 100 * time.Microsecond,
|
|
}
|
|
}
|
|
|
|
// Timer returns a timer that has started.
|
|
// When the returned function is called it will wait.
|
|
func (d *dynamicSleeper) Timer(ctx context.Context) func() {
|
|
t := time.Now()
|
|
return func() {
|
|
doneAt := time.Now()
|
|
for {
|
|
// Grab current values
|
|
d.mu.RLock()
|
|
minWait, maxWait := d.minSleep, d.maxSleep
|
|
factor := d.factor
|
|
cycle := d.cycle
|
|
d.mu.RUnlock()
|
|
elapsed := doneAt.Sub(t)
|
|
// Don't sleep for really small amount of time
|
|
wantSleep := time.Duration(float64(elapsed) * factor)
|
|
if wantSleep <= minWait {
|
|
return
|
|
}
|
|
if maxWait > 0 && wantSleep > maxWait {
|
|
wantSleep = maxWait
|
|
}
|
|
timer := time.NewTimer(wantSleep)
|
|
select {
|
|
case <-ctx.Done():
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
return
|
|
case <-timer.C:
|
|
return
|
|
case <-cycle:
|
|
if !timer.Stop() {
|
|
// We expired.
|
|
<-timer.C
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sleep sleeps the specified time multiplied by the sleep factor.
|
|
// If the factor is updated the sleep will be done again with the new factor.
|
|
func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
|
|
for {
|
|
// Grab current values
|
|
d.mu.RLock()
|
|
minWait, maxWait := d.minSleep, d.maxSleep
|
|
factor := d.factor
|
|
cycle := d.cycle
|
|
d.mu.RUnlock()
|
|
// Don't sleep for really small amount of time
|
|
wantSleep := time.Duration(float64(base) * factor)
|
|
if wantSleep <= minWait {
|
|
return
|
|
}
|
|
if maxWait > 0 && wantSleep > maxWait {
|
|
wantSleep = maxWait
|
|
}
|
|
timer := time.NewTimer(wantSleep)
|
|
select {
|
|
case <-ctx.Done():
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
return
|
|
case <-timer.C:
|
|
return
|
|
case <-cycle:
|
|
if !timer.Stop() {
|
|
// We expired.
|
|
<-timer.C
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the current settings and cycle all waiting.
|
|
// Parameters are the same as in the contructor.
|
|
func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait {
|
|
return nil
|
|
}
|
|
// Update values and cycle waiting.
|
|
close(d.cycle)
|
|
d.factor = factor
|
|
d.maxSleep = maxWait
|
|
d.cycle = make(chan struct{})
|
|
return nil
|
|
}
|