diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index 8d82341fd..21386f8ae 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -24,7 +24,6 @@ import ( "math/rand" "net/http" "strconv" - "strings" "sync" "time" @@ -658,97 +657,6 @@ func (z *erasureServerSets) ListObjectsV2(ctx context.Context, bucket, prefix, c return listObjectsV2Info, err } -// Calculate least entry across serverSets and across multiple FileInfoVersions -// channels, returns the least common entry and the total number of times -// we found this entry. Additionally also returns a boolean -// to indicate if the caller needs to call this function -// again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call lexicallySortedEntry -// N times until this boolean is 'false'. -func lexicallySortedEntryZoneVersions(zoneEntryChs [][]FileInfoVersionsCh, zoneEntries [][]FileInfoVersions, zoneEntriesValid [][]bool) (FileInfoVersions, int, int, bool) { - for i, entryChs := range zoneEntryChs { - for j := range entryChs { - zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() - } - } - - var isTruncated = false - for _, entriesValid := range zoneEntriesValid { - for _, valid := range entriesValid { - if !valid { - continue - } - isTruncated = true - break - } - if isTruncated { - break - } - } - - var lentry FileInfoVersions - var found bool - var zoneIndex = -1 - for i, entriesValid := range zoneEntriesValid { - for j, valid := range entriesValid { - if !valid { - continue - } - if !found { - lentry = zoneEntries[i][j] - found = true - zoneIndex = i - continue - } - str1 := zoneEntries[i][j].Name - str2 := lentry.Name - if HasSuffix(str1, globalDirSuffix) { - str1 = strings.TrimSuffix(str1, globalDirSuffix) + slashSeparator - } - if HasSuffix(str2, globalDirSuffix) { - str2 = strings.TrimSuffix(str2, globalDirSuffix) + slashSeparator - } - - if str1 < str2 { - lentry = zoneEntries[i][j] - zoneIndex = i - } - } - } - - // We haven't been able to find any least entry, - // this would mean that we don't have valid entry. - if !found { - return lentry, 0, zoneIndex, isTruncated - } - - lexicallySortedEntryCount := 0 - for i, entriesValid := range zoneEntriesValid { - for j, valid := range entriesValid { - if !valid { - continue - } - - // Entries are duplicated across disks, - // we should simply skip such entries. - if lentry.Name == zoneEntries[i][j].Name && lentry.LatestModTime.Equal(zoneEntries[i][j].LatestModTime) { - lexicallySortedEntryCount++ - continue - } - - // Push all entries which are lexically higher - // and will be returned later in Pop() - zoneEntryChs[i][j].Push(zoneEntries[i][j]) - } - } - - if HasSuffix(lentry.Name, globalDirSuffix) { - lentry.Name = strings.TrimSuffix(lentry.Name, globalDirSuffix) + slashSeparator - } - - return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated -} - func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) { loi := ListObjectVersionsInfo{} if marker == "" && versionMarker != "" { @@ -1294,51 +1202,56 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res type HealObjectFn func(bucket, object, versionID string) error func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error { - endWalkCh := make(chan struct{}) - defer close(endWalkCh) - - serverSetsEntryChs := make([][]FileInfoVersionsCh, 0, len(z.serverSets)) - zoneDrivesPerSet := make([]int, 0, len(z.serverSets)) - - for _, zone := range z.serverSets { - serverSetsEntryChs = append(serverSetsEntryChs, - zone.startMergeWalksVersions(ctx, bucket, prefix, "", true, endWalkCh)) - zoneDrivesPerSet = append(zoneDrivesPerSet, zone.setDriveCount) - } - - serverSetsEntriesInfos := make([][]FileInfoVersions, 0, len(serverSetsEntryChs)) - serverSetsEntriesValid := make([][]bool, 0, len(serverSetsEntryChs)) - for _, entryChs := range serverSetsEntryChs { - serverSetsEntriesInfos = append(serverSetsEntriesInfos, make([]FileInfoVersions, len(entryChs))) - serverSetsEntriesValid = append(serverSetsEntriesValid, make([]bool, len(entryChs))) - } - // If listing did not return any entries upon first attempt, we // return `ObjectNotFound`, to indicate the caller for any // actions they may want to take as if `prefix` is missing. err := toObjectErr(errFileNotFound, bucket, prefix) - for { - entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZoneVersions(serverSetsEntryChs, serverSetsEntriesInfos, serverSetsEntriesValid) - if !ok { - break - } + for _, erasureSet := range z.serverSets { + for _, set := range erasureSet.sets { + var entryChs []FileInfoVersionsCh + var mu sync.Mutex + var wg sync.WaitGroup + for _, disk := range set.getOnlineDisks() { + disk := disk + wg.Add(1) + go func() { + defer wg.Done() + entryCh, err := disk.WalkVersions(ctx, bucket, prefix, "", true, ctx.Done()) + if err != nil { + // Disk walk returned error, ignore it. + return + } + mu.Lock() + entryChs = append(entryChs, FileInfoVersionsCh{ + Ch: entryCh, + }) + mu.Unlock() + }() + } + wg.Wait() - // Indicate that first attempt was a success and subsequent loop - // knows that its not our first attempt at 'prefix' - err = nil + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfoVersions, len(entryChs)) - if zoneIndex >= len(zoneDrivesPerSet) || zoneIndex < 0 { - return fmt.Errorf("invalid zone index returned: %d", zoneIndex) - } + for { + entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) + if !ok { + break + } - if quorumCount == zoneDrivesPerSet[zoneIndex] && opts.ScanMode == madmin.HealNormalScan { - // Skip good entries. - continue - } + // Indicate that first attempt was a success and subsequent loop + // knows that its not our first attempt at 'prefix' + err = nil - for _, version := range entry.Versions { - if err := healObject(bucket, version.Name, version.VersionID); err != nil { - return toObjectErr(err, bucket, version.Name) + if quorumCount == z.SetDriveCount() && opts.ScanMode == madmin.HealNormalScan { + continue + } + + for _, version := range entry.Versions { + if err := healObject(bucket, version.Name, version.VersionID); err != nil { + return toObjectErr(err, bucket, version.Name) + } + } } } }