diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3a93bed25..2589570bd 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1530,10 +1530,8 @@ func (z *erasureServerPools) listObjectsGeneric(ctx context.Context, bucket, pre } if loi.IsTruncated && merged.lastSkippedEntry > loi.NextMarker { - // An object hidden by ILM was found during a truncated listing. Since the number of entries - // fetched from drives is limited by max-keys, we should use the last ILM filtered entry - // as a continuation token if it is lexially higher than the last visible object so that the - // next call of WalkDir() with the max-keys can reach new objects not seen previously. + // An object hidden by ILM was found during a truncated listing. Set the next marker + // as the last skipped entry if it is lexically higher loi.NextMarker as an optimization loi.NextMarker = merged.lastSkippedEntry } diff --git a/cmd/metacache-server-pool.go b/cmd/metacache-server-pool.go index c31c85bbc..ed324d90a 100644 --- a/cmd/metacache-server-pool.go +++ b/cmd/metacache-server-pool.go @@ -223,7 +223,11 @@ func (z *erasureServerPools) listPath(ctx context.Context, o *listPathOptions) ( go func(o listPathOptions) { defer wg.Done() - o.StopDiskAtLimit = true + if o.Lifecycle == nil { + // No filtering ahead, ask drives to stop + // listing exactly at a specific limit. + o.StopDiskAtLimit = true + } listErr = z.listMerged(listCtx, o, filterCh) o.debugln("listMerged returned with", listErr) }(*o) @@ -422,6 +426,9 @@ func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions go func() { var returned bool for entry := range inCh { + if o.shouldSkip(ctx, entry) { + continue + } if !returned { funcReturnedMu.Lock() returned = funcReturned diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 9307fa3d6..6feabbea4 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -174,6 +174,31 @@ func (o *listPathOptions) debugln(data ...interface{}) { } } +func (o *listPathOptions) shouldSkip(ctx context.Context, entry metaCacheEntry) (yes bool) { + if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) { + return true + } + if o.Marker != "" && entry.name < o.Marker { + return true + } + if !strings.HasPrefix(entry.name, o.Prefix) { + return true + } + if o.Separator != "" && entry.isDir() && !strings.Contains(strings.TrimPrefix(entry.name, o.Prefix), o.Separator) { + return true + } + if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) { + return true + } + if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { + return true + } + if o.Lifecycle != nil || o.Replication.Config != nil { + return triggerExpiryAndRepl(ctx, *o, entry) + } + return false +} + // gatherResults will collect all results on the input channel and filter results according // to the options or to the current bucket ILM expiry rules. // Caller should close the channel when done. @@ -199,27 +224,10 @@ func (o *listPathOptions) gatherResults(ctx context.Context, in <-chan metaCache resCh = nil continue } - if !o.IncludeDirectories && (entry.isDir() || (!o.Versioned && entry.isObjectDir() && entry.isLatestDeletemarker())) { + if yes := o.shouldSkip(ctx, entry); yes { + results.lastSkippedEntry = entry.name continue } - if o.Marker != "" && entry.name < o.Marker { - continue - } - if !strings.HasPrefix(entry.name, o.Prefix) { - continue - } - if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) { - continue - } - if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() && !entry.isObjectDir() { - continue - } - if o.Lifecycle != nil || o.Replication.Config != nil { - if skipped := triggerExpiryAndRepl(ctx, *o, entry); skipped { - results.lastSkippedEntry = entry.name - continue - } - } if o.Limit > 0 && results.len() >= o.Limit { // We have enough and we have more. // Do not return io.EOF diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index 9c6d48ff5..fa949d9ab 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "errors" "io" "sort" "strings" @@ -68,6 +69,7 @@ const ( // WalkDir will traverse a directory and return all entries found. // On success a sorted meta cache stream will be returned. // Metadata has data stripped, if any. +// The function tries to quit as fast as the context is canceled to avoid further drive IO func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { legacyFS := s.fsType != xfs && s.fsType != ext4 @@ -146,6 +148,13 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ var scanDir func(path string) error scanDir = func(current string) error { + if contextCanceled(ctx) { + return ctx.Err() + } + if opts.Limit > 0 && objsReturned >= opts.Limit { + return nil + } + // Skip forward, if requested... sb := bytebufferpool.Get() defer func() { @@ -161,12 +170,6 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ forward = forward[:idx] } } - if contextCanceled(ctx) { - return ctx.Err() - } - if opts.Limit > 0 && objsReturned >= opts.Limit { - return nil - } if s.walkMu != nil { s.walkMu.Lock() @@ -197,6 +200,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ // Avoid a bunch of cleanup when joining. current = strings.Trim(current, SlashSeparator) for i, entry := range entries { + if contextCanceled(ctx) { + return ctx.Err() + } if opts.Limit > 0 && objsReturned >= opts.Limit { return nil } @@ -292,15 +298,15 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ } for _, entry := range entries { + if contextCanceled(ctx) { + return ctx.Err() + } if opts.Limit > 0 && objsReturned >= opts.Limit { return nil } if entry == "" { continue } - if contextCanceled(ctx) { - return ctx.Err() - } meta := metaCacheEntry{name: pathJoinBuf(sb, current, entry)} // If directory entry on stack before this, pop it now. @@ -314,7 +320,10 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ if opts.Recursive { // Scan folder we found. Should be in correct sort order where we are. err := scanDir(pop) - if err != nil && !IsErrIgnored(err, context.Canceled) { + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } internalLogIf(ctx, err) } } diff --git a/cmd/object-api-listobjects_test.go b/cmd/object-api-listobjects_test.go index 30ea27f9a..b6f39d7ab 100644 --- a/cmd/object-api-listobjects_test.go +++ b/cmd/object-api-listobjects_test.go @@ -26,6 +26,9 @@ import ( "strconv" "strings" "testing" + "time" + + "github.com/minio/minio/internal/bucket/lifecycle" ) func TestListObjectsVersionedFolders(t *testing.T) { @@ -1929,3 +1932,112 @@ func BenchmarkListObjects(b *testing.B) { } } } + +func TestListObjectsWithILM(t *testing.T) { + ExecObjectLayerTest(t, testListObjectsWithILM) +} + +func testListObjectsWithILM(obj ObjectLayer, instanceType string, t1 TestErrHandler) { + t, _ := t1.(*testing.T) + + objContent := "test-content" + objMd5 := md5.Sum([]byte(objContent)) + + uploads := []struct { + bucket string + expired int + notExpired int + }{ + {"test-list-ilm-nothing-expired", 0, 6}, + {"test-list-ilm-all-expired", 6, 0}, + {"test-list-ilm-all-half-expired", 3, 3}, + } + + oneWeekAgo := time.Now().Add(-7 * 24 * time.Hour) + + lifecycleBytes := []byte(` + + + Enabled + + 1 + + + +`) + + lifecycleConfig, err := lifecycle.ParseLifecycleConfig(bytes.NewReader(lifecycleBytes)) + if err != nil { + t.Fatal(err) + } + + for i, upload := range uploads { + err := obj.MakeBucket(context.Background(), upload.bucket, MakeBucketOptions{}) + if err != nil { + t.Fatalf("%s : %s", instanceType, err.Error()) + } + + globalBucketMetadataSys.Set(upload.bucket, BucketMetadata{lifecycleConfig: lifecycleConfig}) + defer globalBucketMetadataSys.Remove(upload.bucket) + + // Upload objects which modtime as one week ago, supposed to be expired by ILM + for range upload.expired { + _, err := obj.PutObject(context.Background(), upload.bucket, randString(32), + mustGetPutObjReader(t, + bytes.NewBufferString(objContent), + int64(len(objContent)), + hex.EncodeToString(objMd5[:]), + ""), + ObjectOptions{MTime: oneWeekAgo}, + ) + if err != nil { + t.Fatal(err) + } + } + + // Upload objects which current time as modtime, not expired by ILM + for range upload.notExpired { + _, err := obj.PutObject(context.Background(), upload.bucket, randString(32), + mustGetPutObjReader(t, + bytes.NewBufferString(objContent), + int64(len(objContent)), + hex.EncodeToString(objMd5[:]), + ""), + ObjectOptions{}, + ) + if err != nil { + t.Fatal(err) + } + } + + for _, maxKeys := range []int{1, 10, 49} { + // Test ListObjects V2 + totalObjs, didRuns := 0, 0 + marker := "" + for { + didRuns++ + if didRuns > 1000 { + t.Fatal("too many runs") + return + } + result, err := obj.ListObjectsV2(context.Background(), upload.bucket, "", marker, "", maxKeys, false, "") + if err != nil { + t.Fatalf("Test %d: %s: Expected to pass, but failed with: %s", i, instanceType, err.Error()) + } + totalObjs += len(result.Objects) + if !result.IsTruncated { + break + } + if marker != "" && marker == result.NextContinuationToken { + t.Fatalf("infinite loop marker: %s", result.NextContinuationToken) + } + marker = result.NextContinuationToken + } + + if totalObjs != upload.notExpired { + t.Fatalf("Test %d: %s: max-keys=%d, %d objects are expected to be seen, but %d found instead (didRuns=%d)", + i+1, instanceType, maxKeys, upload.notExpired, totalObjs, didRuns) + } + } + } +}