mirror of
https://github.com/minio/minio.git
synced 2025-07-08 08:32:18 -04:00
S3 listing call is usually sent with a 'max-keys' parameter. This 'max-keys' will also be passed to WalkDir() call. However, when ILM is enabled in a bucket and some objects are skipped, the listing can return IsTruncated set to false even if there are more entries in the drives. The reason is that drives stop feeding the listing code because it has max-keys parameter and the listing code thinks listing is finished because it is being fed anymore. Ask the drives to not stop listing and relies on the context cancellation to stop listing in the drives as fast as possible.
This commit is contained in:
parent
9ebe168782
commit
2c7fe094d1
@ -1530,10 +1530,8 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre
|
||||
}
|
||||
|
||||
if loi.IsTruncated && merged.lastSkippedEntry > loi.NextMarker {
|
||||
// An object hidden by ILM was found during a truncated listing. Since the number of entries
|
||||
// fetched from drives is limited by max-keys, we should use the last ILM filtered entry
|
||||
// as a continuation token if it is lexially higher than the last visible object so that the
|
||||
// next call of WalkDir() with the max-keys can reach new objects not seen previously.
|
||||
// An object hidden by ILM was found during a truncated listing. Set the next marker
|
||||
// as the last skipped entry if it is lexically higher loi.NextMarker as an optimization
|
||||
loi.NextMarker = merged.lastSkippedEntry
|
||||
}
|
||||
|
||||
|
@ -223,7 +223,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) (
|
||||
|
||||
go func(o listPathOptions) {
|
||||
defer wg.Done()
|
||||
o.StopDiskAtLimit = true
|
||||
if o.Lifecycle == nil {
|
||||
// No filtering ahead, ask drives to stop
|
||||
// listing exactly at a specific limit.
|
||||
o.StopDiskAtLimit = true
|
||||
}
|
||||
listErr = z.listMerged(listCtx, o, filterCh)
|
||||
o.debugln("listMerged returned with", listErr)
|
||||
}(*o)
|
||||
@ -422,6 +426,9 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions
|
||||
go func() {
|
||||
var returned bool
|
||||
for entry := range inCh {
|
||||
if o.shouldSkip(ctx, entry) {
|
||||
continue
|
||||
}
|
||||
if !returned {
|
||||
funcReturnedMu.Lock()
|
||||
returned = funcReturned
|
||||
|
@ -174,6 +174,31 @@ func (o *listPathOptions) debugln(data ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *listPathOptions) shouldSkip(ctx context.Context, entry metaCacheEntry) (yes bool) {
|
||||
if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) {
|
||||
return true
|
||||
}
|
||||
if o.Marker != "" && entry.name < o.Marker {
|
||||
return true
|
||||
}
|
||||
if !strings.HasPrefix(entry.name, o.Prefix) {
|
||||
return true
|
||||
}
|
||||
if o.Separator != "" && entry.isDir() && !strings.Contains(strings.TrimPrefix(entry.name, o.Prefix), o.Separator) {
|
||||
return true
|
||||
}
|
||||
if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
|
||||
return true
|
||||
}
|
||||
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() {
|
||||
return true
|
||||
}
|
||||
if o.Lifecycle != nil || o.Replication.Config != nil {
|
||||
return triggerExpiryAndRepl(ctx, *o, entry)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// gatherResults will collect all results on the input channel and filter results according
|
||||
// to the options or to the current bucket ILM expiry rules.
|
||||
// Caller should close the channel when done.
|
||||
@ -199,27 +224,10 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache
|
||||
resCh = nil
|
||||
continue
|
||||
}
|
||||
if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) {
|
||||
if yes := o.shouldSkip(ctx, entry); yes {
|
||||
results.lastSkippedEntry = entry.name
|
||||
continue
|
||||
}
|
||||
if o.Marker != "" && entry.name < o.Marker {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(entry.name, o.Prefix) {
|
||||
continue
|
||||
}
|
||||
if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
|
||||
continue
|
||||
}
|
||||
if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() {
|
||||
continue
|
||||
}
|
||||
if o.Lifecycle != nil || o.Replication.Config != nil {
|
||||
if skipped := triggerExpiryAndRepl(ctx, *o, entry); skipped {
|
||||
results.lastSkippedEntry = entry.name
|
||||
continue
|
||||
}
|
||||
}
|
||||
if o.Limit > 0 && results.len() >= o.Limit {
|
||||
// We have enough and we have more.
|
||||
// Do not return io.EOF
|
||||
|
@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
@ -68,6 +69,7 @@ const (
|
||||
// WalkDir will traverse a directory and return all entries found.
|
||||
// On success a sorted meta cache stream will be returned.
|
||||
// Metadata has data stripped, if any.
|
||||
// The function tries to quit as fast as the context is canceled to avoid further drive IO
|
||||
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
|
||||
legacyFS := s.fsType != xfs && s.fsType != ext4
|
||||
|
||||
@ -146,6 +148,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
var scanDir func(path string) error
|
||||
|
||||
scanDir = func(current string) error {
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
if opts.Limit > 0 && objsReturned >= opts.Limit {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip forward, if requested...
|
||||
sb := bytebufferpool.Get()
|
||||
defer func() {
|
||||
@ -161,12 +170,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
forward = forward[:idx]
|
||||
}
|
||||
}
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
if opts.Limit > 0 && objsReturned >= opts.Limit {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.walkMu != nil {
|
||||
s.walkMu.Lock()
|
||||
@ -197,6 +200,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
// Avoid a bunch of cleanup when joining.
|
||||
current = strings.Trim(current, SlashSeparator)
|
||||
for i, entry := range entries {
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
if opts.Limit > 0 && objsReturned >= opts.Limit {
|
||||
return nil
|
||||
}
|
||||
@ -292,15 +298,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
if opts.Limit > 0 && objsReturned >= opts.Limit {
|
||||
return nil
|
||||
}
|
||||
if entry == "" {
|
||||
continue
|
||||
}
|
||||
if contextCanceled(ctx) {
|
||||
return ctx.Err()
|
||||
}
|
||||
meta := metaCacheEntry{name: pathJoinBuf(sb, current, entry)}
|
||||
|
||||
// If directory entry on stack before this, pop it now.
|
||||
@ -314,7 +320,10 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
|
||||
if opts.Recursive {
|
||||
// Scan folder we found. Should be in correct sort order where we are.
|
||||
err := scanDir(pop)
|
||||
if err != nil && !IsErrIgnored(err, context.Canceled) {
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return err
|
||||
}
|
||||
internalLogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,9 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
)
|
||||
|
||||
func TestListObjectsVersionedFolders(t *testing.T) {
|
||||
@ -1929,3 +1932,112 @@ func BenchmarkListObjects(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListObjectsWithILM(t *testing.T) {
|
||||
ExecObjectLayerTest(t, testListObjectsWithILM)
|
||||
}
|
||||
|
||||
func testListObjectsWithILM(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
|
||||
t, _ := t1.(*testing.T)
|
||||
|
||||
objContent := "test-content"
|
||||
objMd5 := md5.Sum([]byte(objContent))
|
||||
|
||||
uploads := []struct {
|
||||
bucket string
|
||||
expired int
|
||||
notExpired int
|
||||
}{
|
||||
{"test-list-ilm-nothing-expired", 0, 6},
|
||||
{"test-list-ilm-all-expired", 6, 0},
|
||||
{"test-list-ilm-all-half-expired", 3, 3},
|
||||
}
|
||||
|
||||
oneWeekAgo := time.Now().Add(-7 * 24 * time.Hour)
|
||||
|
||||
lifecycleBytes := []byte(`
|
||||
<LifecycleConfiguration>
|
||||
<Rule>
|
||||
<Status>Enabled</Status>
|
||||
<Expiration>
|
||||
<Days>1</Days>
|
||||
</Expiration>
|
||||
</Rule>
|
||||
</LifecycleConfiguration>
|
||||
`)
|
||||
|
||||
lifecycleConfig, err := lifecycle.ParseLifecycleConfig(bytes.NewReader(lifecycleBytes))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i, upload := range uploads {
|
||||
err := obj.MakeBucket(context.Background(), upload.bucket, MakeBucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||
}
|
||||
|
||||
globalBucketMetadataSys.Set(upload.bucket, BucketMetadata{lifecycleConfig: lifecycleConfig})
|
||||
defer globalBucketMetadataSys.Remove(upload.bucket)
|
||||
|
||||
// Upload objects which modtime as one week ago, supposed to be expired by ILM
|
||||
for range upload.expired {
|
||||
_, err := obj.PutObject(context.Background(), upload.bucket, randString(32),
|
||||
mustGetPutObjReader(t,
|
||||
bytes.NewBufferString(objContent),
|
||||
int64(len(objContent)),
|
||||
hex.EncodeToString(objMd5[:]),
|
||||
""),
|
||||
ObjectOptions{MTime: oneWeekAgo},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Upload objects which current time as modtime, not expired by ILM
|
||||
for range upload.notExpired {
|
||||
_, err := obj.PutObject(context.Background(), upload.bucket, randString(32),
|
||||
mustGetPutObjReader(t,
|
||||
bytes.NewBufferString(objContent),
|
||||
int64(len(objContent)),
|
||||
hex.EncodeToString(objMd5[:]),
|
||||
""),
|
||||
ObjectOptions{},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, maxKeys := range []int{1, 10, 49} {
|
||||
// Test ListObjects V2
|
||||
totalObjs, didRuns := 0, 0
|
||||
marker := ""
|
||||
for {
|
||||
didRuns++
|
||||
if didRuns > 1000 {
|
||||
t.Fatal("too many runs")
|
||||
return
|
||||
}
|
||||
result, err := obj.ListObjectsV2(context.Background(), upload.bucket, "", marker, "", maxKeys, false, "")
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Expected to pass, but failed with: <ERROR> %s", i, instanceType, err.Error())
|
||||
}
|
||||
totalObjs += len(result.Objects)
|
||||
if !result.IsTruncated {
|
||||
break
|
||||
}
|
||||
if marker != "" && marker == result.NextContinuationToken {
|
||||
t.Fatalf("infinite loop marker: %s", result.NextContinuationToken)
|
||||
}
|
||||
marker = result.NextContinuationToken
|
||||
}
|
||||
|
||||
if totalObjs != upload.notExpired {
|
||||
t.Fatalf("Test %d: %s: max-keys=%d, %d objects are expected to be seen, but %d found instead (didRuns=%d)",
|
||||
i+1, instanceType, maxKeys, upload.notExpired, totalObjs, didRuns)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user