From 236ef03dbdaf0bf22e20d13c205a3a453179e763 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 14 Jul 2022 16:47:09 -0700 Subject: [PATCH] fix: skip objects expired via lifecycle rules during decommission (#15300) --- cmd/erasure-multipart.go | 7 ++++--- cmd/erasure-server-pool-decom.go | 36 ++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index fe7100c53..12465be38 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -19,6 +19,7 @@ package cmd import ( "context" + "errors" "fmt" "io" "os" @@ -48,7 +49,7 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string { // checkUploadIDExists - verify if a given uploadID exists and is valid. func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) { defer func() { - if err == errFileNotFound { + if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) { err = errUploadIDNotFound } }() @@ -208,10 +209,10 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec for _, disk = range er.getLoadBalancedDisks(true) { uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1) if err != nil { - if err == errDiskNotFound { + if errors.Is(err, errDiskNotFound) { continue } - if err == errFileNotFound { + if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) { return result, nil } logger.LogIf(ctx, err) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 98b7c2c57..28b68811a 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -31,6 +31,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/hash" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/console" @@ -660,6 +661,33 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool } vc, _ := globalBucketVersioningSys.Get(bi.Name) + + // Check if the current bucket has a configured lifecycle policy + lc, _ := globalLifecycleSys.Get(bi.Name) + + // Check if bucket is object locked. + lr, _ := globalBucketObjectLockSys.Get(bi.Name) + + filterLifecycle := func(bucket, object string, fi FileInfo) bool { + if lc == nil { + return false + } + versioned := vc != nil && vc.Versioned(object) + objInfo := fi.ToObjectInfo(bucket, object, versioned) + action := evalActionFromLifecycle(ctx, *lc, lr, objInfo, false) + switch action { + case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: + globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction) + // Skip this entry. + return true + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction) + // Skip this entry. + return true + } + return false + } + decommissionEntry := func(entry metaCacheEntry) { defer func() { <-parallelWorkers @@ -686,6 +714,13 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bi.Name, version.Name)) continue } + + // Apply lifecycle rules on the objects that are expired. + if filterLifecycle(bi.Name, version.Name, version) { + logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bi.Name, version.Name, version.VersionID)) + continue + } + // We will skip decommissioning delete markers // with single version, its as good as there // is no data associated with the object. @@ -693,6 +728,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bi.Name, version.Name)) continue } + if version.Deleted { _, err := z.DeleteObject(ctx, bi.Name,