fix: skip objects expired via lifecycle rules during decommission (#15300)

This commit is contained in:
Harshavardhana 2022-07-14 16:47:09 -07:00 committed by GitHub
parent 53cc561048
commit 236ef03dbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 3 deletions

View File

@ -19,6 +19,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"os"
@ -48,7 +49,7 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string {
// checkUploadIDExists - verify if a given uploadID exists and is valid.
func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
defer func() {
if err == errFileNotFound {
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
err = errUploadIDNotFound
}
}()
@ -208,10 +209,10 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
for _, disk = range er.getLoadBalancedDisks(true) {
uploadIDs, err = disk.ListDir(ctx, minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
if err != nil {
if err == errDiskNotFound {
if errors.Is(err, errDiskNotFound) {
continue
}
if err == errFileNotFound {
if errors.Is(err, errFileNotFound) || errors.Is(err, errVolumeNotFound) {
return result, nil
}
logger.LogIf(ctx, err)

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/hash"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
@ -660,6 +661,33 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
}
vc, _ := globalBucketVersioningSys.Get(bi.Name)
// Check if the current bucket has a configured lifecycle policy
lc, _ := globalLifecycleSys.Get(bi.Name)
// Check if bucket is object locked.
lr, _ := globalBucketObjectLockSys.Get(bi.Name)
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
return false
}
versioned := vc != nil && vc.Versioned(object)
objInfo := fi.ToObjectInfo(bucket, object, versioned)
action := evalActionFromLifecycle(ctx, *lc, lr, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction)
// Skip this entry.
return true
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction)
// Skip this entry.
return true
}
return false
}
decommissionEntry := func(entry metaCacheEntry) {
defer func() {
<-parallelWorkers
@ -686,6 +714,13 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bi.Name, version.Name))
continue
}
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bi.Name, version.Name, version) {
logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bi.Name, version.Name, version.VersionID))
continue
}
// We will skip decommissioning delete markers
// with single version, its as good as there
// is no data associated with the object.
@ -693,6 +728,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bi.Name, version.Name))
continue
}
if version.Deleted {
_, err := z.DeleteObject(ctx,
bi.Name,