change healObjects to heal one set at time

This commit is contained in:
Harshavardhana 2021-07-06 12:46:01 -07:00
parent 51ad1d983d
commit 1c2f82938f
2 changed files with 33 additions and 42 deletions

View File

@ -614,7 +614,10 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
// Dynamic time delay. // Dynamic time delay.
t := UTCNow() t := UTCNow()
err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{Recursive: true, Remove: healDeleteDangling}, err = objAPI.HealObjects(ctx, bucket, prefix, madmin.HealOpts{
Recursive: true,
Remove: healDeleteDangling,
},
func(bucket, object, versionID string) error { func(bucket, object, versionID string) error {
// Wait for each heal as per crawler frequency. // Wait for each heal as per crawler frequency.
sleepDuration(time.Since(t), f.dataUsageCrawlMult) sleepDuration(time.Since(t), f.dataUsageCrawlMult)

View File

@ -1836,56 +1836,44 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
type HealObjectFn func(bucket, object, versionID string) error type HealObjectFn func(bucket, object, versionID string) error
func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
endWalkCh := make(chan struct{})
defer close(endWalkCh)
serverSetsEntryChs := make([][]FileInfoVersionsCh, 0, len(z.serverSets))
zoneDrivesPerSet := make([]int, 0, len(z.serverSets))
var skipped int
for _, zone := range z.serverSets { for _, zone := range z.serverSets {
serverSetsEntryChs = append(serverSetsEntryChs, entryChs := zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, true, ctx.Done())
zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, true, endWalkCh)) entriesInfos := make([]FileInfoVersions, 0, len(entryChs))
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) entriesValid := make([]bool, 0, len(entryChs))
}
serverSetsEntriesInfos := make([][]FileInfoVersions, 0, len(serverSetsEntryChs)) for {
serverSetsEntriesValid := make([][]bool, 0, len(serverSetsEntryChs)) entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entriesInfos, entriesValid)
for _, entryChs := range serverSetsEntryChs { if !ok {
serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfoVersions, len(entryChs))) skipped++
serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs))) // calculate number of skips to return
} // "NotFound" error at the end.
break
}
// If listing did not return any entries upon first attempt, we drivesPerSet := zone.setDriveCount
// return `ObjectNotFound`, to indicate the caller for any if quorumCount == drivesPerSet && opts.ScanMode == madmin.HealNormalScan {
// actions they may want to take as if `prefix` is missing. // Skip good entries.
err := toObjectErr(errFileNotFound, bucket, prefix) continue
for { }
entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid)
if !ok {
break
}
// Indicate that first attempt was a success and subsequent loop for _, version := range entry.Versions {
// knows that its not our first attempt at 'prefix' if err := healObject(bucket, version.Name, version.VersionID); err != nil {
err = nil return toObjectErr(err, bucket, version.Name)
}
if zoneIndex >= len(zoneDrivesPerSet) || zoneIndex < 0 {
return fmt.Errorf("invalid zone index returned: %d", zoneIndex)
}
if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan {
// Skip good entries.
continue
}
for _, version := range entry.Versions {
if err := healObject(bucket, version.Name, version.VersionID); err != nil {
return toObjectErr(err, bucket, version.Name)
} }
} }
} }
return err if skipped == len(z.serverSets) {
// If listing did not return any entries upon first attempt, we
// return `ObjectNotFound`, to indicate the caller for any
// actions they may want to take as if `prefix` is missing.
return toObjectErr(errFileNotFound, bucket, prefix)
}
return nil
} }
func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {