diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b61e2c780..dbc901124 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -683,13 +683,6 @@ func (h *healSequence) healSequenceStart(objAPI ObjectLayer) { } } -func (h *healSequence) logHeal(healType madmin.HealItemType) { - h.mutex.Lock() - h.scannedItemsMap[healType]++ - h.lastHealActivity = UTCNow() - h.mutex.Unlock() -} - func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItemType) error { // Send heal request task := healTask{ diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 1ef7aa1bf..90842bd31 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -20,6 +20,7 @@ package cmd import ( "context" "fmt" + "runtime" "sort" "time" @@ -30,6 +31,7 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/console" "github.com/minio/pkg/v2/wildcard" + "github.com/minio/pkg/v2/workers" ) const ( @@ -132,30 +134,8 @@ func getLocalBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.Bg return status, true } -func mustGetHealSequence(ctx context.Context) *healSequence { - // Get background heal sequence to send elements to heal - for { - globalHealStateLK.RLock() - hstate := globalBackgroundHealState - globalHealStateLK.RUnlock() - - if hstate == nil { - time.Sleep(time.Second) - continue - } - - bgSeq, ok := hstate.getHealSequenceByToken(bgHealingUUID) - if !ok { - time.Sleep(time.Second) - continue - } - return bgSeq - } -} - // healErasureSet lists and heals all objects in a specific erasure set func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error { - bgSeq := mustGetHealSequence(ctx) scanMode := madmin.HealNormalScan // Make sure to copy since `buckets slice` @@ -173,6 +153,30 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } } + info, err := tracker.disk.DiskInfo(ctx, false) + if err != nil { + return fmt.Errorf("unable to get disk information before healing it: %w", err) + } + + var numHealers uint64 + + if numCores := uint64(runtime.GOMAXPROCS(0)); info.NRRequests > numCores { + numHealers = numCores / 4 + } else { + numHealers = info.NRRequests / 4 + } + if numHealers < 4 { + numHealers = 4 + } + // allow overriding this value as well.. + if v := globalHealConfig.GetWorkers(); v > 0 { + numHealers = uint64(v) + } + + logger.Info(fmt.Sprintf("Healing drive '%s' - use %d parallel workers.", tracker.disk.String(), numHealers)) + + jt, _ := workers.New(int(numHealers)) + var retErr error // Heal all buckets with all objects for _, bucket := range healBuckets { @@ -267,6 +271,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // Note: updates from healEntry to tracker must be sent on results channel. healEntry := func(bucket string, entry metaCacheEntry) { + defer jt.Give() + if entry.name == "" && len(entry.metadata) == 0 { // ignore entries that don't have metadata. return @@ -291,14 +297,17 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } } + // erasureObjects layer needs object names to be encoded + encodedEntryName := encodeDirObject(entry.name) + var result healEntryResult fivs, err := entry.fileInfoVersions(bucket) if err != nil { - err := bgSeq.queueHealTask(healSource{ - bucket: bucket, - object: entry.name, - versionID: "", - }, madmin.HealItemObject) + _, err := er.HealObject(ctx, bucket, encodedEntryName, "", + madmin.HealOpts{ + ScanMode: scanMode, + Remove: healDeleteDangling, + }) if err != nil { if isErrObjectNotFound(err) { // queueing happens across namespace, ignore @@ -321,11 +330,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, if version.ModTime.After(tracker.Started) { continue } - if err := bgSeq.queueHealTask(healSource{ - bucket: bucket, - object: version.Name, - versionID: version.VersionID, - }, madmin.HealItemObject); err != nil { + if _, err := er.HealObject(ctx, bucket, encodedEntryName, + version.VersionID, madmin.HealOpts{ + ScanMode: scanMode, + Remove: healDeleteDangling, + }); err != nil { if isErrObjectNotFound(err) { // queueing happens across namespace, ignore // objects that are not found. @@ -344,7 +353,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } else { result = healEntrySuccess(uint64(version.Size)) } - bgSeq.logHeal(madmin.HealItemObject) if !send(result) { return @@ -382,7 +390,8 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, minDisks: 1, reportNotFound: false, agreed: func(entry metaCacheEntry) { - healEntry(actualBucket, entry) + jt.Take() + go healEntry(actualBucket, entry) }, partial: func(entries metaCacheEntries, _ []error) { entry, ok := entries.resolve(&resolver) @@ -391,10 +400,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // proceed to heal nonetheless. entry, _ = entries.firstFound() } - healEntry(actualBucket, *entry) + jt.Take() + go healEntry(actualBucket, *entry) }, finished: nil, }) + jt.Wait() // synchronize all the concurrent heal jobs close(results) if err != nil { // Set this such that when we return this function diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index 96ebaa227..7595ddadb 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -46,6 +46,7 @@ type DiskInfo struct { FreeInodes uint64 Major uint32 Minor uint32 + NRRequests uint64 FSType string RootDisk bool Healing bool diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index f590df115..508fbd3b1 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -14,8 +14,8 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Total, err = dc.ReadUint64() @@ -53,6 +53,11 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Minor") return } + z.NRRequests, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "NRRequests") + return + } z.FSType, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "FSType") @@ -108,8 +113,8 @@ func (z *DiskInfo) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { - // array header, size 17 - err = en.Append(0xdc, 0x0, 0x11) + // array header, size 18 + err = en.Append(0xdc, 0x0, 0x12) if err != nil { return } @@ -148,6 +153,11 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Minor") return } + err = en.WriteUint64(z.NRRequests) + if err != nil { + err = msgp.WrapError(err, "NRRequests") + return + } err = en.WriteString(z.FSType) if err != nil { err = msgp.WrapError(err, "FSType") @@ -204,8 +214,8 @@ func (z *DiskInfo) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // array header, size 17 - o = append(o, 0xdc, 0x0, 0x11) + // array header, size 18 + o = append(o, 0xdc, 0x0, 0x12) o = msgp.AppendUint64(o, z.Total) o = msgp.AppendUint64(o, z.Free) o = msgp.AppendUint64(o, z.Used) @@ -213,6 +223,7 @@ func (z *DiskInfo) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendUint64(o, z.FreeInodes) o = msgp.AppendUint32(o, z.Major) o = msgp.AppendUint32(o, z.Minor) + o = msgp.AppendUint64(o, z.NRRequests) o = msgp.AppendString(o, z.FSType) o = msgp.AppendBool(o, z.RootDisk) o = msgp.AppendBool(o, z.Healing) @@ -238,8 +249,8 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err) return } - if zb0001 != 17 { - err = msgp.ArrayError{Wanted: 17, Got: zb0001} + if zb0001 != 18 { + err = msgp.ArrayError{Wanted: 18, Got: zb0001} return } z.Total, bts, err = msgp.ReadUint64Bytes(bts) @@ -277,6 +288,11 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Minor") return } + z.NRRequests, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "NRRequests") + return + } z.FSType, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "FSType") @@ -333,7 +349,7 @@ func (z *DiskInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *DiskInfo) Msgsize() (s int) { - s = 3 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint32Size + msgp.Uint32Size + msgp.StringPrefixSize + len(z.FSType) + msgp.BoolSize + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.Endpoint) + msgp.StringPrefixSize + len(z.MountPath) + msgp.StringPrefixSize + len(z.ID) + msgp.BoolSize + z.Metrics.Msgsize() + msgp.StringPrefixSize + len(z.Error) + s = 3 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint32Size + msgp.Uint32Size + msgp.Uint64Size + msgp.StringPrefixSize + len(z.FSType) + msgp.BoolSize + msgp.BoolSize + msgp.BoolSize + msgp.StringPrefixSize + len(z.Endpoint) + msgp.StringPrefixSize + len(z.MountPath) + msgp.StringPrefixSize + len(z.ID) + msgp.BoolSize + z.Metrics.Msgsize() + msgp.StringPrefixSize + len(z.Error) return } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6401f55a9..6b49b9486 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -114,6 +114,8 @@ type xlStorage struct { formatData []byte + nrRequests uint64 + // mutex to prevent concurrent read operations overloading walks. rotational bool walkMu *sync.Mutex @@ -244,6 +246,11 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { diskIndex: -1, } + // Sanitize before setting it + if info.NRRequests > 0 { + s.nrRequests = info.NRRequests + } + // We stagger listings only on HDDs. if info.Rotational == nil || *info.Rotational { s.rotational = true @@ -658,6 +665,7 @@ func (s *xlStorage) DiskInfo(_ context.Context, _ bool) (info DiskInfo, err erro dcinfo.UsedInodes = di.Files - di.Ffree dcinfo.FreeInodes = di.Ffree dcinfo.FSType = di.FSType + dcinfo.NRRequests = s.nrRequests dcinfo.Rotational = s.rotational diskID, err := s.GetDiskID() // Healing is 'true' when diff --git a/docs/config/README.md b/docs/config/README.md index 158250ffe..5336b1fe5 100644 --- a/docs/config/README.md +++ b/docs/config/README.md @@ -273,19 +273,23 @@ Once set the scanner settings are automatically applied without the need for ser ### Healing -Healing is enabled by default. The following configuration settings allow for more staggered delay in terms of healing. The healing system by default adapts to the system speed and pauses up to '1sec' per object when the system has `max_io` number of concurrent requests. It is possible to adjust the `max_sleep` and `max_io` values thereby increasing the healing speed. The delays between each operation of the healer can be adjusted by the `mc admin config set alias/ heal max_sleep=1s` and maximum concurrent requests allowed before we start slowing things down can be configured with `mc admin config set alias/ heal max_io=30` . By default the wait delay is `1sec` beyond 10 concurrent operations. This means the healer will sleep *1 second* at max for each heal operation if there are more than *10* concurrent client requests. +Healing is enabled by default. The following configuration settings allow for more staggered delay in terms of healing. The healing system by default adapts to the system speed and pauses up to '250ms' per object when the system has `max_io` number of concurrent requests. It is possible to adjust the `max_sleep` and `max_io` values thereby increasing the healing speed. The delays between each operation of the healer can be adjusted by the `mc admin config set alias/ heal max_sleep=1s` and maximum concurrent requests allowed before we start slowing things down can be configured with `mc admin config set alias/ heal max_io=30` . By default the wait delay is `250ms` beyond 100 concurrent operations. This means the healer will sleep *250 milliseconds* at max for each heal operation if there are more than *100* concurrent client requests. In most setups this is sufficient to heal the content after drive replacements. Setting `max_sleep` to a *lower* value and setting `max_io` to a *higher* value would make heal go faster. +Each node is responsible of healing its local drives; Each drive will have multiple heal workers which is the quarter of the number of CPU cores of the node or the quarter of the configured nr_requests of the drive (https://www.kernel.org/doc/Documentation/block/queue-sysfs.txt). It is also possible to provide a custom number of workers by using this command: `mc admin config set alias/ heal drive_workers=100` . + + ``` ~ mc admin config set alias/ heal KEY: heal manage object healing frequency and bitrot verification checks ARGS: -bitrotscan (on|off) perform bitrot scan on drives when checking objects during scanner -max_sleep (duration) maximum sleep duration between objects to slow down heal operation. eg. 2s -max_io (int) maximum IO requests allowed between objects to slow down heal operation. eg. 3 +bitrotscan (on|off) perform bitrot scan on drives when checking objects during scanner +max_sleep (duration) maximum sleep duration between objects to slow down heal operation. eg. 2s +max_io (int) maximum IO requests allowed between objects to slow down heal operation. eg. 3 +drive_workers (int) the number of workers per drive to heal a new disk replacement. ``` Example: The following settings will increase the heal operation speed by allowing healing operation to run without delay up to `100` concurrent requests, and the maximum delay between each heal operation is set to `300ms`. diff --git a/internal/config/heal/heal.go b/internal/config/heal/heal.go index 279bff4b3..5c57820a7 100644 --- a/internal/config/heal/heal.go +++ b/internal/config/heal/heal.go @@ -31,13 +31,15 @@ import ( // Compression environment variables const ( - Bitrot = "bitrotscan" - Sleep = "max_sleep" - IOCount = "max_io" + Bitrot = "bitrotscan" + Sleep = "max_sleep" + IOCount = "max_io" + DriveWorkers = "drive_workers" - EnvBitrot = "MINIO_HEAL_BITROTSCAN" - EnvSleep = "MINIO_HEAL_MAX_SLEEP" - EnvIOCount = "MINIO_HEAL_MAX_IO" + EnvBitrot = "MINIO_HEAL_BITROTSCAN" + EnvSleep = "MINIO_HEAL_MAX_SLEEP" + EnvIOCount = "MINIO_HEAL_MAX_IO" + EnvDriveWorkers = "MINIO_HEAL_DRIVE_WORKERS" ) var configMutex sync.RWMutex @@ -51,6 +53,8 @@ type Config struct { Sleep time.Duration `json:"sleep"` IOCount int `json:"iocount"` + DriveWorkers int `json:"drive_workers"` + // Cached value from Bitrot field cache struct { // -1: bitrot enabled, 0: bitrot disabled, > 0: bitrot cycle @@ -77,6 +81,13 @@ func (opts Config) Clone() (int, time.Duration, string) { return opts.IOCount, opts.Sleep, opts.Bitrot } +// GetWorkers returns the number of workers, -1 is none configured +func (opts Config) GetWorkers() int { + configMutex.RLock() + defer configMutex.RUnlock() + return opts.DriveWorkers +} + // Update updates opts with nopts func (opts *Config) Update(nopts Config) { configMutex.Lock() @@ -85,6 +96,7 @@ func (opts *Config) Update(nopts Config) { opts.Bitrot = nopts.Bitrot opts.IOCount = nopts.IOCount opts.Sleep = nopts.Sleep + opts.DriveWorkers = nopts.DriveWorkers opts.cache.bitrotCycle, _ = parseBitrotConfig(nopts.Bitrot) } @@ -103,6 +115,10 @@ var DefaultKVS = config.KVS{ Key: IOCount, Value: "100", }, + config.KV{ + Key: DriveWorkers, + Value: "", + }, } const minimumBitrotCycleInMonths = 1 @@ -154,5 +170,18 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { if err != nil { return cfg, fmt.Errorf("'heal:max_io' value invalid: %w", err) } + if ws := env.Get(EnvDriveWorkers, kvs.GetWithDefault(DriveWorkers, DefaultKVS)); ws != "" { + w, err := strconv.Atoi(ws) + if err != nil { + return cfg, fmt.Errorf("'heal:drive_workers' value invalid: %w", err) + } + if w < 1 { + return cfg, fmt.Errorf("'heal:drive_workers' value invalid: zero or negative integer unsupported") + } + cfg.DriveWorkers = w + } else { + cfg.DriveWorkers = -1 + } + return cfg, nil } diff --git a/internal/config/heal/help.go b/internal/config/heal/help.go index 5d873e0b1..5c743e316 100644 --- a/internal/config/heal/help.go +++ b/internal/config/heal/help.go @@ -45,5 +45,11 @@ var ( Optional: true, Type: "int", }, + config.HelpKV{ + Key: DriveWorkers, + Description: `the number of workers per drive to heal a new disk replacement` + defaultHelpPostfix(DriveWorkers), + Optional: true, + Type: "int", + }, } ) diff --git a/internal/disk/disk.go b/internal/disk/disk.go index d028df6d1..3b891edf3 100644 --- a/internal/disk/disk.go +++ b/internal/disk/disk.go @@ -37,6 +37,7 @@ type Info struct { Minor uint32 Name string Rotational *bool + NRRequests uint64 } // DevID is the drive major and minor ids diff --git a/internal/disk/stat_linux.go b/internal/disk/stat_linux.go index ce08ea940..0409f359b 100644 --- a/internal/disk/stat_linux.go +++ b/internal/disk/stat_linux.go @@ -98,6 +98,7 @@ func GetInfo(path string, firstTime bool) (info Info, err error) { } } if err == nil { + info.NRRequests = qst.NRRequests rot := qst.Rotational == 1 // Rotational is '1' if the device is HDD info.Rotational = &rot }