mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
continous healing based on crawler (#10103)
Design: https://gist.github.com/klauspost/792fe25c315caf1dd15c8e79df124914
This commit is contained in:
parent
caad314faa
commit
c097ce9c32
@ -654,9 +654,11 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
||||
if !h.reportProgress {
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted, we should ignore this object and
|
||||
// return success.
|
||||
// return the error and not calculate this object
|
||||
// as part of the metrics.
|
||||
if isErrObjectNotFound(res.err) || isErrVersionNotFound(res.err) {
|
||||
return nil
|
||||
// Return the error so that caller can handle it.
|
||||
return res.err
|
||||
}
|
||||
|
||||
h.mutex.Lock()
|
||||
@ -720,6 +722,8 @@ func (h *healSequence) healItemsFromSourceCh() error {
|
||||
if err := h.queueHealTask(source, itemType); err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectExistsAsDirectory:
|
||||
case ObjectNotFound:
|
||||
case VersionNotFound:
|
||||
default:
|
||||
logger.LogIf(h.ctx, fmt.Errorf("Heal attempt failed for %s: %w",
|
||||
pathJoin(source.bucket, source.object), err))
|
||||
@ -793,17 +797,17 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
|
||||
return errHealStopSignalled
|
||||
}
|
||||
|
||||
herr := h.queueHealTask(healSource{
|
||||
err := h.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: versionID,
|
||||
}, madmin.HealItemBucketMetadata)
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted we ignore this object an move on.
|
||||
if isErrObjectNotFound(herr) || isErrVersionNotFound(herr) {
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return herr
|
||||
return err
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -904,9 +908,15 @@ func (h *healSequence) healObject(bucket, object, versionID string) error {
|
||||
return errHealStopSignalled
|
||||
}
|
||||
|
||||
return h.queueHealTask(healSource{
|
||||
err := h.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: versionID,
|
||||
}, madmin.HealItemObject)
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted we ignore this object an move on.
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -55,19 +55,20 @@ func (h *healRoutine) queueHealTask(task healTask) {
|
||||
h.tasks <- task
|
||||
}
|
||||
|
||||
func waitForLowHTTPReq(tolerance int32) {
|
||||
func waitForLowHTTPReq(tolerance int32, maxWait time.Duration) {
|
||||
const wait = 10 * time.Millisecond
|
||||
waitCount := maxWait / wait
|
||||
|
||||
// Bucket notification and http trace are not costly, it is okay to ignore them
|
||||
// while counting the number of concurrent connections
|
||||
tolerance += int32(globalHTTPListen.NumSubscribers() + globalHTTPTrace.NumSubscribers())
|
||||
|
||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||
// Wait at max 10 minute for an inprogress request before proceeding to heal
|
||||
waitCount := 600
|
||||
// Any requests in progress, delay the heal.
|
||||
for (httpServer.GetRequestCount() >= tolerance) &&
|
||||
waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(wait)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -82,7 +83,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
|
||||
// Wait and proceed if there are active requests
|
||||
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()))
|
||||
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
|
||||
|
||||
var res madmin.HealResultItem
|
||||
var err error
|
||||
|
@ -27,6 +27,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bucket/lifecycle"
|
||||
@ -41,9 +43,10 @@ import (
|
||||
const (
|
||||
dataCrawlSleepPerFolder = time.Millisecond // Time to wait between folders.
|
||||
dataCrawlSleepDefMult = 10.0 // Default multiplier for waits between operations.
|
||||
dataCrawlStartDelay = 5 * time.Minute // Time to wait on startup and between cycles.
|
||||
dataCrawlStartDelay = 5 * time.Second // Time to wait on startup and between cycles.
|
||||
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
|
||||
|
||||
healDeleteDangling = true
|
||||
)
|
||||
|
||||
// initDataCrawler will start the crawler unless disabled.
|
||||
@ -87,12 +90,6 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
if err == nil {
|
||||
// Store new cycle...
|
||||
nextBloomCycle++
|
||||
if nextBloomCycle%dataUpdateTrackerResetEvery == 0 {
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.Info(color.Green("runDataCrawler:") + " Resetting bloom filter for next runs.")
|
||||
}
|
||||
nextBloomCycle++
|
||||
}
|
||||
var tmp [8]byte
|
||||
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
|
||||
r, err := hash.NewReader(bytes.NewReader(tmp[:]), int64(len(tmp)), "", "", int64(len(tmp)), false)
|
||||
@ -111,8 +108,9 @@ func runDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
|
||||
type cachedFolder struct {
|
||||
name string
|
||||
parent *dataUsageHash
|
||||
name string
|
||||
parent *dataUsageHash
|
||||
objectHealProbDiv uint32
|
||||
}
|
||||
|
||||
type folderScanner struct {
|
||||
@ -125,6 +123,8 @@ type folderScanner struct {
|
||||
|
||||
dataUsageCrawlMult float64
|
||||
dataUsageCrawlDebug bool
|
||||
healFolderInclude uint32 // Include a clean folder one in n cycles.
|
||||
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
||||
|
||||
newFolders []cachedFolder
|
||||
existingFolders []cachedFolder
|
||||
@ -167,8 +167,17 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
existingFolders: nil,
|
||||
dataUsageCrawlMult: delayMult,
|
||||
dataUsageCrawlDebug: intDataUpdateTracker.debug,
|
||||
healFolderInclude: 0,
|
||||
healObjectSelect: 0,
|
||||
}
|
||||
|
||||
// Enable healing in XL mode.
|
||||
if globalIsErasure {
|
||||
// Include a clean folder one in n cycles.
|
||||
s.healFolderInclude = 32
|
||||
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
||||
s.healObjectSelect = 512
|
||||
}
|
||||
if len(cache.Info.BloomFilter) > 0 {
|
||||
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
||||
_, err := s.withFilter.ReadFrom(bytes.NewBuffer(cache.Info.BloomFilter))
|
||||
@ -189,7 +198,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
}
|
||||
|
||||
// Always scan flattenLevels deep. Cache root is level 0.
|
||||
todo := []cachedFolder{{name: cache.Info.Name}}
|
||||
todo := []cachedFolder{{name: cache.Info.Name, objectHealProbDiv: 1}}
|
||||
for i := 0; i < flattenLevels; i++ {
|
||||
if s.dataUsageCrawlDebug {
|
||||
logger.Info(logPrefix+"Level %v, scanning %v directories."+logSuffix, i, len(todo))
|
||||
@ -218,7 +227,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
return s.newCache, ctx.Err()
|
||||
default:
|
||||
}
|
||||
du, err := s.deepScanFolder(ctx, folder.name)
|
||||
du, err := s.deepScanFolder(ctx, folder)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
@ -249,26 +258,38 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
||||
}
|
||||
h := hashPath(folder.name)
|
||||
if !h.mod(s.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
||||
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
||||
continue
|
||||
if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) {
|
||||
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
||||
continue
|
||||
} else {
|
||||
folder.objectHealProbDiv = s.healFolderInclude
|
||||
}
|
||||
folder.objectHealProbDiv = dataUsageUpdateDirCycles
|
||||
}
|
||||
|
||||
if s.withFilter != nil {
|
||||
_, prefix := path2BucketObjectWithBasePath(basePath, folder.name)
|
||||
if s.oldCache.Info.lifeCycle == nil || !s.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
|
||||
// If folder isn't in filter, skip it completely.
|
||||
if !s.withFilter.containsDir(folder.name) {
|
||||
if s.dataUsageCrawlDebug {
|
||||
logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder)
|
||||
if !h.mod(s.oldCache.Info.NextCycle, s.healFolderInclude/folder.objectHealProbDiv) {
|
||||
if s.dataUsageCrawlDebug {
|
||||
logger.Info(logPrefix+"Skipping non-updated folder: %v"+logSuffix, folder)
|
||||
}
|
||||
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
||||
continue
|
||||
} else {
|
||||
if s.dataUsageCrawlDebug {
|
||||
logger.Info(logPrefix+"Adding non-updated folder to heal check: %v"+logSuffix, folder.name)
|
||||
}
|
||||
// Update probability of including objects
|
||||
folder.objectHealProbDiv = s.healFolderInclude
|
||||
}
|
||||
s.newCache.replaceHashed(h, folder.parent, s.oldCache.Cache[h.Key()])
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update on this cycle...
|
||||
du, err := s.deepScanFolder(ctx, folder.name)
|
||||
du, err := s.deepScanFolder(ctx, folder)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
@ -301,6 +322,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
default:
|
||||
}
|
||||
thisHash := hashPath(folder.name)
|
||||
existing := f.oldCache.findChildrenCopy(thisHash)
|
||||
|
||||
// If there are lifecycle rules for the prefix, remove the filter.
|
||||
filter := f.withFilter
|
||||
@ -309,7 +331,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
|
||||
if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, true) {
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix)
|
||||
logger.Info(color.Green("folder-scanner:")+" Prefix %q has active rules", prefix)
|
||||
}
|
||||
activeLifeCycle = f.oldCache.Info.lifeCycle
|
||||
filter = nil
|
||||
@ -318,11 +340,19 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
if _, ok := f.oldCache.Cache[thisHash.Key()]; filter != nil && ok {
|
||||
// If folder isn't in filter and we have data, skip it completely.
|
||||
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
|
||||
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("data-usage:")+" Skipping non-updated folder: %v", folder.name)
|
||||
if !thisHash.mod(f.oldCache.Info.NextCycle, f.healFolderInclude/folder.objectHealProbDiv) {
|
||||
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("folder-scanner:")+" Skipping non-updated folder: %v", folder.name)
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("folder-scanner:")+" Adding non-updated folder to heal check: %v", folder.name)
|
||||
}
|
||||
// If probability was already crawlerHealFolderInclude, keep it.
|
||||
folder.objectHealProbDiv = f.healFolderInclude
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
f.waitForLowActiveIO()
|
||||
@ -336,14 +366,14 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
bucket, prefix := path2BucketObjectWithBasePath(f.root, entName)
|
||||
if bucket == "" {
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("data-usage:")+" no bucket (%s,%s)", f.root, entName)
|
||||
logger.Info(color.Green("folder-scanner:")+" no bucket (%s,%s)", f.root, entName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if isReservedOrInvalidBucket(bucket, false) {
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("data-usage:")+" invalid bucket: %v, entry: %v", bucket, entName)
|
||||
logger.Info(color.Green("folder-scanner:")+" invalid bucket: %v, entry: %v", bucket, entName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -359,7 +389,8 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
_, exists := f.oldCache.Cache[h.Key()]
|
||||
cache.addChildString(entName)
|
||||
|
||||
this := cachedFolder{name: entName, parent: &thisHash}
|
||||
this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
||||
delete(existing, h.Key())
|
||||
cache.addChild(h)
|
||||
if final {
|
||||
if exists {
|
||||
@ -385,11 +416,12 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
objectName: path.Base(entName),
|
||||
debug: f.dataUsageCrawlDebug,
|
||||
lifeCycle: activeLifeCycle,
|
||||
heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv),
|
||||
}
|
||||
size, err := f.getSize(item)
|
||||
|
||||
sleepDuration(time.Since(t), f.dataUsageCrawlMult)
|
||||
if err == errSkipFile || err == errFileNotFound {
|
||||
if err == errSkipFile {
|
||||
return nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
@ -402,19 +434,78 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if f.healObjectSelect == 0 {
|
||||
// If we are not scanning, return now.
|
||||
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
||||
continue
|
||||
}
|
||||
|
||||
objAPI := newObjectLayerWithoutSafeModeFn()
|
||||
if objAPI == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
|
||||
// Whatever remains in 'existing' are folders at this level
|
||||
// that existed in the previous run but wasn't found now.
|
||||
//
|
||||
// This may be because of 2 reasons:
|
||||
//
|
||||
// 1) The folder/object was deleted.
|
||||
// 2) We come from another disk and this disk missed the write.
|
||||
//
|
||||
// We therefore perform a heal check.
|
||||
// If that doesn't bring it back we remove the folder and assume it was deleted.
|
||||
// This means that the next run will not look for it.
|
||||
for k := range existing {
|
||||
f.waitForLowActiveIO()
|
||||
bucket, prefix := path2BucketObject(k)
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("folder-scanner:")+" checking disappeared folder: %v/%v", bucket, prefix)
|
||||
}
|
||||
|
||||
err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling},
|
||||
func(bucket, object, versionID string) error {
|
||||
return bgSeq.queueHealTask(healSource{
|
||||
bucket: bucket,
|
||||
object: object,
|
||||
versionID: versionID,
|
||||
}, madmin.HealItemObject)
|
||||
})
|
||||
|
||||
if f.dataUsageCrawlDebug && err != nil {
|
||||
logger.Info(color.Green("healObjects:")+" checking returned value %v", err)
|
||||
}
|
||||
|
||||
// Add unless healing returned an error.
|
||||
if err == nil {
|
||||
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
||||
cache.addChild(hashPath(k))
|
||||
if final {
|
||||
f.existingFolders = append(f.existingFolders, this)
|
||||
} else {
|
||||
nextFolders = append(nextFolders, this)
|
||||
}
|
||||
}
|
||||
}
|
||||
f.newCache.replaceHashed(thisHash, folder.parent, cache)
|
||||
}
|
||||
return nextFolders, nil
|
||||
}
|
||||
|
||||
// deepScanFolder will deep scan a folder and return the size if no error occurs.
|
||||
func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dataUsageEntry, error) {
|
||||
func (f *folderScanner) deepScanFolder(ctx context.Context, folder cachedFolder) (*dataUsageEntry, error) {
|
||||
var cache dataUsageEntry
|
||||
|
||||
done := ctx.Done()
|
||||
|
||||
var addDir func(entName string, typ os.FileMode) error
|
||||
var dirStack = []string{f.root, folder}
|
||||
var dirStack = []string{f.root, folder.name}
|
||||
|
||||
addDir = func(entName string, typ os.FileMode) error {
|
||||
select {
|
||||
@ -445,7 +536,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dat
|
||||
if f.oldCache.Info.lifeCycle != nil {
|
||||
if f.oldCache.Info.lifeCycle.HasActiveRules(prefix, false) {
|
||||
if f.dataUsageCrawlDebug {
|
||||
logger.Info(color.Green("data-usage:")+" Prefix %q has active rules", prefix)
|
||||
logger.Info(color.Green("folder-scanner:")+" Prefix %q has active rules", prefix)
|
||||
}
|
||||
activeLifeCycle = f.oldCache.Info.lifeCycle
|
||||
}
|
||||
@ -460,6 +551,7 @@ func (f *folderScanner) deepScanFolder(ctx context.Context, folder string) (*dat
|
||||
objectName: path.Base(entName),
|
||||
debug: f.dataUsageCrawlDebug,
|
||||
lifeCycle: activeLifeCycle,
|
||||
heal: hashPath(path.Join(prefix, entName)).mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv),
|
||||
})
|
||||
|
||||
// Don't sleep for really small amount of time
|
||||
@ -490,6 +582,7 @@ type crawlItem struct {
|
||||
prefix string // Only the prefix if any, does not have final object name.
|
||||
objectName string // Only the object name without prefixes.
|
||||
lifeCycle *lifecycle.Lifecycle
|
||||
heal bool // Has the object been selected for heal check?
|
||||
debug bool
|
||||
}
|
||||
|
||||
@ -522,6 +615,20 @@ func (i *crawlItem) applyActions(ctx context.Context, o ObjectLayer, meta action
|
||||
if i.debug {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
if i.heal {
|
||||
if i.debug {
|
||||
logger.Info(color.Green("applyActions:")+" heal checking: %v/%v v%s", i.bucket, i.objectPath(), meta.oi.VersionID)
|
||||
}
|
||||
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), meta.oi.VersionID, madmin.HealOpts{Remove: healDeleteDangling})
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
return 0
|
||||
}
|
||||
if !errors.Is(err, NotImplemented{}) {
|
||||
logger.LogIf(ctx, err)
|
||||
return 0
|
||||
}
|
||||
size = res.ObjectSize
|
||||
}
|
||||
if i.lifeCycle == nil {
|
||||
return size
|
||||
}
|
||||
|
@ -48,9 +48,6 @@ const (
|
||||
dataUpdateTrackerFilename = dataUsageBucket + SlashSeparator + ".tracker.bin"
|
||||
dataUpdateTrackerVersion = 2
|
||||
dataUpdateTrackerSaveInterval = 5 * time.Minute
|
||||
|
||||
// Reset bloom filters every n cycle
|
||||
dataUpdateTrackerResetEvery = 1000
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -85,7 +85,12 @@ func (e *dataUsageEntry) merge(other dataUsageEntry) {
|
||||
}
|
||||
|
||||
// mod returns true if the hash mod cycles == cycle.
|
||||
// If cycles is 0 false is always returned.
|
||||
// If cycles is 1 true is always returned (as expected).
|
||||
func (h dataUsageHash) mod(cycle uint32, cycles uint32) bool {
|
||||
if cycles <= 1 {
|
||||
return cycles == 1
|
||||
}
|
||||
return uint32(xxhash.Sum64String(string(h)))%cycles == cycle%cycles
|
||||
}
|
||||
|
||||
@ -117,6 +122,16 @@ func (d *dataUsageCache) find(path string) *dataUsageEntry {
|
||||
return &due
|
||||
}
|
||||
|
||||
// findChildrenCopy returns a copy of the children of the supplied hash.
|
||||
func (d *dataUsageCache) findChildrenCopy(h dataUsageHash) dataUsageHashMap {
|
||||
ch := d.Cache[h.String()].Children
|
||||
res := make(dataUsageHashMap, len(ch))
|
||||
for k := range ch {
|
||||
res[k] = struct{}{}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Returns nil if not found.
|
||||
func (d *dataUsageCache) subCache(path string) dataUsageCache {
|
||||
dst := dataUsageCache{Info: dataUsageCacheInfo{
|
||||
|
@ -360,6 +360,13 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got.root() == nil {
|
||||
t.Log("cached folders:")
|
||||
for folder := range got.Cache {
|
||||
t.Log("folder:", folder)
|
||||
}
|
||||
t.Fatal("got nil root.")
|
||||
}
|
||||
|
||||
// Test dirs
|
||||
var want = []struct {
|
||||
|
@ -1611,10 +1611,10 @@ func (s *erasureSets) HealObjects(ctx context.Context, bucket, prefix string, op
|
||||
continue
|
||||
}
|
||||
|
||||
// Wait and proceed if there are active requests
|
||||
waitForLowHTTPReq(int32(s.drivesPerSet))
|
||||
|
||||
for _, version := range entry.Versions {
|
||||
// Wait and proceed if there are active requests
|
||||
waitForLowHTTPReq(int32(s.drivesPerSet), time.Second)
|
||||
|
||||
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
|
||||
return toObjectErr(err, bucket, version.Name)
|
||||
}
|
||||
|
@ -1903,7 +1903,7 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
|
||||
}
|
||||
|
||||
// HealObjectFn closure function heals the object.
|
||||
type HealObjectFn func(string, string, string) error
|
||||
type HealObjectFn func(bucket, object, versionID string) error
|
||||
|
||||
func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
|
||||
var zonesEntryChs [][]FileInfoVersionsCh
|
||||
@ -1928,28 +1928,37 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o
|
||||
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
|
||||
}
|
||||
|
||||
// If listing did not return any entries upon first attempt, we
|
||||
// return `ObjectNotFound`, to indicate the caller for any
|
||||
// actions they may want to take as if `prefix` is missing.
|
||||
err := toObjectErr(errFileNotFound, bucket, prefix)
|
||||
for {
|
||||
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
// Indicate that first attempt was a success and subsequent loop
|
||||
// knows that its not our first attempt at 'prefix'
|
||||
err = nil
|
||||
|
||||
if zoneIndex >= len(zoneDrivesPerSet) || zoneIndex < 0 {
|
||||
return fmt.Errorf("invalid zone index returned: %d", zoneIndex)
|
||||
}
|
||||
if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan {
|
||||
// Skip good entries.
|
||||
continue
|
||||
}
|
||||
|
||||
// Wait and proceed if there are active requests
|
||||
waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]))
|
||||
|
||||
for _, version := range entry.Versions {
|
||||
// Wait and proceed if there are active requests
|
||||
waitForLowHTTPReq(int32(zoneDrivesPerSet[zoneIndex]), time.Second)
|
||||
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
|
||||
return toObjectErr(err, bucket, version.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/color"
|
||||
"github.com/minio/minio/pkg/dsync"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
@ -294,20 +293,12 @@ func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []Buc
|
||||
}
|
||||
}
|
||||
|
||||
// Add existing buckets if changes or lifecycles.
|
||||
// Add existing buckets.
|
||||
for _, b := range buckets {
|
||||
e := oldCache.find(b.Name)
|
||||
if e != nil {
|
||||
cache.replace(b.Name, dataUsageRoot, *e)
|
||||
lc, err := globalLifecycleSys.Get(b.Name)
|
||||
activeLC := err == nil && lc.HasActiveRules("", true)
|
||||
if activeLC || bf == nil || bf.containsDir(b.Name) {
|
||||
bucketCh <- b
|
||||
} else {
|
||||
if intDataUpdateTracker.debug {
|
||||
logger.Info(color.Green("crawlAndGetDataUsage:")+" Skipping bucket %v, not updated", b.Name)
|
||||
}
|
||||
}
|
||||
bucketCh <- b
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -352,6 +352,8 @@ func (fs *FSObjects) crawlBucket(ctx context.Context, bucket string, cache dataU
|
||||
if fiErr != nil {
|
||||
return 0, errSkipFile
|
||||
}
|
||||
// We cannot heal in FS mode.
|
||||
item.heal = false
|
||||
|
||||
oi := fsMeta.ToObjectInfo(bucket, object, fi)
|
||||
sz := item.applyActions(ctx, fs, actionMeta{oi: oi})
|
||||
|
@ -29,8 +29,6 @@ const (
|
||||
// sleep for an hour after a lock timeout
|
||||
// before retrying to acquire lock again.
|
||||
leaderLockTimeoutSleepInterval = time.Hour
|
||||
// heal entire namespace once in 30 days
|
||||
healInterval = 30 * 24 * time.Hour
|
||||
)
|
||||
|
||||
var leaderLockTimeout = newDynamicTimeout(30*time.Second, time.Minute)
|
||||
@ -86,7 +84,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
|
||||
ScannedItemsCount: bgSeq.getScannedItemsCount(),
|
||||
LastHealActivity: bgSeq.lastHealActivity,
|
||||
HealDisks: healDisks,
|
||||
NextHealRound: UTCNow().Add(durationToNextHealRound(bgSeq.lastHealActivity)),
|
||||
NextHealRound: UTCNow().Add(dataCrawlStartDelay),
|
||||
}, true
|
||||
}
|
||||
|
||||
@ -184,68 +182,3 @@ func deepHealObject(bucket, object, versionID string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the duration to the next background healing round
|
||||
func durationToNextHealRound(lastHeal time.Time) time.Duration {
|
||||
if lastHeal.IsZero() {
|
||||
lastHeal = globalBootTime
|
||||
}
|
||||
|
||||
d := lastHeal.Add(healInterval).Sub(UTCNow())
|
||||
if d < 0 {
|
||||
return time.Second
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// Healing leader will take the charge of healing all erasure sets
|
||||
func execLeaderTasks(ctx context.Context, z *erasureZones) {
|
||||
// So that we don't heal immediately, but after one month.
|
||||
lastScanTime := UTCNow()
|
||||
// Get background heal sequence to send elements to heal
|
||||
var bgSeq *healSequence
|
||||
var ok bool
|
||||
for {
|
||||
bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||
if ok {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C:
|
||||
bgSeq.resetHealStatusCounters()
|
||||
for _, zone := range z.zones {
|
||||
// Heal set by set
|
||||
for i, set := range zone.sets {
|
||||
if err := healErasureSet(ctx, i, set, zone.drivesPerSet); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
lastScanTime = UTCNow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
zones, ok := objAPI.(*erasureZones)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
execLeaderTasks(ctx, zones)
|
||||
}
|
||||
|
||||
func initGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
go startGlobalHeal(ctx, objAPI)
|
||||
}
|
||||
|
@ -379,10 +379,6 @@ func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
|
||||
// No unlock for "leader" lock.
|
||||
}
|
||||
|
||||
if globalIsErasure {
|
||||
initGlobalHeal(ctx, objAPI)
|
||||
}
|
||||
|
||||
initDataCrawler(ctx, objAPI)
|
||||
}
|
||||
|
||||
|
@ -334,6 +334,7 @@ func (client *storageRESTClient) CheckParts(volume, path string, fi FileInfo) er
|
||||
|
||||
var reader bytes.Buffer
|
||||
if err := gob.NewEncoder(&reader).Encode(fi); err != nil {
|
||||
logger.LogIf(context.Background(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user