expire ILM all versions verify quorum on action (#19359)

This commit is contained in:
Harshavardhana 2024-03-27 23:44:52 -07:00 committed by GitHub
parent c61dd16a1e
commit 289223b6de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1821,10 +1821,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
auditObjectErasureSet(ctx, object, &er)
}
if opts.DeletePrefix {
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
}
var lc *lifecycle.Lifecycle
var rcfg lock.Retention
var replcfg *replication.Config
@ -1851,12 +1847,63 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
}
if opts.DeletePrefix {
if opts.Expiration.Expire {
// Expire all versions expiration must still verify the state() on disk
// via a getObjectInfo() call as follows, any read quorum issues we
// must not proceed further for safety reasons. attempt a MRF heal
// while we see such quorum errors.
goi, _, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts)
if gerr != nil && goi.Name == "" {
if _, ok := gerr.(InsufficientReadQuorum); ok {
// Add an MRF heal for next time.
er.addPartial(bucket, object, opts.VersionID)
return objInfo, InsufficientWriteQuorum{}
}
return objInfo, gerr
}
// Add protection and re-verify the ILM rules for qualification
// based on the latest objectInfo and see if the object still
// qualifies for deletion.
if gerr == nil {
evt := evalActionFromLifecycle(ctx, *lc, rcfg, replcfg, goi)
var isErr bool
switch evt.Action {
case lifecycle.NoneAction:
isErr = true
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
isErr = true
}
if isErr {
if goi.VersionID != "" {
return goi, VersionNotFound{
Bucket: bucket,
Object: object,
VersionID: goi.VersionID,
}
}
return goi, ObjectNotFound{
Bucket: bucket,
Object: object,
}
}
}
} // Delete marker and any latest that qualifies shall be expired permanently.
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
}
storageDisks := er.getDisks()
versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
goi, _, gerr := er.getObjectInfoAndQuorum(ctx, bucket, object, opts)
if gerr != nil && goi.Name == "" {
if _, ok := gerr.(InsufficientReadQuorum); ok {
// Add an MRF heal for next time.
er.addPartial(bucket, object, opts.VersionID)
return objInfo, InsufficientWriteQuorum{}
}
// For delete marker replication, versionID being replicated will not exist on disk
@ -1866,6 +1913,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
return objInfo, gerr
}
}
if opts.EvalMetadataFn != nil {
dsc, err := opts.EvalMetadataFn(&goi, err)
if err != nil {