Add tags to auditLogLifecycle (#17081)

This commit is contained in:
Krishnan Parthasarathi
2023-04-26 17:49:00 -07:00
committed by GitHub
parent 31b5acc245
commit e7cac8acef
10 changed files with 122 additions and 116 deletions

View File

@@ -96,9 +96,8 @@ func (sys *LifecycleSys) trace(oi ObjectInfo) func(event string) {
}
type expiryTask struct {
objInfo ObjectInfo
versionExpiry bool
restoredObject bool
objInfo ObjectInfo
event lifecycle.Event
}
type expiryState struct {
@@ -121,11 +120,11 @@ func (es *expiryState) close() {
}
// enqueueByDays enqueues object versions expired by days for expiry.
func (es *expiryState) enqueueByDays(oi ObjectInfo, restoredObject bool, rmVersion bool) {
func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event) {
select {
case <-GlobalContext.Done():
es.close()
case es.byDaysCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion, restoredObject: restoredObject}:
case es.byDaysCh <- expiryTask{objInfo: oi, event: event}:
default:
}
}
@@ -171,9 +170,9 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
go func(t expiryTask) {
defer ewk.Give()
if t.objInfo.TransitionedObject.Status != "" {
applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.restoredObject)
applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.event)
} else {
applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.versionExpiry)
applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.event)
}
}(t)
}
@@ -199,8 +198,8 @@ type newerNoncurrentTask struct {
}
type transitionTask struct {
tier string
objInfo ObjectInfo
event lifecycle.Event
}
type transitionState struct {
@@ -218,10 +217,10 @@ type transitionState struct {
lastDayStats map[string]*lastDayTierStats
}
func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) {
func (t *transitionState) queueTransitionTask(oi ObjectInfo, event lifecycle.Event) {
select {
case <-t.ctx.Done():
case t.transitionCh <- transitionTask{objInfo: oi, tier: sc}:
case t.transitionCh <- transitionTask{objInfo: oi, event: event}:
default:
}
}
@@ -274,9 +273,9 @@ func (t *transitionState) worker(objectAPI ObjectLayer) {
return
}
atomic.AddInt32(&t.activeTasks, 1)
if err := transitionObject(t.ctx, objectAPI, task.objInfo, task.tier); err != nil {
logger.LogIf(t.ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w",
task.objInfo.Bucket, task.objInfo.Name, task.objInfo.VersionID, err))
if err := transitionObject(t.ctx, objectAPI, task.objInfo, task.event); err != nil {
logger.LogIf(t.ctx, fmt.Errorf("Transition to %s failed for %s/%s version:%s with %w",
task.event.StorageClass, task.objInfo.Bucket, task.objInfo.Name, task.objInfo.VersionID, err))
} else {
ts := tierStats{
TotalSize: uint64(task.objInfo.Size),
@@ -285,7 +284,7 @@ func (t *transitionState) worker(objectAPI ObjectLayer) {
if task.objInfo.IsLatest {
ts.NumObjects = 1
}
t.addLastDayStats(task.tier, ts)
t.addLastDayStats(task.event.StorageClass, ts)
}
atomic.AddInt32(&t.activeTasks, -1)
}
@@ -358,89 +357,76 @@ func validateTransitionTier(lc *lifecycle.Lifecycle) error {
// This is to be called after a successful upload of an object (version).
func enqueueTransitionImmediate(obj ObjectInfo) {
if lc, err := globalLifecycleSys.Get(obj.Bucket); err == nil {
event := lc.Eval(obj.ToLifecycleOpts())
switch event.Action {
switch event := lc.Eval(obj.ToLifecycleOpts()); event.Action {
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
globalTransitionState.queueTransitionTask(obj, event.StorageClass)
globalTransitionState.queueTransitionTask(obj, event)
}
}
}
// expireAction represents different actions to be performed on expiry of a
// restored/transitioned object
type expireAction int
const (
// ignore the zero value
_ expireAction = iota
// expireObj indicates expiry of 'regular' transitioned objects.
expireObj
// expireRestoredObj indicates expiry of restored objects.
expireRestoredObj
)
// expireTransitionedObject handles expiry of transitioned/restored objects
// (versions) in one of the following situations:
//
// 1. when a restored (via PostRestoreObject API) object expires.
// 2. when a transitioned object expires (based on an ILM rule).
func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, action expireAction) error {
func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *ObjectInfo, lcOpts lifecycle.ObjectOpts, lcEvent lifecycle.Event) error {
traceFn := globalLifecycleSys.trace(*oi)
var opts ObjectOptions
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name)
opts.VersionID = lcOpts.VersionID
opts.Expiration = ExpirationOptions{Expire: true}
switch action {
case expireObj:
// When an object is past expiry or when a transitioned object is being
// deleted, 'mark' the data in the remote tier for delete.
entry := jentry{
ObjName: oi.TransitionedObject.Name,
VersionID: oi.TransitionedObject.VersionID,
TierName: oi.TransitionedObject.Tier,
}
if err := globalTierJournal.AddEntry(entry); err != nil {
logger.LogIf(ctx, err)
return err
}
// Delete metadata on source, now that data in remote tier has been
// marked for deletion.
if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil {
logger.LogIf(ctx, err)
return err
}
// Send audit for the lifecycle delete operation
defer auditLogLifecycle(ctx, *oi, ILMExpiry, traceFn)
eventName := event.ObjectRemovedDelete
if lcOpts.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
objInfo := ObjectInfo{
Name: oi.Name,
VersionID: lcOpts.VersionID,
DeleteMarker: lcOpts.DeleteMarker,
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: oi.Bucket,
Object: objInfo,
UserAgent: "Internal: [ILM-Expiry]",
Host: globalLocalNodeName,
})
case expireRestoredObj:
tags := auditLifecycleTags(lcEvent)
if lcEvent.Action.DeleteRestored() {
// delete locally restored copy of object or object version
// from the source, while leaving metadata behind. The data on
// transitioned tier lies untouched and still accessible
opts.Transition.ExpireRestored = true
_, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts)
if err == nil {
// TODO consider including expiry of restored object to events we
// notify.
auditLogLifecycle(ctx, *oi, ILMExpiry, tags, traceFn)
}
return err
default:
return fmt.Errorf("Unknown expire action %v", action)
}
// When an object is past expiry or when a transitioned object is being
// deleted, 'mark' the data in the remote tier for delete.
entry := jentry{
ObjName: oi.TransitionedObject.Name,
VersionID: oi.TransitionedObject.VersionID,
TierName: oi.TransitionedObject.Tier,
}
if err := globalTierJournal.AddEntry(entry); err != nil {
logger.LogIf(ctx, err)
return err
}
// Delete metadata on source, now that data in remote tier has been
// marked for deletion.
if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil {
logger.LogIf(ctx, err)
return err
}
// Send audit for the lifecycle delete operation
defer auditLogLifecycle(ctx, *oi, ILMExpiry, tags, traceFn)
eventName := event.ObjectRemovedDelete
if lcOpts.DeleteMarker {
eventName = event.ObjectRemovedDeleteMarkerCreated
}
objInfo := ObjectInfo{
Name: oi.Name,
VersionID: lcOpts.VersionID,
DeleteMarker: lcOpts.DeleteMarker,
}
// Notify object deleted event.
sendEvent(eventArgs{
EventName: eventName,
BucketName: oi.Bucket,
Object: objInfo,
UserAgent: "Internal: [ILM-Expiry]",
Host: globalLocalNodeName,
})
return nil
}
@@ -460,13 +446,14 @@ func genTransitionObjName(bucket string) (string, error) {
// storage specified by the transition ARN, the metadata is left behind on source cluster and original content
// is moved to the transition tier. Note that in the case of encrypted objects, entire encrypted stream is moved
// to the transition tier without decrypting or re-encrypting.
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo, tier string) error {
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo, lcEvent lifecycle.Event) error {
opts := ObjectOptions{
Transition: TransitionOptions{
Status: lifecycle.TransitionPending,
Tier: tier,
Tier: lcEvent.StorageClass,
ETag: oi.ETag,
},
LifecycleEvent: lcEvent,
VersionID: oi.VersionID,
Versioned: globalBucketVersioningSys.PrefixEnabled(oi.Bucket, oi.Name),
VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name),
@@ -878,3 +865,20 @@ func (oi ObjectInfo) ToLifecycleOpts() lifecycle.ObjectOpts {
TransitionStatus: oi.TransitionedObject.Status,
}
}
func auditLifecycleTags(event lifecycle.Event) map[string]interface{} {
const (
ilmAction = "ilm-action"
ilmDue = "ilm-due"
ilmRuleID = "ilm-rule-id"
ilmTier = "ilm-tier"
)
tags := make(map[string]interface{}, 4)
tags[ilmAction] = event.Action.String()
tags[ilmDue] = event.Due
tags[ilmRuleID] = event.RuleID
if event.StorageClass != "" {
tags[ilmTier] = event.StorageClass
}
return tags
}