From 5392eee250ad3d3fb7321c5d9fde2a010043f57e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 16 Sep 2019 17:38:37 -0700 Subject: [PATCH] Avoid recursion and use a simple loop to merge entries (#8239) This avoids stack overflows when there are lot of entries to be skipped, this PR also optimizes the code to reuse the buffers. --- cmd/xl-sets.go | 115 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 39 deletions(-) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index b25b2cd6e..5a0f3889e 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -821,13 +821,16 @@ func (f *FileInfoCh) Push(fi FileInfo) { f.Valid = true } -// Calculate least entry across multiple FileInfo channels, additionally -// returns a boolean to indicate if the caller needs to call again. -func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, bool) { - var entriesValid = make([]bool, len(entriesCh)) - var entries = make([]FileInfo, len(entriesCh)) - for i := range entriesCh { - entries[i], entriesValid[i] = entriesCh[i].Pop() +// Calculate least entry across multiple FileInfo channels, +// returns the least common entry and the total number of times +// we found this entry. Additionally also returns a boolean +// to indicate if the caller needs to call this function +// again to list the next entry. It is callers responsibility +// if the caller wishes to list N entries to call leastEntry +// N times until this boolean is 'false'. +func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { + for i := range entryChs { + entries[i], entriesValid[i] = entryChs[i].Pop() } var isTruncated = false @@ -856,9 +859,9 @@ func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, b } // We haven't been able to find any least entry, - // this would mean that we don't have valid. + // this would mean that we don't have valid entry. if !found { - return lentry, isTruncated + return lentry, 0, isTruncated } leastEntryCount := 0 @@ -876,53 +879,76 @@ func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, b // Push all entries which are lexically higher // and will be returned later in Pop() - entriesCh[i].Push(entries[i]) + entryChs[i].Push(entries[i]) } - quorum := lentry.Quorum - if quorum == 0 { - quorum = totalDrives / 2 - } - - if heal { - // When healing is enabled, we should - // list only objects which need healing. - if leastEntryCount != totalDrives { - return lentry, isTruncated - } - } else { - if leastEntryCount >= quorum { - return lentry, isTruncated - } - } - return leastEntry(entriesCh, totalDrives, heal) + return lentry, leastEntryCount, isTruncated } // mergeEntriesCh - merges FileInfo channel to entries upto maxKeys. -func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) { +func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) { var i = 0 + entriesInfos := make([]FileInfo, len(entryChs)) + entriesValid := make([]bool, len(entryChs)) for { - fi, valid := leastEntry(entriesCh, totalDrives, heal) + fi, quorumCount, valid := leastEntry(entryChs, entriesInfos, entriesValid) if !valid { + // We have reached EOF across all entryChs, break the loop. break } - if i == maxKeys { - entries.IsTruncated = true - // Re-insert the last entry so it can be - // listed in the next listing iteration. - for j := range entriesCh { - if !entriesCh[j].Valid { - entriesCh[j].Push(fi) - } + + rquorum := fi.Quorum + // Quorum is zero for all directories. + if rquorum == 0 { + // Choose N/2 quoroum for directory entries. + rquorum = totalDrives / 2 + } + + if heal { + // When healing is enabled, we should + // list only objects which need healing. + if quorumCount == totalDrives { + // Skip good entries. + continue + } + } else { + // Regular listing, we skip entries not in quorum. + if quorumCount < rquorum { + // Skip entries which do not have quorum. + continue } - break } entries.Files = append(entries.Files, fi) i++ + if i == maxKeys { + entries.IsTruncated = isTruncated(entryChs, entriesInfos, entriesValid) + break + } } return entries } +func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) bool { + for i := range entryChs { + entries[i], entriesValid[i] = entryChs[i].Pop() + } + + var isTruncated = false + for _, valid := range entriesValid { + if !valid { + continue + } + isTruncated = true + break + } + for i := range entryChs { + if entriesValid[i] { + entryChs[i].Push(entries[i]) + } + } + return isTruncated +} + // Starts a walk channel across all disks and returns a slice. func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh { var entryChs []FileInfoCh @@ -955,15 +981,26 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker var eof bool var prevPrefix string + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfo, len(entryChs)) for { if len(objInfos) == maxKeys { break } - result, ok := leastEntry(entryChs, s.drivesPerSet, false) + result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) if !ok { eof = true break } + rquorum := result.Quorum + // Quorum is zero for all directories. + if rquorum == 0 { + // Choose N/2 quorum for directory entries. + rquorum = s.drivesPerSet / 2 + } + if quorumCount < rquorum { + continue + } var objInfo ObjectInfo