From 2c7fe094d11b023623f18b2a04cb1675587c3de6 Mon Sep 17 00:00:00 2001 From: Anis Eleuch Date: Mon, 26 May 2025 08:06:43 +0100 Subject: [PATCH] s3: Fix early listing stopping when ILM is enabled (#472) (#21246) S3 listing call is usually sent with a 'max-keys' parameter. This 'max-keys' will also be passed to WalkDir() call. However, when ILM is enabled in a bucket and some objects are skipped, the listing can return IsTruncated set to false even if there are more entries in the drives. The reason is that drives stop feeding the listing code because it has max-keys parameter and the listing code thinks listing is finished because it is being fed anymore. Ask the drives to not stop listing and relies on the context cancellation to stop listing in the drives as fast as possible. --- cmd/erasure-server-pool.go | 6 +- cmd/metacache-server-pool.go | 9 ++- cmd/metacache-set.go | 46 +++++++----- cmd/metacache-walk.go | 29 +++++--- cmd/object-api-listobjects_test.go | 112 +++++++++++++++++++++++++++++ 5 files changed, 168 insertions(+), 34 deletions(-) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3a93bed25..2589570bd 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1530,10 +1530,8 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre } if loi.IsTruncated && merged.lastSkippedEntry > loi.NextMarker { - // An object hidden by ILM was found during a truncated listing. Since the number of entries - // fetched from drives is limited by max-keys, we should use the last ILM filtered entry - // as a continuation token if it is lexially higher than the last visible object so that the - // next call of WalkDir() with the max-keys can reach new objects not seen previously. + // An object hidden by ILM was found during a truncated listing. Set the next marker + // as the last skipped entry if it is lexically higher loi.NextMarker as an optimization loi.NextMarker = merged.lastSkippedEntry } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index c31c85bbc..ed324d90a 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -223,7 +223,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( go func(o listPathOptions) { defer wg.Done() - o.StopDiskAtLimit = true + if o.Lifecycle == nil { + // No filtering ahead, ask drives to stop + // listing exactly at a specific limit. + o.StopDiskAtLimit = true + } listErr = z.listMerged(listCtx, o, filterCh) o.debugln("listMerged returned with", listErr) }(*o) @@ -422,6 +426,9 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions go func() { var returned bool for entry := range inCh { + if o.shouldSkip(ctx, entry) { + continue + } if !returned { funcReturnedMu.Lock() returned = funcReturned diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9307fa3d6..6feabbea4 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -174,6 +174,31 @@ func (o *listPathOptions) debugln(data ...interface{}) { } } +func (o *listPathOptions) shouldSkip(ctx context.Context, entry metaCacheEntry) (yes bool) { + if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) { + return true + } + if o.Marker != "" && entry.name < o.Marker { + return true + } + if !strings.HasPrefix(entry.name, o.Prefix) { + return true + } + if o.Separator != "" && entry.isDir() && !strings.Contains(strings.TrimPrefix(entry.name, o.Prefix), o.Separator) { + return true + } + if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) { + return true + } + if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { + return true + } + if o.Lifecycle != nil || o.Replication.Config != nil { + return triggerExpiryAndRepl(ctx, *o, entry) + } + return false +} + // gatherResults will collect all results on the input channel and filter results according // to the options or to the current bucket ILM expiry rules. // Caller should close the channel when done. @@ -199,27 +224,10 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache resCh = nil continue } - if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) { + if yes := o.shouldSkip(ctx, entry); yes { + results.lastSkippedEntry = entry.name continue } - if o.Marker != "" && entry.name < o.Marker { - continue - } - if !strings.HasPrefix(entry.name, o.Prefix) { - continue - } - if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) { - continue - } - if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { - continue - } - if o.Lifecycle != nil || o.Replication.Config != nil { - if skipped := triggerExpiryAndRepl(ctx, *o, entry); skipped { - results.lastSkippedEntry = entry.name - continue - } - } if o.Limit > 0 && results.len() >= o.Limit { // We have enough and we have more. // Do not return io.EOF diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 9c6d48ff5..fa949d9ab 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "errors" "io" "sort" "strings" @@ -68,6 +69,7 @@ const ( // WalkDir will traverse a directory and return all entries found. // On success a sorted meta cache stream will be returned. // Metadata has data stripped, if any. +// The function tries to quit as fast as the context is canceled to avoid further drive IO func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { legacyFS := s.fsType != xfs && s.fsType != ext4 @@ -146,6 +148,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ var scanDir func(path string) error scanDir = func(current string) error { + if contextCanceled(ctx) { + return ctx.Err() + } + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } + // Skip forward, if requested... sb := bytebufferpool.Get() defer func() { @@ -161,12 +170,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ forward = forward[:idx] } } - if contextCanceled(ctx) { - return ctx.Err() - } - if opts.Limit > 0 && objsReturned >= opts.Limit { - return nil - } if s.walkMu != nil { s.walkMu.Lock() @@ -197,6 +200,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // Avoid a bunch of cleanup when joining. current = strings.Trim(current, SlashSeparator) for i, entry := range entries { + if contextCanceled(ctx) { + return ctx.Err() + } if opts.Limit > 0 && objsReturned >= opts.Limit { return nil } @@ -292,15 +298,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } for _, entry := range entries { + if contextCanceled(ctx) { + return ctx.Err() + } if opts.Limit > 0 && objsReturned >= opts.Limit { return nil } if entry == "" { continue } - if contextCanceled(ctx) { - return ctx.Err() - } meta := metaCacheEntry{name: pathJoinBuf(sb, current, entry)} // If directory entry on stack before this, pop it now. @@ -314,7 +320,10 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if opts.Recursive { // Scan folder we found. Should be in correct sort order where we are. err := scanDir(pop) - if err != nil && !IsErrIgnored(err, context.Canceled) { + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } internalLogIf(ctx, err) } } diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index 30ea27f9a..b6f39d7ab 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -26,6 +26,9 @@ import ( "strconv" "strings" "testing" + "time" + + "github.com/minio/minio/internal/bucket/lifecycle" ) func TestListObjectsVersionedFolders(t *testing.T) { @@ -1929,3 +1932,112 @@ func BenchmarkListObjects(b *testing.B) { } } } + +func TestListObjectsWithILM(t *testing.T) { + ExecObjectLayerTest(t, testListObjectsWithILM) +} + +func testListObjectsWithILM(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + t, _ := t1.(*testing.T) + + objContent := "test-content" + objMd5 := md5.Sum([]byte(objContent)) + + uploads := []struct { + bucket string + expired int + notExpired int + }{ + {"test-list-ilm-nothing-expired", 0, 6}, + {"test-list-ilm-all-expired", 6, 0}, + {"test-list-ilm-all-half-expired", 3, 3}, + } + + oneWeekAgo := time.Now().Add(-7 * 24 * time.Hour) + + lifecycleBytes := []byte(` + + + Enabled + + 1 + + + +`) + + lifecycleConfig, err := lifecycle.ParseLifecycleConfig(bytes.NewReader(lifecycleBytes)) + if err != nil { + t.Fatal(err) + } + + for i, upload := range uploads { + err := obj.MakeBucket(context.Background(), upload.bucket, MakeBucketOptions{}) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + globalBucketMetadataSys.Set(upload.bucket, BucketMetadata{lifecycleConfig: lifecycleConfig}) + defer globalBucketMetadataSys.Remove(upload.bucket) + + // Upload objects which modtime as one week ago, supposed to be expired by ILM + for range upload.expired { + _, err := obj.PutObject(context.Background(), upload.bucket, randString(32), + mustGetPutObjReader(t, + bytes.NewBufferString(objContent), + int64(len(objContent)), + hex.EncodeToString(objMd5[:]), + ""), + ObjectOptions{MTime: oneWeekAgo}, + ) + if err != nil { + t.Fatal(err) + } + } + + // Upload objects which current time as modtime, not expired by ILM + for range upload.notExpired { + _, err := obj.PutObject(context.Background(), upload.bucket, randString(32), + mustGetPutObjReader(t, + bytes.NewBufferString(objContent), + int64(len(objContent)), + hex.EncodeToString(objMd5[:]), + ""), + ObjectOptions{}, + ) + if err != nil { + t.Fatal(err) + } + } + + for _, maxKeys := range []int{1, 10, 49} { + // Test ListObjects V2 + totalObjs, didRuns := 0, 0 + marker := "" + for { + didRuns++ + if didRuns > 1000 { + t.Fatal("too many runs") + return + } + result, err := obj.ListObjectsV2(context.Background(), upload.bucket, "", marker, "", maxKeys, false, "") + if err != nil { + t.Fatalf("Test %d: %s: Expected to pass, but failed with: %s", i, instanceType, err.Error()) + } + totalObjs += len(result.Objects) + if !result.IsTruncated { + break + } + if marker != "" && marker == result.NextContinuationToken { + t.Fatalf("infinite loop marker: %s", result.NextContinuationToken) + } + marker = result.NextContinuationToken + } + + if totalObjs != upload.notExpired { + t.Fatalf("Test %d: %s: max-keys=%d, %d objects are expected to be seen, but %d found instead (didRuns=%d)", + i+1, instanceType, maxKeys, upload.notExpired, totalObjs, didRuns) + } + } + } +}