Avoid recursion and use a simple loop to merge entries (#8239)

This avoids stack overflows when there are
lot of entries to be skipped, this PR also
optimizes the code to reuse the buffers.
This commit is contained in:
Harshavardhana 2019-09-16 17:38:37 -07:00 committed by kannappanr
parent fa32c71a56
commit 5392eee250

View File

@ -821,13 +821,16 @@ func (f *FileInfoCh) Push(fi FileInfo) {
f.Valid = true
}
// Calculate least entry across multiple FileInfo channels, additionally
// returns a boolean to indicate if the caller needs to call again.
func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, bool) {
var entriesValid = make([]bool, len(entriesCh))
var entries = make([]FileInfo, len(entriesCh))
for i := range entriesCh {
entries[i], entriesValid[i] = entriesCh[i].Pop()
// Calculate least entry across multiple FileInfo channels,
// returns the least common entry and the total number of times
// we found this entry. Additionally also returns a boolean
// to indicate if the caller needs to call this function
// again to list the next entry. It is callers responsibility
// if the caller wishes to list N entries to call leastEntry
// N times until this boolean is 'false'.
func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) {
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
}
var isTruncated = false
@ -856,9 +859,9 @@ func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, b
}
// We haven't been able to find any least entry,
// this would mean that we don't have valid.
// this would mean that we don't have valid entry.
if !found {
return lentry, isTruncated
return lentry, 0, isTruncated
}
leastEntryCount := 0
@ -876,53 +879,76 @@ func leastEntry(entriesCh []FileInfoCh, totalDrives int, heal bool) (FileInfo, b
// Push all entries which are lexically higher
// and will be returned later in Pop()
entriesCh[i].Push(entries[i])
entryChs[i].Push(entries[i])
}
quorum := lentry.Quorum
if quorum == 0 {
quorum = totalDrives / 2
}
if heal {
// When healing is enabled, we should
// list only objects which need healing.
if leastEntryCount != totalDrives {
return lentry, isTruncated
}
} else {
if leastEntryCount >= quorum {
return lentry, isTruncated
}
}
return leastEntry(entriesCh, totalDrives, heal)
return lentry, leastEntryCount, isTruncated
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) {
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, heal bool) (entries FilesInfo) {
var i = 0
entriesInfos := make([]FileInfo, len(entryChs))
entriesValid := make([]bool, len(entryChs))
for {
fi, valid := leastEntry(entriesCh, totalDrives, heal)
fi, quorumCount, valid := leastEntry(entryChs, entriesInfos, entriesValid)
if !valid {
// We have reached EOF across all entryChs, break the loop.
break
}
if i == maxKeys {
entries.IsTruncated = true
// Re-insert the last entry so it can be
// listed in the next listing iteration.
for j := range entriesCh {
if !entriesCh[j].Valid {
entriesCh[j].Push(fi)
}
rquorum := fi.Quorum
// Quorum is zero for all directories.
if rquorum == 0 {
// Choose N/2 quoroum for directory entries.
rquorum = totalDrives / 2
}
if heal {
// When healing is enabled, we should
// list only objects which need healing.
if quorumCount == totalDrives {
// Skip good entries.
continue
}
} else {
// Regular listing, we skip entries not in quorum.
if quorumCount < rquorum {
// Skip entries which do not have quorum.
continue
}
break
}
entries.Files = append(entries.Files, fi)
i++
if i == maxKeys {
entries.IsTruncated = isTruncated(entryChs, entriesInfos, entriesValid)
break
}
}
return entries
}
func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) bool {
for i := range entryChs {
entries[i], entriesValid[i] = entryChs[i].Pop()
}
var isTruncated = false
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
for i := range entryChs {
if entriesValid[i] {
entryChs[i].Push(entries[i])
}
}
return isTruncated
}
// Starts a walk channel across all disks and returns a slice.
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh {
var entryChs []FileInfoCh
@ -955,15 +981,26 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
var eof bool
var prevPrefix string
entriesValid := make([]bool, len(entryChs))
entries := make([]FileInfo, len(entryChs))
for {
if len(objInfos) == maxKeys {
break
}
result, ok := leastEntry(entryChs, s.drivesPerSet, false)
result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
if !ok {
eof = true
break
}
rquorum := result.Quorum
// Quorum is zero for all directories.
if rquorum == 0 {
// Choose N/2 quorum for directory entries.
rquorum = s.drivesPerSet / 2
}
if quorumCount < rquorum {
continue
}
var objInfo ObjectInfo