diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 4c199f068..6f3e59057 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -1298,7 +1298,7 @@ func (z *erasureServerSets) listObjectVersions(ctx context.Context, bucket, pref entryChs, endWalkCh := zone.poolVersions.Release(listParams{bucket, recursive, marker, prefix}) if entryChs == nil { endWalkCh = make(chan struct{}) - entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, zone.listTolerancePerSet) + entryChs = zone.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, false, endWalkCh, zone.listTolerancePerSet) } serverSetsEntryChs = append(serverSetsEntryChs, entryChs) serverSetsEndWalkCh = append(serverSetsEndWalkCh, endWalkCh) @@ -1770,7 +1770,7 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res if opts.WalkVersions { var serverSetsEntryChs [][]FileInfoVersionsCh for _, zone := range z.serverSets { - serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, ctx.Done())) + serverSetsEntryChs = append(serverSetsEntryChs, zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, false, ctx.Done())) } var serverSetsEntriesInfos [][]FileInfoVersions @@ -1844,7 +1844,7 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri for _, zone := range z.serverSets { serverSetsEntryChs = append(serverSetsEntryChs, - zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, endWalkCh)) + zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, true, endWalkCh)) zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 816b1e8b9..2d3686776 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -982,13 +982,13 @@ func (s *erasureSets) startMergeWalks(ctx context.Context, bucket, prefix, marke return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1, false) } -func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh { - return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1) +func (s *erasureSets) startMergeWalksVersions(ctx context.Context, bucket, prefix, marker string, recursive, healing bool, endWalkCh <-chan struct{}) []FileInfoVersionsCh { + return s.startMergeWalksVersionsN(ctx, bucket, prefix, marker, recursive, healing, endWalkCh, -1) } // Starts a walk versions channel across N number of disks and returns a slice. // FileInfoCh which can be read from. -func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh { +func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, prefix, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoVersionsCh { var entryChs []FileInfoVersionsCh var wg sync.WaitGroup var mutex sync.Mutex @@ -999,7 +999,7 @@ func (s *erasureSets) startMergeWalksVersionsN(ctx context.Context, bucket, pref go func(i int, disk StorageAPI) { defer wg.Done() - entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, endWalkCh) + entryCh, err := disk.WalkVersions(GlobalContext, bucket, prefix, marker, recursive, healing, endWalkCh) if err != nil { return } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 477067acb..c49b15927 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -161,7 +161,7 @@ func healErasureSet(ctx context.Context, prefix string, setIndex int, maxIO int, // disk is nil and not available. return } - entryCh, err := disk.WalkVersions(ctx, bucket.Name, prefix, "", true, ctx.Done()) + entryCh, err := disk.WalkVersions(ctx, bucket.Name, prefix, "", true, false, ctx.Done()) if err != nil { logger.LogIf(ctx, fmt.Errorf("%s returned %w - disk will be ignored and continued further", disk, err)) // Disk walk returned error, ignore it. diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index da3bc1591..055c2dd44 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -49,7 +49,7 @@ type StorageAPI interface { DeleteVol(ctx context.Context, volume string, forceDelete bool) (err error) // WalkVersions in sorted order directly on disk. - WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) + WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) // Walk in sorted order directly on disk. Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) // Walk in sorted order directly on disk. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 310af1a9f..e30687da6 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -484,12 +484,13 @@ func (client *storageRESTClient) WalkSplunk(ctx context.Context, volume, dirPath return ch, nil } -func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { +func (client *storageRESTClient) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTDirPath, dirPath) values.Set(storageRESTMarkerPath, marker) values.Set(storageRESTRecursive, strconv.FormatBool(recursive)) + values.Set(storageRESTHealing, strconv.FormatBool(healing)) respBody, err := client.call(ctx, storageRESTMethodWalkVersions, values, nil, -1) if err != nil { return nil, err diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 6145eb2c3..597d8222f 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API + storageRESTVersion = "v22" // WalkVersions to throttle for healing storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -72,6 +72,7 @@ const ( storageRESTCount = "count" storageRESTMarkerPath = "marker" storageRESTRecursive = "recursive" + storageRESTHealing = "healing" storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotHash = "bitrot-hash" storageRESTDiskID = "disk-id" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index fe8d23a7b..344d3ddf2 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -560,11 +560,16 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R s.writeErrorResponse(w, err) return } + healing, err := strconv.ParseBool(vars[storageRESTHealing]) + if err != nil { + s.writeErrorResponse(w, err) + return + } setEventStreamHeaders(w) encoder := gob.NewEncoder(w) - fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done()) + fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, healing, r.Context().Done()) if err != nil { s.writeErrorResponse(w, err) return @@ -1107,7 +1112,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkVersions).HandlerFunc(httpTraceHdrs(server.WalkVersionsHandler)). - Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive)...) + Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTHealing)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler)). Queries(restQueries(storageRESTVolume, storageRESTTotalVersions)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 248905acd..32eb0a89a 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -142,11 +142,11 @@ func (p *xlStorageDiskIDCheck) DeleteVol(ctx context.Context, volume string, for return p.storage.DeleteVol(ctx, volume, forceDelete) } -func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { +func (p *xlStorageDiskIDCheck) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (chan FileInfoVersions, error) { if err := p.checkDiskStale(); err != nil { return nil, err } - return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, endWalkCh) + return p.storage.WalkVersions(ctx, volume, dirPath, marker, recursive, healing, endWalkCh) } func (p *xlStorageDiskIDCheck) Walk(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (chan FileInfo, error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index f87d4da1a..37fcc3ae5 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -33,6 +33,7 @@ import ( slashpath "path" "path/filepath" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -947,7 +948,13 @@ func (s *xlStorage) WalkSplunk(ctx context.Context, volume, dirPath, marker stri // WalkVersions - is a sorted walker which returns file entries in lexically sorted order, // additionally along with metadata version info about each of those entries. -func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) { +func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker string, recursive bool, healing bool, endWalkCh <-chan struct{}) (ch chan FileInfoVersions, err error) { + delayMult, err := strconv.ParseFloat(env.Get(envDataUsageCrawlDelay, "10.0"), 64) + if err != nil { + logger.LogIf(ctx, err) + delayMult = dataCrawlSleepDefMult + } + atomic.AddInt32(&s.activeIOCount, 1) defer func() { atomic.AddInt32(&s.activeIOCount, -1) @@ -982,10 +989,14 @@ func (s *xlStorage) WalkVersions(ctx context.Context, volume, dirPath, marker st go func() { defer close(ch) listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) { + t := time.Now() entries, err := s.ListDir(ctx, volume, dirPath, -1) if err != nil { return false, nil, false } + if healing { + sleepDuration(time.Since(t), delayMult) + } if len(entries) == 0 { return true, nil, false }