diff --git a/.typos.toml b/.typos.toml index a5b06a509..3168e9dd9 100644 --- a/.typos.toml +++ b/.typos.toml @@ -14,6 +14,7 @@ extend-ignore-re = [ 'http\.Header\{"X-Amz-Server-Side-Encryptio":', "ZoEoZdLlzVbOlT9rbhD7ZN7TLyiYXSAlB79uGEge", "ERRO:", + "(?Rm)^.*(#|//)\\s*spellchecker:disable-line$", # ignore line ] [default.extend-words] diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 6463c4903..b67b7c36a 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -559,7 +559,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, }, goi, opts, gerr) if dsc.ReplicateAny() { if object.VersionID != "" { - object.VersionPurgeStatus = Pending + object.VersionPurgeStatus = replication.VersionPurgePending object.VersionPurgeStatuses = dsc.PendingStatus() } else { object.DeleteMarkerReplicationStatus = dsc.PendingStatus() @@ -669,7 +669,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, continue } - if replicateDeletes && (dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == Pending) { + if replicateDeletes && (dobj.DeleteMarkerReplicationStatus() == replication.Pending || dobj.VersionPurgeStatus() == replication.VersionPurgePending) { // copy so we can re-add null ID. dobj := dobj if isDirObject(dobj.ObjectName) && dobj.VersionID == "" { diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 5a787f409..f57c2226e 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -73,6 +73,10 @@ func NewLifecycleSys() *LifecycleSys { func ilmTrace(startTime time.Time, duration time.Duration, oi ObjectInfo, event string, metadata map[string]string, err string) madmin.TraceInfo { sz, _ := oi.GetActualSize() + if metadata == nil { + metadata = make(map[string]string) + } + metadata["version-id"] = oi.VersionID return madmin.TraceInfo{ TraceType: madmin.TraceILM, Time: startTime, @@ -151,7 +155,7 @@ func (f freeVersionTask) OpHash() uint64 { return xxh3.HashString(f.TransitionedObject.Tier + f.TransitionedObject.Name) } -func (n newerNoncurrentTask) OpHash() uint64 { +func (n noncurrentVersionsTask) OpHash() uint64 { return xxh3.HashString(n.bucket + n.versions[0].ObjectName) } @@ -236,14 +240,16 @@ func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event, src l } } -// enqueueByNewerNoncurrent enqueues object versions expired by -// NewerNoncurrentVersions limit for expiry. -func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete, lcEvent lifecycle.Event) { +func (es *expiryState) enqueueNoncurrentVersions(bucket string, versions []ObjectToDelete, events []lifecycle.Event) { if len(versions) == 0 { return } - task := newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent} + task := noncurrentVersionsTask{ + bucket: bucket, + versions: versions, + events: events, + } wrkr := es.getWorkerCh(task.OpHash()) if wrkr == nil { es.stats.missedExpiryTasks.Add(1) @@ -343,8 +349,8 @@ func (es *expiryState) Worker(input <-chan expiryOp) { } else { applyExpiryOnNonTransitionedObjects(es.ctx, es.objAPI, v.objInfo, v.event, v.src) } - case newerNoncurrentTask: - deleteObjectVersions(es.ctx, es.objAPI, v.bucket, v.versions, v.event) + case noncurrentVersionsTask: + deleteObjectVersions(es.ctx, es.objAPI, v.bucket, v.versions, v.events) case jentry: transitionLogIf(es.ctx, deleteObjectFromRemoteTier(es.ctx, v.ObjName, v.VersionID, v.TierName)) case freeVersionTask: @@ -392,12 +398,10 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { globalExpiryState = newExpiryState(ctx, objectAPI, globalILMConfig.getExpirationWorkers()) } -// newerNoncurrentTask encapsulates arguments required by worker to expire objects -// by NewerNoncurrentVersions -type newerNoncurrentTask struct { +type noncurrentVersionsTask struct { bucket string versions []ObjectToDelete - event lifecycle.Event + events []lifecycle.Event } type transitionTask struct { @@ -1104,17 +1108,20 @@ func isRestoredObjectOnDisk(meta map[string]string) (onDisk bool) { // ToLifecycleOpts returns lifecycle.ObjectOpts value for oi. func (oi ObjectInfo) ToLifecycleOpts() lifecycle.ObjectOpts { return lifecycle.ObjectOpts{ - Name: oi.Name, - UserTags: oi.UserTags, - VersionID: oi.VersionID, - ModTime: oi.ModTime, - Size: oi.Size, - IsLatest: oi.IsLatest, - NumVersions: oi.NumVersions, - DeleteMarker: oi.DeleteMarker, - SuccessorModTime: oi.SuccessorModTime, - RestoreOngoing: oi.RestoreOngoing, - RestoreExpires: oi.RestoreExpires, - TransitionStatus: oi.TransitionedObject.Status, + Name: oi.Name, + UserTags: oi.UserTags, + VersionID: oi.VersionID, + ModTime: oi.ModTime, + Size: oi.Size, + IsLatest: oi.IsLatest, + NumVersions: oi.NumVersions, + DeleteMarker: oi.DeleteMarker, + SuccessorModTime: oi.SuccessorModTime, + RestoreOngoing: oi.RestoreOngoing, + RestoreExpires: oi.RestoreExpires, + TransitionStatus: oi.TransitionedObject.Status, + UserDefined: oi.UserDefined, + VersionPurgeStatus: oi.VersionPurgeStatus, + ReplicationStatus: oi.ReplicationStatus, } } diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index e4de5742c..28bb7def1 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -125,16 +125,16 @@ func (ri replicatedInfos) VersionPurgeStatus() VersionPurgeStatusType { completed := 0 for _, v := range ri.Targets { switch v.VersionPurgeStatus { - case Failed: - return Failed - case Complete: + case replication.VersionPurgeFailed: + return replication.VersionPurgeFailed + case replication.VersionPurgeComplete: completed++ } } if completed == len(ri.Targets) { - return Complete + return replication.VersionPurgeComplete } - return Pending + return replication.VersionPurgePending } func (ri replicatedInfos) VersionPurgeStatusInternal() string { @@ -380,7 +380,7 @@ func (rs *ReplicationState) CompositeReplicationStatus() (st replication.StatusT // CompositeVersionPurgeStatus returns overall replication purge status for the permanent delete being replicated. func (rs *ReplicationState) CompositeVersionPurgeStatus() VersionPurgeStatusType { switch VersionPurgeStatusType(rs.VersionPurgeStatusInternal) { - case Pending, Complete, Failed: // for backward compatibility + case replication.VersionPurgePending, replication.VersionPurgeComplete, replication.VersionPurgeFailed: // for backward compatibility return VersionPurgeStatusType(rs.VersionPurgeStatusInternal) default: return getCompositeVersionPurgeStatus(rs.PurgeTargets) @@ -478,16 +478,16 @@ func getCompositeVersionPurgeStatus(m map[string]VersionPurgeStatusType) Version completed := 0 for _, v := range m { switch v { - case Failed: - return Failed - case Complete: + case replication.VersionPurgeFailed: + return replication.VersionPurgeFailed + case replication.VersionPurgeComplete: completed++ } } if completed == len(m) { - return Complete + return replication.VersionPurgeComplete } - return Pending + return replication.VersionPurgePending } // getHealReplicateObjectInfo returns info needed by heal replication in ReplicateObjectInfo @@ -635,28 +635,7 @@ type ResyncTarget struct { } // VersionPurgeStatusType represents status of a versioned delete or permanent delete w.r.t bucket replication -type VersionPurgeStatusType string - -const ( - // Pending - versioned delete replication is pending. - Pending VersionPurgeStatusType = "PENDING" - - // Complete - versioned delete replication is now complete, erase version on disk. - Complete VersionPurgeStatusType = "COMPLETE" - - // Failed - versioned delete replication failed. - Failed VersionPurgeStatusType = "FAILED" -) - -// Empty returns true if purge status was not set. -func (v VersionPurgeStatusType) Empty() bool { - return string(v) == "" -} - -// Pending returns true if the version is pending purge. -func (v VersionPurgeStatusType) Pending() bool { - return v == Pending || v == Failed -} +type VersionPurgeStatusType = replication.VersionPurgeStatusType type replicationResyncer struct { // map of bucket to their resync status diff --git a/cmd/bucket-replication-utils_gen.go b/cmd/bucket-replication-utils_gen.go index 5ed010d9b..a93e6bb4c 100644 --- a/cmd/bucket-replication-utils_gen.go +++ b/cmd/bucket-replication-utils_gen.go @@ -915,33 +915,29 @@ func (z *ReplicationState) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "PurgeTargets") return } - { - var zb0004 string - zb0004, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "PurgeTargets", za0003) - return - } - za0004 = VersionPurgeStatusType(zb0004) + err = za0004.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "PurgeTargets", za0003) + return } z.PurgeTargets[za0003] = za0004 } case "ResetStatusesMap": - var zb0005 uint32 - zb0005, err = dc.ReadMapHeader() + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() if err != nil { err = msgp.WrapError(err, "ResetStatusesMap") return } if z.ResetStatusesMap == nil { - z.ResetStatusesMap = make(map[string]string, zb0005) + z.ResetStatusesMap = make(map[string]string, zb0004) } else if len(z.ResetStatusesMap) > 0 { for key := range z.ResetStatusesMap { delete(z.ResetStatusesMap, key) } } - for zb0005 > 0 { - zb0005-- + for zb0004 > 0 { + zb0004-- var za0005 string var za0006 string za0005, err = dc.ReadString() @@ -1078,7 +1074,7 @@ func (z *ReplicationState) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "PurgeTargets") return } - err = en.WriteString(string(za0004)) + err = za0004.EncodeMsg(en) if err != nil { err = msgp.WrapError(err, "PurgeTargets", za0003) return @@ -1154,7 +1150,11 @@ func (z *ReplicationState) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendMapHeader(o, uint32(len(z.PurgeTargets))) for za0003, za0004 := range z.PurgeTargets { o = msgp.AppendString(o, za0003) - o = msgp.AppendString(o, string(za0004)) + o, err = za0004.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "PurgeTargets", za0003) + return + } } // string "ResetStatusesMap" o = append(o, 0xb0, 0x52, 0x65, 0x73, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x4d, 0x61, 0x70) @@ -1279,35 +1279,31 @@ func (z *ReplicationState) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "PurgeTargets") return } - { - var zb0004 string - zb0004, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "PurgeTargets", za0003) - return - } - za0004 = VersionPurgeStatusType(zb0004) + bts, err = za0004.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "PurgeTargets", za0003) + return } z.PurgeTargets[za0003] = za0004 } case "ResetStatusesMap": - var zb0005 uint32 - zb0005, bts, err = msgp.ReadMapHeaderBytes(bts) + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, "ResetStatusesMap") return } if z.ResetStatusesMap == nil { - z.ResetStatusesMap = make(map[string]string, zb0005) + z.ResetStatusesMap = make(map[string]string, zb0004) } else if len(z.ResetStatusesMap) > 0 { for key := range z.ResetStatusesMap { delete(z.ResetStatusesMap, key) } } - for zb0005 > 0 { + for zb0004 > 0 { var za0005 string var za0006 string - zb0005-- + zb0004-- za0005, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "ResetStatusesMap") @@ -1345,7 +1341,7 @@ func (z *ReplicationState) Msgsize() (s int) { if z.PurgeTargets != nil { for za0003, za0004 := range z.PurgeTargets { _ = za0004 - s += msgp.StringPrefixSize + len(za0003) + msgp.StringPrefixSize + len(string(za0004)) + s += msgp.StringPrefixSize + len(za0003) + za0004.Msgsize() } } s += 17 + msgp.MapHeaderSize @@ -2507,55 +2503,3 @@ func (z *TargetReplicationResyncStatus) Msgsize() (s int) { s = 1 + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) return } - -// DecodeMsg implements msgp.Decodable -func (z *VersionPurgeStatusType) DecodeMsg(dc *msgp.Reader) (err error) { - { - var zb0001 string - zb0001, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err) - return - } - (*z) = VersionPurgeStatusType(zb0001) - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z VersionPurgeStatusType) EncodeMsg(en *msgp.Writer) (err error) { - err = en.WriteString(string(z)) - if err != nil { - err = msgp.WrapError(err) - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z VersionPurgeStatusType) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendString(o, string(z)) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *VersionPurgeStatusType) UnmarshalMsg(bts []byte) (o []byte, err error) { - { - var zb0001 string - zb0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - (*z) = VersionPurgeStatusType(zb0001) - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z VersionPurgeStatusType) Msgsize() (s int) { - s = msgp.StringPrefixSize + len(string(z)) - return -} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7189309aa..e9b4f5f8d 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -390,7 +390,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet // can be the case that other cluster is down and duplicate `mc rm --vid` // is issued - this still needs to be replicated back to the other target if !oi.VersionPurgeStatus.Empty() { - replicate = oi.VersionPurgeStatus == Pending || oi.VersionPurgeStatus == Failed + replicate = oi.VersionPurgeStatus == replication.VersionPurgePending || oi.VersionPurgeStatus == replication.VersionPurgeFailed dsc.Set(newReplicateTargetDecision(tgtArn, replicate, sync)) } continue @@ -618,7 +618,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI rinfo.ReplicationStatus = rinfo.PrevReplicationStatus return } - if dobj.VersionID != "" && rinfo.VersionPurgeStatus == Complete { + if dobj.VersionID != "" && rinfo.VersionPurgeStatus == replication.VersionPurgeComplete { return } if globalBucketTargetSys.isOffline(tgt.EndpointURL()) { @@ -638,7 +638,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI if dobj.VersionID == "" { rinfo.ReplicationStatus = replication.Failed } else { - rinfo.VersionPurgeStatus = Failed + rinfo.VersionPurgeStatus = replication.VersionPurgeFailed } return } @@ -662,7 +662,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI case isErrObjectNotFound(serr), isErrVersionNotFound(serr): // version being purged is already not found on target. if !rinfo.VersionPurgeStatus.Empty() { - rinfo.VersionPurgeStatus = Complete + rinfo.VersionPurgeStatus = replication.VersionPurgeComplete return } case isErrReadQuorum(serr), isErrWriteQuorum(serr): @@ -695,7 +695,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI if dobj.VersionID == "" { rinfo.ReplicationStatus = replication.Failed } else { - rinfo.VersionPurgeStatus = Failed + rinfo.VersionPurgeStatus = replication.VersionPurgeFailed } replLogIf(ctx, fmt.Errorf("unable to replicate delete marker to %s: %s/%s(%s): %w", tgt.EndpointURL(), tgt.Bucket, dobj.ObjectName, versionID, rmErr)) if rmErr != nil && minio.IsNetworkOrHostDown(rmErr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { @@ -705,7 +705,7 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI if dobj.VersionID == "" { rinfo.ReplicationStatus = replication.Completed } else { - rinfo.VersionPurgeStatus = Complete + rinfo.VersionPurgeStatus = replication.VersionPurgeComplete } } return @@ -3363,7 +3363,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, } for arn, st := range roi.TargetPurgeStatuses { if opts.ARN == "" || opts.ARN == arn { - if !opts.Verbose && st == Complete { + if !opts.Verbose && st == replication.VersionPurgeComplete { continue } t, ok := tgtsMap[arn] @@ -3462,7 +3462,7 @@ func queueReplicationHeal(ctx context.Context, bucket string, oi ObjectInfo, rcf // heal delete marker replication failure or versioned delete replication failure if roi.ReplicationStatus == replication.Pending || roi.ReplicationStatus == replication.Failed || - roi.VersionPurgeStatus == Failed || roi.VersionPurgeStatus == Pending { + roi.VersionPurgeStatus == replication.VersionPurgeFailed || roi.VersionPurgeStatus == replication.VersionPurgePending { globalReplicationPool.Get().queueReplicaDeleteTask(dv) return } diff --git a/cmd/data-scanner-metric.go b/cmd/data-scanner-metric.go index 87c70de84..8e0990ee4 100644 --- a/cmd/data-scanner-metric.go +++ b/cmd/data-scanner-metric.go @@ -104,6 +104,22 @@ func (p *scannerMetrics) log(s scannerMetric, paths ...string) func(custom map[s } } +// time n scanner actions. +// Use for s < scannerMetricLastRealtime +func (p *scannerMetrics) timeN(s scannerMetric) func(n int) func() { + startTime := time.Now() + return func(n int) func() { + return func() { + duration := time.Since(startTime) + + atomic.AddUint64(&p.operations[s], uint64(n)) + if s < scannerMetricLastRealtime { + p.latency[s].add(duration) + } + } + } +} + // time a scanner action. // Use for s < scannerMetricLastRealtime func (p *scannerMetrics) time(s scannerMetric) func() { diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index b3b38009a..02ceed636 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -38,7 +38,6 @@ import ( "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" - "github.com/minio/minio/internal/bucket/versioning" "github.com/minio/minio/internal/color" "github.com/minio/minio/internal/config/heal" "github.com/minio/minio/internal/event" @@ -950,10 +949,7 @@ func (i *scannerItem) transformMetaDir() { i.objectName = split[len(split)-1] } -var ( - applyActionsLogPrefix = color.Green("applyActions:") - applyVersionActionsLogPrefix = color.Green("applyVersionActions:") -) +var applyActionsLogPrefix = color.Green("applyActions:") func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi ObjectInfo) (size int64) { if i.debug { @@ -978,153 +974,8 @@ func (i *scannerItem) applyHealing(ctx context.Context, o ObjectLayer, oi Object return 0 } -func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi ObjectInfo) (action lifecycle.Action, size int64) { - size, err := oi.GetActualSize() - if i.debug { - scannerLogIf(ctx, err) - } - if i.lifeCycle == nil { - return action, size - } - - versionID := oi.VersionID - - var vc *versioning.Versioning - var lr lock.Retention - var rcfg *replication.Config - if !isMinioMetaBucketName(i.bucket) { - vc, err = globalBucketVersioningSys.Get(i.bucket) - if err != nil { - scannerLogOnceIf(ctx, err, i.bucket) - return - } - - // Check if bucket is object locked. - lr, err = globalBucketObjectLockSys.Get(i.bucket) - if err != nil { - scannerLogOnceIf(ctx, err, i.bucket) - return - } - - rcfg, err = getReplicationConfig(ctx, i.bucket) - if err != nil { - scannerLogOnceIf(ctx, err, i.bucket) - return - } - } - - lcEvt := evalActionFromLifecycle(ctx, *i.lifeCycle, lr, rcfg, oi) - if i.debug { - if versionID != "" { - console.Debugf(applyActionsLogPrefix+" lifecycle: %q (version-id=%s), Initial scan: %v\n", i.objectPath(), versionID, lcEvt.Action) - } else { - console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), lcEvt.Action) - } - } - - switch lcEvt.Action { - // This version doesn't contribute towards sizeS only when it is permanently deleted. - // This can happen when, - // - ExpireObjectAllVersions flag is enabled - // - NoncurrentVersionExpiration is applicable - case lifecycle.DeleteVersionAction, lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction: - size = 0 - case lifecycle.DeleteAction: - // On a non-versioned bucket, DeleteObject removes the only version permanently. - if !vc.PrefixEnabled(oi.Name) { - size = 0 - } - } - - applyLifecycleAction(lcEvt, lcEventSrc_Scanner, oi) - return lcEvt.Action, size -} - -// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured. -// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. -func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { - done := globalScannerMetrics.time(scannerMetricApplyNonCurrent) - defer done() - - rcfg, _ := globalBucketObjectLockSys.Get(i.bucket) - vcfg, _ := globalBucketVersioningSys.Get(i.bucket) - - versioned := vcfg != nil && vcfg.Versioned(i.objectPath()) - - objectInfos := make([]ObjectInfo, 0, len(fivs)) - - if i.lifeCycle == nil { - for _, fi := range fivs { - objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)) - } - return objectInfos, nil - } - - event := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()}) - lim := event.NewerNoncurrentVersions - if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions - for _, fi := range fivs { - objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)) - } - return objectInfos, nil - } - - overflowVersions := fivs[lim+1:] - // Retain the current version + most recent lim noncurrent versions - for _, fi := range fivs[:lim+1] { - objectInfos = append(objectInfos, fi.ToObjectInfo(i.bucket, i.objectPath(), versioned)) - } - - toDel := make([]ObjectToDelete, 0, len(overflowVersions)) - for _, fi := range overflowVersions { - obj := fi.ToObjectInfo(i.bucket, i.objectPath(), versioned) - // skip versions with object locking enabled - if rcfg.LockEnabled && enforceRetentionForDeletion(ctx, obj) { - if i.debug { - if obj.VersionID != "" { - console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s v(%s) is locked, not deleting\n", obj.Name, obj.VersionID) - } else { - console.Debugf(applyVersionActionsLogPrefix+" lifecycle: %s is locked, not deleting\n", obj.Name) - } - } - // add this version back to remaining versions for - // subsequent lifecycle policy applications - objectInfos = append(objectInfos, obj) - continue - } - - // NoncurrentDays not passed yet. - if time.Now().UTC().Before(lifecycle.ExpectedExpiryTime(obj.SuccessorModTime, event.NoncurrentDays)) { - // add this version back to remaining versions for - // subsequent lifecycle policy applications - objectInfos = append(objectInfos, obj) - continue - } - - toDel = append(toDel, ObjectToDelete{ - ObjectV: ObjectV{ - ObjectName: obj.Name, - VersionID: obj.VersionID, - }, - }) - } - - if len(toDel) > 0 { - expState.enqueueByNewerNoncurrent(i.bucket, toDel, event) - } - return objectInfos, nil -} - -// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain -// after applying lifecycle checks configured. -func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { - objInfos, err := i.applyNewerNoncurrentVersionLimit(ctx, o, fivs, expState) - if err != nil { - return nil, err - } - - // Check if we have many versions after applyNewerNoncurrentVersionLimit. - if len(objInfos) >= int(scannerExcessObjectVersions.Load()) { +func (i *scannerItem) alertExcessiveVersions(remainingVersions int, cumulativeSize int64) { + if remainingVersions >= int(scannerExcessObjectVersions.Load()) { // Notify object accessed via a GET request. sendEvent(eventArgs{ EventName: event.ObjectManyVersions, @@ -1134,7 +985,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi }, UserAgent: "Scanner", Host: globalLocalNodeName, - RespElements: map[string]string{"x-minio-versions": strconv.Itoa(len(objInfos))}, + RespElements: map[string]string{"x-minio-versions": strconv.Itoa(remainingVersions)}, }) auditLogInternal(context.Background(), AuditLogOptions{ @@ -1143,15 +994,11 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi Bucket: i.bucket, Object: i.objectPath(), Tags: map[string]string{ - "x-minio-versions": strconv.Itoa(len(objInfos)), + "x-minio-versions": strconv.Itoa(remainingVersions), }, }) } - cumulativeSize := int64(0) - for _, objInfo := range objInfos { - cumulativeSize += objInfo.Size - } // Check if the cumulative size of all versions of this object is high. if cumulativeSize >= scannerExcessObjectVersionsTotalSize.Load() { // Notify object accessed via a GET request. @@ -1164,7 +1011,7 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi UserAgent: "Scanner", Host: globalLocalNodeName, RespElements: map[string]string{ - "x-minio-versions-count": strconv.Itoa(len(objInfos)), + "x-minio-versions-count": strconv.Itoa(remainingVersions), "x-minio-versions-size": strconv.FormatInt(cumulativeSize, 10), }, }) @@ -1175,43 +1022,30 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi Bucket: i.bucket, Object: i.objectPath(), Tags: map[string]string{ - "x-minio-versions-count": strconv.Itoa(len(objInfos)), + "x-minio-versions-count": strconv.Itoa(remainingVersions), "x-minio-versions-size": strconv.FormatInt(cumulativeSize, 10), }, }) } - - return objInfos, nil } +type actionsAccountingFn func(oi ObjectInfo, sz, actualSz int64, sizeS *sizeSummary) + // applyActions will apply lifecycle checks on to a scanned item. // The resulting size on disk will always be returned. // The metadata will be compared to consensus on the object layer before any changes are applied. // If no metadata is supplied, -1 is returned if no action is taken. -func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) (objDeleted bool, size int64) { - done := globalScannerMetrics.time(scannerMetricILM) - var action lifecycle.Action - action, size = i.applyLifecycle(ctx, o, oi) - done() - - // Note: objDeleted is true if and only if action == - // lifecycle.DeleteAllVersionsAction - if action.DeleteAll() { - return true, 0 - } - - // For instance, an applied lifecycle means we remove/transitioned an object - // from the current deployment, which means we don't have to call healing - // routine even if we are asked to do via heal flag. - if action == lifecycle.NoneAction { +func (i *scannerItem) applyActions(ctx context.Context, objAPI ObjectLayer, objInfos []ObjectInfo, lr lock.Retention, sizeS *sizeSummary, fn actionsAccountingFn) { + healActions := func(oi ObjectInfo, actualSz int64) int64 { + size := actualSz if i.heal.enabled { done := globalScannerMetrics.time(scannerMetricHealCheck) - size = i.applyHealing(ctx, o, oi) + size = i.applyHealing(ctx, objAPI, oi) done() if healDeleteDangling { done := globalScannerMetrics.time(scannerMetricCleanAbandoned) - err := o.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling}) + err := objAPI.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling}) done() if err != nil { healingLogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", i.bucket, i.objectPath(), err), i.objectPath()) @@ -1221,10 +1055,109 @@ func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi Object // replicate only if lifecycle rules are not applied. done := globalScannerMetrics.time(scannerMetricCheckReplication) - i.healReplication(ctx, o, oi.Clone(), sizeS) + i.healReplication(ctx, oi.Clone(), sizeS) done() + return size } - return false, size + + vc, err := globalBucketVersioningSys.Get(i.bucket) + if err != nil { + scannerLogOnceIf(ctx, err, i.bucket) + return + } + + // start ILM check timer + done := globalScannerMetrics.timeN(scannerMetricILM) + if i.lifeCycle == nil { // no ILM configured, apply healing and replication checks + var cumulativeSize int64 + for _, oi := range objInfos { + actualSz, err := oi.GetActualSize() + if err != nil { + scannerLogIf(ctx, err) + continue + } + size := healActions(oi, actualSz) + if fn != nil { // call accountingfn + fn(oi, size, actualSz, sizeS) + } + cumulativeSize += size + } + // end ILM check timer + done(len(objInfos)) + i.alertExcessiveVersions(len(objInfos), cumulativeSize) + return + } + objOpts := make([]lifecycle.ObjectOpts, len(objInfos)) + for i, oi := range objInfos { + objOpts[i] = oi.ToLifecycleOpts() + } + evaluator := lifecycle.NewEvaluator(*i.lifeCycle).WithLockRetention(&lr).WithReplicationConfig(i.replication.Config) + events, err := evaluator.Eval(objOpts) + if err != nil { + // This error indicates that the objOpts passed to Eval is invalid. + bugLogIf(ctx, err, i.bucket, i.objectPath()) + done(len(objInfos)) // end ILM check timer + return + } + done(len(objInfos)) // end ILM check timer + + var ( + toDel []ObjectToDelete + noncurrentEvents []lifecycle.Event + cumulativeSize int64 + ) + remainingVersions := len(objInfos) +eventLoop: + for idx, event := range events { + oi := objInfos[idx] + actualSz, err := oi.GetActualSize() + if i.debug { + scannerLogIf(ctx, err) + } + size := actualSz + switch event.Action { + case lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction: + remainingVersions = 0 + applyExpiryRule(event, lcEventSrc_Scanner, oi) + break eventLoop + + case lifecycle.DeleteAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + if !vc.PrefixEnabled(i.objectPath()) && event.Action == lifecycle.DeleteAction { + remainingVersions-- + size = 0 + } + applyExpiryRule(event, lcEventSrc_Scanner, oi) + + case lifecycle.DeleteVersionAction: // noncurrent versions expiration + opts := objOpts[idx] + remainingVersions-- + size = 0 + toDel = append(toDel, ObjectToDelete{ + ObjectV: ObjectV{ + ObjectName: opts.Name, + VersionID: opts.VersionID, + }, + }) + noncurrentEvents = append(noncurrentEvents, event) + + case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: + applyTransitionRule(event, lcEventSrc_Scanner, oi) + + case lifecycle.NoneAction: + size = healActions(oi, actualSz) + } + // NB fn must be called for every object version except if it is + // expired or was a dangling object. + if fn != nil { + fn(oi, size, actualSz, sizeS) + } + cumulativeSize += size + } + + if len(toDel) > 0 { + globalExpiryState.enqueueNoncurrentVersions(i.bucket, toDel, noncurrentEvents) + } + i.alertExcessiveVersions(remainingVersions, cumulativeSize) } func evalActionFromLifecycle(ctx context.Context, lc lifecycle.Lifecycle, lr lock.Retention, rcfg *replication.Config, obj ObjectInfo) lifecycle.Event { @@ -1371,22 +1304,8 @@ func applyExpiryOnNonTransitionedObjects(ctx context.Context, objLayer ObjectLay } // Apply object, object version, restored object or restored object version action on the given object -func applyExpiryRule(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) bool { +func applyExpiryRule(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) { globalExpiryState.enqueueByDays(obj, event, src) - return true -} - -// Perform actions (removal or transitioning of objects), return true the action is successfully performed -func applyLifecycleAction(event lifecycle.Event, src lcEventSrc, obj ObjectInfo) (success bool) { - switch action := event.Action; action { - case lifecycle.DeleteVersionAction, lifecycle.DeleteAction, - lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction, - lifecycle.DeleteAllVersionsAction, lifecycle.DelMarkerDeleteAllVersionsAction: - success = applyExpiryRule(event, src, obj) - case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: - success = applyTransitionRule(event, src, obj) - } - return } // objectPath returns the prefix and object name. @@ -1395,7 +1314,7 @@ func (i *scannerItem) objectPath() string { } // healReplication will heal a scanned item that has failed replication. -func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { +func (i *scannerItem) healReplication(ctx context.Context, oi ObjectInfo, sizeS *sizeSummary) { if oi.VersionID == "" { return } diff --git a/cmd/data-scanner_test.go b/cmd/data-scanner_test.go index 0f2344d85..367dfc666 100644 --- a/cmd/data-scanner_test.go +++ b/cmd/data-scanner_test.go @@ -21,58 +21,65 @@ import ( "context" "encoding/xml" "fmt" + "slices" "strings" "sync" "testing" "time" "github.com/google/uuid" + "github.com/minio/minio/internal/amztime" "github.com/minio/minio/internal/bucket/lifecycle" - "github.com/minio/minio/internal/bucket/object/lock" + objectlock "github.com/minio/minio/internal/bucket/object/lock" + "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/bucket/versioning" + xhttp "github.com/minio/minio/internal/http" ) func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { + // Prepare object layer objAPI, disks, err := prepareErasure(context.Background(), 8) if err != nil { t.Fatalf("Failed to initialize object layer: %v", err) } defer removeRoots(disks) setObjectLayer(objAPI) + + // Prepare bucket metadata globalBucketMetadataSys = NewBucketMetadataSys() globalBucketObjectLockSys = &BucketObjectLockSys{} globalBucketVersioningSys = &BucketVersioningSys{} - es := newExpiryState(context.Background(), objAPI, 0) - workers := []chan expiryOp{make(chan expiryOp)} - es.workers.Store(&workers) - globalExpiryState = es - var wg sync.WaitGroup - wg.Add(1) - expired := make([]ObjectToDelete, 0, 5) - go func() { - defer wg.Done() - workers := globalExpiryState.workers.Load() - for t := range (*workers)[0] { - if t, ok := t.(newerNoncurrentTask); ok { - expired = append(expired, t.versions...) - } - } - }() - lc := lifecycle.Lifecycle{ - Rules: []lifecycle.Rule{ - { - ID: "max-versions", - Status: "Enabled", - NoncurrentVersionExpiration: lifecycle.NoncurrentVersionExpiration{ - NewerNoncurrentVersions: 1, - }, - }, - }, - } - lcXML, err := xml.Marshal(lc) + + lcXML := ` + + + max-versions + Enabled + + 2 + + + + delete-all-versions + Enabled + + + del-all + true + + + + 1 + true + + + +` + lc, err := lifecycle.ParseLifecycleConfig(strings.NewReader(lcXML)) if err != nil { - t.Fatalf("Failed to marshal lifecycle config: %v", err) + t.Fatalf("Failed to unmarshal lifecycle config: %v", err) } + vcfg := versioning.Versioning{ Status: "Enabled", } @@ -82,33 +89,45 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { } bucket := "bucket" - obj := "obj-1" now := time.Now() meta := BucketMetadata{ Name: bucket, Created: now, - LifecycleConfigXML: lcXML, + LifecycleConfigXML: []byte(lcXML), VersioningConfigXML: vcfgXML, VersioningConfigUpdatedAt: now, LifecycleConfigUpdatedAt: now, - lifecycleConfig: &lc, + lifecycleConfig: lc, versioningConfig: &vcfg, } globalBucketMetadataSys.Set(bucket, meta) - item := scannerItem{ - Path: obj, - bucket: bucket, - prefix: "", - objectName: obj, - lifeCycle: &lc, - } + // Prepare lifecycle expiration workers + es := newExpiryState(context.Background(), objAPI, 0) + globalExpiryState = es - modTime := time.Now() + // Prepare object versions + obj := "obj-1" + // Simulate objects uploaded 30 hours ago + modTime := now.Add(-48 * time.Hour) uuids := make([]uuid.UUID, 5) for i := range uuids { uuids[i] = uuid.UUID([16]byte{15: uint8(i + 1)}) } fivs := make([]FileInfo, 5) + objInfos := make([]ObjectInfo, 5) + objRetentionMeta := make(map[string]string) + objRetentionMeta[strings.ToLower(xhttp.AmzObjectLockMode)] = string(objectlock.RetCompliance) + // Set retain until date 12 hours into the future + objRetentionMeta[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = amztime.ISO8601Format(now.Add(12 * time.Hour)) + /* + objInfos: + version stack for obj-1 + v5 uuid-5 modTime + v4 uuid-4 modTime -1m + v3 uuid-3 modTime -2m + v2 uuid-2 modTime -3m + v1 uuid-1 modTime -4m + */ for i := 0; i < 5; i++ { fivs[i] = FileInfo{ Volume: bucket, @@ -119,41 +138,183 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) { Size: 1 << 10, NumVersions: 5, } + objInfos[i] = fivs[i].ToObjectInfo(bucket, obj, true) } - versioned := vcfg.Status == "Enabled" - wants := make([]ObjectInfo, 2) - for i, fi := range fivs[:2] { - wants[i] = fi.ToObjectInfo(bucket, obj, versioned) + /* + lrObjInfos: objInfos with following modifications + version stack for obj-1 + v2 uuid-2 modTime -3m objRetentionMeta + */ + lrObjInfos := slices.Clone(objInfos) + lrObjInfos[3].UserDefined = objRetentionMeta + var lrWants []ObjectInfo + lrWants = append(lrWants, lrObjInfos[:4]...) + + /* + replObjInfos: objInfos with following modifications + version stack for obj-1 + v1 uuid-1 modTime -4m "VersionPurgeStatus: replication.VersionPurgePending" + */ + replObjInfos := slices.Clone(objInfos) + replObjInfos[4].VersionPurgeStatus = replication.VersionPurgePending + var replWants []ObjectInfo + replWants = append(replWants, replObjInfos[:3]...) + replWants = append(replWants, replObjInfos[4]) + + allVersExpObjInfos := slices.Clone(objInfos) + allVersExpObjInfos[0].UserTags = "del-all=true" + + replCfg := replication.Config{ + Rules: []replication.Rule{ + { + ID: "", + Status: "Enabled", + Priority: 1, + Destination: replication.Destination{ + ARN: "arn:minio:replication:::dest-bucket", + Bucket: "dest-bucket", + }, + }, + }, } - gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, es) - if err != nil { - t.Fatalf("Failed with err: %v", err) - } - if len(gots) != len(wants) { - t.Fatalf("Expected %d objects but got %d", len(wants), len(gots)) + lr := objectlock.Retention{ + Mode: objectlock.RetCompliance, + Validity: 12 * time.Hour, + LockEnabled: true, } - // Close expiry state's channel to inspect object versions enqueued for expiration - close(workers[0]) - wg.Wait() - for _, obj := range expired { - switch obj.VersionID { - case uuids[2].String(), uuids[3].String(), uuids[4].String(): - default: - t.Errorf("Unexpected versionID being expired: %#v\n", obj) + expiryWorker := func(wg *sync.WaitGroup, readyCh chan<- struct{}, taskCh <-chan expiryOp, gotExpired *[]ObjectToDelete) { + defer wg.Done() + // signal the calling goroutine that the worker is ready tor receive tasks + close(readyCh) + var expired []ObjectToDelete + for t := range taskCh { + switch v := t.(type) { + case noncurrentVersionsTask: + expired = append(expired, v.versions...) + case expiryTask: + expired = append(expired, ObjectToDelete{ + ObjectV: ObjectV{ + ObjectName: v.objInfo.Name, + VersionID: v.objInfo.VersionID, + }, + }) + } } + if len(expired) > 0 { + *gotExpired = expired + } + } + tests := []struct { + replCfg replicationConfig + lr objectlock.Retention + objInfos []ObjectInfo + wants []ObjectInfo + wantExpired []ObjectToDelete + }{ + { + // With replication configured, version(s) with PENDING purge status + replCfg: replicationConfig{Config: &replCfg}, + objInfos: replObjInfos, + wants: replWants, + wantExpired: []ObjectToDelete{ + {ObjectV: ObjectV{ObjectName: obj, VersionID: objInfos[3].VersionID}}, + }, + }, + { + // With lock retention configured and version(s) with retention metadata + lr: lr, + objInfos: lrObjInfos, + wants: lrWants, + wantExpired: []ObjectToDelete{ + {ObjectV: ObjectV{ObjectName: obj, VersionID: objInfos[4].VersionID}}, + }, + }, + { + // With replication configured, but no versions with PENDING purge status + replCfg: replicationConfig{Config: &replCfg}, + objInfos: objInfos, + wants: objInfos[:3], + wantExpired: []ObjectToDelete{ + {ObjectV: ObjectV{ObjectName: obj, VersionID: objInfos[3].VersionID}}, + {ObjectV: ObjectV{ObjectName: obj, VersionID: objInfos[4].VersionID}}, + }, + }, + { + objInfos: allVersExpObjInfos, + wants: nil, + wantExpired: []ObjectToDelete{{ObjectV: ObjectV{ObjectName: obj, VersionID: allVersExpObjInfos[0].VersionID}}}, + }, + } + for i, test := range tests { + t.Run(fmt.Sprintf("TestApplyNewerNoncurrentVersionsLimit-%d", i), func(t *testing.T) { + workers := []chan expiryOp{make(chan expiryOp)} + es.workers.Store(&workers) + workerReady := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + var gotExpired []ObjectToDelete + go expiryWorker(&wg, workerReady, workers[0], &gotExpired) + <-workerReady + + item := scannerItem{ + Path: obj, + bucket: bucket, + prefix: "", + objectName: obj, + lifeCycle: lc, + replication: test.replCfg, + } + + var ( + sizeS sizeSummary + gots []ObjectInfo + ) + item.applyActions(context.TODO(), objAPI, test.objInfos, test.lr, &sizeS, func(oi ObjectInfo, sz, _ int64, _ *sizeSummary) { + if sz != 0 { + gots = append(gots, oi) + } + }) + + if len(gots) != len(test.wants) { + t.Fatalf("Expected %d objects but got %d", len(test.wants), len(gots)) + } + if slices.CompareFunc(gots, test.wants, func(g, w ObjectInfo) int { + if g.VersionID == w.VersionID { + return 0 + } + return -1 + }) != 0 { + t.Fatalf("Expected %v but got %v", test.wants, gots) + } + // verify the objects to be deleted + close(workers[0]) + wg.Wait() + if len(gotExpired) != len(test.wantExpired) { + t.Fatalf("Expected expiry of %d objects but got %d", len(test.wantExpired), len(gotExpired)) + } + if slices.CompareFunc(gotExpired, test.wantExpired, func(g, w ObjectToDelete) int { + if g.VersionID == w.VersionID { + return 0 + } + return -1 + }) != 0 { + t.Fatalf("Expected %v but got %v", test.wantExpired, gotExpired) + } + }) } } func TestEvalActionFromLifecycle(t *testing.T) { // Tests cover only ExpiredObjectDeleteAllVersions and DelMarkerExpiration actions + numVersions := 4 obj := ObjectInfo{ Name: "foo", ModTime: time.Now().Add(-31 * 24 * time.Hour), Size: 100 << 20, VersionID: uuid.New().String(), IsLatest: true, - NumVersions: 4, + NumVersions: numVersions, } delMarker := ObjectInfo{ Name: "foo-deleted", @@ -162,8 +323,9 @@ func TestEvalActionFromLifecycle(t *testing.T) { VersionID: uuid.New().String(), IsLatest: true, DeleteMarker: true, - NumVersions: 4, + NumVersions: numVersions, } + deleteAllILM := ` @@ -195,35 +357,35 @@ func TestEvalActionFromLifecycle(t *testing.T) { } tests := []struct { ilm lifecycle.Lifecycle - retention lock.Retention + retention *objectlock.Retention obj ObjectInfo want lifecycle.Action }{ { // with object locking ilm: *deleteAllLc, - retention: lock.Retention{LockEnabled: true}, + retention: &objectlock.Retention{LockEnabled: true}, obj: obj, want: lifecycle.NoneAction, }, { // without object locking ilm: *deleteAllLc, - retention: lock.Retention{}, + retention: &objectlock.Retention{}, obj: obj, want: lifecycle.DeleteAllVersionsAction, }, { // with object locking ilm: *delMarkerLc, - retention: lock.Retention{LockEnabled: true}, + retention: &objectlock.Retention{LockEnabled: true}, obj: delMarker, want: lifecycle.NoneAction, }, { // without object locking ilm: *delMarkerLc, - retention: lock.Retention{}, + retention: &objectlock.Retention{}, obj: delMarker, want: lifecycle.DelMarkerDeleteAllVersionsAction, }, @@ -231,8 +393,9 @@ func TestEvalActionFromLifecycle(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("TestEvalAction-%d", i), func(t *testing.T) { - if got := evalActionFromLifecycle(context.TODO(), test.ilm, test.retention, nil, test.obj); got.Action != test.want { - t.Fatalf("Expected %v but got %v", test.want, got) + gotEvent := evalActionFromLifecycle(context.Background(), test.ilm, *test.retention, nil, test.obj) + if gotEvent.Action != test.want { + t.Fatalf("Expected %v but got %v", test.want, gotEvent.Action) } }) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 8a458c3ac..162f16209 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -2032,7 +2032,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if opts.VersionPurgeStatus().Empty() && opts.DeleteMarkerReplicationStatus().Empty() { markDelete = false } - if opts.VersionPurgeStatus() == Complete { + if opts.VersionPurgeStatus() == replication.VersionPurgeComplete { markDelete = false } // now, since VersionPurgeStatus() is already set, we can let the diff --git a/cmd/object-handlers-common.go b/cmd/object-handlers-common.go index 4e351ba72..b530441ac 100644 --- a/cmd/object-handlers-common.go +++ b/cmd/object-handlers-common.go @@ -378,7 +378,7 @@ func setPutObjHeaders(w http.ResponseWriter, objInfo ObjectInfo, del bool, h htt hash.AddChecksumHeader(w, cs) } -func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete, lcEvent lifecycle.Event) { +func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toDel []ObjectToDelete, lcEvent []lifecycle.Event) { for remaining := toDel; len(remaining) > 0; toDel = remaining { if len(toDel) > maxDeleteList { remaining = toDel[maxDeleteList:] @@ -399,8 +399,7 @@ func deleteObjectVersions(ctx context.Context, o ObjectLayer, bucket string, toD VersionID: dobj.VersionID, } traceFn := globalLifecycleSys.trace(oi) - // Note: NewerNoncurrentVersions action is performed only scanner today - tags := newLifecycleAuditEvent(lcEventSrc_Scanner, lcEvent).Tags() + tags := newLifecycleAuditEvent(lcEventSrc_Scanner, lcEvent[i]).Tags() // Send audit for the lifecycle delete operation auditLogLifecycle( diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 2270fb5a8..fc42d2feb 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2672,7 +2672,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. Host: handlers.GetSourceIP(r), }) - if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == Pending { + if objInfo.ReplicationStatus == replication.Pending || objInfo.VersionPurgeStatus == replication.VersionPurgePending { dmVersionID := "" versionID := "" if objInfo.DeleteMarker { diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 37f32b0fb..01ec63a5a 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1384,13 +1384,13 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) { updateVersion = fi.MarkDeleted } else { // for replication scenario - if fi.Deleted && fi.VersionPurgeStatus() != Complete { + if fi.Deleted && fi.VersionPurgeStatus() != replication.VersionPurgeComplete { if !fi.VersionPurgeStatus().Empty() || fi.DeleteMarkerReplicationStatus().Empty() { updateVersion = true } } // object or delete-marker versioned delete is not complete - if !fi.VersionPurgeStatus().Empty() && fi.VersionPurgeStatus() != Complete { + if !fi.VersionPurgeStatus().Empty() && fi.VersionPurgeStatus() != replication.VersionPurgeComplete { updateVersion = true } } @@ -1458,7 +1458,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) { return "", err } x.versions = append(x.versions[:i], x.versions[i+1:]...) - if fi.MarkDeleted && (fi.VersionPurgeStatus().Empty() || (fi.VersionPurgeStatus() != Complete)) { + if fi.MarkDeleted && (fi.VersionPurgeStatus().Empty() || (fi.VersionPurgeStatus() != replication.VersionPurgeComplete)) { err = x.addVersion(ventry) } else if fi.Deleted && uv.String() == emptyUUID { return "", x.addVersion(ventry) diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 3d5defe83..5d263c632 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -43,6 +43,7 @@ import ( "github.com/klauspost/filepathx" "github.com/minio/madmin-go/v3" "github.com/minio/minio/internal/bucket/lifecycle" + "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/cachevalue" "github.com/minio/minio/internal/config/storageclass" @@ -552,7 +553,8 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } // Check if the current bucket has replication configuration - if rcfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, cache.Info.Name); err == nil { + var rcfg *replication.Config + if rcfg, _, err = globalBucketMetadataSys.GetReplicationConfig(ctx, cache.Info.Name); err == nil { if rcfg.HasActiveRules("", true) { tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, cache.Info.Name) if err == nil { @@ -564,6 +566,13 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } } + // Check if bucket is object locked. + lr, err := globalBucketObjectLockSys.Get(cache.Info.Name) + if err != nil { + scannerLogOnceIf(ctx, err, cache.Info.Name) + return cache, err + } + vcfg, _ := globalBucketVersioningSys.Get(cache.Info.Name) // return initialized object layer @@ -614,6 +623,11 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates return sizeSummary{}, errSkipFile } + versioned := vcfg != nil && vcfg.Versioned(item.objectPath()) + objInfos := make([]ObjectInfo, len(fivs.Versions)) + for i, fi := range fivs.Versions { + objInfos[i] = fi.ToObjectInfo(item.bucket, item.objectPath(), versioned) + } sizeS := sizeSummary{} for _, tier := range globalTierConfigMgr.ListTiers() { if sizeS.tiers == nil { @@ -626,35 +640,14 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates sizeS.tiers[storageclass.RRS] = tierStats{} } - done := globalScannerMetrics.time(scannerMetricApplyAll) - objInfos, err := item.applyVersionActions(ctx, objAPI, fivs.Versions, globalExpiryState) - done() - if err != nil { res["err"] = err.Error() return sizeSummary{}, errSkipFile } - versioned := vcfg != nil && vcfg.Versioned(item.objectPath()) - - var objDeleted bool - for _, oi := range objInfos { - done = globalScannerMetrics.time(scannerMetricApplyVersion) - var sz int64 - objDeleted, sz = item.applyActions(ctx, objAPI, oi, &sizeS) - done() - - // DeleteAllVersionsAction: The object and all its - // versions are expired and - // doesn't contribute toward data usage. - if objDeleted { - break - } - actualSz, err := oi.GetActualSize() - if err != nil { - continue - } - + var objPresent bool + item.applyActions(ctx, objAPI, objInfos, lr, &sizeS, func(oi ObjectInfo, sz, actualSz int64, sizeS *sizeSummary) { + objPresent = true if oi.DeleteMarker { sizeS.deleteMarkers++ } @@ -667,7 +660,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates // tracking deleted transitioned objects switch { case oi.DeleteMarker, oi.TransitionedObject.FreeVersion: - continue + return } tier := oi.StorageClass if tier == "" { @@ -681,12 +674,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates sizeS.tiers[tier] = st.add(oi.tierStats()) } } - } + }) // apply tier sweep action on free versions for _, freeVersion := range fivs.FreeVersions { oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned) - done = globalScannerMetrics.time(scannerMetricTierObjSweep) + done := globalScannerMetrics.time(scannerMetricTierObjSweep) globalExpiryState.enqueueFreeVersion(oi) done() } @@ -722,7 +715,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } } } - if objDeleted { + if !objPresent { // we return errIgnoreFileContrib to signal this function's // callers to skip this object's contribution towards // usage. diff --git a/internal/bucket/lifecycle/evaluator.go b/internal/bucket/lifecycle/evaluator.go new file mode 100644 index 000000000..adc5ab233 --- /dev/null +++ b/internal/bucket/lifecycle/evaluator.go @@ -0,0 +1,153 @@ +// Copyright (c) 2015-2025 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package lifecycle + +import ( + "fmt" + "time" + + objlock "github.com/minio/minio/internal/bucket/object/lock" + "github.com/minio/minio/internal/bucket/replication" +) + +// Evaluator - evaluates lifecycle policy on objects for the given lifecycle +// configuration, lock retention configuration and replication configuration. +type Evaluator struct { + policy Lifecycle + lockRetention *objlock.Retention + replCfg *replication.Config +} + +// NewEvaluator - creates a new evaluator with the given lifecycle +func NewEvaluator(policy Lifecycle) *Evaluator { + return &Evaluator{ + policy: policy, + } +} + +// WithLockRetention - sets the lock retention configuration for the evaluator +func (e *Evaluator) WithLockRetention(lr *objlock.Retention) *Evaluator { + e.lockRetention = lr + return e +} + +// WithReplicationConfig - sets the replication configuration for the evaluator +func (e *Evaluator) WithReplicationConfig(rcfg *replication.Config) *Evaluator { + e.replCfg = rcfg + return e +} + +// IsPendingReplication checks if the object is pending replication. +func (e *Evaluator) IsPendingReplication(obj ObjectOpts) bool { + if e.replCfg == nil { + return false + } + if e.replCfg.HasActiveRules(obj.Name, true) && !obj.VersionPurgeStatus.Empty() { + return true + } + + return false +} + +// IsObjectLocked checks if it is appropriate to remove an +// object according to locking configuration when this is lifecycle/ bucket quota asking. +// (copied over from enforceRetentionForDeletion) +func (e *Evaluator) IsObjectLocked(obj ObjectOpts) bool { + if e.lockRetention == nil || !e.lockRetention.LockEnabled { + return false + } + + if obj.DeleteMarker { + return false + } + + lhold := objlock.GetObjectLegalHoldMeta(obj.UserDefined) + if lhold.Status.Valid() && lhold.Status == objlock.LegalHoldOn { + return true + } + + ret := objlock.GetObjectRetentionMeta(obj.UserDefined) + if ret.Mode.Valid() && (ret.Mode == objlock.RetCompliance || ret.Mode == objlock.RetGovernance) { + t, err := objlock.UTCNowNTP() + if err != nil { + // it is safe to assume that the object is locked when + // we can't get the current time + return true + } + if ret.RetainUntilDate.After(t) { + return true + } + } + return false +} + +// eval will return a lifecycle event for each object in objs for a given time. +func (e *Evaluator) eval(objs []ObjectOpts, now time.Time) []Event { + events := make([]Event, len(objs)) + var newerNoncurrentVersions int +loop: + for i, obj := range objs { + event := e.policy.eval(obj, now, newerNoncurrentVersions) + switch event.Action { + case DeleteAllVersionsAction, DelMarkerDeleteAllVersionsAction: + // Skip if bucket has object locking enabled; To prevent the + // possibility of violating an object retention on one of the + // noncurrent versions of this object. + if e.lockRetention != nil && e.lockRetention.LockEnabled { + event = Event{} + } else { + // No need to evaluate remaining versions' lifecycle + // events after DeleteAllVersionsAction* + events[i] = event + break loop + } + + case DeleteVersionAction, DeleteRestoredVersionAction: + // Defensive code, should never happen + if obj.VersionID == "" { + event.Action = NoneAction + } + if e.IsObjectLocked(obj) { + event = Event{} + } + + if e.IsPendingReplication(obj) { + event = Event{} + } + } + if !obj.IsLatest { + switch event.Action { + case DeleteVersionAction: + // this noncurrent version will be expired, nothing to add + default: + // this noncurrent version will be spared + newerNoncurrentVersions++ + } + } + events[i] = event + } + return events +} + +// Eval will return a lifecycle event for each object in objs +func (e *Evaluator) Eval(objs []ObjectOpts) ([]Event, error) { + if len(objs) != objs[0].NumVersions { + return nil, fmt.Errorf("number of versions mismatch, expected %d, got %d", objs[0].NumVersions, len(objs)) + } + return e.eval(objs, time.Now().UTC()), nil +} diff --git a/internal/bucket/lifecycle/evaluator_test.go b/internal/bucket/lifecycle/evaluator_test.go new file mode 100644 index 000000000..f85130bf1 --- /dev/null +++ b/internal/bucket/lifecycle/evaluator_test.go @@ -0,0 +1,177 @@ +// Copyright (c) 2015-2025 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package lifecycle + +import ( + "fmt" + "testing" + "time" + + "github.com/google/uuid" +) + +func TestNewerNoncurrentVersions(t *testing.T) { + prepLifecycleCfg := func(tagKeys []string, retainVersions []int) Lifecycle { + var lc Lifecycle + for i := range retainVersions { + ruleID := fmt.Sprintf("rule-%d", i) + tag := Tag{ + Key: tagKeys[i], + Value: "minio", + } + lc.Rules = append(lc.Rules, Rule{ + ID: ruleID, + Status: "Enabled", + Filter: Filter{ + Tag: tag, + set: true, + }, + NoncurrentVersionExpiration: NoncurrentVersionExpiration{ + NewerNoncurrentVersions: retainVersions[i], + set: true, + }, + }) + } + return lc + } + + lc := prepLifecycleCfg([]string{"tag3", "tag4", "tag5"}, []int{3, 4, 5}) + evaluator := NewEvaluator(lc) + tagKeys := []string{"tag3", "tag3", "tag3", "tag4", "tag4", "tag5", "tag5"} + verIDs := []string{ + "0NdAikoUVNGEpCUuB9vl.XyoMftMXCSg", "19M6Z405yFZuYygnnU9jKzsOBamTZK_7", "0PmlJdFWi_9d6l_dAkWrrhP.bBgtFk6V", // spellchecker:disable-line + ".MmRalFNNJyOLymgCtQ3.qsdoYpy8qkB", "Bjb4OlMW9Agx.Nrggh15iU6frGu2CLde", "ngBmUd_cVl6ckONI9XsKGpJjzimohrzZ", // spellchecker:disable-line + "T6m1heTHLUtnByW2IOWJ3zM4JP9xXt2O", // spellchecker:disable-line + } + wantEvents := []Event{ + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: DeleteVersionAction}, + } + var objs []ObjectOpts + curModTime := time.Date(2025, time.February, 10, 23, 0, 0, 0, time.UTC) + for i := range tagKeys { + obj := ObjectOpts{ + Name: "obj", + VersionID: verIDs[i], + ModTime: curModTime.Add(time.Duration(-i) * time.Second), + UserTags: fmt.Sprintf("%s=minio", tagKeys[i]), + NumVersions: len(verIDs), + } + if i == 0 { + obj.IsLatest = true + } else { + obj.SuccessorModTime = curModTime.Add(time.Duration(-i+1) * time.Second) + } + objs = append(objs, obj) + } + now := time.Date(2025, time.February, 10, 23, 0, 0, 0, time.UTC) + gotEvents := evaluator.eval(objs, now) + for i := range wantEvents { + if gotEvents[i].Action != wantEvents[i].Action { + t.Fatalf("got %v, want %v", gotEvents[i], wantEvents[i]) + } + } + + lc = prepLifecycleCfg([]string{"tag3", "tag4", "tag5"}, []int{1, 2, 3}) + objs = objs[:len(objs)-1] + wantEvents = []Event{ + {Action: NoneAction}, + {Action: NoneAction}, + {Action: DeleteVersionAction}, + {Action: NoneAction}, + {Action: DeleteVersionAction}, + {Action: NoneAction}, + } + evaluator = NewEvaluator(lc) + gotEvents = evaluator.eval(objs, now) + for i := range wantEvents { + if gotEvents[i].Action != wantEvents[i].Action { + t.Fatalf("test-%d: got %v, want %v", i+1, gotEvents[i], wantEvents[i]) + } + } + + lc = Lifecycle{ + Rules: []Rule{ + { + ID: "AllVersionsExpiration", + Status: "Enabled", + Filter: Filter{}, + Expiration: Expiration{ + Days: 1, + DeleteAll: Boolean{ + val: true, + set: true, + }, + set: true, + }, + }, + }, + } + + now = time.Date(2025, time.February, 12, 23, 0, 0, 0, time.UTC) + evaluator = NewEvaluator(lc) + gotEvents = evaluator.eval(objs, now) + wantEvents = []Event{ + {Action: DeleteAllVersionsAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + {Action: NoneAction}, + } + for i := range wantEvents { + if gotEvents[i].Action != wantEvents[i].Action { + t.Fatalf("test-%d: got %v, want %v", i+1, gotEvents[i], wantEvents[i]) + } + } +} + +func TestEmptyEvaluator(t *testing.T) { + var objs []ObjectOpts + curModTime := time.Date(2025, time.February, 10, 23, 0, 0, 0, time.UTC) + for i := range 5 { + obj := ObjectOpts{ + Name: "obj", + VersionID: uuid.New().String(), + ModTime: curModTime.Add(time.Duration(-i) * time.Second), + NumVersions: 5, + } + if i == 0 { + obj.IsLatest = true + } else { + obj.SuccessorModTime = curModTime.Add(time.Duration(-i+1) * time.Second) + } + objs = append(objs, obj) + } + + evaluator := NewEvaluator(Lifecycle{}) + events, err := evaluator.Eval(objs) + if err != nil { + t.Fatal(err) + } + for _, event := range events { + if event.Action != NoneAction { + t.Fatalf("got %v, want %v", event.Action, NoneAction) + } + } +} diff --git a/internal/bucket/lifecycle/lifecycle.go b/internal/bucket/lifecycle/lifecycle.go index e0dddd843..97c4200f2 100644 --- a/internal/bucket/lifecycle/lifecycle.go +++ b/internal/bucket/lifecycle/lifecycle.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "github.com/minio/minio/internal/bucket/object/lock" + "github.com/minio/minio/internal/bucket/replication" xhttp "github.com/minio/minio/internal/http" ) @@ -310,6 +311,10 @@ type ObjectOpts struct { TransitionStatus string RestoreOngoing bool RestoreExpires time.Time + // to determine if object is locked due to retention + UserDefined map[string]string + VersionPurgeStatus replication.VersionPurgeStatusType + ReplicationStatus replication.StatusType } // ExpiredObjectDeleteMarker returns true if an object version referred to by o @@ -331,12 +336,12 @@ type Event struct { // Eval returns the lifecycle event applicable now. func (lc Lifecycle) Eval(obj ObjectOpts) Event { - return lc.eval(obj, time.Now().UTC()) + return lc.eval(obj, time.Now().UTC(), 0) } // eval returns the lifecycle event applicable at the given now. If now is the // zero value of time.Time, it returns the upcoming lifecycle event. -func (lc Lifecycle) eval(obj ObjectOpts, now time.Time) Event { +func (lc Lifecycle) eval(obj ObjectOpts, now time.Time, remainingVersions int) Event { var events []Event if obj.ModTime.IsZero() { return Event{} @@ -404,17 +409,22 @@ func (lc Lifecycle) eval(obj ObjectOpts, now time.Time) Event { continue } - // Skip rules with newer noncurrent versions specified. These rules are - // not handled at an individual version level. eval applies only to a - // specific version. - if !obj.IsLatest && rule.NoncurrentVersionExpiration.NewerNoncurrentVersions > 0 { - continue - } - - if !obj.IsLatest && !rule.NoncurrentVersionExpiration.IsDaysNull() { - // Non current versions should be deleted if their age exceeds non current days configuration - // https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html#intro-lifecycle-rules-actions - if expectedExpiry := ExpectedExpiryTime(obj.SuccessorModTime, int(rule.NoncurrentVersionExpiration.NoncurrentDays)); now.IsZero() || now.After(expectedExpiry) { + // NoncurrentVersionExpiration + if !obj.IsLatest && rule.NoncurrentVersionExpiration.set { + var ( + retainedEnough bool + oldEnough bool + ) + if rule.NoncurrentVersionExpiration.NewerNoncurrentVersions == 0 || remainingVersions >= rule.NoncurrentVersionExpiration.NewerNoncurrentVersions { + retainedEnough = true + } + expectedExpiry := ExpectedExpiryTime(obj.SuccessorModTime, int(rule.NoncurrentVersionExpiration.NoncurrentDays)) + if now.IsZero() || now.After(expectedExpiry) { + oldEnough = true + } + // > For the deletion to occur, both the and the values must be exceeded. + // ref: https://docs.aws.amazon.com/AmazonS3/latest/dev/intro-lifecycle-rules.html#intro-lifecycle-rules-actions + if retainedEnough && oldEnough { events = append(events, Event{ Action: DeleteVersionAction, RuleID: rule.ID, @@ -529,7 +539,7 @@ func ExpectedExpiryTime(modTime time.Time, days int) time.Time { // SetPredictionHeaders sets time to expiry and transition headers on w for a // given obj. func (lc Lifecycle) SetPredictionHeaders(w http.ResponseWriter, obj ObjectOpts) { - event := lc.eval(obj, time.Time{}) + event := lc.eval(obj, time.Time{}, 0) switch event.Action { case DeleteAction, DeleteVersionAction, DeleteAllVersionsAction, DelMarkerDeleteAllVersionsAction: w.Header()[xhttp.AmzExpiration] = []string{ diff --git a/internal/bucket/lifecycle/lifecycle_test.go b/internal/bucket/lifecycle/lifecycle_test.go index ae0d767b6..5f0f590f8 100644 --- a/internal/bucket/lifecycle/lifecycle_test.go +++ b/internal/bucket/lifecycle/lifecycle_test.go @@ -960,7 +960,9 @@ func TestTransitionTier(t *testing.T) { // Go back seven days in the past now = now.Add(7 * 24 * time.Hour) - evt := lc.eval(obj1, now) + evaluator := NewEvaluator(lc) + evts := evaluator.eval([]ObjectOpts{obj1, obj2}, now) + evt := evts[0] if evt.Action != TransitionAction { t.Fatalf("Expected action: %s but got %s", TransitionAction, evt.Action) } @@ -968,7 +970,7 @@ func TestTransitionTier(t *testing.T) { t.Fatalf("Expected TIER-1 but got %s", evt.StorageClass) } - evt = lc.eval(obj2, now) + evt = evts[1] if evt.Action != TransitionVersionAction { t.Fatalf("Expected action: %s but got %s", TransitionVersionAction, evt.Action) } @@ -1036,14 +1038,16 @@ func TestTransitionTierWithPrefixAndTags(t *testing.T) { // Go back seven days in the past now = now.Add(7 * 24 * time.Hour) + evaluator := NewEvaluator(lc) + evts := evaluator.eval([]ObjectOpts{obj1, obj2, obj3}, now) // Eval object 1 - evt := lc.eval(obj1, now) + evt := evts[0] if evt.Action != NoneAction { t.Fatalf("Expected action: %s but got %s", NoneAction, evt.Action) } // Eval object 2 - evt = lc.eval(obj2, now) + evt = evts[1] if evt.Action != TransitionAction { t.Fatalf("Expected action: %s but got %s", TransitionAction, evt.Action) } @@ -1052,7 +1056,7 @@ func TestTransitionTierWithPrefixAndTags(t *testing.T) { } // Eval object 3 - evt = lc.eval(obj3, now) + evt = evts[2] if evt.Action != TransitionAction { t.Fatalf("Expected action: %s but got %s", TransitionAction, evt.Action) } @@ -1466,7 +1470,9 @@ func TestDeleteAllVersions(t *testing.T) { NumVersions: 4, } - event := lc.eval(opts, time.Time{}) + evaluator := NewEvaluator(lc) + events := evaluator.eval([]ObjectOpts{opts}, time.Time{}) + event := events[0] if event.Action != TransitionAction { t.Fatalf("Expected %v action but got %v", TransitionAction, event.Action) } @@ -1503,7 +1509,9 @@ func TestDeleteAllVersions(t *testing.T) { DeleteMarker: true, NumVersions: 4, } - event = lc.eval(opts, time.Time{}) + evaluator = NewEvaluator(lc) + events = evaluator.eval([]ObjectOpts{opts}, time.Time{}) + event = events[0] if event.Action != DelMarkerDeleteAllVersionsAction { t.Fatalf("Expected %v action but got %v", DelMarkerDeleteAllVersionsAction, event.Action) } diff --git a/internal/bucket/replication/datatypes.go b/internal/bucket/replication/datatypes.go index a67cabe13..980f9be55 100644 --- a/internal/bucket/replication/datatypes.go +++ b/internal/bucket/replication/datatypes.go @@ -51,3 +51,27 @@ func (s StatusType) String() string { func (s StatusType) Empty() bool { return string(s) == "" } + +// VersionPurgeStatusType represents status of a versioned delete or permanent delete w.r.t bucket replication +type VersionPurgeStatusType string + +const ( + // VersionPurgePending - versioned delete replication is pending. + VersionPurgePending VersionPurgeStatusType = "PENDING" + + // VersionPurgeComplete - versioned delete replication is now complete, erase version on disk. + VersionPurgeComplete VersionPurgeStatusType = "COMPLETE" + + // VersionPurgeFailed - versioned delete replication failed. + VersionPurgeFailed VersionPurgeStatusType = "FAILED" +) + +// Empty returns true if purge status was not set. +func (v VersionPurgeStatusType) Empty() bool { + return string(v) == "" +} + +// Pending returns true if the version is pending purge. +func (v VersionPurgeStatusType) Pending() bool { + return v == VersionPurgePending || v == VersionPurgeFailed +} diff --git a/internal/bucket/replication/datatypes_gen.go b/internal/bucket/replication/datatypes_gen.go index 3dc029a6a..058fe655b 100644 --- a/internal/bucket/replication/datatypes_gen.go +++ b/internal/bucket/replication/datatypes_gen.go @@ -109,3 +109,55 @@ func (z Type) Msgsize() (s int) { s = msgp.IntSize return } + +// DecodeMsg implements msgp.Decodable +func (z *VersionPurgeStatusType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 string + zb0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = VersionPurgeStatusType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z VersionPurgeStatusType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteString(string(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z VersionPurgeStatusType) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendString(o, string(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *VersionPurgeStatusType) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 string + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = VersionPurgeStatusType(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z VersionPurgeStatusType) Msgsize() (s int) { + s = msgp.StringPrefixSize + len(string(z)) + return +}