mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
1443 lines
45 KiB
Go
1443 lines
45 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"math"
|
|
"math/rand"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/bits-and-blooms/bloom/v3"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/minio/madmin-go"
|
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
|
"github.com/minio/minio/internal/bucket/object/lock"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/color"
|
|
"github.com/minio/minio/internal/config/heal"
|
|
"github.com/minio/minio/internal/event"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/console"
|
|
uatomic "go.uber.org/atomic"
|
|
)
|
|
|
|
const (
|
|
dataScannerSleepPerFolder = time.Millisecond // Time to wait between folders.
|
|
dataUsageUpdateDirCycles = 16 // Visit all folders every n cycles.
|
|
dataScannerCompactLeastObject = 500 // Compact when there is less than this many objects in a branch.
|
|
dataScannerCompactAtChildren = 10000 // Compact when there are this many children in a branch.
|
|
dataScannerCompactAtFolders = dataScannerCompactAtChildren / 4 // Compact when this many subfolders in a single folder.
|
|
dataScannerForceCompactAtFolders = 1_000_000 // Compact when this many subfolders in a single folder (even top level).
|
|
dataScannerStartDelay = 1 * time.Minute // Time to wait on startup and between cycles.
|
|
|
|
healDeleteDangling = true
|
|
healFolderIncludeProb = 32 // Include a clean folder one in n cycles.
|
|
healObjectSelectProb = 512 // Overall probability of a file being scanned; one in n.
|
|
)
|
|
|
|
var (
|
|
globalHealConfig heal.Config
|
|
|
|
// Sleeper values are updated when config is loaded.
|
|
scannerSleeper = newDynamicSleeper(10, 10*time.Second, true)
|
|
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
|
|
)
|
|
|
|
// initDataScanner will start the scanner in the background.
|
|
func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
|
go func() {
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
// Run the data scanner in a loop
|
|
for {
|
|
runDataScanner(ctx, objAPI)
|
|
duration := time.Duration(r.Float64() * float64(scannerCycle.Load()))
|
|
if duration < time.Second {
|
|
// Make sure to sleep atleast a second to avoid high CPU ticks.
|
|
duration = time.Second
|
|
}
|
|
time.Sleep(duration)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func getCycleScanMode(currentCycle, bitrotStartCycle uint64, bitrotStartTime time.Time) madmin.HealScanMode {
|
|
bitrotCycle := globalHealConfig.BitrotScanCycle()
|
|
switch bitrotCycle {
|
|
case -1:
|
|
return madmin.HealNormalScan
|
|
case 0:
|
|
return madmin.HealDeepScan
|
|
}
|
|
|
|
if currentCycle-bitrotStartCycle < healObjectSelectProb {
|
|
return madmin.HealDeepScan
|
|
}
|
|
|
|
if time.Since(bitrotStartTime) > bitrotCycle {
|
|
return madmin.HealDeepScan
|
|
}
|
|
|
|
return madmin.HealNormalScan
|
|
}
|
|
|
|
type backgroundHealInfo struct {
|
|
BitrotStartTime time.Time `json:"bitrotStartTime"`
|
|
BitrotStartCycle uint64 `json:"bitrotStartCycle"`
|
|
CurrentScanMode madmin.HealScanMode `json:"currentScanMode"`
|
|
}
|
|
|
|
func readBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer) backgroundHealInfo {
|
|
if globalIsErasureSD {
|
|
return backgroundHealInfo{}
|
|
}
|
|
|
|
// Get last healing information
|
|
buf, err := readConfig(ctx, objAPI, backgroundHealInfoPath)
|
|
if err != nil {
|
|
if !errors.Is(err, errConfigNotFound) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return backgroundHealInfo{}
|
|
}
|
|
var info backgroundHealInfo
|
|
if err = json.Unmarshal(buf, &info); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return info
|
|
}
|
|
|
|
func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgroundHealInfo) {
|
|
if globalIsErasureSD {
|
|
return
|
|
}
|
|
|
|
b, err := json.Marshal(info)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
// Get last healing information
|
|
err = saveConfig(ctx, objAPI, backgroundHealInfoPath, b)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
|
|
// runDataScanner will start a data scanner.
|
|
// The function will block until the context is canceled.
|
|
// There should only ever be one scanner running per cluster.
|
|
func runDataScanner(ctx context.Context, objAPI ObjectLayer) {
|
|
ctx, cancel := globalLeaderLock.GetLock(ctx)
|
|
defer cancel()
|
|
|
|
// Load current bloom cycle
|
|
var cycleInfo currentScannerCycle
|
|
cycleInfo.next = intDataUpdateTracker.current() + 1
|
|
|
|
buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
|
|
if len(buf) == 8 {
|
|
cycleInfo.next = binary.LittleEndian.Uint64(buf)
|
|
} else if len(buf) > 8 {
|
|
cycleInfo.next = binary.LittleEndian.Uint64(buf[:8])
|
|
buf = buf[8:]
|
|
_, err := cycleInfo.UnmarshalMsg(buf)
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
|
|
scannerTimer := time.NewTimer(scannerCycle.Load())
|
|
defer scannerTimer.Stop()
|
|
defer globalScannerMetrics.setCycle(nil)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-scannerTimer.C:
|
|
// Reset the timer for next cycle.
|
|
// If scanner takes longer we start at once.
|
|
scannerTimer.Reset(scannerCycle.Load())
|
|
|
|
stopFn := globalScannerMetrics.log(scannerMetricScanCycle)
|
|
cycleInfo.current = cycleInfo.next
|
|
cycleInfo.started = time.Now()
|
|
globalScannerMetrics.setCycle(&cycleInfo)
|
|
|
|
bgHealInfo := readBackgroundHealInfo(ctx, objAPI)
|
|
scanMode := getCycleScanMode(cycleInfo.current, bgHealInfo.BitrotStartCycle, bgHealInfo.BitrotStartTime)
|
|
if bgHealInfo.CurrentScanMode != scanMode {
|
|
newHealInfo := bgHealInfo
|
|
newHealInfo.CurrentScanMode = scanMode
|
|
if scanMode == madmin.HealDeepScan {
|
|
newHealInfo.BitrotStartTime = time.Now().UTC()
|
|
newHealInfo.BitrotStartCycle = cycleInfo.current
|
|
}
|
|
saveBackgroundHealInfo(ctx, objAPI, newHealInfo)
|
|
}
|
|
|
|
// Wait before starting next cycle and wait on startup.
|
|
results := make(chan DataUsageInfo, 1)
|
|
go storeDataUsageInBackend(ctx, objAPI, results)
|
|
bf, err := globalNotificationSys.updateBloomFilter(ctx, cycleInfo.current)
|
|
logger.LogIf(ctx, err)
|
|
err = objAPI.NSScanner(ctx, bf, results, uint32(cycleInfo.current), scanMode)
|
|
logger.LogIf(ctx, err)
|
|
stopFn()
|
|
if err == nil {
|
|
// Store new cycle...
|
|
cycleInfo.next++
|
|
cycleInfo.current = 0
|
|
cycleInfo.cycleCompleted = append(cycleInfo.cycleCompleted, time.Now())
|
|
if len(cycleInfo.cycleCompleted) > dataUsageUpdateDirCycles {
|
|
cycleInfo.cycleCompleted = cycleInfo.cycleCompleted[len(cycleInfo.cycleCompleted)-dataUsageUpdateDirCycles:]
|
|
}
|
|
globalScannerMetrics.setCycle(&cycleInfo)
|
|
tmp := make([]byte, 8, 8+cycleInfo.Msgsize())
|
|
// Cycle for backward compat.
|
|
binary.LittleEndian.PutUint64(tmp, cycleInfo.next)
|
|
tmp, _ = cycleInfo.MarshalMsg(tmp)
|
|
err = saveConfig(ctx, objAPI, dataUsageBloomNamePath, tmp)
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type cachedFolder struct {
|
|
name string
|
|
parent *dataUsageHash
|
|
objectHealProbDiv uint32
|
|
}
|
|
|
|
type folderScanner struct {
|
|
root string
|
|
getSize getSizeFn
|
|
oldCache dataUsageCache
|
|
newCache dataUsageCache
|
|
updateCache dataUsageCache
|
|
withFilter *bloomFilter
|
|
|
|
dataUsageScannerDebug bool
|
|
healFolderInclude uint32 // Include a clean folder one in n cycles.
|
|
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
|
scanMode madmin.HealScanMode
|
|
|
|
disks []StorageAPI
|
|
disksQuorum int
|
|
|
|
// If set updates will be sent regularly to this channel.
|
|
// Will not be closed when returned.
|
|
updates chan<- dataUsageEntry
|
|
lastUpdate time.Time
|
|
|
|
// updateCurrentPath should be called whenever a new path is scanned.
|
|
updateCurrentPath func(string)
|
|
}
|
|
|
|
// Cache structure and compaction:
|
|
//
|
|
// A cache structure will be kept with a tree of usages.
|
|
// The cache is a tree structure where each keeps track of its children.
|
|
//
|
|
// An uncompacted branch contains a count of the files only directly at the
|
|
// branch level, and contains link to children branches or leaves.
|
|
//
|
|
// The leaves are "compacted" based on a number of properties.
|
|
// A compacted leaf contains the totals of all files beneath it.
|
|
//
|
|
// A leaf is only scanned once every dataUsageUpdateDirCycles,
|
|
// rarer if the bloom filter for the path is clean and no lifecycles are applied.
|
|
// Skipped leaves have their totals transferred from the previous cycle.
|
|
//
|
|
// A clean leaf will be included once every healFolderIncludeProb for partial heal scans.
|
|
// When selected there is a one in healObjectSelectProb that any object will be chosen for heal scan.
|
|
//
|
|
// Compaction happens when either:
|
|
//
|
|
// 1) The folder (and subfolders) contains less than dataScannerCompactLeastObject objects.
|
|
// 2) The folder itself contains more than dataScannerCompactAtFolders folders.
|
|
// 3) The folder only contains objects and no subfolders.
|
|
//
|
|
// A bucket root will never be compacted.
|
|
//
|
|
// Furthermore if a has more than dataScannerCompactAtChildren recursive children (uncompacted folders)
|
|
// the tree will be recursively scanned and the branches with the least number of objects will be
|
|
// compacted until the limit is reached.
|
|
//
|
|
// This ensures that any branch will never contain an unreasonable amount of other branches,
|
|
// and also that small branches with few objects don't take up unreasonable amounts of space.
|
|
// This keeps the cache size at a reasonable size for all buckets.
|
|
//
|
|
// Whenever a branch is scanned, it is assumed that it will be un-compacted
|
|
// before it hits any of the above limits.
|
|
// This will make the branch rebalance itself when scanned if the distribution of objects has changed.
|
|
|
|
// scanDataFolder will scanner the basepath+cache.Info.Name and return an updated cache.
|
|
// The returned cache will always be valid, but may not be updated from the existing.
|
|
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
|
|
// If the supplied context is canceled the function will return at the first chance.
|
|
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
|
|
logPrefix := color.Green("data-usage: ")
|
|
|
|
switch cache.Info.Name {
|
|
case "", dataUsageRoot:
|
|
return cache, errors.New("internal error: root scan attempted")
|
|
}
|
|
updatePath, closeDisk := globalScannerMetrics.currentPathUpdater(basePath, cache.Info.Name)
|
|
defer closeDisk()
|
|
|
|
s := folderScanner{
|
|
root: basePath,
|
|
getSize: getSize,
|
|
oldCache: cache,
|
|
newCache: dataUsageCache{Info: cache.Info},
|
|
updateCache: dataUsageCache{Info: cache.Info},
|
|
dataUsageScannerDebug: intDataUpdateTracker.debug,
|
|
healFolderInclude: 0,
|
|
healObjectSelect: 0,
|
|
scanMode: scanMode,
|
|
updates: cache.Info.updates,
|
|
updateCurrentPath: updatePath,
|
|
}
|
|
|
|
// Add disks for set healing.
|
|
if poolIdx >= 0 && setIdx >= 0 {
|
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
|
if ok {
|
|
if poolIdx < len(objAPI.serverPools) && setIdx < len(objAPI.serverPools[poolIdx].sets) {
|
|
// Pass the disks belonging to the set.
|
|
s.disks = objAPI.serverPools[poolIdx].sets[setIdx].getDisks()
|
|
s.disksQuorum = len(s.disks) / 2
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Matching pool %s, set %s not found", humanize.Ordinal(poolIdx+1), humanize.Ordinal(setIdx+1)))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enable healing in XL mode.
|
|
if globalIsErasure && !cache.Info.SkipHealing {
|
|
// Include a clean folder one in n cycles.
|
|
s.healFolderInclude = healFolderIncludeProb
|
|
// Do a heal check on an object once every n cycles. Must divide into healFolderInclude
|
|
s.healObjectSelect = healObjectSelectProb
|
|
}
|
|
if len(cache.Info.BloomFilter) > 0 {
|
|
s.withFilter = &bloomFilter{BloomFilter: &bloom.BloomFilter{}}
|
|
_, err := s.withFilter.ReadFrom(bytes.NewReader(cache.Info.BloomFilter))
|
|
if err != nil {
|
|
logger.LogIf(ctx, err, logPrefix+"Error reading bloom filter")
|
|
s.withFilter = nil
|
|
}
|
|
}
|
|
|
|
done := ctx.Done()
|
|
|
|
// Read top level in bucket.
|
|
select {
|
|
case <-done:
|
|
return cache, ctx.Err()
|
|
default:
|
|
}
|
|
root := dataUsageEntry{}
|
|
folder := cachedFolder{name: cache.Info.Name, objectHealProbDiv: 1}
|
|
err := s.scanFolder(ctx, folder, &root)
|
|
if err != nil {
|
|
// No useful information...
|
|
return cache, err
|
|
}
|
|
|
|
s.newCache.Info.LastUpdate = UTCNow()
|
|
s.newCache.Info.NextCycle = cache.Info.NextCycle
|
|
return s.newCache, nil
|
|
}
|
|
|
|
// sendUpdate() should be called on a regular basis when the newCache contains more recent total than previously.
|
|
// May or may not send an update upstream.
|
|
func (f *folderScanner) sendUpdate() {
|
|
// Send at most an update every minute.
|
|
if f.updates == nil || time.Since(f.lastUpdate) < time.Minute {
|
|
return
|
|
}
|
|
if flat := f.updateCache.sizeRecursive(f.newCache.Info.Name); flat != nil {
|
|
select {
|
|
case f.updates <- *flat:
|
|
default:
|
|
}
|
|
f.lastUpdate = time.Now()
|
|
}
|
|
}
|
|
|
|
// scanFolder will scan the provided folder.
|
|
// Files found in the folders will be added to f.newCache.
|
|
// If final is provided folders will be put into f.newFolders or f.existingFolders.
|
|
// If final is not provided the folders found are returned from the function.
|
|
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, into *dataUsageEntry) error {
|
|
done := ctx.Done()
|
|
scannerLogPrefix := color.Green("folder-scanner:")
|
|
|
|
thisHash := hashPath(folder.name)
|
|
// Store initial compaction state.
|
|
wasCompacted := into.Compacted
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
existing, ok := f.oldCache.Cache[thisHash.Key()]
|
|
var abandonedChildren dataUsageHashMap
|
|
if !into.Compacted {
|
|
abandonedChildren = f.oldCache.findChildrenCopy(thisHash)
|
|
}
|
|
|
|
// If there are lifecycle rules for the prefix, remove the filter.
|
|
filter := f.withFilter
|
|
_, prefix := path2BucketObjectWithBasePath(f.root, folder.name)
|
|
var activeLifeCycle *lifecycle.Lifecycle
|
|
if f.oldCache.Info.lifeCycle != nil && f.oldCache.Info.lifeCycle.HasActiveRules(prefix) {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" Prefix %q has active rules\n", prefix)
|
|
}
|
|
activeLifeCycle = f.oldCache.Info.lifeCycle
|
|
filter = nil
|
|
}
|
|
// If there are replication rules for the prefix, remove the filter.
|
|
var replicationCfg replicationConfig
|
|
if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) {
|
|
replicationCfg = f.oldCache.Info.replication
|
|
filter = nil
|
|
}
|
|
// Check if we can skip it due to bloom filter...
|
|
if filter != nil && ok && existing.Compacted {
|
|
// If folder isn't in filter and we have data, skip it completely.
|
|
if folder.name != dataUsageRoot && !filter.containsDir(folder.name) {
|
|
if f.healObjectSelect == 0 || !thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) {
|
|
f.newCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
|
f.updateCache.copyWithChildren(&f.oldCache, thisHash, folder.parent)
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" Skipping non-updated folder: %v\n", folder.name)
|
|
}
|
|
return nil
|
|
}
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" Adding non-updated folder to heal check: %v\n", folder.name)
|
|
}
|
|
// If probability was already scannerHealFolderInclude, keep it.
|
|
folder.objectHealProbDiv = f.healFolderInclude
|
|
}
|
|
}
|
|
scannerSleeper.Sleep(ctx, dataScannerSleepPerFolder)
|
|
|
|
var existingFolders, newFolders []cachedFolder
|
|
var foundObjects bool
|
|
err := readDirFn(path.Join(f.root, folder.name), func(entName string, typ os.FileMode) error {
|
|
// Parse
|
|
entName = pathClean(path.Join(folder.name, entName))
|
|
if entName == "" || entName == folder.name {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" no entity (%s,%s)\n", f.root, entName)
|
|
}
|
|
return nil
|
|
}
|
|
bucket, prefix := path2BucketObjectWithBasePath(f.root, entName)
|
|
if bucket == "" {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" no bucket (%s,%s)\n", f.root, entName)
|
|
}
|
|
return errDoneForNow
|
|
}
|
|
|
|
if isReservedOrInvalidBucket(bucket, false) {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" invalid bucket: %v, entry: %v\n", bucket, entName)
|
|
}
|
|
return errDoneForNow
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return errDoneForNow
|
|
default:
|
|
}
|
|
|
|
if typ&os.ModeDir != 0 {
|
|
h := hashPath(entName)
|
|
_, exists := f.oldCache.Cache[h.Key()]
|
|
if h == thisHash {
|
|
return nil
|
|
}
|
|
this := cachedFolder{name: entName, parent: &thisHash, objectHealProbDiv: folder.objectHealProbDiv}
|
|
delete(abandonedChildren, h.Key()) // h.Key() already accounted for.
|
|
if exists {
|
|
existingFolders = append(existingFolders, this)
|
|
f.updateCache.copyWithChildren(&f.oldCache, h, &thisHash)
|
|
} else {
|
|
newFolders = append(newFolders, this)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Dynamic time delay.
|
|
wait := scannerSleeper.Timer(ctx)
|
|
|
|
// Get file size, ignore errors.
|
|
item := scannerItem{
|
|
Path: path.Join(f.root, entName),
|
|
Typ: typ,
|
|
bucket: bucket,
|
|
prefix: path.Dir(prefix),
|
|
objectName: path.Base(entName),
|
|
debug: f.dataUsageScannerDebug,
|
|
lifeCycle: activeLifeCycle,
|
|
replication: replicationCfg,
|
|
}
|
|
|
|
item.heal.enabled = thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure
|
|
item.heal.bitrot = f.scanMode == madmin.HealDeepScan
|
|
|
|
// if the drive belongs to an erasure set
|
|
// that is already being healed, skip the
|
|
// healing attempt on this drive.
|
|
item.heal.enabled = item.heal.enabled && f.healObjectSelect > 0
|
|
|
|
sz, err := f.getSize(item)
|
|
if err != nil {
|
|
wait() // wait to proceed to next entry.
|
|
if err != errSkipFile && f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" getSize \"%v/%v\" returned err: %v\n", bucket, item.objectPath(), err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// successfully read means we have a valid object.
|
|
foundObjects = true
|
|
// Remove filename i.e is the meta file to construct object name
|
|
item.transformMetaDir()
|
|
|
|
// Object already accounted for, remove from heal map,
|
|
// simply because getSize() function already heals the
|
|
// object.
|
|
delete(abandonedChildren, path.Join(item.bucket, item.objectPath()))
|
|
|
|
into.addSizes(sz)
|
|
into.Objects++
|
|
|
|
wait() // wait to proceed to next entry.
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if foundObjects && globalIsErasure {
|
|
// If we found an object in erasure mode, we skip subdirs (only datadirs)...
|
|
break
|
|
}
|
|
|
|
// If we have many subfolders, compact ourself.
|
|
shouldCompact := f.newCache.Info.Name != folder.name &&
|
|
len(existingFolders)+len(newFolders) >= dataScannerCompactAtFolders ||
|
|
len(existingFolders)+len(newFolders) >= dataScannerForceCompactAtFolders
|
|
|
|
if !into.Compacted && shouldCompact {
|
|
into.Compacted = true
|
|
newFolders = append(newFolders, existingFolders...)
|
|
existingFolders = nil
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(scannerLogPrefix+" Preemptively compacting: %v, entries: %v\n", folder.name, len(existingFolders)+len(newFolders))
|
|
}
|
|
}
|
|
|
|
scanFolder := func(folder cachedFolder) {
|
|
if contextCanceled(ctx) {
|
|
return
|
|
}
|
|
dst := into
|
|
if !into.Compacted {
|
|
dst = &dataUsageEntry{Compacted: false}
|
|
}
|
|
if err := f.scanFolder(ctx, folder, dst); err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
if !into.Compacted {
|
|
h := dataUsageHash(folder.name)
|
|
into.addChild(h)
|
|
// We scanned a folder, optionally send update.
|
|
f.updateCache.deleteRecursive(h)
|
|
f.updateCache.copyWithChildren(&f.newCache, h, folder.parent)
|
|
f.sendUpdate()
|
|
}
|
|
}
|
|
|
|
// Transfer existing
|
|
if !into.Compacted {
|
|
for _, folder := range existingFolders {
|
|
h := hashPath(folder.name)
|
|
f.updateCache.copyWithChildren(&f.oldCache, h, folder.parent)
|
|
}
|
|
}
|
|
// Scan new...
|
|
for _, folder := range newFolders {
|
|
h := hashPath(folder.name)
|
|
// Add new folders to the update tree so totals update for these.
|
|
if !into.Compacted {
|
|
var foundAny bool
|
|
parent := thisHash
|
|
for parent != hashPath(f.updateCache.Info.Name) {
|
|
e := f.updateCache.find(parent.Key())
|
|
if e == nil || e.Compacted {
|
|
foundAny = true
|
|
break
|
|
}
|
|
if next := f.updateCache.searchParent(parent); next == nil {
|
|
foundAny = true
|
|
break
|
|
} else {
|
|
parent = *next
|
|
}
|
|
}
|
|
if !foundAny {
|
|
// Add non-compacted empty entry.
|
|
f.updateCache.replaceHashed(h, &thisHash, dataUsageEntry{})
|
|
}
|
|
}
|
|
f.updateCurrentPath(folder.name)
|
|
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name)
|
|
scanFolder(folder)
|
|
stopFn()
|
|
// Add new folders if this is new and we don't have existing.
|
|
if !into.Compacted {
|
|
parent := f.updateCache.find(thisHash.Key())
|
|
if parent != nil && !parent.Compacted {
|
|
f.updateCache.deleteRecursive(h)
|
|
f.updateCache.copyWithChildren(&f.newCache, h, &thisHash)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Scan existing...
|
|
for _, folder := range existingFolders {
|
|
h := hashPath(folder.name)
|
|
// Check if we should skip scanning folder...
|
|
// We can only skip if we are not indexing into a compacted destination
|
|
// and the entry itself is compacted.
|
|
if !into.Compacted && f.oldCache.isCompacted(h) {
|
|
if !h.mod(f.oldCache.Info.NextCycle, dataUsageUpdateDirCycles) {
|
|
if f.healObjectSelect == 0 || !h.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healFolderInclude/folder.objectHealProbDiv) {
|
|
// Transfer and add as child...
|
|
f.newCache.copyWithChildren(&f.oldCache, h, folder.parent)
|
|
into.addChild(h)
|
|
continue
|
|
}
|
|
folder.objectHealProbDiv = f.healFolderInclude
|
|
}
|
|
}
|
|
f.updateCurrentPath(folder.name)
|
|
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name, "EXISTING")
|
|
scanFolder(folder)
|
|
stopFn()
|
|
}
|
|
|
|
// Scan for healing
|
|
if f.healObjectSelect == 0 || len(abandonedChildren) == 0 {
|
|
// If we are not heal scanning, return now.
|
|
break
|
|
}
|
|
|
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
|
if !ok || len(f.disks) == 0 || f.disksQuorum == 0 {
|
|
break
|
|
}
|
|
|
|
bgSeq, found := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
|
if !found {
|
|
break
|
|
}
|
|
|
|
// Whatever remains in 'abandonedChildren' are folders at this level
|
|
// that existed in the previous run but wasn't found now.
|
|
//
|
|
// This may be because of 2 reasons:
|
|
//
|
|
// 1) The folder/object was deleted.
|
|
// 2) We come from another disk and this disk missed the write.
|
|
//
|
|
// We therefore perform a heal check.
|
|
// If that doesn't bring it back we remove the folder and assume it was deleted.
|
|
// This means that the next run will not look for it.
|
|
// How to resolve results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: f.disksQuorum,
|
|
objQuorum: f.disksQuorum,
|
|
bucket: "",
|
|
strict: false,
|
|
}
|
|
|
|
healObjectsPrefix := color.Green("healObjects:")
|
|
for k := range abandonedChildren {
|
|
bucket, prefix := path2BucketObject(k)
|
|
stopFn := globalScannerMetrics.time(scannerMetricCheckMissing)
|
|
f.updateCurrentPath(k)
|
|
|
|
if bucket != resolver.bucket {
|
|
// Bucket might be missing as well with abandoned children.
|
|
// make sure it is created first otherwise healing won't proceed
|
|
// for objects.
|
|
_, _ = objAPI.HealBucket(ctx, bucket, madmin.HealOpts{})
|
|
}
|
|
|
|
resolver.bucket = bucket
|
|
|
|
foundObjs := false
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
err := listPathRaw(ctx, listPathRawOptions{
|
|
disks: f.disks,
|
|
bucket: bucket,
|
|
path: prefix,
|
|
recursive: true,
|
|
reportNotFound: true,
|
|
minDisks: f.disksQuorum,
|
|
agreed: func(entry metaCacheEntry) {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(healObjectsPrefix+" got agreement: %v\n", entry.name)
|
|
}
|
|
},
|
|
// Some disks have data for this.
|
|
partial: func(entries metaCacheEntries, errs []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if !ok {
|
|
// check if we can get one entry atleast
|
|
// proceed to heal nonetheless, since
|
|
// this object might be dangling.
|
|
entry, _ = entries.firstFound()
|
|
}
|
|
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(healObjectsPrefix+" resolved to: %v, dir: %v\n", entry.name, entry.isDir())
|
|
}
|
|
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
// wait on timer per object.
|
|
wait := scannerSleeper.Timer(ctx)
|
|
|
|
// We got an entry which we should be able to heal.
|
|
fiv, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
wait()
|
|
err := bgSeq.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: entry.name,
|
|
versionID: "",
|
|
}, madmin.HealItemObject)
|
|
logger.LogIf(ctx, err)
|
|
foundObjs = foundObjs || err == nil
|
|
return
|
|
}
|
|
|
|
for _, ver := range fiv.Versions {
|
|
// Sleep and reset.
|
|
wait()
|
|
wait = scannerSleeper.Timer(ctx)
|
|
|
|
err := bgSeq.queueHealTask(healSource{
|
|
bucket: bucket,
|
|
object: fiv.Name,
|
|
versionID: ver.VersionID,
|
|
}, madmin.HealItemObject)
|
|
logger.LogIf(ctx, err)
|
|
foundObjs = foundObjs || err == nil
|
|
}
|
|
},
|
|
// Too many disks failed.
|
|
finished: func(errs []error) {
|
|
if f.dataUsageScannerDebug {
|
|
console.Debugf(healObjectsPrefix+" too many errors: %v\n", errs)
|
|
}
|
|
cancel()
|
|
},
|
|
})
|
|
|
|
stopFn()
|
|
if f.dataUsageScannerDebug && err != nil && err != errFileNotFound {
|
|
console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
|
|
}
|
|
|
|
// Add unless healing returned an error.
|
|
if foundObjs {
|
|
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: 1}
|
|
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, this.name, "HEALED")
|
|
scanFolder(this)
|
|
stopFn()
|
|
}
|
|
}
|
|
break
|
|
}
|
|
if !wasCompacted {
|
|
f.newCache.replaceHashed(thisHash, folder.parent, *into)
|
|
}
|
|
|
|
if !into.Compacted && f.newCache.Info.Name != folder.name {
|
|
flat := f.newCache.sizeRecursive(thisHash.Key())
|
|
flat.Compacted = true
|
|
var compact bool
|
|
if flat.Objects < dataScannerCompactLeastObject {
|
|
if f.dataUsageScannerDebug && flat.Objects > 1 {
|
|
// Disabled, rather chatty:
|
|
// console.Debugf(scannerLogPrefix+" Only %d objects, compacting %s -> %+v\n", flat.Objects, folder.name, flat)
|
|
}
|
|
compact = true
|
|
} else {
|
|
// Compact if we only have objects as children...
|
|
compact = true
|
|
for k := range into.Children {
|
|
if v, ok := f.newCache.Cache[k]; ok {
|
|
if len(v.Children) > 0 || v.Objects > 1 {
|
|
compact = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if f.dataUsageScannerDebug && compact {
|
|
// Disabled, rather chatty:
|
|
// console.Debugf(scannerLogPrefix+" Only objects (%d), compacting %s -> %+v\n", flat.Objects, folder.name, flat)
|
|
}
|
|
}
|
|
if compact {
|
|
f.newCache.deleteRecursive(thisHash)
|
|
f.newCache.replaceHashed(thisHash, folder.parent, *flat)
|
|
}
|
|
|
|
}
|
|
// Compact if too many children...
|
|
if !into.Compacted {
|
|
f.newCache.reduceChildrenOf(thisHash, dataScannerCompactAtChildren, f.newCache.Info.Name != folder.name)
|
|
}
|
|
if _, ok := f.updateCache.Cache[thisHash.Key()]; !wasCompacted && ok {
|
|
// Replace if existed before.
|
|
if flat := f.newCache.sizeRecursive(thisHash.Key()); flat != nil {
|
|
f.updateCache.deleteRecursive(thisHash)
|
|
f.updateCache.replaceHashed(thisHash, folder.parent, *flat)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// scannerItem represents each file while walking.
|
|
type scannerItem struct {
|
|
Path string
|
|
bucket string // Bucket.
|
|
prefix string // Only the prefix if any, does not have final object name.
|
|
objectName string // Only the object name without prefixes.
|
|
replication replicationConfig
|
|
lifeCycle *lifecycle.Lifecycle
|
|
Typ fs.FileMode
|
|
heal struct {
|
|
enabled bool
|
|
bitrot bool
|
|
} // Has the object been selected for heal check?
|
|
debug bool
|
|
}
|
|
|
|
type sizeSummary struct {
|
|
totalSize int64
|
|
versions uint64
|
|
replicatedSize int64
|
|
pendingSize int64
|
|
failedSize int64
|
|
replicaSize int64
|
|
pendingCount uint64
|
|
failedCount uint64
|
|
replTargetStats map[string]replTargetSizeSummary
|
|
tiers map[string]tierStats
|
|
}
|
|
|
|
// replTargetSizeSummary holds summary of replication stats by target
|
|
type replTargetSizeSummary struct {
|
|
replicatedSize int64
|
|
pendingSize int64
|
|
failedSize int64
|
|
pendingCount uint64
|
|
failedCount uint64
|
|
}
|
|
|
|
type getSizeFn func(item scannerItem) (sizeSummary, error)
|
|
|
|
// transformMetaDir will transform a directory to prefix/file.ext
|
|
func (i *scannerItem) transformMetaDir() {
|
|
split := strings.Split(i.prefix, SlashSeparator)
|
|
if len(split) > 1 {
|
|
i.prefix = path.Join(split[:len(split)-1]...)
|
|
} else {
|
|
i.prefix = ""
|
|
}
|
|
// Object name is last element
|
|
i.objectName = split[len(split)-1]
|
|
}
|
|
|
|
var (
|
|
applyActionsLogPrefix = color.Green("applyActions:")
|
|
applyVersionActionsLogPrefix = color.Green("applyVersionActions:")
|
|
)
|
|
|
|
func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) {
|
|
if i.debug {
|
|
if oi.VersionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v v(%s)\n", i.bucket, i.objectPath(), oi.VersionID)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
|
|
}
|
|
}
|
|
scanMode := madmin.HealNormalScan
|
|
if i.heal.bitrot {
|
|
scanMode = madmin.HealDeepScan
|
|
}
|
|
healOpts := madmin.HealOpts{
|
|
Remove: healDeleteDangling,
|
|
ScanMode: scanMode,
|
|
}
|
|
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts)
|
|
if err != nil {
|
|
if errors.Is(err, NotImplemented{}) || isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
err = nil
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return 0
|
|
}
|
|
return res.ObjectSize
|
|
}
|
|
|
|
func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (applied bool, size int64) {
|
|
size, err := oi.GetActualSize()
|
|
if i.debug {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
if i.lifeCycle == nil {
|
|
if i.debug {
|
|
// disabled, very chatty:
|
|
// console.Debugf(applyActionsLogPrefix+" no lifecycle rules to apply: %q\n", i.objectPath())
|
|
}
|
|
return false, size
|
|
}
|
|
|
|
versionID := oi.VersionID
|
|
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
|
|
lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, oi)
|
|
if i.debug {
|
|
if versionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), lcEvt.Action)
|
|
}
|
|
}
|
|
defer globalScannerMetrics.timeILM(lcEvt.Action)
|
|
|
|
switch lcEvt.Action {
|
|
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
|
return applyLifecycleAction(lcEvt.Action, oi, ""), 0
|
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
return applyLifecycleAction(lcEvt.Action, oi, lcEvt.StorageClass), size
|
|
default:
|
|
// No action.
|
|
return false, size
|
|
}
|
|
}
|
|
|
|
// applyTierObjSweep removes remote object pending deletion and the free-version
|
|
// tracking this information.
|
|
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
|
|
if !oi.TransitionedObject.FreeVersion {
|
|
// nothing to be done
|
|
return
|
|
}
|
|
|
|
ignoreNotFoundErr := func(err error) error {
|
|
switch {
|
|
case isErrVersionNotFound(err), isErrObjectNotFound(err):
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
// Remove the remote object
|
|
err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
|
|
if ignoreNotFoundErr(err) != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
|
|
// Remove this free version
|
|
_, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
})
|
|
if err == nil {
|
|
auditLogLifecycle(ctx, oi, ILMFreeVersionDelete)
|
|
}
|
|
if ignoreNotFoundErr(err) != nil {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
|
|
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
|
|
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
|
|
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
|
|
if i.lifeCycle == nil {
|
|
return fivs, nil
|
|
}
|
|
|
|
_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
|
|
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
|
|
return fivs, nil
|
|
}
|
|
|
|
overflowVersions := fivs[lim+1:]
|
|
// current version + most recent lim noncurrent versions
|
|
fivs = fivs[:lim+1]
|
|
|
|
rcfg, _ := globalBucketObjectLockSys.Get(i.bucket)
|
|
vcfg, _ := globalBucketVersioningSys.Get(i.bucket)
|
|
|
|
versioned := vcfg != nil && vcfg.Versioned(i.objectPath())
|
|
|
|
toDel := make([]ObjectToDelete, 0, len(overflowVersions))
|
|
for _, fi := range overflowVersions {
|
|
obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)
|
|
// skip versions with object locking enabled
|
|
if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
|
|
if i.debug {
|
|
if obj.VersionID != "" {
|
|
console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
|
|
} else {
|
|
console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
|
|
}
|
|
}
|
|
// add this version back to remaining versions for
|
|
// subsequent lifecycle policy applications
|
|
fivs = append(fivs, fi)
|
|
continue
|
|
}
|
|
|
|
// NoncurrentDays not passed yet.
|
|
if time.Now().UTC().Before(lifecycle.ExpectedExpiryTime(obj.SuccessorModTime, days)) {
|
|
// add this version back to remaining versions for
|
|
// subsequent lifecycle policy applications
|
|
fivs = append(fivs, fi)
|
|
continue
|
|
}
|
|
|
|
toDel = append(toDel, ObjectToDelete{
|
|
ObjectV: ObjectV{
|
|
ObjectName: fi.Name,
|
|
VersionID: fi.VersionID,
|
|
},
|
|
})
|
|
}
|
|
|
|
globalExpiryState.enqueueByNewerNoncurrent(i.bucket, toDel)
|
|
return fivs, nil
|
|
}
|
|
|
|
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
|
|
// after applying lifecycle checks configured.
|
|
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
|
|
return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
|
|
}
|
|
|
|
// applyActions will apply lifecycle checks on to a scanned item.
|
|
// The resulting size on disk will always be returned.
|
|
// The metadata will be compared to consensus on the object layer before any changes are applied.
|
|
// If no metadata is supplied, -1 is returned if no action is taken.
|
|
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 {
|
|
done := globalScannerMetrics.time(scannerMetricILM)
|
|
applied, size := i.applyLifecycle(ctx, o, oi)
|
|
done()
|
|
|
|
// For instance, an applied lifecycle means we remove/transitioned an object
|
|
// from the current deployment, which means we don't have to call healing
|
|
// routine even if we are asked to do via heal flag.
|
|
if !applied {
|
|
if i.heal.enabled {
|
|
done := globalScannerMetrics.time(scannerMetricHealCheck)
|
|
size = i.applyHealing(ctx, o, oi)
|
|
done()
|
|
}
|
|
// replicate only if lifecycle rules are not applied.
|
|
done := globalScannerMetrics.time(scannerMetricCheckReplication)
|
|
i.healReplication(ctx, o, oi.Clone(), sizeS)
|
|
done()
|
|
}
|
|
return size
|
|
}
|
|
|
|
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, obj ObjectInfo) lifecycle.Event {
|
|
event := lc.Eval(obj.ToLifecycleOpts())
|
|
if serverDebugLog {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", event.Action)
|
|
}
|
|
|
|
if event.Action == lifecycle.NoneAction {
|
|
return event
|
|
}
|
|
|
|
switch event.Action {
|
|
case lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredVersionAction:
|
|
// Defensive code, should never happen
|
|
if obj.VersionID == "" {
|
|
return lifecycle.Event{Action: lifecycle.NoneAction}
|
|
}
|
|
if lr.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
|
|
if serverDebugLog {
|
|
if obj.VersionID != "" {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
|
|
} else {
|
|
console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
|
|
}
|
|
}
|
|
return lifecycle.Event{Action: lifecycle.NoneAction}
|
|
}
|
|
}
|
|
|
|
return event
|
|
}
|
|
|
|
func applyTransitionRule(obj ObjectInfo, storageClass string) bool {
|
|
if obj.DeleteMarker {
|
|
return false
|
|
}
|
|
globalTransitionState.queueTransitionTask(obj, storageClass)
|
|
return true
|
|
}
|
|
|
|
func applyExpiryOnTransitionedObject(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, restoredObject bool) bool {
|
|
action := expireObj
|
|
if restoredObject {
|
|
action = expireRestoredObj
|
|
}
|
|
if err := expireTransitionedObject(ctx, objLayer, &obj, obj.ToLifecycleOpts(), action); err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return false
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
// Notification already sent in *expireTransitionedObject*, just return 'true' here.
|
|
return true
|
|
}
|
|
|
|
func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLayer, obj ObjectInfo, applyOnVersion bool) bool {
|
|
opts := ObjectOptions{
|
|
Expiration: ExpirationOptions{Expire: true},
|
|
}
|
|
|
|
if applyOnVersion {
|
|
opts.VersionID = obj.VersionID
|
|
}
|
|
if opts.VersionID == "" {
|
|
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(obj.Bucket, obj.Name)
|
|
opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(obj.Bucket, obj.Name)
|
|
}
|
|
|
|
obj, err := objLayer.DeleteObject(ctx, obj.Bucket, obj.Name, opts)
|
|
if err != nil {
|
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
|
return false
|
|
}
|
|
// Assume it is still there.
|
|
logger.LogIf(ctx, err)
|
|
return false
|
|
}
|
|
|
|
// Send audit for the lifecycle delete operation
|
|
auditLogLifecycle(ctx, obj, ILMExpiry)
|
|
|
|
eventName := event.ObjectRemovedDelete
|
|
if obj.DeleteMarker {
|
|
eventName = event.ObjectRemovedDeleteMarkerCreated
|
|
}
|
|
|
|
// Notify object deleted event.
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: obj.Bucket,
|
|
Object: obj,
|
|
Host: "Internal: [ILM-EXPIRY]",
|
|
})
|
|
|
|
return true
|
|
}
|
|
|
|
// Apply object, object version, restored object or restored object version action on the given object
|
|
func applyExpiryRule(obj ObjectInfo, restoredObject, applyOnVersion bool) bool {
|
|
globalExpiryState.enqueueByDays(obj, restoredObject, applyOnVersion)
|
|
return true
|
|
}
|
|
|
|
// Perform actions (removal or transitioning of objects), return true the action is successfully performed
|
|
func applyLifecycleAction(action lifecycle.Action, obj ObjectInfo, storageClass string) (success bool) {
|
|
switch action {
|
|
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
|
|
success = applyExpiryRule(obj, false, action == lifecycle.DeleteVersionAction)
|
|
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
|
success = applyExpiryRule(obj, true, action == lifecycle.DeleteRestoredVersionAction)
|
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
success = applyTransitionRule(obj, storageClass)
|
|
}
|
|
return
|
|
}
|
|
|
|
// objectPath returns the prefix and object name.
|
|
func (i *scannerItem) objectPath() string {
|
|
return path.Join(i.prefix, i.objectName)
|
|
}
|
|
|
|
// healReplication will heal a scanned item that has failed replication.
|
|
func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) {
|
|
if oi.VersionID == "" {
|
|
return
|
|
}
|
|
if i.replication.Config == nil {
|
|
return
|
|
}
|
|
roi := queueReplicationHeal(ctx, oi.Bucket, oi, i.replication)
|
|
if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() {
|
|
return
|
|
}
|
|
|
|
if sizeS.replTargetStats == nil && len(roi.TargetStatuses) > 0 {
|
|
sizeS.replTargetStats = make(map[string]replTargetSizeSummary)
|
|
}
|
|
|
|
for arn, tgtStatus := range roi.TargetStatuses {
|
|
tgtSizeS, ok := sizeS.replTargetStats[arn]
|
|
if !ok {
|
|
tgtSizeS = replTargetSizeSummary{}
|
|
}
|
|
switch tgtStatus {
|
|
case replication.Pending:
|
|
tgtSizeS.pendingCount++
|
|
tgtSizeS.pendingSize += oi.Size
|
|
sizeS.pendingCount++
|
|
sizeS.pendingSize += oi.Size
|
|
case replication.Failed:
|
|
tgtSizeS.failedSize += oi.Size
|
|
tgtSizeS.failedCount++
|
|
sizeS.failedSize += oi.Size
|
|
sizeS.failedCount++
|
|
case replication.Completed, replication.CompletedLegacy:
|
|
tgtSizeS.replicatedSize += oi.Size
|
|
sizeS.replicatedSize += oi.Size
|
|
}
|
|
sizeS.replTargetStats[arn] = tgtSizeS
|
|
}
|
|
|
|
switch oi.ReplicationStatus {
|
|
case replication.Replica:
|
|
sizeS.replicaSize += oi.Size
|
|
}
|
|
}
|
|
|
|
type dynamicSleeper struct {
|
|
mu sync.RWMutex
|
|
|
|
// Sleep factor
|
|
factor float64
|
|
|
|
// maximum sleep cap,
|
|
// set to <= 0 to disable.
|
|
maxSleep time.Duration
|
|
|
|
// Don't sleep at all, if time taken is below this value.
|
|
// This is to avoid too small costly sleeps.
|
|
minSleep time.Duration
|
|
|
|
// cycle will be closed
|
|
cycle chan struct{}
|
|
|
|
// isScanner should be set when this is used by the scanner
|
|
// to record metrics.
|
|
isScanner bool
|
|
}
|
|
|
|
// newDynamicSleeper
|
|
func newDynamicSleeper(factor float64, maxWait time.Duration, isScanner bool) *dynamicSleeper {
|
|
return &dynamicSleeper{
|
|
factor: factor,
|
|
cycle: make(chan struct{}),
|
|
maxSleep: maxWait,
|
|
minSleep: 100 * time.Microsecond,
|
|
isScanner: isScanner,
|
|
}
|
|
}
|
|
|
|
// Timer returns a timer that has started.
|
|
// When the returned function is called it will wait.
|
|
func (d *dynamicSleeper) Timer(ctx context.Context) func() {
|
|
t := time.Now()
|
|
return func() {
|
|
doneAt := time.Now()
|
|
for {
|
|
// Grab current values
|
|
d.mu.RLock()
|
|
minWait, maxWait := d.minSleep, d.maxSleep
|
|
factor := d.factor
|
|
cycle := d.cycle
|
|
d.mu.RUnlock()
|
|
elapsed := doneAt.Sub(t)
|
|
// Don't sleep for really small amount of time
|
|
wantSleep := time.Duration(float64(elapsed) * factor)
|
|
if wantSleep <= minWait {
|
|
return
|
|
}
|
|
if maxWait > 0 && wantSleep > maxWait {
|
|
wantSleep = maxWait
|
|
}
|
|
timer := time.NewTimer(wantSleep)
|
|
select {
|
|
case <-ctx.Done():
|
|
if !timer.Stop() {
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
<-timer.C
|
|
}
|
|
return
|
|
case <-timer.C:
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
return
|
|
case <-cycle:
|
|
if !timer.Stop() {
|
|
// We expired.
|
|
<-timer.C
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sleep sleeps the specified time multiplied by the sleep factor.
|
|
// If the factor is updated the sleep will be done again with the new factor.
|
|
func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
|
|
for {
|
|
// Grab current values
|
|
d.mu.RLock()
|
|
minWait, maxWait := d.minSleep, d.maxSleep
|
|
factor := d.factor
|
|
cycle := d.cycle
|
|
d.mu.RUnlock()
|
|
// Don't sleep for really small amount of time
|
|
wantSleep := time.Duration(float64(base) * factor)
|
|
if wantSleep <= minWait {
|
|
return
|
|
}
|
|
if maxWait > 0 && wantSleep > maxWait {
|
|
wantSleep = maxWait
|
|
}
|
|
timer := time.NewTimer(wantSleep)
|
|
select {
|
|
case <-ctx.Done():
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
}
|
|
return
|
|
case <-timer.C:
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
return
|
|
case <-cycle:
|
|
if !timer.Stop() {
|
|
// We expired.
|
|
<-timer.C
|
|
if d.isScanner {
|
|
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the current settings and cycle all waiting.
|
|
// Parameters are the same as in the contructor.
|
|
func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait {
|
|
return nil
|
|
}
|
|
// Update values and cycle waiting.
|
|
close(d.cycle)
|
|
d.factor = factor
|
|
d.maxSleep = maxWait
|
|
d.cycle = make(chan struct{})
|
|
return nil
|
|
}
|
|
|
|
const (
|
|
// ILMExpiry - audit trail for ILM expiry
|
|
ILMExpiry = "ilm:expiry"
|
|
// ILMFreeVersionDelete - audit trail for ILM free-version delete
|
|
ILMFreeVersionDelete = "ilm:free-version-delete"
|
|
// ILMTransition - audit trail for ILM transitioning.
|
|
ILMTransition = " ilm:transition"
|
|
)
|
|
|
|
func auditLogLifecycle(ctx context.Context, oi ObjectInfo, event string) {
|
|
var apiName string
|
|
switch event {
|
|
case ILMExpiry:
|
|
apiName = "ILMExpiry"
|
|
case ILMFreeVersionDelete:
|
|
apiName = "ILMFreeVersionDelete"
|
|
case ILMTransition:
|
|
apiName = "ILMTransition"
|
|
}
|
|
auditLogInternal(ctx, AuditLogOptions{
|
|
Event: event,
|
|
APIName: apiName,
|
|
Bucket: oi.Bucket,
|
|
Object: oi.Name,
|
|
VersionID: oi.VersionID,
|
|
})
|
|
}
|