diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 2acd0e211..1cfbc7542 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -673,17 +673,17 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool return err } + vc, _ := globalBucketVersioningSys.Get(bi.Name) + + // Check if the current bucket has a configured lifecycle policy + lc, _ := globalLifecycleSys.Get(bi.Name) + + // Check if bucket is object locked. + lr, _ := globalBucketObjectLockSys.Get(bi.Name) + for _, set := range pool.sets { set := set - vc, _ := globalBucketVersioningSys.Get(bi.Name) - - // Check if the current bucket has a configured lifecycle policy - lc, _ := globalLifecycleSys.Get(bi.Name) - - // Check if bucket is object locked. - lr, _ := globalBucketObjectLockSys.Get(bi.Name) - filterLifecycle := func(bucket, object string, fi FileInfo) bool { if lc == nil { return false @@ -728,22 +728,33 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } + // any object with only single DEL marker we don't need + // to decommission, just skip it. + if version.Deleted && len(fivs.Versions) == 1 { + decommissionedCount++ + continue + } + + versionID := version.VersionID + if versionID == "" { + versionID = nullVersionID + } + if version.Deleted { _, err := z.DeleteObject(ctx, bi.Name, version.Name, ObjectOptions{ - Versioned: vc.PrefixEnabled(version.Name), - VersionID: version.VersionID, + // Since we are preserving a delete marker, we have to make sure this is always true. + // regardless of the current configuration of the bucket we must preserve all versions + // on the pool being decommissioned. + Versioned: true, + VersionID: versionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, DeleteMarker: true, // make sure we create a delete marker SkipDecommissioned: true, // make sure we skip the decommissioned pool }) - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - // object/version already deleted by the application, nothing to do here we move on. - continue - } var failure bool if err != nil { logger.LogIf(ctx, err) @@ -765,7 +776,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if version.IsRemote() { stopFn := globalDecommissionMetrics.log(decomMetricDecommissionObject, idx, bi.Name, version.Name, version.VersionID) if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{ - VersionID: version.VersionID, + VersionID: versionID, MTime: version.ModTime, UserDefined: version.Metadata, }); err != nil { @@ -785,7 +796,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool nil, http.Header{}, ObjectOptions{ - VersionID: version.VersionID, + VersionID: versionID, NoDecryption: true, NoLock: true, })