Fetch fileinfo concurrently (#11700)

For non-erasure setups fetch up to 10 fileinfos concurrently.

Fixes #11625
This commit is contained in:
Klaus Post 2021-03-08 11:30:43 -08:00 committed by GitHub
parent 097e5eba9f
commit 3ff5f55dcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,12 +19,12 @@ package cmd
import (
"context"
"errors"
"sync"
"strings"
"sync"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/sync/errgroup"
)
const (
@ -340,58 +340,83 @@ func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, d
walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
}
var objInfos []ObjectInfo
var eof bool
var nextMarker string
// List until maxKeys requested.
for i := 0; i < maxKeys; {
g := errgroup.WithNErrs(maxKeys).WithConcurrency(10)
ctx, cancel := g.WithCancelOnError(ctx)
defer cancel()
objInfoFound := make([]*ObjectInfo, maxKeys)
var i int
for i = 0; i < maxKeys; i++ {
i := i
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
break
}
var objInfo ObjectInfo
var err error
if HasSuffix(walkResult.entry, SlashSeparator) {
for _, getObjectInfoDir := range getObjectInfoDirs {
objInfo, err = getObjectInfoDir(ctx, bucket, walkResult.entry)
if err == nil {
break
}
if err == errFileNotFound {
err = nil
objInfo = ObjectInfo{
Bucket: bucket,
Name: walkResult.entry,
IsDir: true,
g.Go(func() error {
for _, getObjectInfoDir := range getObjectInfoDirs {
objInfo, err := getObjectInfoDir(ctx, bucket, walkResult.entry)
if err == nil {
objInfoFound[i] = &objInfo
// Done...
return nil
}
// Add temp, may be overridden,
if err == errFileNotFound {
objInfoFound[i] = &ObjectInfo{
Bucket: bucket,
Name: walkResult.entry,
IsDir: true,
}
continue
}
return toObjectErr(err, bucket, prefix)
}
}
return nil
}, i)
} else {
objInfo, err = getObjInfo(ctx, bucket, walkResult.entry)
g.Go(func() error {
objInfo, err := getObjInfo(ctx, bucket, walkResult.entry)
if err != nil {
// Ignore errFileNotFound as the object might have got
// deleted in the interim period of listing and getObjectInfo(),
// ignore quorum error as it might be an entry from an outdated disk.
if IsErrIgnored(err, []error{
errFileNotFound,
errErasureReadQuorum,
}...) {
return nil
}
return toObjectErr(err, bucket, prefix)
}
objInfoFound[i] = &objInfo
return nil
}, i)
}
if err != nil {
// Ignore errFileNotFound as the object might have got
// deleted in the interim period of listing and getObjectInfo(),
// ignore quorum error as it might be an entry from an outdated disk.
if IsErrIgnored(err, []error{
errFileNotFound,
errErasureReadQuorum,
}...) {
continue
}
return loi, toObjectErr(err, bucket, prefix)
}
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
if walkResult.end {
eof = true
break
}
i++
}
if err := g.WaitErr(); err != nil {
return loi, err
}
// Copy found objects
objInfos := make([]ObjectInfo, 0, i+1)
for _, objInfo := range objInfoFound {
if objInfo == nil {
continue
}
objInfos = append(objInfos, *objInfo)
nextMarker = objInfo.Name
}
// Save list routine for the next marker if we haven't reached EOF.