lifecycle: refactor rules filtering and tagging support (#15914)

This commit is contained in:
Anis Elleuch
2022-10-21 18:46:53 +01:00
committed by GitHub
parent 5aba2aedb3
commit ac85c2af76
10 changed files with 229 additions and 168 deletions

View File

@@ -155,9 +155,14 @@ type newerNoncurrentTask struct {
versions []ObjectToDelete
}
type transitionTask struct {
tier string
objInfo ObjectInfo
}
type transitionState struct {
once sync.Once
transitionCh chan ObjectInfo
transitionCh chan transitionTask
ctx context.Context
objAPI ObjectLayer
@@ -171,13 +176,13 @@ type transitionState struct {
lastDayStats map[string]*lastDayTierStats
}
func (t *transitionState) queueTransitionTask(oi ObjectInfo) {
func (t *transitionState) queueTransitionTask(oi ObjectInfo, sc string) {
select {
case <-GlobalContext.Done():
t.once.Do(func() {
close(t.transitionCh)
})
case t.transitionCh <- oi:
case t.transitionCh <- transitionTask{objInfo: oi, tier: sc}:
default:
}
}
@@ -186,7 +191,7 @@ var globalTransitionState *transitionState
func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionState {
return &transitionState{
transitionCh: make(chan ObjectInfo, 10000),
transitionCh: make(chan transitionTask, 10000),
ctx: ctx,
objAPI: objAPI,
killCh: make(chan struct{}),
@@ -213,27 +218,25 @@ func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) {
return
case <-ctx.Done():
return
case oi, ok := <-t.transitionCh:
case task, ok := <-t.transitionCh:
if !ok {
return
}
atomic.AddInt32(&t.activeTasks, 1)
var tier string
var err error
if tier, err = transitionObject(ctx, objectAPI, oi); err != nil {
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err))
if err := transitionObject(ctx, objectAPI, task.objInfo, task.tier); err != nil {
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w",
task.objInfo.Bucket, task.objInfo.Name, task.objInfo.VersionID, err))
} else {
ts := tierStats{
TotalSize: uint64(oi.Size),
TotalSize: uint64(task.objInfo.Size),
NumVersions: 1,
}
if oi.IsLatest {
if task.objInfo.IsLatest {
ts.NumObjects = 1
}
t.addLastDayStats(tier, ts)
t.addLastDayStats(task.tier, ts)
}
atomic.AddInt32(&t.activeTasks, -1)
}
}
}
@@ -304,9 +307,10 @@ func validateTransitionTier(lc *lifecycle.Lifecycle) error {
// This is to be called after a successful upload of an object (version).
func enqueueTransitionImmediate(obj ObjectInfo) {
if lc, err := globalLifecycleSys.Get(obj.Bucket); err == nil {
switch lc.ComputeAction(obj.ToLifecycleOpts()) {
event := lc.Eval(obj.ToLifecycleOpts(), time.Now())
switch event.Action {
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
globalTransitionState.queueTransitionTask(obj)
globalTransitionState.queueTransitionTask(obj, event.StorageClass)
}
}
}
@@ -403,12 +407,7 @@ func genTransitionObjName(bucket string) (string, error) {
// storage specified by the transition ARN, the metadata is left behind on source cluster and original content
// is moved to the transition tier. Note that in the case of encrypted objects, entire encrypted stream is moved
// to the transition tier without decrypting or re-encrypting.
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) (string, error) {
lc, err := globalLifecycleSys.Get(oi.Bucket)
if err != nil {
return "", err
}
tier := lc.TransitionTier(oi.ToLifecycleOpts())
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo, tier string) error {
opts := ObjectOptions{
Transition: TransitionOptions{
Status: lifecycle.TransitionPending,
@@ -420,7 +419,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo)
VersionSuspended: globalBucketVersioningSys.PrefixSuspended(oi.Bucket, oi.Name),
MTime: oi.ModTime,
}
return tier, objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts)
return objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts)
}
type auditTierOp struct {