When listing, do not count delete markers (#15689)

When limiting listing do not count delete, since they may be discarded.

Extend limit, since we may be discarding the forward-to marker.

Fix directories always being sent to resolve, since they didn't return as match.
This commit is contained in:
Klaus Post 2022-09-14 21:11:27 +02:00 committed by GitHub
parent 5c61c3ccdc
commit eee1ce305c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 12 deletions

View File

@ -85,6 +85,12 @@ func (e *metaCacheEntry) matches(other *metaCacheEntry, strict bool) (prefer *me
return other, false
}
if other.isDir() || e.isDir() {
if e.isDir() {
return e, other.isDir() == e.isDir()
}
return other, other.isDir() == e.isDir()
}
eVers, eErr := e.xlmeta()
oVers, oErr := other.xlmeta()
if eErr != nil || oErr != nil {

View File

@ -768,8 +768,8 @@ func (es *erasureSingle) listPathInner(ctx context.Context, o listPathOptions, r
var limit int
if o.Limit > 0 && o.StopDiskAtLimit {
// Over-read by 1 to know if we truncate results.
limit = o.Limit + 1
// Over-read by 2 to know if we truncate results and not reach false EOF.
limit = o.Limit + 2
}
ctxDone := ctx.Done()
@ -842,9 +842,9 @@ func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions, resul
}
var limit int
if o.Limit > 0 && o.StopDiskAtLimit {
// Over-read by 2 + 1 for every 16 in limit to give some space for resolver
// And know if we have truncated.
limit = o.Limit + 2 + (o.Limit / 16)
// Over-read by 4 + 1 for every 16 in limit to give some space for resolver,
// allow for truncating the list and know if we have more results.
limit = o.Limit + 4 + (o.Limit / 16)
}
ctxDone := ctx.Done()
return listPathRaw(ctx, listPathRawOptions{

View File

@ -89,6 +89,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
defer close(out)
var objsReturned int
objReturned := func(metadata []byte) {
if opts.Limit <= 0 {
return
}
if m, _, _ := isIndexedMetaV2(metadata); m != nil && !m.IsLatestDeleteMarker() {
objsReturned++
}
}
// Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content.
if HasSuffix(opts.BaseDir, SlashSeparator) {
@ -103,7 +112,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
name: opts.BaseDir,
metadata: metadata,
}
objsReturned++
objReturned(metadata)
} else {
st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
if sterr == nil && st.Mode().IsRegular() {
@ -156,13 +165,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
return nil
}
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
// Do do not retain the file, since it doesn't
// Do not retain the file, since it doesn't
// match the prefix.
entries[i] = ""
continue
}
if len(forward) > 0 && entry < forward {
// Do do not retain the file, since its
// Do not retain the file, since its
// lexially smaller than 'forward'
entries[i] = ""
continue
@ -205,7 +214,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name)
meta.name = decodeDirObject(meta.name)
objsReturned++
objReturned(meta.metadata)
out <- meta
return nil
}
@ -225,7 +235,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
meta.name = pathJoin(current, meta.name)
objsReturned++
objReturned(meta.metadata)
out <- meta
return nil
}
@ -289,14 +300,16 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
if isDirObj {
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
}
objsReturned++
objReturned(meta.metadata)
out <- meta
case osIsNotExist(err), isSysErrIsDir(err):
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
diskHealthCheckOK(ctx, err)
if err == nil {
// It was an object
objsReturned++
objReturned(meta.metadata)
out <- meta
continue
}