avoid source index to be same as destination index (#20238)

during rebalance stop, it can possibly happen that
Put() would race by overwriting the same object again.

This may very well if done "successfully" it can
potentially proceed to delete the object from the pool,
causing data loss.

This PR enhances #20233 to handle more scenarios such
as these.
This commit is contained in:
Harshavardhana 2024-08-09 19:30:44 -07:00 committed by GitHub
parent 4e67a4027e
commit 909b169593
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 81 additions and 28 deletions

View File

@ -608,7 +608,7 @@ func (z *erasureServerPools) IsDecommissionRunning() bool {
return false return false
} }
func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { func (z *erasureServerPools) decommissionObject(ctx context.Context, idx int, bucket string, gr *GetObjectReader) (err error) {
objInfo := gr.ObjInfo objInfo := gr.ObjInfo
defer func() { defer func() {
@ -623,9 +623,11 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
if objInfo.isMultipart() { if objInfo.isMultipart() {
res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{ res, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
VersionID: objInfo.VersionID, VersionID: objInfo.VersionID,
UserDefined: objInfo.UserDefined, UserDefined: objInfo.UserDefined,
NoAuditLog: true, NoAuditLog: true,
SrcPoolIdx: idx,
DataMovement: true,
}) })
if err != nil { if err != nil {
return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err) return fmt.Errorf("decommissionObject: NewMultipartUpload() %w", err)
@ -660,6 +662,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
} }
} }
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{ _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
SrcPoolIdx: idx,
DataMovement: true, DataMovement: true,
MTime: objInfo.ModTime, MTime: objInfo.ModTime,
NoAuditLog: true, NoAuditLog: true,
@ -681,6 +684,7 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
NewPutObjReader(hr), NewPutObjReader(hr),
ObjectOptions{ ObjectOptions{
DataMovement: true, DataMovement: true,
SrcPoolIdx: idx,
VersionID: objInfo.VersionID, VersionID: objInfo.VersionID,
MTime: objInfo.ModTime, MTime: objInfo.ModTime,
UserDefined: objInfo.UserDefined, UserDefined: objInfo.UserDefined,
@ -855,6 +859,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
versionID = nullVersionID versionID = nullVersionID
} }
var failure, ignore bool
if version.Deleted { if version.Deleted {
_, err := z.DeleteObject(ctx, _, err := z.DeleteObject(ctx,
bi.Name, bi.Name,
@ -867,14 +872,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
VersionID: versionID, VersionID: versionID,
MTime: version.ModTime, MTime: version.ModTime,
DeleteReplication: version.ReplicationState, DeleteReplication: version.ReplicationState,
SrcPoolIdx: idx,
DataMovement: true,
DeleteMarker: true, // make sure we create a delete marker DeleteMarker: true, // make sure we create a delete marker
SkipDecommissioned: true, // make sure we skip the decommissioned pool SkipDecommissioned: true, // make sure we skip the decommissioned pool
NoAuditLog: true, NoAuditLog: true,
}) })
var failure bool
if err != nil { if err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) { // This can happen when rebalance stop races with ongoing rebalance workers.
err = nil // These rebalance failures can be ignored.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
ignore = true
stopFn(0, nil)
continue
} }
} }
stopFn(version.Size, err) stopFn(version.Size, err)
@ -893,22 +903,26 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
continue continue
} }
var failure, ignore bool
// gr.Close() is ensured by decommissionObject(). // gr.Close() is ensured by decommissionObject().
for try := 0; try < 3; try++ { for try := 0; try < 3; try++ {
if version.IsRemote() { if version.IsRemote() {
if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{ if err := z.DecomTieredObject(ctx, bi.Name, version.Name, version, ObjectOptions{
VersionID: versionID, VersionID: versionID,
MTime: version.ModTime, MTime: version.ModTime,
UserDefined: version.Metadata, UserDefined: version.Metadata,
SrcPoolIdx: idx,
DataMovement: true,
}); err != nil { }); err != nil {
stopFn(version.Size, err) if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
failure = true ignore = true
decomLogIf(ctx, err) stopFn(0, nil)
continue }
}
if !ignore {
stopFn(version.Size, err)
failure = err != nil
decomLogIf(ctx, err)
} }
stopFn(version.Size, nil)
failure = false
break break
} }
gr, err := set.GetObjectNInfo(ctx, gr, err := set.GetObjectNInfo(ctx,
@ -925,7 +939,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
if isErrObjectNotFound(err) || isErrVersionNotFound(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
// object deleted by the application, nothing to do here we move on. // object deleted by the application, nothing to do here we move on.
ignore = true ignore = true
stopFn(version.Size, nil) stopFn(0, nil)
break break
} }
if err != nil && !ignore { if err != nil && !ignore {
@ -943,7 +957,12 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
stopFn(version.Size, err) stopFn(version.Size, err)
continue continue
} }
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil { if err = z.decommissionObject(ctx, idx, bi.Name, gr); err != nil {
if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
ignore = true
stopFn(0, nil)
break
}
stopFn(version.Size, err) stopFn(version.Size, err)
failure = true failure = true
decomLogIf(ctx, err) decomLogIf(ctx, err)

View File

@ -626,14 +626,18 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
var rebalanced, expired int var rebalanced, expired int
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
// Skip transitioned objects for now. TBD // Skip transitioned objects for now. TBD
if version.IsRemote() { if version.IsRemote() {
stopFn(version.Size, errors.New("ILM Tiered version will be skipped for now"))
continue continue
} }
// Apply lifecycle rules on the objects that are expired. // Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bucket, version.Name, version) { if filterLifecycle(bucket, version.Name, version) {
expired++ expired++
stopFn(version.Size, errors.New("ILM expired object/version will be skipped"))
continue continue
} }
@ -643,6 +647,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
remainingVersions := len(fivs.Versions) - expired remainingVersions := len(fivs.Versions) - expired
if version.Deleted && remainingVersions == 1 { if version.Deleted && remainingVersions == 1 {
rebalanced++ rebalanced++
stopFn(version.Size, errors.New("DELETE marked object with no other non-current versions will be skipped"))
continue continue
} }
@ -651,6 +656,7 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
versionID = nullVersionID versionID = nullVersionID
} }
var failure, ignore bool
if version.Deleted { if version.Deleted {
_, err := z.DeleteObject(ctx, _, err := z.DeleteObject(ctx,
bucket, bucket,
@ -660,16 +666,26 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
VersionID: versionID, VersionID: versionID,
MTime: version.ModTime, MTime: version.ModTime,
DeleteReplication: version.ReplicationState, DeleteReplication: version.ReplicationState,
SrcPoolIdx: poolIdx,
DataMovement: true,
DeleteMarker: true, // make sure we create a delete marker DeleteMarker: true, // make sure we create a delete marker
SkipRebalancing: true, // make sure we skip the decommissioned pool SkipRebalancing: true, // make sure we skip the decommissioned pool
NoAuditLog: true, NoAuditLog: true,
}) })
var failure bool // This can happen when rebalance stop races with ongoing rebalance workers.
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // These rebalance failures can be ignored.
rebalanceLogIf(ctx, err) if err != nil {
failure = true // This can happen when rebalance stop races with ongoing rebalance workers.
// These rebalance failures can be ignored.
if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
ignore = true
stopFn(0, nil)
continue
}
} }
stopFn(version.Size, err)
rebalanceLogIf(ctx, err)
failure = err != nil
if !failure { if !failure {
z.updatePoolStats(poolIdx, bucket, version) z.updatePoolStats(poolIdx, bucket, version)
rebalanced++ rebalanced++
@ -678,10 +694,8 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
continue continue
} }
var failure, ignore bool
for try := 0; try < 3; try++ { for try := 0; try < 3; try++ {
// GetObjectReader.Close is called by rebalanceObject // GetObjectReader.Close is called by rebalanceObject
stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name, version.VersionID)
gr, err := set.GetObjectNInfo(ctx, gr, err := set.GetObjectNInfo(ctx,
bucket, bucket,
encodeDirObject(version.Name), encodeDirObject(version.Name),
@ -709,9 +723,10 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil { if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil {
// This can happen when rebalance stop races with ongoing rebalance workers. // This can happen when rebalance stop races with ongoing rebalance workers.
// These rebalance failures can be ignored. // These rebalance failures can be ignored.
if isDataMovementOverWriteErr(err) { if isErrObjectNotFound(err) || isErrVersionNotFound(err) || isDataMovementOverWriteErr(err) {
ignore = true ignore = true
continue stopFn(0, nil)
break
} }
failure = true failure = true
rebalanceLogIf(ctx, err) rebalanceLogIf(ctx, err)

View File

@ -1147,6 +1147,16 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
return pinfo.ObjInfo, nil return pinfo.ObjInfo, nil
} }
// Datamovement must never be allowed on the same pool.
if opts.DataMovement && opts.SrcPoolIdx == pinfo.Index {
return pinfo.ObjInfo, DataMovementOverwriteErr{
Bucket: bucket,
Object: object,
VersionID: opts.VersionID,
Err: errDataMovementSrcDstPoolSame,
}
}
// Delete concurrently in all server pools with read quorum error for unversioned objects. // Delete concurrently in all server pools with read quorum error for unversioned objects.
if len(noReadQuorumPools) > 0 && !opts.Versioned && !opts.VersionSuspended { if len(noReadQuorumPools) > 0 && !opts.Versioned && !opts.VersionSuspended {
return z.deleteObjectFromAllPools(ctx, bucket, object, opts, noReadQuorumPools) return z.deleteObjectFromAllPools(ctx, bucket, object, opts, noReadQuorumPools)
@ -2797,5 +2807,14 @@ func (z *erasureServerPools) DecomTieredObject(ctx context.Context, bucket, obje
return err return err
} }
if opts.DataMovement && idx == opts.SrcPoolIdx {
return DataMovementOverwriteErr{
Bucket: bucket,
Object: object,
VersionID: opts.VersionID,
Err: errDataMovementSrcDstPoolSame,
}
}
return z.serverPools[idx].DecomTieredObject(ctx, bucket, object, fi, opts) return z.serverPools[idx].DecomTieredObject(ctx, bucket, object, fi, opts)
} }