ListObjects: Filter lifecycle expired objects (#14606)

For ListObjects and ListObjectsV2 perform lifecycle checks on 
all objects before returning. This will filter out objects that are 
pending lifecycle expiration.

Bonus: Cheaper server pool conflict resolution by not converting to FileInfo.
This commit is contained in:
Klaus Post 2022-03-22 12:39:45 -07:00 committed by GitHub
parent 8eecdc6d1f
commit 2ac54e5a7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 7 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
"github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/config/storageclass" "github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup" "github.com/minio/minio/internal/sync/errgroup"
@ -1152,6 +1153,14 @@ func maxKeysPlusOne(maxKeys int, addOne bool) int {
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo var loi ListObjectsInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
lc, _ := globalLifecycleSys.Get(bucket)
if lc != nil {
if !lc.HasActiveRules(prefix, true) {
lc = nil
}
}
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" { if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like // Optimization for certain applications like
// - Cohesity // - Cohesity
@ -1162,6 +1171,13 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
// to avoid the need for ListObjects(). // to avoid the need for ListObjects().
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true}) objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil { if err == nil {
if lc != nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
return loi, nil
}
}
loi.Objects = append(loi.Objects, objInfo) loi.Objects = append(loi.Objects, objInfo)
return loi, nil return loi, nil
} }
@ -1176,6 +1192,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
InclDeleted: false, InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
} }
merged, err := z.listPath(ctx, &opts) merged, err := z.listPath(ctx, &opts)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
if !isErrBucketNotFound(err) { if !isErrBucketNotFound(err) {

View File

@ -498,7 +498,7 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
return versions return versions
} }
// fileInfoVersions converts the metadata to FileInfoVersions where possible. // fileInfos converts the metadata to ObjectInfo where possible.
// Metadata that cannot be decoded is skipped. // Metadata that cannot be decoded is skipped.
func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) { func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (objects []ObjectInfo) {
objects = make([]ObjectInfo, 0, m.len()) objects = make([]ObjectInfo, 0, m.len())

View File

@ -28,6 +28,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
) )
@ -289,6 +290,14 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
} }
mu.Unlock() mu.Unlock()
// Do lifecycle filtering.
if o.lcFilter != nil {
filterIn := make(chan metaCacheEntry, 10)
go filterLifeCycle(ctx, o.Bucket, o.lcFilter, filterIn, results)
// Replace results.
results = filterIn
}
// Gather results to a single channel. // Gather results to a single channel.
err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) { err := mergeEntryChannels(ctx, inputs, results, func(existing, other *metaCacheEntry) (replace bool) {
// Pick object over directory // Pick object over directory
@ -298,21 +307,20 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
if !existing.isDir() && other.isDir() { if !existing.isDir() && other.isDir() {
return false return false
} }
eMeta, err := existing.xlmeta()
eFIV, err := existing.fileInfo(o.Bucket)
if err != nil { if err != nil {
return true return true
} }
oFIV, err := other.fileInfo(o.Bucket) oMeta, err := other.xlmeta()
if err != nil { if err != nil {
return false return false
} }
// Replace if modtime is newer // Replace if modtime is newer
if !oFIV.ModTime.Equal(eFIV.ModTime) { if !oMeta.latestModtime().Equal(oMeta.latestModtime()) {
return oFIV.ModTime.After(eFIV.ModTime) return oMeta.latestModtime().After(eMeta.latestModtime())
} }
// Use NumVersions as a final tiebreaker. // Use NumVersions as a final tiebreaker.
return oFIV.NumVersions > eFIV.NumVersions return len(oMeta.versions) > len(eMeta.versions)
}) })
cancelList() cancelList()
@ -346,6 +354,44 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
return nil return nil
} }
// filterLifeCycle will filter out objects if the most recent
// version should be deleted by lifecycle.
// out will be closed when there are no more results.
// When 'in' is closed or the context is canceled the
// function closes 'out' and exits.
func filterLifeCycle(ctx context.Context, bucket string, lc *lifecycle.Lifecycle, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out)
for {
var obj metaCacheEntry
var ok bool
select {
case <-ctx.Done():
return
case obj, ok = <-in:
if !ok {
return
}
}
fi, err := obj.fileInfo(bucket)
if err != nil {
continue
}
objInfo := fi.ToObjectInfo(bucket, obj.name)
action := evalActionFromLifecycle(ctx, *lc, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
// Skip this entry.
continue
}
select {
case <-ctx.Done():
return
case out <- obj:
}
}
}
func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) { func (z *erasureServerPools) listAndSave(ctx context.Context, o *listPathOptions) (entries metaCacheEntriesSorted, err error) {
// Use ID as the object name... // Use ID as the object name...
o.pool = z.getAvailablePoolIdx(ctx, minioMetaBucket, o.ID, 10<<20) o.pool = z.getAvailablePoolIdx(ctx, minioMetaBucket, o.ID, 10<<20)

View File

@ -32,6 +32,7 @@ import (
"time" "time"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/color" "github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
@ -96,6 +97,11 @@ type listPathOptions struct {
// pool and set of where the cache is located. // pool and set of where the cache is located.
pool, set int pool, set int
// lcFilter performs filtering based on lifecycle.
// This will filter out objects if the most recent version should be deleted by lifecycle.
// Is not transferred across request calls.
lcFilter *lifecycle.Lifecycle
} }
func init() { func init() {