fix: simplify loading IAM users to avoid using regular ListObjects() (#13392)

- avoids relying in listQuorum from the underlying listObjects()
  and potentially missing entries if any.

- avoid the entire merging logic etc, listing raw set by set
  and loading whatever is found is cleaner when dealing with
  a large cluster for IAM metadata.
This commit is contained in:
Harshavardhana 2021-10-12 09:53:17 -07:00 committed by GitHub
parent 1e117b780a
commit 13e41f2c68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 70 additions and 39 deletions

View File

@ -1603,51 +1603,82 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
return err
}
if opts.WalkVersions {
go func() {
defer close(results)
var marker, versionIDMarker string
for {
loi, err := z.ListObjectVersions(ctx, bucket, prefix, marker, versionIDMarker, "", 1000)
if err != nil {
break
}
for _, obj := range loi.Objects {
results <- obj
}
if !loi.IsTruncated {
break
}
marker = loi.NextMarker
versionIDMarker = loi.NextVersionIDMarker
}
}()
return nil
}
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
defer close(results)
var marker string
for {
loi, err := z.ListObjects(ctx, bucket, prefix, marker, "", 1000)
if err != nil {
break
}
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup
for _, set := range erasureSet.sets {
set := set
wg.Add(1)
go func() {
defer wg.Done()
for _, obj := range loi.Objects {
results <- obj
}
disks, _ := set.getOnlineDisksWithHealing()
if len(disks) == 0 {
cancel()
return
}
if !loi.IsTruncated {
break
}
loadEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
marker = loi.NextMarker
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
cancel()
return
}
for _, version := range fivs.Versions {
results <- version.ToObjectInfo(bucket, version.Name)
}
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
}
path := baseDirFromPrefix(prefix)
if path == "" {
path = prefix
}
lopts := listPathRawOptions{
disks: disks,
bucket: bucket,
path: path,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: loadEntry,
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
loadEntry(*entry)
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
return
}
}()
}
wg.Wait()
}
}()