mirror of https://github.com/minio/minio.git
feat: add idempotent delete marker support (#15521)
The bottom line is delete markers are a nuisance, most applications are not version aware and this has simply complicated the version management. AWS S3 gave an unnecessary complication overhead for customers, they need to now manage these markers by applying ILM settings and clean them up on a regular basis. To make matters worse all these delete markers get replicated as well in a replicated setup, requiring two ILM settings on each site. This PR is an attempt to address this inferior implementation by deviating MinIO towards an idempotent delete marker implementation i.e MinIO will never create any more than single consecutive delete markers. This significantly reduces operational overhead by making versioning more useful for real data. This is an S3 spec deviation for pragmatic reasons.
This commit is contained in:
parent
21831b3fe2
commit
d350b666ff
|
@ -20,7 +20,7 @@ jobs:
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
go-version: [1.17.11b7, 1.18.3b7]
|
go-version: [1.18.5b7]
|
||||||
os: [ubuntu-latest]
|
os: [ubuntu-latest]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
|
|
@ -448,15 +448,26 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
|
||||||
}
|
}
|
||||||
defer NSUpdated(bucket, object)
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
if opts.VersionID != "" {
|
fi := FileInfo{
|
||||||
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
|
VersionID: m.VersionID,
|
||||||
VersionID: opts.VersionID,
|
|
||||||
}, false)
|
|
||||||
} else {
|
|
||||||
er.deleteObjectVersion(ctx, bucket, object, 1, FileInfo{
|
|
||||||
VersionID: m.VersionID,
|
|
||||||
}, false)
|
|
||||||
}
|
}
|
||||||
|
if opts.VersionID != "" {
|
||||||
|
fi.VersionID = opts.VersionID
|
||||||
|
}
|
||||||
|
|
||||||
|
disks := er.getDisks()
|
||||||
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
for index := range disks {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if disks[index] == nil {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
return disks[index].DeleteVersion(ctx, bucket, object, fi, false)
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Wait()
|
||||||
}
|
}
|
||||||
return m, err
|
return m, err
|
||||||
}
|
}
|
||||||
|
@ -1172,8 +1183,16 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
|
func (er erasureObjects) deleteObjectVersion(ctx context.Context, bucket, object string, fi FileInfo, forceDelMarker bool) error {
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
|
// Assume (N/2 + 1) quorum for Delete()
|
||||||
|
// this is a theoretical assumption such that
|
||||||
|
// for delete's we do not need to honor storage
|
||||||
|
// class for objects that have reduced quorum
|
||||||
|
// due to storage class - this only needs to be honored
|
||||||
|
// for Read() requests alone that we already do.
|
||||||
|
writeQuorum := len(disks)/2 + 1
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
for index := range disks {
|
for index := range disks {
|
||||||
index := index
|
index := index
|
||||||
|
@ -1423,69 +1442,49 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire a write lock before deleting the object.
|
|
||||||
lk := er.NewNSLock(bucket, object)
|
|
||||||
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return ObjectInfo{}, err
|
|
||||||
}
|
|
||||||
ctx = lkctx.Context()
|
|
||||||
defer lk.Unlock(lkctx.Cancel)
|
|
||||||
|
|
||||||
versionFound := true
|
|
||||||
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
|
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
|
||||||
goi, writeQuorum, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts)
|
|
||||||
if gerr != nil && goi.Name == "" {
|
|
||||||
switch gerr.(type) {
|
|
||||||
case InsufficientReadQuorum:
|
|
||||||
return objInfo, InsufficientWriteQuorum{}
|
|
||||||
}
|
|
||||||
// For delete marker replication, versionID being replicated will not exist on disk
|
|
||||||
if opts.DeleteMarker {
|
|
||||||
versionFound = false
|
|
||||||
} else {
|
|
||||||
return objInfo, gerr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if opts.Expiration.Expire {
|
|
||||||
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
|
|
||||||
var isErr bool
|
|
||||||
switch action {
|
|
||||||
case lifecycle.NoneAction:
|
|
||||||
isErr = true
|
|
||||||
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
||||||
isErr = true
|
|
||||||
}
|
|
||||||
if isErr {
|
|
||||||
if goi.VersionID != "" {
|
|
||||||
return goi, VersionNotFound{
|
|
||||||
Bucket: bucket,
|
|
||||||
Object: object,
|
|
||||||
VersionID: goi.VersionID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return goi, ObjectNotFound{
|
|
||||||
Bucket: bucket,
|
|
||||||
Object: object,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
defer NSUpdated(bucket, object)
|
defer NSUpdated(bucket, object)
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
|
|
||||||
var markDelete bool
|
if opts.Expiration.Expire {
|
||||||
// Determine whether to mark object deleted for replication
|
goi, _, err := er.getObjectInfoAndQuorum(ctx, bucket, object, opts)
|
||||||
if goi.VersionID != "" {
|
if err == nil {
|
||||||
markDelete = true
|
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
|
||||||
|
var isErr bool
|
||||||
|
switch action {
|
||||||
|
case lifecycle.NoneAction:
|
||||||
|
isErr = true
|
||||||
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
||||||
|
isErr = true
|
||||||
|
}
|
||||||
|
if isErr {
|
||||||
|
if goi.VersionID != "" {
|
||||||
|
return goi, VersionNotFound{
|
||||||
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
|
VersionID: goi.VersionID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return goi, ObjectNotFound{
|
||||||
|
Bucket: bucket,
|
||||||
|
Object: object,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default deleteMarker to true if object is under versioning
|
versionFound := !(opts.DeleteMarker && opts.VersionID != "")
|
||||||
deleteMarker := opts.Versioned
|
|
||||||
|
|
||||||
if opts.VersionID != "" {
|
// Determine whether to mark object deleted for replication
|
||||||
|
markDelete := !opts.DeleteMarker && opts.VersionID != ""
|
||||||
|
|
||||||
|
// Default deleteMarker to true if object is under versioning
|
||||||
|
// versioning suspended means we add `null` version as
|
||||||
|
// delete marker, if its not decided already.
|
||||||
|
deleteMarker := (opts.Versioned || opts.VersionSuspended) && opts.VersionID == "" || (opts.DeleteMarker && opts.VersionID != "")
|
||||||
|
|
||||||
|
if markDelete {
|
||||||
// case where replica version needs to be deleted on target cluster
|
// case where replica version needs to be deleted on target cluster
|
||||||
if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica {
|
if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica {
|
||||||
markDelete = false
|
markDelete = false
|
||||||
|
@ -1496,22 +1495,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||||
if opts.VersionPurgeStatus() == Complete {
|
if opts.VersionPurgeStatus() == Complete {
|
||||||
markDelete = false
|
markDelete = false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Version is found but we do not wish to create more delete markers
|
|
||||||
// now, since VersionPurgeStatus() is already set, we can let the
|
|
||||||
// lower layers decide this. This fixes a regression that was introduced
|
|
||||||
// in PR #14555 where !VersionPurgeStatus.Empty() is automatically
|
|
||||||
// considered as Delete marker true to avoid listing such objects by
|
|
||||||
// regular ListObjects() calls. However for delete replication this
|
|
||||||
// ends up being a problem because "upon" a successful delete this
|
|
||||||
// ends up creating a new delete marker that is spurious and unnecessary.
|
|
||||||
if versionFound {
|
|
||||||
if !goi.VersionPurgeStatus.Empty() {
|
|
||||||
deleteMarker = false
|
|
||||||
} else if !goi.DeleteMarker { // implies a versioned delete of object
|
|
||||||
deleteMarker = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
modTime := opts.MTime
|
modTime := opts.MTime
|
||||||
|
@ -1519,38 +1502,31 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||||
modTime = UTCNow()
|
modTime = UTCNow()
|
||||||
}
|
}
|
||||||
fvID := mustGetUUID()
|
fvID := mustGetUUID()
|
||||||
if markDelete {
|
if markDelete && (opts.Versioned || opts.VersionSuspended) {
|
||||||
if opts.Versioned || opts.VersionSuspended {
|
fi := FileInfo{
|
||||||
if !deleteMarker {
|
Name: object,
|
||||||
// versioning suspended means we add `null` version as
|
Deleted: deleteMarker,
|
||||||
// delete marker, if its not decided already.
|
MarkDeleted: markDelete,
|
||||||
deleteMarker = opts.VersionSuspended && opts.VersionID == ""
|
ModTime: modTime,
|
||||||
}
|
ReplicationState: opts.DeleteReplication,
|
||||||
fi := FileInfo{
|
TransitionStatus: opts.Transition.Status,
|
||||||
Name: object,
|
ExpireRestored: opts.Transition.ExpireRestored,
|
||||||
Deleted: deleteMarker,
|
|
||||||
MarkDeleted: markDelete,
|
|
||||||
ModTime: modTime,
|
|
||||||
ReplicationState: opts.DeleteReplication,
|
|
||||||
TransitionStatus: opts.Transition.Status,
|
|
||||||
ExpireRestored: opts.Transition.ExpireRestored,
|
|
||||||
}
|
|
||||||
fi.SetTierFreeVersionID(fvID)
|
|
||||||
if opts.Versioned {
|
|
||||||
fi.VersionID = mustGetUUID()
|
|
||||||
if opts.VersionID != "" {
|
|
||||||
fi.VersionID = opts.VersionID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// versioning suspended means we add `null` version as
|
|
||||||
// delete marker. Add delete marker, since we don't have
|
|
||||||
// any version specified explicitly. Or if a particular
|
|
||||||
// version id needs to be replicated.
|
|
||||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
|
|
||||||
return objInfo, toObjectErr(err, bucket, object)
|
|
||||||
}
|
|
||||||
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
||||||
}
|
}
|
||||||
|
fi.SetTierFreeVersionID(fvID)
|
||||||
|
if opts.Versioned {
|
||||||
|
fi.VersionID = mustGetUUID()
|
||||||
|
if opts.VersionID != "" {
|
||||||
|
fi.VersionID = opts.VersionID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// versioning suspended means we add `null` version as
|
||||||
|
// delete marker. Add delete marker, since we don't have
|
||||||
|
// any version specified explicitly. Or if a particular
|
||||||
|
// version id needs to be replicated.
|
||||||
|
if err = er.deleteObjectVersion(ctx, bucket, object, fi, opts.DeleteMarker); err != nil {
|
||||||
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the object version on all disks.
|
// Delete the object version on all disks.
|
||||||
|
@ -1565,7 +1541,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||||
ExpireRestored: opts.Transition.ExpireRestored,
|
ExpireRestored: opts.Transition.ExpireRestored,
|
||||||
}
|
}
|
||||||
dfi.SetTierFreeVersionID(fvID)
|
dfi.SetTierFreeVersionID(fvID)
|
||||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil {
|
if err = er.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil {
|
||||||
return objInfo, toObjectErr(err, bucket, object)
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1577,13 +1553,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return ObjectInfo{
|
return dfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
Bucket: bucket,
|
|
||||||
Name: object,
|
|
||||||
VersionID: opts.VersionID,
|
|
||||||
VersionPurgeStatusInternal: opts.DeleteReplication.VersionPurgeStatusInternal,
|
|
||||||
ReplicationStatusInternal: opts.DeleteReplication.ReplicationStatusInternal,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the successful but partial upload/delete, however ignore
|
// Send the successful but partial upload/delete, however ignore
|
||||||
|
@ -1853,14 +1823,8 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st
|
||||||
eventName := event.ObjectTransitionComplete
|
eventName := event.ObjectTransitionComplete
|
||||||
|
|
||||||
storageDisks := er.getDisks()
|
storageDisks := er.getDisks()
|
||||||
// we now know the number of blocks this object needs for data and parity.
|
|
||||||
// writeQuorum is dataBlocks + 1
|
|
||||||
writeQuorum := fi.Erasure.DataBlocks
|
|
||||||
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
|
||||||
writeQuorum++
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
|
if err = er.deleteObjectVersion(ctx, bucket, object, fi, false); err != nil {
|
||||||
eventName = event.ObjectTransitionFailed
|
eventName = event.ObjectTransitionFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -380,19 +380,15 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
|
||||||
return serverPools
|
return serverPools
|
||||||
}
|
}
|
||||||
|
|
||||||
// poolObjInfo represents the state of an object per pool
|
// PoolObjInfo represents the state of current object version per pool
|
||||||
type poolObjInfo struct {
|
type PoolObjInfo struct {
|
||||||
PoolIndex int
|
Index int
|
||||||
ObjInfo ObjectInfo
|
ObjInfo ObjectInfo
|
||||||
Err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) {
|
func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (PoolObjInfo, error) {
|
||||||
if z.SinglePool() {
|
poolObjInfos := make([]PoolObjInfo, len(z.serverPools))
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
poolObjInfos := make([]poolObjInfo, len(z.serverPools))
|
|
||||||
poolOpts := make([]ObjectOptions, len(z.serverPools))
|
poolOpts := make([]ObjectOptions, len(z.serverPools))
|
||||||
for i := range z.serverPools {
|
for i := range z.serverPools {
|
||||||
poolOpts[i] = opts
|
poolOpts[i] = opts
|
||||||
|
@ -404,8 +400,8 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
|
||||||
go func(i int, pool *erasureSets, opts ObjectOptions) {
|
go func(i int, pool *erasureSets, opts ObjectOptions) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// remember the pool index, we may sort the slice original index might be lost.
|
// remember the pool index, we may sort the slice original index might be lost.
|
||||||
pinfo := poolObjInfo{
|
pinfo := PoolObjInfo{
|
||||||
PoolIndex: i,
|
Index: i,
|
||||||
}
|
}
|
||||||
// do not remove this check as it can lead to inconsistencies
|
// do not remove this check as it can lead to inconsistencies
|
||||||
// for all callers of bucket replication.
|
// for all callers of bucket replication.
|
||||||
|
@ -429,19 +425,19 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
|
||||||
for _, pinfo := range poolObjInfos {
|
for _, pinfo := range poolObjInfos {
|
||||||
// skip all objects from suspended pools if asked by the
|
// skip all objects from suspended pools if asked by the
|
||||||
// caller.
|
// caller.
|
||||||
if z.IsSuspended(pinfo.PoolIndex) && opts.SkipDecommissioned {
|
if z.IsSuspended(pinfo.Index) && opts.SkipDecommissioned {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
|
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
|
||||||
return -1, pinfo.Err
|
return pinfo, pinfo.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
if isErrObjectNotFound(pinfo.Err) {
|
if isErrObjectNotFound(pinfo.Err) {
|
||||||
// No object exists or its a delete marker,
|
// No object exists or its a delete marker,
|
||||||
// check objInfo to confirm.
|
// check objInfo to confirm.
|
||||||
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
|
||||||
return pinfo.PoolIndex, nil
|
return pinfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// objInfo is not valid, truly the object doesn't
|
// objInfo is not valid, truly the object doesn't
|
||||||
|
@ -449,10 +445,18 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return pinfo.PoolIndex, nil
|
return pinfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1, toObjectErr(errFileNotFound, bucket, object)
|
return PoolObjInfo{}, toObjectErr(errFileNotFound, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, bucket, object string, opts ObjectOptions) (idx int, err error) {
|
||||||
|
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, opts)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
return pinfo.Index, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPoolIdxExistingNoLock returns the (first) found object pool index containing an object.
|
// getPoolIdxExistingNoLock returns the (first) found object pool index containing an object.
|
||||||
|
@ -999,16 +1003,36 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
|
||||||
}
|
}
|
||||||
|
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
if z.SinglePool() {
|
|
||||||
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
|
// Acquire a write lock before deleting the object.
|
||||||
|
lk := z.NewNSLock(bucket, object)
|
||||||
|
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
return ObjectInfo{}, err
|
||||||
|
}
|
||||||
|
ctx = lkctx.Context()
|
||||||
|
defer lk.Unlock(lkctx.Cancel)
|
||||||
|
|
||||||
|
gopts := opts
|
||||||
|
gopts.NoLock = true
|
||||||
|
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, object, gopts)
|
||||||
|
if err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case InsufficientReadQuorum:
|
||||||
|
return objInfo, InsufficientWriteQuorum{}
|
||||||
|
}
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return z.serverPools[idx].DeleteObject(ctx, bucket, object, opts)
|
// Delete marker already present we are not going to create new delete markers.
|
||||||
|
if pinfo.ObjInfo.DeleteMarker && opts.VersionID == "" {
|
||||||
|
pinfo.ObjInfo.Name = decodeDirObject(object)
|
||||||
|
return pinfo.ObjInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo, err = z.serverPools[pinfo.Index].DeleteObject(ctx, bucket, object, opts)
|
||||||
|
objInfo.Name = decodeDirObject(object)
|
||||||
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
||||||
|
@ -1034,14 +1058,6 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
|
||||||
ctx = lkctx.Context()
|
ctx = lkctx.Context()
|
||||||
defer multiDeleteLock.Unlock(lkctx.Cancel)
|
defer multiDeleteLock.Unlock(lkctx.Cancel)
|
||||||
|
|
||||||
if z.SinglePool() {
|
|
||||||
deleteObjects, dErrs := z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts)
|
|
||||||
for i := range deleteObjects {
|
|
||||||
deleteObjects[i].ObjectName = decodeDirObject(deleteObjects[i].ObjectName)
|
|
||||||
}
|
|
||||||
return deleteObjects, dErrs
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch location of up to 10 objects concurrently.
|
// Fetch location of up to 10 objects concurrently.
|
||||||
poolObjIdxMap := map[int][]ObjectToDelete{}
|
poolObjIdxMap := map[int][]ObjectToDelete{}
|
||||||
origIndexMap := map[int][]int{}
|
origIndexMap := map[int][]int{}
|
||||||
|
@ -1060,46 +1076,66 @@ func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, o
|
||||||
j := j
|
j := j
|
||||||
obj := obj
|
obj := obj
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, obj.ObjectName, ObjectOptions{
|
pinfo, err := z.getPoolInfoExistingWithOpts(ctx, bucket, obj.ObjectName, ObjectOptions{
|
||||||
NoLock: true,
|
NoLock: true,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
derrs[j] = err
|
derrs[j] = err
|
||||||
|
dobjects[j] = DeletedObject{
|
||||||
|
ObjectName: obj.ObjectName,
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete marker already present we are not going to create new delete markers.
|
||||||
|
if pinfo.ObjInfo.DeleteMarker && obj.VersionID == "" {
|
||||||
|
dobjects[j] = DeletedObject{
|
||||||
|
DeleteMarker: pinfo.ObjInfo.DeleteMarker,
|
||||||
|
DeleteMarkerVersionID: pinfo.ObjInfo.VersionID,
|
||||||
|
DeleteMarkerMTime: DeleteMarkerMTime{pinfo.ObjInfo.ModTime},
|
||||||
|
ObjectName: pinfo.ObjInfo.Name,
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
idx := pinfo.Index
|
||||||
|
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
|
||||||
poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj)
|
poolObjIdxMap[idx] = append(poolObjIdxMap[idx], obj)
|
||||||
origIndexMap[idx] = append(origIndexMap[idx], j)
|
origIndexMap[idx] = append(origIndexMap[idx], j)
|
||||||
mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}, j)
|
}, j)
|
||||||
}
|
}
|
||||||
|
|
||||||
eg.Wait() // wait to check all the pools.
|
eg.Wait() // wait to check all the pools.
|
||||||
|
|
||||||
// Delete concurrently in all server pools.
|
if len(poolObjIdxMap) > 0 {
|
||||||
var wg sync.WaitGroup
|
// Delete concurrently in all server pools.
|
||||||
wg.Add(len(z.serverPools))
|
var wg sync.WaitGroup
|
||||||
for idx, pool := range z.serverPools {
|
wg.Add(len(z.serverPools))
|
||||||
go func(idx int, pool *erasureSets) {
|
for idx, pool := range z.serverPools {
|
||||||
defer wg.Done()
|
go func(idx int, pool *erasureSets) {
|
||||||
objs := poolObjIdxMap[idx]
|
defer wg.Done()
|
||||||
if len(objs) > 0 {
|
objs := poolObjIdxMap[idx]
|
||||||
orgIndexes := origIndexMap[idx]
|
if len(objs) > 0 {
|
||||||
deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts)
|
orgIndexes := origIndexMap[idx]
|
||||||
mu.Lock()
|
deletedObjects, errs := pool.DeleteObjects(ctx, bucket, objs, opts)
|
||||||
for i, derr := range errs {
|
mu.Lock()
|
||||||
if derr != nil {
|
for i, derr := range errs {
|
||||||
derrs[orgIndexes[i]] = derr
|
if derr != nil {
|
||||||
|
derrs[orgIndexes[i]] = derr
|
||||||
|
}
|
||||||
|
deletedObjects[i].ObjectName = decodeDirObject(deletedObjects[i].ObjectName)
|
||||||
|
dobjects[orgIndexes[i]] = deletedObjects[i]
|
||||||
}
|
}
|
||||||
deletedObjects[i].ObjectName = decodeDirObject(deletedObjects[i].ObjectName)
|
mu.Unlock()
|
||||||
dobjects[orgIndexes[i]] = deletedObjects[i]
|
|
||||||
}
|
}
|
||||||
mu.Unlock()
|
}(idx, pool)
|
||||||
}
|
}
|
||||||
}(idx, pool)
|
wg.Wait()
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
return dobjects, derrs
|
return dobjects, derrs
|
||||||
}
|
}
|
||||||
|
|
|
@ -1223,7 +1223,7 @@ func (es *erasureSingle) putObject(ctx context.Context, bucket string, object st
|
||||||
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
|
func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, fi FileInfo, forceDelMarker bool) error {
|
||||||
return es.disk.DeleteVersion(ctx, bucket, object, fi, forceDelMarker)
|
return es.disk.DeleteVersion(ctx, bucket, object, fi, forceDelMarker)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1447,7 +1447,7 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string
|
||||||
|
|
||||||
versionFound := true
|
versionFound := true
|
||||||
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
|
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
|
||||||
goi, writeQuorum, gerr := es.getObjectInfoAndQuorum(ctx, bucket, object, opts)
|
goi, _, gerr := es.getObjectInfoAndQuorum(ctx, bucket, object, opts)
|
||||||
if gerr != nil && goi.Name == "" {
|
if gerr != nil && goi.Name == "" {
|
||||||
switch gerr.(type) {
|
switch gerr.(type) {
|
||||||
case InsufficientReadQuorum:
|
case InsufficientReadQuorum:
|
||||||
|
@ -1461,6 +1461,11 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do not create new delete markers.
|
||||||
|
if goi.DeleteMarker && opts.VersionID == "" {
|
||||||
|
return goi, nil
|
||||||
|
}
|
||||||
|
|
||||||
if opts.Expiration.Expire {
|
if opts.Expiration.Expire {
|
||||||
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
|
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
|
||||||
var isErr bool
|
var isErr bool
|
||||||
|
@ -1557,7 +1562,7 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string
|
||||||
// delete marker. Add delete marker, since we don't have
|
// delete marker. Add delete marker, since we don't have
|
||||||
// any version specified explicitly. Or if a particular
|
// any version specified explicitly. Or if a particular
|
||||||
// version id needs to be replicated.
|
// version id needs to be replicated.
|
||||||
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
|
if err = es.deleteObjectVersion(ctx, bucket, object, fi, opts.DeleteMarker); err != nil {
|
||||||
return objInfo, toObjectErr(err, bucket, object)
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
||||||
|
@ -1576,7 +1581,7 @@ func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string
|
||||||
ExpireRestored: opts.Transition.ExpireRestored,
|
ExpireRestored: opts.Transition.ExpireRestored,
|
||||||
}
|
}
|
||||||
dfi.SetTierFreeVersionID(fvID)
|
dfi.SetTierFreeVersionID(fvID)
|
||||||
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil {
|
if err = es.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil {
|
||||||
return objInfo, toObjectErr(err, bucket, object)
|
return objInfo, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1813,14 +1818,7 @@ func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object st
|
||||||
fi.TransitionVersionID = string(rv)
|
fi.TransitionVersionID = string(rv)
|
||||||
eventName := event.ObjectTransitionComplete
|
eventName := event.ObjectTransitionComplete
|
||||||
|
|
||||||
// we now know the number of blocks this object needs for data and parity.
|
if err = es.deleteObjectVersion(ctx, bucket, object, fi, false); err != nil {
|
||||||
// writeQuorum is dataBlocks + 1
|
|
||||||
writeQuorum := fi.Erasure.DataBlocks
|
|
||||||
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
|
||||||
writeQuorum++
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
|
|
||||||
eventName = event.ObjectTransitionFailed
|
eventName = event.ObjectTransitionFailed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -x
|
||||||
|
|
||||||
trap 'catch $LINENO' ERR
|
trap 'catch $LINENO' ERR
|
||||||
|
|
||||||
# shellcheck disable=SC2120
|
# shellcheck disable=SC2120
|
||||||
|
@ -37,8 +39,10 @@ unset MINIO_KMS_KES_KEY_FILE
|
||||||
unset MINIO_KMS_KES_ENDPOINT
|
unset MINIO_KMS_KES_ENDPOINT
|
||||||
unset MINIO_KMS_KES_KEY_NAME
|
unset MINIO_KMS_KES_KEY_NAME
|
||||||
|
|
||||||
wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \
|
if [ ! -f ./mc ]; then
|
||||||
&& chmod +x mc
|
wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && \
|
||||||
|
chmod +x mc
|
||||||
|
fi
|
||||||
|
|
||||||
minio server --address 127.0.0.1:9001 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
|
minio server --address 127.0.0.1:9001 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
|
||||||
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_1.log 2>&1 &
|
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_1.log 2>&1 &
|
||||||
|
@ -66,6 +70,9 @@ done
|
||||||
./mc mirror /tmp/data sitea/bucket/
|
./mc mirror /tmp/data sitea/bucket/
|
||||||
./mc version enable sitea/bucket
|
./mc version enable sitea/bucket
|
||||||
|
|
||||||
|
./mc cp /tmp/data/file_1.txt sitea/bucket/marker
|
||||||
|
./mc rm sitea/bucket/marker
|
||||||
|
|
||||||
./mc mb siteb/bucket/
|
./mc mb siteb/bucket/
|
||||||
./mc version enable siteb/bucket/
|
./mc version enable siteb/bucket/
|
||||||
|
|
||||||
|
@ -83,11 +90,49 @@ sleep 1
|
||||||
sleep 10s ## sleep for 10s idea is that we give 100ms per object.
|
sleep 10s ## sleep for 10s idea is that we give 100ms per object.
|
||||||
|
|
||||||
count=$(./mc replicate resync status sitea/bucket --remote-bucket "${remote_arn}" --json | jq .resyncInfo.target[].replicationCount)
|
count=$(./mc replicate resync status sitea/bucket --remote-bucket "${remote_arn}" --json | jq .resyncInfo.target[].replicationCount)
|
||||||
./mc ls --versions sitea/bucket
|
|
||||||
./mc ls --versions siteb/bucket
|
./mc ls -r --versions sitea/bucket > /tmp/sitea.txt
|
||||||
if [ $count -ne 10 ]; then
|
./mc ls -r --versions siteb/bucket > /tmp/siteb.txt
|
||||||
echo "resync not complete after 100s unexpected failure"
|
|
||||||
|
out=$(diff -qpruN /tmp/sitea.txt /tmp/siteb.txt)
|
||||||
|
ret=$?
|
||||||
|
if [ $ret -ne 0 ]; then
|
||||||
|
echo "BUG: expected no missing entries after replication: $out"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ $count -ne 12 ]; then
|
||||||
|
echo "resync not complete after 10s unexpected failure"
|
||||||
./mc diff sitea/bucket siteb/bucket
|
./mc diff sitea/bucket siteb/bucket
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
./mc cp /tmp/data/file_1.txt sitea/bucket/marker_new
|
||||||
|
./mc rm sitea/bucket/marker_new
|
||||||
|
|
||||||
|
sleep 12s ## sleep for 12s idea is that we give 100ms per object.
|
||||||
|
|
||||||
|
./mc ls -r --versions sitea/bucket > /tmp/sitea.txt
|
||||||
|
./mc ls -r --versions siteb/bucket > /tmp/siteb.txt
|
||||||
|
|
||||||
|
out=$(diff -qpruN /tmp/sitea.txt /tmp/siteb.txt)
|
||||||
|
ret=$?
|
||||||
|
if [ $ret -ne 0 ]; then
|
||||||
|
echo "BUG: expected no 'diff' after replication: $out"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
./mc rm -r --force --versions sitea/bucket/marker
|
||||||
|
sleep 14s ## sleep for 14s idea is that we give 100ms per object.
|
||||||
|
|
||||||
|
./mc ls -r --versions sitea/bucket > /tmp/sitea.txt
|
||||||
|
./mc ls -r --versions siteb/bucket > /tmp/siteb.txt
|
||||||
|
|
||||||
|
out=$(diff -qpruN /tmp/sitea.txt /tmp/siteb.txt)
|
||||||
|
ret=$?
|
||||||
|
if [ $ret -ne 0 ]; then
|
||||||
|
echo "BUG: expected no 'diff' after replication: $out"
|
||||||
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
catch
|
catch
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
if [ -n "$TEST_DEBUG" ]; then
|
||||||
|
set -x
|
||||||
|
fi
|
||||||
|
|
||||||
trap 'catch $LINENO' ERR
|
trap 'catch $LINENO' ERR
|
||||||
|
|
||||||
# shellcheck disable=SC2120
|
# shellcheck disable=SC2120
|
||||||
|
|
Loading…
Reference in New Issue