fetch bucket retention config once for ILM evalAction (#14727)

This is mainly an optimization, does not change any
existing functionality.
This commit is contained in:
Harshavardhana 2022-04-11 13:25:32 -07:00 committed by GitHub
parent 1a1b55e133
commit 153a612253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 42 deletions

View File

@ -38,6 +38,7 @@ import (
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/color" "github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/heal" "github.com/minio/minio/internal/config/heal"
@ -1131,7 +1132,7 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object
return size return size
} }
func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj ObjectInfo, debug bool) (action lifecycle.Action) { func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, obj ObjectInfo, debug bool) (action lifecycle.Action) {
action = lc.ComputeAction(obj.ToLifecycleOpts()) action = lc.ComputeAction(obj.ToLifecycleOpts())
if debug { if debug {
console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action) console.Debugf(applyActionsLogPrefix+" lifecycle: Secondary scan: %v\n", action)
@ -1147,18 +1148,15 @@ func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, obj Ob
if obj.VersionID == "" { if obj.VersionID == "" {
return lifecycle.NoneAction return lifecycle.NoneAction
} }
if rcfg, _ := globalBucketObjectLockSys.Get(obj.Bucket); rcfg.LockEnabled { if lr.LockEnabled && enforceRetentionForDeletion(ctx, obj) {
locked := enforceRetentionForDeletion(ctx, obj) if debug {
if locked { if obj.VersionID != "" {
if debug { console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID)
if obj.VersionID != "" { } else {
console.Debugf(applyActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID) console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
} else {
console.Debugf(applyActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name)
}
} }
return lifecycle.NoneAction
} }
return lifecycle.NoneAction
} }
} }
@ -1277,32 +1275,31 @@ func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi Obj
roi.OpType = replication.ExistingObjectReplicationType roi.OpType = replication.ExistingObjectReplicationType
} }
if roi.TargetStatuses != nil { if sizeS.replTargetStats == nil && len(roi.TargetStatuses) > 0 {
if sizeS.replTargetStats == nil { sizeS.replTargetStats = make(map[string]replTargetSizeSummary)
sizeS.replTargetStats = make(map[string]replTargetSizeSummary) }
for arn, tgtStatus := range roi.TargetStatuses {
tgtSizeS, ok := sizeS.replTargetStats[arn]
if !ok {
tgtSizeS = replTargetSizeSummary{}
} }
for arn, tgtStatus := range roi.TargetStatuses { switch tgtStatus {
tgtSizeS, ok := sizeS.replTargetStats[arn] case replication.Pending:
if !ok { tgtSizeS.pendingCount++
tgtSizeS = replTargetSizeSummary{} tgtSizeS.pendingSize += oi.Size
} sizeS.pendingCount++
switch tgtStatus { sizeS.pendingSize += oi.Size
case replication.Pending: case replication.Failed:
tgtSizeS.pendingCount++ tgtSizeS.failedSize += oi.Size
tgtSizeS.pendingSize += oi.Size tgtSizeS.failedCount++
sizeS.pendingCount++ sizeS.failedSize += oi.Size
sizeS.pendingSize += oi.Size sizeS.failedCount++
case replication.Failed: case replication.Completed, replication.CompletedLegacy:
tgtSizeS.failedSize += oi.Size tgtSizeS.replicatedSize += oi.Size
tgtSizeS.failedCount++ sizeS.replicatedSize += oi.Size
sizeS.failedSize += oi.Size
sizeS.failedCount++
case replication.Completed, "COMPLETE":
tgtSizeS.replicatedSize += oi.Size
sizeS.replicatedSize += oi.Size
}
sizeS.replTargetStats[arn] = tgtSizeS
} }
sizeS.replTargetStats[arn] = tgtSizeS
} }
switch oi.ReplicationStatus { switch oi.ReplicationStatus {

View File

@ -33,6 +33,7 @@ import (
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
@ -1222,9 +1223,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
} }
var lc *lifecycle.Lifecycle var lc *lifecycle.Lifecycle
var rcfg lock.Retention
if opts.Expiration.Expire { if opts.Expiration.Expire {
// Check if the current bucket has a configured lifecycle policy // Check if the current bucket has a configured lifecycle policy
lc, _ = globalLifecycleSys.Get(bucket) lc, _ = globalLifecycleSys.Get(bucket)
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
} }
// expiration attempted on a bucket with no lifecycle // expiration attempted on a bucket with no lifecycle
@ -1269,7 +1272,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
} }
if opts.Expiration.Expire { if opts.Expiration.Expire {
action := evalActionFromLifecycle(ctx, *lc, goi, false) action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
var isErr bool var isErr bool
switch action { switch action {
case lifecycle.NoneAction: case lifecycle.NoneAction:

View File

@ -1161,6 +1161,9 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
// Automatically remove the object/version is an expiry lifecycle rule can be applied // Automatically remove the object/version is an expiry lifecycle rule can be applied
lc, _ := globalLifecycleSys.Get(bucket) lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" { if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like // Optimization for certain applications like
// - Cohesity // - Cohesity
@ -1172,7 +1175,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true}) objInfo, err := z.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil { if err == nil {
if lc != nil { if lc != nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false) action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
switch action { switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough fallthrough
@ -1194,6 +1197,7 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma
InclDeleted: false, InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(), AskDisks: globalAPIConfig.getListQuorum(),
Lifecycle: lc, Lifecycle: lc,
Retention: rcfg,
} }
merged, err := z.listPath(ctx, &opts) merged, err := z.listPath(ctx, &opts)

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
) )
@ -293,7 +294,7 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
// Do lifecycle filtering. // Do lifecycle filtering.
if o.Lifecycle != nil { if o.Lifecycle != nil {
filterIn := make(chan metaCacheEntry, 10) filterIn := make(chan metaCacheEntry, 10)
go filterLifeCycle(ctx, o.Bucket, o.Lifecycle, filterIn, results) go filterLifeCycle(ctx, o.Bucket, *o.Lifecycle, o.Retention, filterIn, results)
// Replace results. // Replace results.
results = filterIn results = filterIn
} }
@ -359,7 +360,7 @@ func (z *erasureServerPools) listMerged(ctx context.Context, o listPathOptions,
// out will be closed when there are no more results. // out will be closed when there are no more results.
// When 'in' is closed or the context is canceled the // When 'in' is closed or the context is canceled the
// function closes 'out' and exits. // function closes 'out' and exits.
func filterLifeCycle(ctx context.Context, bucket string, lc *lifecycle.Lifecycle, in <-chan metaCacheEntry, out chan<- metaCacheEntry) { func filterLifeCycle(ctx context.Context, bucket string, lc lifecycle.Lifecycle, lr lock.Retention, in <-chan metaCacheEntry, out chan<- metaCacheEntry) {
defer close(out) defer close(out)
for { for {
var obj metaCacheEntry var obj metaCacheEntry
@ -378,7 +379,7 @@ func filterLifeCycle(ctx context.Context, bucket string, lc *lifecycle.Lifecycle
continue continue
} }
objInfo := fi.ToObjectInfo(bucket, obj.name) objInfo := fi.ToObjectInfo(bucket, obj.name)
action := evalActionFromLifecycle(ctx, *lc, objInfo, false) action := evalActionFromLifecycle(ctx, lc, lr, objInfo, false)
switch action { switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough fallthrough

View File

@ -33,6 +33,7 @@ import (
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/color" "github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
@ -98,6 +99,9 @@ type listPathOptions struct {
// Is not transferred across request calls. // Is not transferred across request calls.
Lifecycle *lifecycle.Lifecycle Lifecycle *lifecycle.Lifecycle
// Retention configuration, needed to be passed along with lifecycle if set.
Retention lock.Retention
// pool and set of where the cache is located. // pool and set of where the cache is located.
pool, set int pool, set int
} }

View File

@ -464,7 +464,8 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
// Automatically remove the object/version is an expiry lifecycle rule can be applied // Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil { if lc, err := globalLifecycleSys.Get(bucket); err == nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false) rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool var success bool
switch action { switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
@ -686,7 +687,8 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
// Automatically remove the object/version is an expiry lifecycle rule can be applied // Automatically remove the object/version is an expiry lifecycle rule can be applied
if lc, err := globalLifecycleSys.Get(bucket); err == nil { if lc, err := globalLifecycleSys.Get(bucket); err == nil {
action := evalActionFromLifecycle(ctx, *lc, objInfo, false) rcfg, _ := globalBucketObjectLockSys.Get(bucket)
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
var success bool var success bool
switch action { switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:

View File

@ -29,6 +29,9 @@ const (
// Completed - replication completed ok. // Completed - replication completed ok.
Completed StatusType = "COMPLETED" Completed StatusType = "COMPLETED"
// CompletedLegacy was called "COMPLETE" incorrectly.
CompletedLegacy StatusType = "COMPLETE"
// Failed - replication failed. // Failed - replication failed.
Failed StatusType = "FAILED" Failed StatusType = "FAILED"