Add MaxNoncurrentVersions to NoncurrentExpiration action (#13580)

This unit allows users to limit the maximum number of noncurrent 
versions of an object.

To enable this rule you need the following *ilm.json*
```
cat >> ilm.json <<EOF
{
    "Rules": [
        {
            "ID": "test-max-noncurrent",
            "Status": "Enabled",
            "Filter": {
                "Prefix": "user-uploads/"
            },
            "NoncurrentVersionExpiration": {
                "MaxNoncurrentVersions": 5
            }
        }
    ]
}
EOF
mc ilm import myminio/mybucket < ilm.json
```
This commit is contained in:
Krishnan Parthasarathi
2021-11-19 17:54:10 -08:00
committed by GitHub
parent 1e2fac054c
commit 3da9ee15d3
16 changed files with 707 additions and 308 deletions

View File

@@ -81,40 +81,58 @@ type expiryTask struct {
}
type expiryState struct {
once sync.Once
expiryCh chan expiryTask
once sync.Once
byDaysCh chan expiryTask
byMaxNoncurrentCh chan maxNoncurrentTask
}
// PendingTasks returns the number of pending ILM expiry tasks.
func (es *expiryState) PendingTasks() int {
return len(es.expiryCh)
return len(es.byDaysCh) + len(es.byMaxNoncurrentCh)
}
func (es *expiryState) queueExpiryTask(oi ObjectInfo, restoredObject bool, rmVersion bool) {
// close closes work channels exactly once.
func (es *expiryState) close() {
es.once.Do(func() {
close(es.byDaysCh)
close(es.byMaxNoncurrentCh)
})
}
// enqueueByDays enqueues object versions expired by days for expiry.
func (es *expiryState) enqueueByDays(oi ObjectInfo, restoredObject bool, rmVersion bool) {
select {
case <-GlobalContext.Done():
es.once.Do(func() {
close(es.expiryCh)
})
case es.expiryCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion, restoredObject: restoredObject}:
es.close()
case es.byDaysCh <- expiryTask{objInfo: oi, versionExpiry: rmVersion, restoredObject: restoredObject}:
default:
}
}
var (
globalExpiryState *expiryState
)
// enqueueByMaxNoncurrent enqueues object versions expired by
// MaxNoncurrentVersions limit for expiry.
func (es *expiryState) enqueueByMaxNoncurrent(bucket string, versions []ObjectToDelete) {
select {
case <-GlobalContext.Done():
es.close()
case es.byMaxNoncurrentCh <- maxNoncurrentTask{bucket: bucket, versions: versions}:
default:
}
}
var globalExpiryState *expiryState
func newExpiryState() *expiryState {
return &expiryState{
expiryCh: make(chan expiryTask, 10000),
byDaysCh: make(chan expiryTask, 10000),
byMaxNoncurrentCh: make(chan maxNoncurrentTask, 10000),
}
}
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
globalExpiryState = newExpiryState()
go func() {
for t := range globalExpiryState.expiryCh {
for t := range globalExpiryState.byDaysCh {
if t.objInfo.TransitionedObject.Status != "" {
applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.restoredObject)
} else {
@@ -122,6 +140,18 @@ func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
}
}
}()
go func() {
for t := range globalExpiryState.byMaxNoncurrentCh {
deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions)
}
}()
}
// maxNoncurrentTask encapsulates arguments required by worker to expire objects
// by MaxNoncurrentVersions
type maxNoncurrentTask struct {
bucket string
versions []ObjectToDelete
}
type transitionState struct {