fix: decommission delete markers for non-current objects (#15225)

versioned buckets were not creating the delete markers
present in the versioned stack of an object, this essentially
would stop decommission to succeed.

This PR fixes creating such delete markers properly during
a decommissioning process, adds tests as well.
This commit is contained in:
Harshavardhana
2022-07-05 07:37:24 -07:00
committed by GitHub
parent 39b3941892
commit 9d80ff5a05
3 changed files with 26 additions and 9 deletions

View File

@@ -119,7 +119,7 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
}
if deploymentID == "" {
// all zones should have same deployment ID
// all pools should have same deployment ID
deploymentID = formats[i].ID
}
@@ -457,7 +457,7 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
// The check is skipped if there is only one pool, and 0, nil is always returned in that case.
func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
NoLock: true,
@@ -881,7 +881,7 @@ func (z *erasureServerPools) getLatestObjectInfoWithIdx(ctx context.Context, buc
sort.Slice(results, func(i, j int) bool {
a, b := results[i], results[j]
if a.oi.ModTime.Equal(b.oi.ModTime) {
// On tiebreak, select the lowest zone index.
// On tiebreak, select the lowest pool index.
return a.zIdx < b.zIdx
}
return a.oi.ModTime.After(b.oi.ModTime)
@@ -975,12 +975,12 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
}
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for idx, zone := range z.serverPools {
for idx, pool := range z.serverPools {
if z.IsSuspended(idx) {
logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1))
continue
}
_, err := zone.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
_, err := pool.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err
}