fix: multiple fixes in prefix exclude implementation (#14877)

- do not need to restrict prefix exclusions that do not
  have `/` as suffix, relax this requirement as spark may
  have staging folders with other autogenerated characters
  , so we are better off doing full prefix March and skip. 

- multiple delete objects was incorrectly creating a
  null delete marker on a versioned bucket instead of
  creating a proper versioned delete marker.

- do not suspend paths on the excluded prefixes during
  delete operations to avoid creating `null` delete markers,
  honor suspension of versioning only at bucket level for
  delete markers.
This commit is contained in:
Harshavardhana
2022-05-07 22:06:44 -07:00
committed by GitHub
parent def75ffcfe
commit 5cffd3780a
9 changed files with 25 additions and 48 deletions

View File

@@ -512,7 +512,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
opts := ObjectOptions{
VersionID: object.VersionID,
Versioned: vc.PrefixEnabled(object.ObjectName),
VersionSuspended: vc.PrefixSuspended(object.ObjectName),
VersionSuspended: vc.Suspended(),
}
if replicateDeletes || object.VersionID != "" && hasLockEnabled || !globalTierConfigMgr.Empty() {
@@ -578,8 +578,8 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
deleteList := toNames(objectsToDelete)
dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{
PrefixEnabledFn: vc.PrefixEnabled,
PrefixSuspendedFn: vc.PrefixSuspended,
PrefixEnabledFn: vc.PrefixEnabled,
VersionSuspended: vc.Suspended(),
})
for i := range errs {
@@ -635,22 +635,12 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
continue
}
if replicateDeletes {
if dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending {
dv := DeletedObjectReplicationInfo{
DeletedObject: dobj,
Bucket: bucket,
}
scheduleReplicationDelete(ctx, dv, objectAPI)
if replicateDeletes && (dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending) {
dv := DeletedObjectReplicationInfo{
DeletedObject: dobj,
Bucket: bucket,
}
}
}
// Notify deleted event for objects.
for _, dobj := range deletedObjects {
if dobj.ObjectName == "" {
continue
scheduleReplicationDelete(ctx, dv, objectAPI)
}
eventName := event.ObjectRemovedDelete

View File

@@ -271,7 +271,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
}
// Skip replication if this object's prefix is excluded from being
// versioned.
if delOpts.VersionSuspended {
if !delOpts.Versioned {
return
}
opts := replication.ObjectOpts{

View File

@@ -1189,9 +1189,6 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
if objects[i].VersionID == "" {
// MinIO extension to bucket version configuration
suspended := opts.VersionSuspended
if opts.PrefixSuspendedFn != nil {
suspended = opts.PrefixSuspendedFn(objects[i].ObjectName)
}
versioned := opts.Versioned
if opts.PrefixEnabledFn != nil {
versioned = opts.PrefixEnabledFn(objects[i].ObjectName)
@@ -1204,7 +1201,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// Versioning suspended means that we add a `null` version
// delete marker, if not add a new version for this delete
// marker.
if opts.Versioned {
if versioned {
vr.VersionID = mustGetUUID()
}
}

View File

@@ -77,8 +77,7 @@ type ObjectOptions struct {
Mutate bool
WalkAscending bool // return Walk results in ascending order of versions
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
PrefixSuspendedFn func(prefix string) bool // function which returns true if versioning is suspended on prefix
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
}
// ExpirationOptions represents object options for object expiration at objectLayer.

View File

@@ -166,7 +166,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
return opts, err
}
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object)
opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object)
opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket)
delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker))
if delMarker != "" {
switch delMarker {

View File

@@ -275,8 +275,8 @@ func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toD
}
vc, _ := globalBucketVersioningSys.Get(bucket)
deletedObjs, errs := o.DeleteObjects(ctx, bucket, toDel, ObjectOptions{
PrefixEnabledFn: vc.PrefixEnabled,
PrefixSuspendedFn: vc.PrefixSuspended,
PrefixEnabledFn: vc.PrefixEnabled,
VersionSuspended: vc.Suspended(),
})
var logged bool
for i, err := range errs {