mirror of
https://github.com/minio/minio.git
synced 2025-04-17 01:10:29 -04:00
New disk healing goes through the healing workers (#16568)
This commit is contained in:
parent
1828fb212a
commit
b1d98febfd
@ -19,9 +19,13 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/minio/madmin-go/v2"
|
"github.com/minio/madmin-go/v2"
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/pkg/env"
|
||||||
)
|
)
|
||||||
|
|
||||||
// healTask represents what to heal along with options
|
// healTask represents what to heal along with options
|
||||||
@ -111,9 +115,19 @@ func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
|
|
||||||
func newHealRoutine() *healRoutine {
|
func newHealRoutine() *healRoutine {
|
||||||
workers := runtime.GOMAXPROCS(0) / 2
|
workers := runtime.GOMAXPROCS(0) / 2
|
||||||
|
|
||||||
|
if envHealWorkers := env.Get("_MINIO_HEAL_WORKERS", ""); envHealWorkers != "" {
|
||||||
|
if numHealers, err := strconv.Atoi(envHealWorkers); err != nil {
|
||||||
|
logger.LogIf(context.Background(), fmt.Errorf("invalid _MINIO_HEAL_WORKERS value: %w", err))
|
||||||
|
} else {
|
||||||
|
workers = numHealers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if workers == 0 {
|
if workers == 0 {
|
||||||
workers = 4
|
workers = 4
|
||||||
}
|
}
|
||||||
|
|
||||||
return &healRoutine{
|
return &healRoutine{
|
||||||
tasks: make(chan healTask),
|
tasks: make(chan healTask),
|
||||||
workers: workers,
|
workers: workers,
|
||||||
|
@ -21,17 +21,14 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
"github.com/minio/madmin-go/v2"
|
"github.com/minio/madmin-go/v2"
|
||||||
"github.com/minio/minio/internal/color"
|
"github.com/minio/minio/internal/color"
|
||||||
"github.com/minio/minio/internal/config/storageclass"
|
"github.com/minio/minio/internal/config/storageclass"
|
||||||
"github.com/minio/minio/internal/jobtokens"
|
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/console"
|
"github.com/minio/pkg/console"
|
||||||
"github.com/minio/pkg/env"
|
|
||||||
"github.com/minio/pkg/wildcard"
|
"github.com/minio/pkg/wildcard"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -163,8 +160,6 @@ func mustGetHealSequence(ctx context.Context) *healSequence {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const envHealWorkers = "_MINIO_HEAL_WORKERS"
|
|
||||||
|
|
||||||
// healErasureSet lists and heals all objects in a specific erasure set
|
// healErasureSet lists and heals all objects in a specific erasure set
|
||||||
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
|
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, tracker *healingTracker) error {
|
||||||
bgSeq := mustGetHealSequence(ctx)
|
bgSeq := mustGetHealSequence(ctx)
|
||||||
@ -185,16 +180,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// numHealers - number of concurrent heal jobs, defaults to 1
|
|
||||||
numHealers, err := strconv.Atoi(env.Get(envHealWorkers, "1"))
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("invalid %s value %v, defaulting to 1", envHealWorkers, err))
|
|
||||||
}
|
|
||||||
if numHealers < 1 {
|
|
||||||
numHealers = 1
|
|
||||||
}
|
|
||||||
// jt will never be nil since we ensure that numHealers > 0
|
|
||||||
jt, _ := jobtokens.New(numHealers)
|
|
||||||
var retErr error
|
var retErr error
|
||||||
// Heal all buckets with all objects
|
// Heal all buckets with all objects
|
||||||
for _, bucket := range healBuckets {
|
for _, bucket := range healBuckets {
|
||||||
@ -290,8 +275,6 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
|
|
||||||
// Note: updates from healEntry to tracker must be sent on results channel.
|
// Note: updates from healEntry to tracker must be sent on results channel.
|
||||||
healEntry := func(entry metaCacheEntry) {
|
healEntry := func(entry metaCacheEntry) {
|
||||||
defer jt.Give()
|
|
||||||
|
|
||||||
if entry.name == "" && len(entry.metadata) == 0 {
|
if entry.name == "" && len(entry.metadata) == 0 {
|
||||||
// ignore entries that don't have metadata.
|
// ignore entries that don't have metadata.
|
||||||
return
|
return
|
||||||
@ -339,20 +322,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// erasureObjects layer needs object names to be encoded
|
|
||||||
encodedEntryName := encodeDirObject(entry.name)
|
|
||||||
if healDeleteDangling {
|
|
||||||
err := er.checkAbandonedParts(ctx, bucket, encodedEntryName, madmin.HealOpts{Remove: healDeleteDangling})
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, version := range fivs.Versions {
|
for _, version := range fivs.Versions {
|
||||||
if _, err := er.HealObject(ctx, bucket, encodedEntryName,
|
if err := bgSeq.queueHealTask(healSource{
|
||||||
version.VersionID, madmin.HealOpts{
|
bucket: bucket,
|
||||||
ScanMode: scanMode,
|
object: version.Name,
|
||||||
Remove: healDeleteDangling,
|
versionID: version.VersionID,
|
||||||
}); err != nil {
|
}, madmin.HealItemObject); err != nil {
|
||||||
// If not deleted, assume they failed.
|
// If not deleted, assume they failed.
|
||||||
result = healEntryFailure(uint64(version.Size))
|
result = healEntryFailure(uint64(version.Size))
|
||||||
if version.VersionID != "" {
|
if version.VersionID != "" {
|
||||||
@ -388,7 +363,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = listPathRaw(ctx, listPathRawOptions{
|
err := listPathRaw(ctx, listPathRawOptions{
|
||||||
disks: disks,
|
disks: disks,
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
recursive: true,
|
recursive: true,
|
||||||
@ -396,8 +371,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
minDisks: 1,
|
minDisks: 1,
|
||||||
reportNotFound: false,
|
reportNotFound: false,
|
||||||
agreed: func(entry metaCacheEntry) {
|
agreed: func(entry metaCacheEntry) {
|
||||||
jt.Take()
|
healEntry(entry)
|
||||||
go healEntry(entry)
|
|
||||||
},
|
},
|
||||||
partial: func(entries metaCacheEntries, _ []error) {
|
partial: func(entries metaCacheEntries, _ []error) {
|
||||||
entry, ok := entries.resolve(&resolver)
|
entry, ok := entries.resolve(&resolver)
|
||||||
@ -406,12 +380,10 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
|||||||
// proceed to heal nonetheless.
|
// proceed to heal nonetheless.
|
||||||
entry, _ = entries.firstFound()
|
entry, _ = entries.firstFound()
|
||||||
}
|
}
|
||||||
jt.Take()
|
healEntry(*entry)
|
||||||
go healEntry(*entry)
|
|
||||||
},
|
},
|
||||||
finished: nil,
|
finished: nil,
|
||||||
})
|
})
|
||||||
jt.Wait() // synchronize all the concurrent heal jobs
|
|
||||||
close(results)
|
close(results)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Set this such that when we return this function
|
// Set this such that when we return this function
|
||||||
|
Loading…
x
Reference in New Issue
Block a user