Add fast max-keys=1 support for Listing (#15670)

Add a listing option to stop when the limit is reached.  
This can be used by stateless listings for fast results.
This commit is contained in:
Klaus Post 2022-09-09 17:13:06 +02:00 committed by GitHub
parent b579163802
commit ff9a74b91f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 4 deletions

View File

@ -231,7 +231,7 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
go func(o listPathOptions) { go func(o listPathOptions) {
defer wg.Done() defer wg.Done()
o.Limit = 0 o.StopDiskAtLimit = true
listErr = z.listMerged(listCtx, o, filterCh) listErr = z.listMerged(listCtx, o, filterCh)
o.debugln("listMerged returned with", listErr) o.debugln("listMerged returned with", listErr)
}(*o) }(*o)

View File

@ -104,6 +104,10 @@ type listPathOptions struct {
// Replication configuration // Replication configuration
Replication replicationConfig Replication replicationConfig
// StopDiskAtLimit will stop listing on each disk when limit number off objects has been returned.
StopDiskAtLimit bool
// pool and set of where the cache is located. // pool and set of where the cache is located.
pool, set int pool, set int
} }
@ -762,6 +766,12 @@ func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, r
resolver.requestedVersions = 1 resolver.requestedVersions = 1
} }
var limit int
if o.Limit > 0 && o.StopDiskAtLimit {
// Over-read by 1 to know if we truncate results.
limit = o.Limit + 1
}
ctxDone := ctx.Done() ctxDone := ctx.Done()
return listPathRaw(ctx, listPathRawOptions{ return listPathRaw(ctx, listPathRawOptions{
disks: []StorageAPI{es.disk}, disks: []StorageAPI{es.disk},
@ -771,6 +781,7 @@ func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, r
filterPrefix: o.FilterPrefix, filterPrefix: o.FilterPrefix,
minDisks: 1, minDisks: 1,
forwardTo: o.Marker, forwardTo: o.Marker,
perDiskLimit: limit,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
select { select {
case <-ctxDone: case <-ctxDone:
@ -829,7 +840,12 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
if !o.Versioned { if !o.Versioned {
resolver.requestedVersions = 1 resolver.requestedVersions = 1
} }
var limit int
if o.Limit > 0 && o.StopDiskAtLimit {
// Over-read by 2 + 1 for every 16 in limit to give some space for resolver
// And know if we have truncated.
limit = o.Limit + 2 + (o.Limit / 16)
}
ctxDone := ctx.Done() ctxDone := ctx.Done()
return listPathRaw(ctx, listPathRawOptions{ return listPathRaw(ctx, listPathRawOptions{
disks: disks, disks: disks,
@ -840,6 +856,7 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
filterPrefix: o.FilterPrefix, filterPrefix: o.FilterPrefix,
minDisks: listingQuorum, minDisks: listingQuorum,
forwardTo: o.Marker, forwardTo: o.Marker,
perDiskLimit: limit,
agreed: func(entry metaCacheEntry) { agreed: func(entry metaCacheEntry) {
select { select {
case <-ctxDone: case <-ctxDone:
@ -1153,6 +1170,10 @@ type listPathRawOptions struct {
minDisks int minDisks int
reportNotFound bool reportNotFound bool
// perDiskLimit will limit each disk to return n objects.
// If <= 0 all results will be returned until canceled.
perDiskLimit int
// Callbacks with results: // Callbacks with results:
// If set to nil, it will not be called. // If set to nil, it will not be called.
@ -1227,6 +1248,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
werr = errDiskNotFound werr = errDiskNotFound
} else { } else {
werr = d.WalkDir(ctx, WalkDirOptions{ werr = d.WalkDir(ctx, WalkDirOptions{
Limit: opts.perDiskLimit,
Bucket: opts.bucket, Bucket: opts.bucket,
BaseDir: opts.path, BaseDir: opts.path,
Recursive: opts.recursive, Recursive: opts.recursive,
@ -1246,6 +1268,7 @@ func listPathRaw(ctx context.Context, opts listPathRawOptions) (err error) {
// askDisks is less than total // askDisks is less than total
// number of disks per set. // number of disks per set.
werr = fd.WalkDir(ctx, WalkDirOptions{ werr = fd.WalkDir(ctx, WalkDirOptions{
Limit: opts.perDiskLimit,
Bucket: opts.bucket, Bucket: opts.bucket,
BaseDir: opts.path, BaseDir: opts.path,
Recursive: opts.recursive, Recursive: opts.recursive,

View File

@ -53,6 +53,9 @@ type WalkDirOptions struct {
// ForwardTo will forward to the given object path. // ForwardTo will forward to the given object path.
ForwardTo string ForwardTo string
// Limit the number of returned objects if > 0.
Limit int
} }
// WalkDir will traverse a directory and return all entries found. // WalkDir will traverse a directory and return all entries found.
@ -84,6 +87,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return err return err
} }
defer close(out) defer close(out)
var objsReturned int
// Fast exit track to check if we are listing an object with // Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content. // a trailing slash, this will avoid to list the object content.
@ -93,12 +97,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
xlStorageFormatFile)) xlStorageFormatFile))
if err == nil { if err == nil {
// if baseDir is already a directory object, consider it // if baseDir is already a directory object, consider it
// as part of the list call, this is a AWS S3 specific // as part of the list call, this is AWS S3 specific
// behavior. // behavior.
out <- metaCacheEntry{ out <- metaCacheEntry{
name: opts.BaseDir, name: opts.BaseDir,
metadata: metadata, metadata: metadata,
} }
objsReturned++
} else { } else {
st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)) st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
if sterr == nil && st.Mode().IsRegular() { if sterr == nil && st.Mode().IsRegular() {
@ -123,6 +128,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if contextCanceled(ctx) { if contextCanceled(ctx) {
return ctx.Err() return ctx.Err()
} }
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
s.walkMu.Lock() s.walkMu.Lock()
entries, err := s.ListDir(ctx, opts.Bucket, current, -1) entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
@ -144,6 +152,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
dirObjects := make(map[string]struct{}) dirObjects := make(map[string]struct{})
for i, entry := range entries { for i, entry := range entries {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
// Do do not retain the file, since it doesn't // Do do not retain the file, since it doesn't
// match the prefix. // match the prefix.
@ -194,6 +205,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name) meta.name = pathJoin(current, meta.name)
meta.name = decodeDirObject(meta.name) meta.name = decodeDirObject(meta.name)
objsReturned++
out <- meta out <- meta
return nil return nil
} }
@ -213,6 +225,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1) meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator) meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name) meta.name = pathJoin(current, meta.name)
objsReturned++
out <- meta out <- meta
return nil return nil
} }
@ -234,6 +247,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
} }
for _, entry := range entries { for _, entry := range entries {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if entry == "" { if entry == "" {
continue continue
} }
@ -273,12 +289,14 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if isDirObj { if isDirObj {
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
} }
objsReturned++
out <- meta out <- meta
case osIsNotExist(err), isSysErrIsDir(err): case osIsNotExist(err), isSysErrIsDir(err):
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1)) meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
diskHealthCheckOK(ctx, err) diskHealthCheckOK(ctx, err)
if err == nil { if err == nil {
// It was an object // It was an object
objsReturned++
out <- meta out <- meta
continue continue
} }
@ -299,6 +317,12 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// If directory entry left on stack, pop it now. // If directory entry left on stack, pop it now.
for len(dirStack) > 0 { for len(dirStack) > 0 {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if contextCanceled(ctx) {
return ctx.Err()
}
pop := dirStack[len(dirStack)-1] pop := dirStack[len(dirStack)-1]
out <- metaCacheEntry{name: pop} out <- metaCacheEntry{name: pop}
if opts.Recursive { if opts.Recursive {

View File

@ -1769,7 +1769,7 @@ func testListObjectsContinuation(obj ObjectLayer, instanceType string, t1 TestEr
} }
// Formualting the result data set to be expected from ListObjects call inside the tests, // Formulating the result data set to be expected from ListObjects call inside the tests,
// This will be used in testCases and used for asserting the correctness of ListObjects output in the tests. // This will be used in testCases and used for asserting the correctness of ListObjects output in the tests.
resultCases := []ListObjectsInfo{ resultCases := []ListObjectsInfo{