mirror of
https://github.com/minio/minio.git
synced 2025-07-14 03:11:52 -04:00
add throttling delays for HealObjects() based on crawler delay
This commit is contained in:
parent
472d78604b
commit
0e1dce37ad
@ -1298,7 +1298,7 @@ func (z *erasureServerSets) listObjectVersions(ctx context.Context, bucket, pref
|
||||
entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix})
|
||||
if entryChs == nil {
|
||||
endWalkCh = make(chan struct{})
|
||||
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet)
|
||||
entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, false, endWalkCh, zone.listTolerancePerSet)
|
||||
}
|
||||
serverSetsEntryChs = append(serverSetsEntryChs, entryChs)
|
||||
serverSetsEndWalkCh = append(serverSetsEndWalkCh, endWalkCh)
|
||||
@ -1770,7 +1770,7 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
|
||||
if opts.WalkVersions {
|
||||
var serverSetsEntryChs [][]FileInfoVersionsCh
|
||||
for _, zone := range z.serverSets {
|
||||
serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done()))
|
||||
serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, false, ctx.Done()))
|
||||
}
|
||||
|
||||
var serverSetsEntriesInfos [][]FileInfoVersions
|
||||
@ -1844,7 +1844,7 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
|
||||
|
||||
for _, zone := range z.serverSets {
|
||||
serverSetsEntryChs = append(serverSetsEntryChs,
|
||||
zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, endWalkCh))
|
||||
zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, true, endWalkCh))
|
||||
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount)
|
||||
}
|
||||
|
||||
|
@ -982,13 +982,13 @@ func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marke
|
||||
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false)
|
||||
}
|
||||
|
||||
func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
|
||||
return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
|
||||
func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive, healing bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh {
|
||||
return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, healing, endWalkCh, -1)
|
||||
}
|
||||
|
||||
// Starts a walk versions channel across N number of disks and returns a slice.
|
||||
// FileInfoCh which can be read from.
|
||||
func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh {
|
||||
func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh {
|
||||
var entryChs []FileInfoVersionsCh
|
||||
var wg sync.WaitGroup
|
||||
var mutex sync.Mutex
|
||||
@ -999,7 +999,7 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref
|
||||
go func(i int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
|
||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh)
|
||||
entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, healing, endWalkCh)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -161,7 +161,7 @@ func healErasureSet(ctx context.Context, prefix string, setIndex int, maxIO int,
|
||||
// disk is nil and not available.
|
||||
return
|
||||
}
|
||||
entryCh, err := disk.WalkVersions(ctx, bucket.Name, prefix, "", true, ctx.Done())
|
||||
entryCh, err := disk.WalkVersions(ctx, bucket.Name, prefix, "", true, false, ctx.Done())
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("%s returned %w - disk will be ignored and continued further", disk, err))
|
||||
// Disk walk returned error, ignore it.
|
||||
|
@ -49,7 +49,7 @@ type StorageAPI interface {
|
||||
DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error)
|
||||
|
||||
// WalkVersions in sorted order directly on disk.
|
||||
WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error)
|
||||
WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error)
|
||||
// Walk in sorted order directly on disk.
|
||||
Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error)
|
||||
// Walk in sorted order directly on disk.
|
||||
|
@ -484,12 +484,13 @@ func (client *storageRESTClient) WalkSplunk(ctx context.Context, volume, dirPath
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
||||
func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTDirPath, dirPath)
|
||||
values.Set(storageRESTMarkerPath, marker)
|
||||
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
||||
values.Set(storageRESTHealing, strconv.FormatBool(healing))
|
||||
respBody, err := client.call(ctx, storageRESTMethodWalkVersions, values, nil, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -17,7 +17,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API
|
||||
storageRESTVersion = "v22" // WalkVersions to throttle for healing
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
@ -72,6 +72,7 @@ const (
|
||||
storageRESTCount = "count"
|
||||
storageRESTMarkerPath = "marker"
|
||||
storageRESTRecursive = "recursive"
|
||||
storageRESTHealing = "healing"
|
||||
storageRESTBitrotAlgo = "bitrot-algo"
|
||||
storageRESTBitrotHash = "bitrot-hash"
|
||||
storageRESTDiskID = "disk-id"
|
||||
|
@ -560,11 +560,16 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
healing, err := strconv.ParseBool(vars[storageRESTHealing])
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
|
||||
fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
|
||||
fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, healing, r.Context().Done())
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -1107,7 +1112,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...)
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTHealing)...)
|
||||
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...)
|
||||
|
@ -142,11 +142,11 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for
|
||||
return p.storage.DeleteVol(ctx, volume, forceDelete)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
||||
func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) {
|
||||
if err := p.checkDiskStale(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh)
|
||||
return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, healing, endWalkCh)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) {
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
slashpath "path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -947,7 +948,13 @@ func (s *xlStorage) WalkSplunk(ctx context.Context, volume, dirPath, marker stri
|
||||
|
||||
// WalkVersions - is a sorted walker which returns file entries in lexically sorted order,
|
||||
// additionally along with metadata version info about each of those entries.
|
||||
func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) {
|
||||
func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) {
|
||||
delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
delayMult = dataCrawlSleepDefMult
|
||||
}
|
||||
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt32(&s.activeIOCount, -1)
|
||||
@ -982,10 +989,14 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st
|
||||
go func() {
|
||||
defer close(ch)
|
||||
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
|
||||
t := time.Now()
|
||||
entries, err := s.ListDir(ctx, volume, dirPath, -1)
|
||||
if err != nil {
|
||||
return false, nil, false
|
||||
}
|
||||
if healing {
|
||||
sleepDuration(time.Since(t), delayMult)
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return true, nil, false
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user