From 909b169593a856ded7632573643b3fa1067c3d81 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 9 Aug 2024 19:30:44 -0700 Subject: [PATCH] avoid source index to be same as destination index (#20238) during rebalance stop, it can possibly happen that Put() would race by overwriting the same object again. This may very well if done "successfully" it can potentially proceed to delete the object from the pool, causing data loss. This PR enhances #20233 to handle more scenarios such as these. --- cmd/erasure-server-pool-decom.go | 57 ++++++++++++++++++---------- cmd/erasure-server-pool-rebalance.go | 33 +++++++++++----- cmd/erasure-server-pool.go | 19 ++++++++++ 3 files changed, 81 insertions(+), 28 deletions(-) diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 5e758d848..1a82e1005 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -608,7 +608,7 @@ func (z *erasureServerPools) IsDecommissionRunning() bool { return false } -func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { +func (z *erasureServerPools) decommissionObject(ctx context.Context, idx int, bucket string, gr *GetObjectReader) (err error) { objInfo := gr.ObjInfo defer func() { @@ -623,9 +623,11 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri if objInfo.isMultipart() { res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{ - VersionID: objInfo.VersionID, - UserDefined: objInfo.UserDefined, - NoAuditLog: true, + VersionID: objInfo.VersionID, + UserDefined: objInfo.UserDefined, + NoAuditLog: true, + SrcPoolIdx: idx, + DataMovement: true, }) if err != nil { return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err) @@ -660,6 +662,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri } } _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{ + SrcPoolIdx: idx, DataMovement: true, MTime: objInfo.ModTime, NoAuditLog: true, @@ -681,6 +684,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri NewPutObjReader(hr), ObjectOptions{ DataMovement: true, + SrcPoolIdx: idx, VersionID: objInfo.VersionID, MTime: objInfo.ModTime, UserDefined: objInfo.UserDefined, @@ -855,6 +859,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool versionID = nullVersionID } + var failure, ignore bool if version.Deleted { _, err := z.DeleteObject(ctx, bi.Name, @@ -867,14 +872,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool VersionID: versionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, + SrcPoolIdx: idx, + DataMovement: true, DeleteMarker: true, // make sure we create a delete marker SkipDecommissioned: true, // make sure we skip the decommissioned pool NoAuditLog: true, }) - var failure bool if err != nil { - if isErrObjectNotFound(err) || isErrVersionNotFound(err) { - err = nil + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + continue } } stopFn(version.Size, err) @@ -893,22 +903,26 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool continue } - var failure, ignore bool // gr.Close() is ensured by decommissionObject(). for try := 0; try < 3; try++ { if version.IsRemote() { if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{ - VersionID: versionID, - MTime: version.ModTime, - UserDefined: version.Metadata, + VersionID: versionID, + MTime: version.ModTime, + UserDefined: version.Metadata, + SrcPoolIdx: idx, + DataMovement: true, }); err != nil { - stopFn(version.Size, err) - failure = true - decomLogIf(ctx, err) - continue + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + } + } + if !ignore { + stopFn(version.Size, err) + failure = err != nil + decomLogIf(ctx, err) } - stopFn(version.Size, nil) - failure = false break } gr, err := set.GetObjectNInfo(ctx, @@ -925,7 +939,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // object deleted by the application, nothing to do here we move on. ignore = true - stopFn(version.Size, nil) + stopFn(0, nil) break } if err != nil && !ignore { @@ -943,7 +957,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool stopFn(version.Size, err) continue } - if err = z.decommissionObject(ctx, bi.Name, gr); err != nil { + if err = z.decommissionObject(ctx, idx, bi.Name, gr); err != nil { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + break + } stopFn(version.Size, err) failure = true decomLogIf(ctx, err) diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go index d13456b48..749cf4605 100644 --- a/cmd/erasure-server-pool-rebalance.go +++ b/cmd/erasure-server-pool-rebalance.go @@ -626,14 +626,18 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, var rebalanced, expired int for _, version := range fivs.Versions { + stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID) + // Skip transitioned objects for now. TBD if version.IsRemote() { + stopFn(version.Size, errors.New("ILM Tiered version will be skipped for now")) continue } // Apply lifecycle rules on the objects that are expired. if filterLifecycle(bucket, version.Name, version) { expired++ + stopFn(version.Size, errors.New("ILM expired object/version will be skipped")) continue } @@ -643,6 +647,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, remainingVersions := len(fivs.Versions) - expired if version.Deleted && remainingVersions == 1 { rebalanced++ + stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped")) continue } @@ -651,6 +656,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, versionID = nullVersionID } + var failure, ignore bool if version.Deleted { _, err := z.DeleteObject(ctx, bucket, @@ -660,16 +666,26 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, VersionID: versionID, MTime: version.ModTime, DeleteReplication: version.ReplicationState, + SrcPoolIdx: poolIdx, + DataMovement: true, DeleteMarker: true, // make sure we create a delete marker SkipRebalancing: true, // make sure we skip the decommissioned pool NoAuditLog: true, }) - var failure bool - if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { - rebalanceLogIf(ctx, err) - failure = true + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if err != nil { + // This can happen when rebalance stop races with ongoing rebalance workers. + // These rebalance failures can be ignored. + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { + ignore = true + stopFn(0, nil) + continue + } } - + stopFn(version.Size, err) + rebalanceLogIf(ctx, err) + failure = err != nil if !failure { z.updatePoolStats(poolIdx, bucket, version) rebalanced++ @@ -678,10 +694,8 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, continue } - var failure, ignore bool for try := 0; try < 3; try++ { // GetObjectReader.Close is called by rebalanceObject - stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID) gr, err := set.GetObjectNInfo(ctx, bucket, encodeDirObject(version.Name), @@ -709,9 +723,10 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil { // This can happen when rebalance stop races with ongoing rebalance workers. // These rebalance failures can be ignored. - if isDataMovementOverWriteErr(err) { + if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) { ignore = true - continue + stopFn(0, nil) + break } failure = true rebalanceLogIf(ctx, err) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index cf41b128d..55dc5f890 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1147,6 +1147,16 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob return pinfo.ObjInfo, nil } + // Datamovement must never be allowed on the same pool. + if opts.DataMovement && opts.SrcPoolIdx == pinfo.Index { + return pinfo.ObjInfo, DataMovementOverwriteErr{ + Bucket: bucket, + Object: object, + VersionID: opts.VersionID, + Err: errDataMovementSrcDstPoolSame, + } + } + // Delete concurrently in all server pools with read quorum error for unversioned objects. if len(noReadQuorumPools) > 0 && !opts.Versioned && !opts.VersionSuspended { return z.deleteObjectFromAllPools(ctx, bucket, object, opts, noReadQuorumPools) @@ -2797,5 +2807,14 @@ func (z *erasureServerPools) DecomTieredObject(ctx context.Context, bucket, obje return err } + if opts.DataMovement && idx == opts.SrcPoolIdx { + return DataMovementOverwriteErr{ + Bucket: bucket, + Object: object, + VersionID: opts.VersionID, + Err: errDataMovementSrcDstPoolSame, + } + } + return z.serverPools[idx].DecomTieredObject(ctx, bucket, object, fi, opts) }