heal: Enable periodic bitrot scan configuration (#14464)

This commit is contained in:
Anis Elleuch 2022-04-07 16:10:40 +01:00 committed by GitHub
parent ee49a23220
commit 16431d222c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 192 additions and 50 deletions

View File

@ -701,7 +701,7 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
if source.opts != nil {
task.opts = *source.opts
} else {
task.opts.ScanMode = globalHealConfig.ScanMode()
task.opts.ScanMode = madmin.HealNormalScan
}
h.mutex.Lock()

View File

@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io/fs"
@ -103,6 +104,63 @@ func (s *safeDuration) Get() time.Duration {
return s.t
}
func getCycleScanMode(currentCycle, bitrotStartCycle uint64, bitrotStartTime time.Time) madmin.HealScanMode {
bitrotCycle := globalHealConfig.BitrotScanCycle()
switch bitrotCycle {
case -1:
return madmin.HealNormalScan
case 0:
return madmin.HealDeepScan
}
if currentCycle-bitrotStartCycle < healObjectSelectProb {
return madmin.HealDeepScan
}
if time.Since(bitrotStartTime) > bitrotCycle {
return madmin.HealDeepScan
}
return madmin.HealNormalScan
}
type backgroundHealInfo struct {
BitrotStartTime time.Time `json:"bitrotStartTime"`
BitrotStartCycle uint64 `json:"bitrotStartCycle"`
CurrentScanMode madmin.HealScanMode `json:"currentScanMode"`
}
func readBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer) backgroundHealInfo {
// Get last healing information
buf, err := readConfig(ctx, objAPI, backgroundHealInfoPath)
if err != nil {
if !errors.Is(err, errConfigNotFound) {
logger.LogIf(ctx, err)
}
return backgroundHealInfo{}
}
var info backgroundHealInfo
err = json.Unmarshal(buf, &info)
if err != nil {
logger.LogIf(ctx, err)
return backgroundHealInfo{}
}
return info
}
func saveBackgroundHealInfo(ctx context.Context, objAPI ObjectLayer, info backgroundHealInfo) {
b, err := json.Marshal(info)
if err != nil {
logger.LogIf(ctx, err)
return
}
// Get last healing information
err = saveConfig(ctx, objAPI, backgroundHealInfoPath, b)
if err != nil {
logger.LogIf(ctx, err)
}
}
// runDataScanner will start a data scanner.
// The function will block until the context is canceled.
// There should only ever be one scanner running per cluster.
@ -145,12 +203,24 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
console.Debugln("starting scanner cycle")
}
bgHealInfo := readBackgroundHealInfo(ctx, objAPI)
scanMode := getCycleScanMode(nextBloomCycle, bgHealInfo.BitrotStartCycle, bgHealInfo.BitrotStartTime)
if bgHealInfo.CurrentScanMode != scanMode {
newHealInfo := bgHealInfo
newHealInfo.CurrentScanMode = scanMode
if scanMode == madmin.HealDeepScan {
newHealInfo.BitrotStartTime = time.Now().UTC()
newHealInfo.BitrotStartCycle = nextBloomCycle
}
saveBackgroundHealInfo(ctx, objAPI, newHealInfo)
}
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
logger.LogIf(ctx, err)
err = objAPI.NSScanner(ctx, bf, results, uint32(nextBloomCycle))
err = objAPI.NSScanner(ctx, bf, results, uint32(nextBloomCycle), scanMode)
logger.LogIf(ctx, err)
if err == nil {
// Store new cycle...
@ -182,6 +252,7 @@ type folderScanner struct {
dataUsageScannerDebug bool
healFolderInclude uint32 // Include a clean folder one in n cycles.
healObjectSelect uint32 // Do a heal check on an object once every n cycles. Must divide into healFolderInclude
scanMode madmin.HealScanMode
disks []StorageAPI
disksQuorum int
@ -250,7 +321,7 @@ var globalScannerStats scannerStats
// The returned cache will always be valid, but may not be updated from the existing.
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn) (dataUsageCache, error) {
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
t := UTCNow()
logPrefix := color.Green("data-usage: ")
@ -279,6 +350,7 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
dataUsageScannerDebug: intDataUpdateTracker.debug,
healFolderInclude: 0,
healObjectSelect: 0,
scanMode: scanMode,
updates: cache.Info.updates,
}
@ -482,12 +554,15 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
debug: f.dataUsageScannerDebug,
lifeCycle: activeLifeCycle,
replication: replicationCfg,
heal: thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure,
}
item.heal.enabled = thisHash.modAlt(f.oldCache.Info.NextCycle/folder.objectHealProbDiv, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure
item.heal.bitrot = f.scanMode == madmin.HealDeepScan
// if the drive belongs to an erasure set
// that is already being healed, skip the
// healing attempt on this drive.
item.heal = item.heal && f.healObjectSelect > 0
item.heal.enabled = item.heal.enabled && f.healObjectSelect > 0
sz, err := f.getSize(item)
if err != nil {
@ -821,8 +896,11 @@ type scannerItem struct {
replication replicationConfig
lifeCycle *lifecycle.Lifecycle
Typ fs.FileMode
heal bool // Has the object been selected for heal check?
debug bool
heal struct {
enabled bool
bitrot bool
} // Has the object been selected for heal check?
debug bool
}
type sizeSummary struct {
@ -874,9 +952,13 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi Object
console.Debugf(applyActionsLogPrefix+" heal checking: %v/%v\n", i.bucket, i.objectPath())
}
}
scanMode := madmin.HealNormalScan
if i.heal.bitrot {
scanMode = madmin.HealDeepScan
}
healOpts := madmin.HealOpts{
Remove: healDeleteDangling,
ScanMode: globalHealConfig.ScanMode(),
ScanMode: scanMode,
}
res, err := o.HealObject(ctx, i.bucket, i.objectPath(), oi.VersionID, healOpts)
if err != nil && !errors.Is(err, NotImplemented{}) {
@ -1040,7 +1122,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object
// from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag.
if !applied {
if i.heal {
if i.heal.enabled {
size = i.applyHealing(ctx, o, oi)
}
// replicate only if lifecycle rules are not applied.

View File

@ -35,6 +35,8 @@ const (
dataUsageBloomName = ".bloomcycle.bin"
dataUsageBloomNamePath = bucketMetaPrefix + SlashSeparator + dataUsageBloomName
backgroundHealInfoPath = bucketMetaPrefix + SlashSeparator + ".background-heal.json"
dataUsageCacheName = ".usage-cache.bin"
)

View File

@ -67,7 +67,7 @@ func TestDataUsageUpdate(t *testing.T) {
return
}
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize)
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
if err != nil {
t.Fatal(err)
}
@ -178,7 +178,7 @@ func TestDataUsageUpdate(t *testing.T) {
}
// Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize)
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize, 0)
got.Info.NextCycle++
if err != nil {
t.Fatal(err)
@ -289,7 +289,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
}
return
}
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize)
got, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: "bucket"}}, getSize, 0)
if err != nil {
t.Fatal(err)
}
@ -423,7 +423,7 @@ func TestDataUsageUpdatePrefix(t *testing.T) {
}
// Changed dir must be picked up in this many cycles.
for i := 0; i < dataUsageUpdateDirCycles; i++ {
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize)
got, err = scanDataFolder(context.Background(), 0, 0, base, got, getSize, 0)
got.Info.NextCycle++
if err != nil {
t.Fatal(err)
@ -575,7 +575,7 @@ func TestDataUsageCacheSerialize(t *testing.T) {
}
return
}
want, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize)
want, err := scanDataFolder(context.Background(), 0, 0, base, dataUsageCache{Info: dataUsageCacheInfo{Name: bucket}}, getSize, 0)
if err != nil {
t.Fatal(err)
}

View File

@ -531,7 +531,7 @@ func (z *erasureServerPools) StorageInfo(ctx context.Context) (StorageInfo, []er
return storageInfo, errs
}
func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32) error {
func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
// Updates must be closed before we return.
defer close(updates)
@ -576,7 +576,7 @@ func (z *erasureServerPools) NSScanner(ctx context.Context, bf *bloomFilter, upd
}
}()
// Start scanner. Blocks until done.
err := erObj.nsScanner(ctx, allBuckets, bf, wantCycle, updates)
err := erObj.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode)
if err != nil {
logger.LogIf(ctx, err)
mu.Lock()

View File

@ -344,7 +344,7 @@ func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
// nsScanner will start scanning buckets and send updated totals as they are traversed.
// Updates are sent on a regular basis and the caller *must* consume them.
func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache) error {
func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
if len(buckets) == 0 {
return nil
}
@ -490,7 +490,7 @@ func (er erasureObjects) nsScanner(ctx context.Context, buckets []BucketInfo, bf
// Calc usage
before := cache.Info.LastUpdate
var err error
cache, err = disk.NSScanner(ctx, cache, updates)
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
cache.Info.BloomFilter = nil
if err != nil {
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {

View File

@ -235,7 +235,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) {
}
// NSScanner returns data usage stats of the current FS deployment
func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32) error {
func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, _ madmin.HealScanMode) error {
defer close(updates)
// Load bucket totals
var totalCache dataUsageCache
@ -396,7 +396,7 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs
}
return sizeSummary{totalSize: fi.Size(), versions: 1}, nil
})
}, 0)
return cache, err
}

View File

@ -48,7 +48,7 @@ func (a GatewayUnsupported) LocalStorageInfo(ctx context.Context) (StorageInfo,
}
// NSScanner - scanner is not implemented for gateway
func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32) error {
func (a GatewayUnsupported) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error {
logger.CriticalIf(ctx, errors.New("not implemented"))
return NotImplemented{}
}

View File

@ -44,8 +44,7 @@ func newBgHealSequence() *healSequence {
hs := madmin.HealOpts{
// Remove objects that do not have read-quorum
Remove: healDeleteDangling,
ScanMode: globalHealConfig.ScanMode(),
Remove: healDeleteDangling,
}
return &healSequence{
@ -165,7 +164,7 @@ func mustGetHealSequence(ctx context.Context) *healSequence {
// healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
bgSeq := mustGetHealSequence(ctx)
scanMode := globalHealConfig.ScanMode()
scanMode := madmin.HealNormalScan
// Make sure to copy since `buckets slice`
// is modified in place by tracker.

View File

@ -185,7 +185,7 @@ func (m *mrfState) healRoutine() {
defer idler.Stop()
mrfHealingOpts := madmin.HealOpts{
ScanMode: globalHealConfig.ScanMode(),
ScanMode: madmin.HealNormalScan,
Remove: healDeleteDangling,
}

View File

@ -22,6 +22,8 @@ import (
"io"
"sync"
"time"
"github.com/minio/madmin-go"
)
// naughtyDisk wraps a POSIX disk and returns programmed errors
@ -110,8 +112,8 @@ func (d *naughtyDisk) SetDiskID(id string) {
d.disk.SetDiskID(id)
}
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (info dataUsageCache, err error) {
return d.disk.NSScanner(ctx, cache, updates)
func (d *naughtyDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (info dataUsageCache, err error) {
return d.disk.NSScanner(ctx, cache, updates, scanMode)
}
func (d *naughtyDisk) DiskInfo(ctx context.Context) (info DiskInfo, err error) {

View File

@ -171,7 +171,7 @@ type ObjectLayer interface {
// Storage operations.
Shutdown(context.Context) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, scanMode madmin.HealScanMode) error
BackendInfo() madmin.BackendInfo
StorageInfo(ctx context.Context) (StorageInfo, []error)
LocalStorageInfo(ctx context.Context) (StorageInfo, []error)

View File

@ -21,6 +21,8 @@ import (
"context"
"io"
"time"
"github.com/minio/madmin-go"
)
// StorageAPI interface.
@ -64,7 +66,7 @@ type StorageAPI interface {
// has never been replaced.
Healing() *healingTracker
DiskInfo(ctx context.Context) (info DiskInfo, err error)
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error)
NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error)
// Volume operations.
MakeVol(ctx context.Context, volume string) (err error)
@ -142,7 +144,7 @@ func (p *unrecognizedDisk) Healing() *healingTracker {
return nil
}
func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
func (p *unrecognizedDisk) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
return dataUsageCache{}, errDiskNotFound
}

View File

@ -33,6 +33,7 @@ import (
"sync"
"time"
"github.com/minio/madmin-go"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/rest"
@ -207,12 +208,14 @@ func (client *storageRESTClient) Healing() *healingTracker {
return val.(*healingTracker)
}
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
defer close(updates)
pr, pw := io.Pipe()
go func() {
pw.CloseWithError(cache.serializeTo(pw))
}()
vals := make(url.Values)
vals.Set(storageRESTScanMode, strconv.Itoa(int(scanMode)))
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
defer xhttp.DrainBody(respBody)
pr.CloseWithError(err)

View File

@ -18,7 +18,7 @@
package cmd
const (
storageRESTVersion = "v43" // Added DiskMTime field for FileInfo
storageRESTVersion = "v44" // Added heal scan mode in NSScanner
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)
@ -79,4 +79,5 @@ const (
storageRESTDiskID = "disk-id"
storageRESTForceDelete = "force-delete"
storageRESTGlob = "glob"
storageRESTScanMode = "scan-mode"
)

View File

@ -40,6 +40,7 @@ import (
jwtreq "github.com/golang-jwt/jwt/v4/request"
"github.com/gorilla/mux"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/config"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
@ -179,10 +180,17 @@ func (s *storageRESTServer) NSScannerHandler(w http.ResponseWriter, r *http.Requ
return
}
scanMode, err := strconv.Atoi(r.Form.Get(storageRESTScanMode))
if err != nil {
logger.LogIf(r.Context(), err)
s.writeErrorResponse(w, err)
return
}
setEventStreamHeaders(w)
var cache dataUsageCache
err := cache.deserialize(r.Body)
err = cache.deserialize(r.Body)
if err != nil {
logger.LogIf(r.Context(), err)
s.writeErrorResponse(w, err)
@ -220,7 +228,7 @@ func (s *storageRESTServer) NSScannerHandler(w http.ResponseWriter, r *http.Requ
}
}
}()
usageInfo, err := s.storage.NSScanner(ctx, cache, updates)
usageInfo, err := s.storage.NSScanner(ctx, cache, updates, madmin.HealScanMode(scanMode))
if err != nil {
respW.Flush()
resp.CloseWithError(err)

View File

@ -153,7 +153,7 @@ func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
return p.storage.Healing()
}
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
if contextCanceled(ctx) {
return dataUsageCache{}, ctx.Err()
}
@ -161,7 +161,7 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
if err := p.checkDiskStale(); err != nil {
return dataUsageCache{}, err
}
return p.storage.NSScanner(ctx, cache, updates)
return p.storage.NSScanner(ctx, cache, updates, scanMode)
}
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {

View File

@ -38,6 +38,7 @@ import (
"github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/disk"
@ -409,7 +410,7 @@ func (s *xlStorage) readMetadata(ctx context.Context, itemPath string) ([]byte,
return buf, err
}
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry) (dataUsageCache, error) {
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates chan<- dataUsageEntry, scanMode madmin.HealScanMode) (dataUsageCache, error) {
// Updates must be closed before we return.
defer close(updates)
var lc *lifecycle.Lifecycle
@ -524,7 +525,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
item.applyTierObjSweep(ctx, objAPI, oi)
}
return sizeS, nil
})
}, scanMode)
if err != nil {
return dataUsageInfo, err
}

View File

@ -81,7 +81,7 @@ func ParseBool(str string) (bool, error) {
if strings.EqualFold(str, "disabled") {
return false, nil
}
return false, fmt.Errorf("ParseBool: parsing '%s': %s", str, strconv.ErrSyntax)
return false, fmt.Errorf("ParseBool: parsing '%s': %w", str, strconv.ErrSyntax)
}
// ParseBoolFlag - parses string into BoolFlag.

View File

@ -18,12 +18,13 @@
package heal
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/env"
)
@ -44,20 +45,27 @@ var configMutex sync.RWMutex
// Config represents the heal settings.
type Config struct {
// Bitrot will perform bitrot scan on local disk when checking objects.
Bitrot bool `json:"bitrotscan"`
Bitrot string `json:"bitrotscan"`
// maximum sleep duration between objects to slow down heal operation.
Sleep time.Duration `json:"sleep"`
IOCount int `json:"iocount"`
// Cached value from Bitrot field
cache struct {
// -1: bitrot enabled, 0: bitrot disabled, > 0: bitrot cycle
bitrotCycle time.Duration
}
}
// ScanMode returns configured scan mode
func (opts Config) ScanMode() madmin.HealScanMode {
// BitrotScanCycle returns the configured cycle for the scanner healing
// -1 for not enabled
// 0 for contiunous bitrot scanning
// >0 interval duration between cycles
func (opts Config) BitrotScanCycle() (d time.Duration) {
configMutex.RLock()
defer configMutex.RUnlock()
if opts.Bitrot {
return madmin.HealDeepScan
}
return madmin.HealNormalScan
return opts.cache.bitrotCycle
}
// Wait waits for IOCount to go down or max sleep to elapse before returning.
@ -103,6 +111,8 @@ func (opts *Config) Update(nopts Config) {
opts.Bitrot = nopts.Bitrot
opts.IOCount = nopts.IOCount
opts.Sleep = nopts.Sleep
opts.cache.bitrotCycle, _ = parseBitrotConfig(nopts.Bitrot)
}
var (
@ -126,9 +136,9 @@ var (
Help = config.HelpKVS{
config.HelpKV{
Key: Bitrot,
Description: `perform bitrot scan on disks when checking objects during scanner`,
Description: `perform bitrot scan on disks when checking objects during scanner. e.g 6m`,
Optional: true,
Type: "on|off",
Type: "on|off|duration",
},
config.HelpKV{
Key: Sleep,
@ -145,12 +155,44 @@ var (
}
)
const minimumBitrotCycleInMonths = 1
func parseBitrotConfig(s string) (time.Duration, error) {
// Try to parse as a boolean
enabled, err := config.ParseBool(s)
if err == nil {
switch enabled {
case true:
return 0, nil
case false:
return -1, nil
}
}
// Try to parse as a number of months
if !strings.HasSuffix(s, "m") {
return -1, errors.New("unknown format")
}
months, err := strconv.Atoi(strings.TrimSuffix(s, "m"))
if err != nil {
return -1, err
}
if months < minimumBitrotCycleInMonths {
return -1, fmt.Errorf("minimum bitrot cycle is %d month(s)", minimumBitrotCycleInMonths)
}
return time.Duration(months) * 30 * 24 * time.Hour, nil
}
// LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.HealSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
cfg.Bitrot, err = config.ParseBool(env.Get(EnvBitrot, kvs.GetWithDefault(Bitrot, DefaultKVS)))
cfg.Bitrot = env.Get(EnvBitrot, kvs.GetWithDefault(Bitrot, DefaultKVS))
_, err = parseBitrotConfig(cfg.Bitrot)
if err != nil {
return cfg, fmt.Errorf("'heal:bitrotscan' value invalid: %w", err)
}