Improve expiration of tiered objects (#18926)

- Use a shared worker pool for all ILM expiry tasks
- Free version cleanup executes in a separate goroutine
- Add a free version only if removing the remote object fails
- Add ILM expiry metrics to the node namespace
- Move tier journal tasks to expiryState
- Remove unused on-disk journal for tiered objects pending deletion
- Distribute expiry tasks across workers such that the expiry of versions of
  the same object serialized
- Ability to resize worker pool without server restart
- Make scaling down of expiryState workers' concurrency safe; Thanks
  @klauspost
- Add error logs when expiryState and transition state are not
  initialized (yet)
* metrics: Add missed tier journal entry tasks
* Initialize the ILM worker pool after the object layer
This commit is contained in:
Krishnan Parthasarathi 2024-03-01 21:11:03 -08:00 committed by GitHub
parent 325fd80687
commit a7577da768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 611 additions and 1131 deletions

View File

@ -24,7 +24,6 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -39,12 +38,9 @@ import (
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/event" "github.com/minio/minio/internal/event"
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select" "github.com/minio/minio/internal/s3select"
"github.com/minio/pkg/v2/env"
xnet "github.com/minio/pkg/v2/net" xnet "github.com/minio/pkg/v2/net"
"github.com/minio/pkg/v2/workers"
"github.com/zeebo/xxh3" "github.com/zeebo/xxh3"
) )
@ -105,95 +101,280 @@ type expiryTask struct {
src lcEventSrc src lcEventSrc
} }
// expiryStats records metrics related to ILM expiry activities
type expiryStats struct {
missedExpiryTasks atomic.Int64
missedFreeVersTasks atomic.Int64
missedTierJournalTasks atomic.Int64
workers atomic.Int32
}
// MissedTasks returns the number of ILM expiry tasks that were missed since
// there were no available workers.
func (e *expiryStats) MissedTasks() int64 {
return e.missedExpiryTasks.Load()
}
// MissedFreeVersTasks returns the number of free version collection tasks that
// were missed since there were no available workers.
func (e *expiryStats) MissedFreeVersTasks() int64 {
return e.missedFreeVersTasks.Load()
}
// MissedTierJournalTasks returns the number of tasks to remove tiered objects
// that were missed since there were no available workers.
func (e *expiryStats) MissedTierJournalTasks() int64 {
return e.missedTierJournalTasks.Load()
}
// NumWorkers returns the number of active workers executing one of ILM expiry
// tasks or free version collection tasks.
func (e *expiryStats) NumWorkers() int32 {
return e.workers.Load()
}
type expiryOp interface {
OpHash() uint64
}
type freeVersionTask struct {
ObjectInfo
}
func (f freeVersionTask) OpHash() uint64 {
return xxh3.HashString(f.TransitionedObject.Tier + f.TransitionedObject.Name)
}
func (n newerNoncurrentTask) OpHash() uint64 {
return xxh3.HashString(n.bucket + n.versions[0].ObjectV.ObjectName)
}
func (j jentry) OpHash() uint64 {
return xxh3.HashString(j.TierName + j.ObjName)
}
func (e expiryTask) OpHash() uint64 {
return xxh3.HashString(e.objInfo.Bucket + e.objInfo.Name)
}
// expiryState manages all ILM related expiration activities.
type expiryState struct { type expiryState struct {
once sync.Once mu sync.RWMutex
byDaysCh chan expiryTask workers atomic.Pointer[[]chan expiryOp]
byNewerNoncurrentCh chan newerNoncurrentTask
ctx context.Context
objAPI ObjectLayer
stats expiryStats
} }
// PendingTasks returns the number of pending ILM expiry tasks. // PendingTasks returns the number of pending ILM expiry tasks.
func (es *expiryState) PendingTasks() int { func (es *expiryState) PendingTasks() int {
return len(es.byDaysCh) + len(es.byNewerNoncurrentCh) w := es.workers.Load()
if w == nil || len(*w) == 0 {
return 0
}
var tasks int
for _, wrkr := range *w {
tasks += len(wrkr)
}
return tasks
} }
// close closes work channels exactly once. // enqueueTierJournalEntry enqueues a tier journal entry referring to a remote
func (es *expiryState) close() { // object corresponding to a 'replaced' object versions. This applies only to
es.once.Do(func() { // non-versioned or version suspended buckets.
xioutil.SafeClose(es.byDaysCh) func (es *expiryState) enqueueTierJournalEntry(je jentry) {
xioutil.SafeClose(es.byNewerNoncurrentCh) wrkr := es.getWorkerCh(je.OpHash())
}) if wrkr == nil {
es.stats.missedTierJournalTasks.Add(1)
return
}
select {
case <-GlobalContext.Done():
case wrkr <- je:
default:
es.stats.missedTierJournalTasks.Add(1)
}
}
// enqueueFreeVersion enqueues a free version to be deleted
func (es *expiryState) enqueueFreeVersion(oi ObjectInfo) {
task := freeVersionTask{ObjectInfo: oi}
wrkr := es.getWorkerCh(task.OpHash())
if wrkr == nil {
es.stats.missedFreeVersTasks.Add(1)
return
}
select {
case <-GlobalContext.Done():
case wrkr <- task:
default:
es.stats.missedFreeVersTasks.Add(1)
}
} }
// enqueueByDays enqueues object versions expired by days for expiry. // enqueueByDays enqueues object versions expired by days for expiry.
func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event, src lcEventSrc) { func (es *expiryState) enqueueByDays(oi ObjectInfo, event lifecycle.Event, src lcEventSrc) {
task := expiryTask{objInfo: oi, event: event, src: src}
wrkr := es.getWorkerCh(task.OpHash())
if wrkr == nil {
es.stats.missedExpiryTasks.Add(1)
return
}
select { select {
case <-GlobalContext.Done(): case <-GlobalContext.Done():
es.close() case wrkr <- task:
case es.byDaysCh <- expiryTask{objInfo: oi, event: event, src: src}:
default: default:
es.stats.missedExpiryTasks.Add(1)
} }
} }
// enqueueByNewerNoncurrent enqueues object versions expired by // enqueueByNewerNoncurrent enqueues object versions expired by
// NewerNoncurrentVersions limit for expiry. // NewerNoncurrentVersions limit for expiry.
func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete, lcEvent lifecycle.Event) { func (es *expiryState) enqueueByNewerNoncurrent(bucket string, versions []ObjectToDelete, lcEvent lifecycle.Event) {
task := newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent}
wrkr := es.getWorkerCh(task.OpHash())
if wrkr == nil {
es.stats.missedExpiryTasks.Add(1)
return
}
select { select {
case <-GlobalContext.Done(): case <-GlobalContext.Done():
es.close() case wrkr <- task:
case es.byNewerNoncurrentCh <- newerNoncurrentTask{bucket: bucket, versions: versions, event: lcEvent}:
default: default:
es.stats.missedExpiryTasks.Add(1)
} }
} }
var globalExpiryState = newExpiryState() // globalExpiryState is the per-node instance which manages all ILM expiry tasks.
var globalExpiryState *expiryState
func newExpiryState() *expiryState { // newExpiryState creates an expiryState with buffered channels allocated for
return &expiryState{ // each ILM expiry task type.
byDaysCh: make(chan expiryTask, 100000), func newExpiryState(ctx context.Context, objAPI ObjectLayer, n int) *expiryState {
byNewerNoncurrentCh: make(chan newerNoncurrentTask, 100000), es := &expiryState{
ctx: ctx,
objAPI: objAPI,
}
workers := make([]chan expiryOp, 0, n)
es.workers.Store(&workers)
es.ResizeWorkers(n)
return es
}
func (es *expiryState) getWorkerCh(h uint64) chan<- expiryOp {
w := es.workers.Load()
if w == nil || len(*w) == 0 {
return nil
}
workers := *w
return workers[h%uint64(len(workers))]
}
func (es *expiryState) ResizeWorkers(n int) {
// Lock to avoid multiple resizes to happen at the same time.
es.mu.Lock()
defer es.mu.Unlock()
var workers []chan expiryOp
if v := es.workers.Load(); v != nil {
// Copy to new array.
workers = append(workers, *v...)
}
if n == len(workers) || n < 1 {
return
}
for len(workers) < n {
input := make(chan expiryOp, 10000)
workers = append(workers, input)
go es.Worker(input)
es.stats.workers.Add(1)
}
for len(workers) > n {
worker := workers[len(workers)-1]
workers = workers[:len(workers)-1]
worker <- expiryOp(nil)
es.stats.workers.Add(-1)
}
// Atomically replace workers.
es.workers.Store(&workers)
}
// Worker handles 4 types of expiration tasks.
// 1. Expiry of objects, includes regular and transitioned objects
// 2. Expiry of noncurrent versions due to NewerNoncurrentVersions
// 3. Expiry of free-versions, for remote objects of transitioned object which have been expired since.
// 4. Expiry of remote objects corresponding to objects in a
// non-versioned/version suspended buckets
func (es *expiryState) Worker(input <-chan expiryOp) {
for {
select {
case <-es.ctx.Done():
return
case v, ok := <-input:
if !ok {
return
}
if v == nil {
// ResizeWorkers signaling worker to quit
return
}
switch v := v.(type) {
case expiryTask:
if v.objInfo.TransitionedObject.Status != "" {
applyExpiryOnTransitionedObject(es.ctx, es.objAPI, v.objInfo, v.event, v.src)
} else {
applyExpiryOnNonTransitionedObjects(es.ctx, es.objAPI, v.objInfo, v.event, v.src)
}
case newerNoncurrentTask:
deleteObjectVersions(es.ctx, es.objAPI, v.bucket, v.versions, v.event)
case jentry:
logger.LogIf(es.ctx, deleteObjectFromRemoteTier(es.ctx, v.ObjName, v.VersionID, v.TierName))
case freeVersionTask:
oi := v.ObjectInfo
traceFn := globalLifecycleSys.trace(oi)
if !oi.TransitionedObject.FreeVersion {
// nothing to be done
return
}
ignoreNotFoundErr := func(err error) error {
switch {
case isErrVersionNotFound(err), isErrObjectNotFound(err):
return nil
}
return err
}
// Remove the remote object
err := deleteObjectFromRemoteTier(es.ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
if ignoreNotFoundErr(err) != nil {
logger.LogIf(es.ctx, err)
return
}
// Remove this free version
_, err = es.objAPI.DeleteObject(es.ctx, oi.Bucket, oi.Name, ObjectOptions{
VersionID: oi.VersionID,
InclFreeVersions: true,
})
if err == nil {
auditLogLifecycle(es.ctx, oi, ILMFreeVersionDelete, nil, traceFn)
}
if ignoreNotFoundErr(err) != nil {
logger.LogIf(es.ctx, err)
}
default:
logger.LogIf(es.ctx, fmt.Errorf("Invalid work type - %v", v))
}
}
} }
} }
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2))) globalExpiryState = newExpiryState(ctx, objectAPI, globalAPIConfig.getExpiryWorkers())
if workerSize == 0 {
workerSize = 4
}
ewk, err := workers.New(workerSize)
if err != nil {
logger.LogIf(ctx, err)
}
nwk, err := workers.New(workerSize)
if err != nil {
logger.LogIf(ctx, err)
}
go func() {
for t := range globalExpiryState.byDaysCh {
ewk.Take()
go func(t expiryTask) {
defer ewk.Give()
if t.objInfo.TransitionedObject.Status != "" {
applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.event, t.src)
} else {
applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.event, t.src)
}
}(t)
}
ewk.Wait()
}()
go func() {
for t := range globalExpiryState.byNewerNoncurrentCh {
nwk.Take()
go func(t newerNoncurrentTask) {
defer nwk.Give()
deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions, t.event)
}(t)
}
nwk.Wait()
}()
} }
// newerNoncurrentTask encapsulates arguments required by worker to expire objects // newerNoncurrentTask encapsulates arguments required by worker to expire objects
@ -417,18 +598,18 @@ func expireTransitionedObject(ctx context.Context, objectAPI ObjectLayer, oi *Ob
} }
return err return err
} }
// When an object is past expiry or when a transitioned object is being
// deleted, 'mark' the data in the remote tier for delete. // Delete remote object from warm-tier
entry := jentry{ err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
ObjName: oi.TransitionedObject.Name, if err == nil {
VersionID: oi.TransitionedObject.VersionID, // Skip adding free version since we successfully deleted the
TierName: oi.TransitionedObject.Tier, // remote object
opts.SkipFreeVersion = true
} else {
logger.LogIf(ctx, err)
} }
if err := globalTierJournal.AddEntry(entry); err != nil {
return err // Now, delete object from hot-tier namespace
}
// Delete metadata on source, now that data in remote tier has been
// marked for deletion.
if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil { if _, err := objectAPI.DeleteObject(ctx, oi.Bucket, oi.Name, opts); err != nil {
return err return err
} }

View File

@ -45,6 +45,7 @@ import (
"github.com/minio/minio/internal/config/identity/openid" "github.com/minio/minio/internal/config/identity/openid"
idplugin "github.com/minio/minio/internal/config/identity/plugin" idplugin "github.com/minio/minio/internal/config/identity/plugin"
xtls "github.com/minio/minio/internal/config/identity/tls" xtls "github.com/minio/minio/internal/config/identity/tls"
"github.com/minio/minio/internal/config/ilm"
"github.com/minio/minio/internal/config/lambda" "github.com/minio/minio/internal/config/lambda"
"github.com/minio/minio/internal/config/notify" "github.com/minio/minio/internal/config/notify"
"github.com/minio/minio/internal/config/policy/opa" "github.com/minio/minio/internal/config/policy/opa"
@ -78,6 +79,7 @@ func initHelp() {
config.SubnetSubSys: subnet.DefaultKVS, config.SubnetSubSys: subnet.DefaultKVS,
config.CallhomeSubSys: callhome.DefaultKVS, config.CallhomeSubSys: callhome.DefaultKVS,
config.DriveSubSys: drive.DefaultKVS, config.DriveSubSys: drive.DefaultKVS,
config.ILMSubSys: ilm.DefaultKVS,
config.CacheSubSys: cache.DefaultKVS, config.CacheSubSys: cache.DefaultKVS,
config.BatchSubSys: batch.DefaultKVS, config.BatchSubSys: batch.DefaultKVS,
config.BrowserSubSys: browser.DefaultKVS, config.BrowserSubSys: browser.DefaultKVS,
@ -716,6 +718,22 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
return fmt.Errorf("Unable to apply browser config: %w", err) return fmt.Errorf("Unable to apply browser config: %w", err)
} }
globalBrowserConfig.Update(browserCfg) globalBrowserConfig.Update(browserCfg)
case config.ILMSubSys:
ilmCfg, err := ilm.LookupConfig(s[config.ILMSubSys][config.Default])
if err != nil {
return fmt.Errorf("Unable to apply ilm config: %w", err)
}
if globalTransitionState != nil {
globalTransitionState.UpdateWorkers(ilmCfg.TransitionWorkers)
} else {
logger.LogIf(ctx, fmt.Errorf("ILM transition subsystem not initialized"))
}
if globalExpiryState != nil {
globalExpiryState.ResizeWorkers(ilmCfg.ExpirationWorkers)
} else {
logger.LogIf(ctx, fmt.Errorf("ILM expiration subsystem not initialized"))
}
} }
globalServerConfigMu.Lock() globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock() defer globalServerConfigMu.Unlock()

View File

@ -982,42 +982,6 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
return lcEvt.Action, size return lcEvt.Action, size
} }
// applyTierObjSweep removes remote object pending deletion and the free-version
// tracking this information.
func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi ObjectInfo) {
traceFn := globalLifecycleSys.trace(oi)
if !oi.TransitionedObject.FreeVersion {
// nothing to be done
return
}
ignoreNotFoundErr := func(err error) error {
switch {
case isErrVersionNotFound(err), isErrObjectNotFound(err):
return nil
}
return err
}
// Remove the remote object
err := deleteObjectFromRemoteTier(ctx, oi.TransitionedObject.Name, oi.TransitionedObject.VersionID, oi.TransitionedObject.Tier)
if ignoreNotFoundErr(err) != nil {
logger.LogIf(ctx, err)
return
}
// Remove this free version
_, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
VersionID: oi.VersionID,
InclFreeVersions: true,
})
if err == nil {
auditLogLifecycle(ctx, oi, ILMFreeVersionDelete, nil, traceFn)
}
if ignoreNotFoundErr(err) != nil {
logger.LogIf(ctx, err)
}
}
// applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured. // applyNewerNoncurrentVersionLimit removes noncurrent versions older than the most recent NewerNoncurrentVersions configured.
// Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return. // Note: This function doesn't update sizeSummary since it always removes versions that it doesn't return.
func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) { func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ ObjectLayer, fivs []FileInfo, expState *expiryState) ([]ObjectInfo, error) {

View File

@ -39,14 +39,20 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
globalBucketMetadataSys = NewBucketMetadataSys() globalBucketMetadataSys = NewBucketMetadataSys()
globalBucketObjectLockSys = &BucketObjectLockSys{} globalBucketObjectLockSys = &BucketObjectLockSys{}
globalBucketVersioningSys = &BucketVersioningSys{} globalBucketVersioningSys = &BucketVersioningSys{}
expiryState := newExpiryState() es := newExpiryState(context.Background(), objAPI, 0)
workers := []chan expiryOp{make(chan expiryOp)}
es.workers.Store(&workers)
globalExpiryState = es
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
expired := make([]ObjectToDelete, 0, 5) expired := make([]ObjectToDelete, 0, 5)
go func() { go func() {
defer wg.Done() defer wg.Done()
for t := range expiryState.byNewerNoncurrentCh { workers := globalExpiryState.workers.Load()
expired = append(expired, t.versions...) for t := range (*workers)[0] {
if t, ok := t.(newerNoncurrentTask); ok {
expired = append(expired, t.versions...)
}
} }
}() }()
lc := lifecycle.Lifecycle{ lc := lifecycle.Lifecycle{
@ -116,7 +122,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
for i, fi := range fivs[:2] { for i, fi := range fivs[:2] {
wants[i] = fi.ToObjectInfo(bucket, obj, versioned) wants[i] = fi.ToObjectInfo(bucket, obj, versioned)
} }
gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, expiryState) gots, err := item.applyNewerNoncurrentVersionLimit(context.TODO(), objAPI, fivs, es)
if err != nil { if err != nil {
t.Fatalf("Failed with err: %v", err) t.Fatalf("Failed with err: %v", err)
} }
@ -125,7 +131,7 @@ func TestApplyNewerNoncurrentVersionsLimit(t *testing.T) {
} }
// Close expiry state's channel to inspect object versions enqueued for expiration // Close expiry state's channel to inspect object versions enqueued for expiration
close(expiryState.byNewerNoncurrentCh) close(workers[0])
wg.Wait() wg.Wait()
for _, obj := range expired { for _, obj := range expired {
switch obj.ObjectV.VersionID { switch obj.ObjectV.VersionID {

View File

@ -525,6 +525,7 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []
const ( const (
tierFVID = "tier-free-versionID" tierFVID = "tier-free-versionID"
tierFVMarker = "tier-free-marker" tierFVMarker = "tier-free-marker"
tierSkipFVID = "tier-skip-fvid"
) )
// SetTierFreeVersionID sets free-version's versionID. This method is used by // SetTierFreeVersionID sets free-version's versionID. This method is used by
@ -551,6 +552,23 @@ func (fi *FileInfo) SetTierFreeVersion() {
fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = "" fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] = ""
} }
// SetSkipTierFreeVersion indicates to skip adding a tier free version id.
// Note: Used only when expiring tiered objects and the remote content has
// already been scheduled for deletion
func (fi *FileInfo) SetSkipTierFreeVersion() {
if fi.Metadata == nil {
fi.Metadata = make(map[string]string)
}
fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID] = ""
}
// SkipTierFreeVersion returns true if set, false otherwise.
// See SetSkipTierVersion for its purpose.
func (fi *FileInfo) SkipTierFreeVersion() bool {
_, ok := fi.Metadata[ReservedMetadataPrefixLower+tierSkipFVID]
return ok
}
// TierFreeVersion returns true if version is a free-version. // TierFreeVersion returns true if version is a free-version.
func (fi *FileInfo) TierFreeVersion() bool { func (fi *FileInfo) TierFreeVersion() bool {
_, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker] _, ok := fi.Metadata[ReservedMetadataPrefixLower+tierFVMarker]

View File

@ -314,3 +314,11 @@ func TestTransitionInfoEquals(t *testing.T) {
t.Fatalf("Expected to be inequal: fi %v ofi %v", fi, ofi) t.Fatalf("Expected to be inequal: fi %v ofi %v", fi, ofi)
} }
} }
func TestSkipTierFreeVersion(t *testing.T) {
fi := newFileInfo("object", 8, 8)
fi.SetSkipTierFreeVersion()
if ok := fi.SkipTierFreeVersion(); !ok {
t.Fatal("Expected SkipTierFreeVersion to be set on FileInfo but wasn't")
}
}

View File

@ -1975,6 +1975,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
ExpireRestored: opts.Transition.ExpireRestored, ExpireRestored: opts.Transition.ExpireRestored,
} }
fi.SetTierFreeVersionID(fvID) fi.SetTierFreeVersionID(fvID)
if opts.SkipFreeVersion {
fi.SetSkipTierFreeVersion()
}
if opts.VersionID != "" { if opts.VersionID != "" {
fi.VersionID = opts.VersionID fi.VersionID = opts.VersionID
} else if opts.Versioned { } else if opts.Versioned {
@ -2004,6 +2007,9 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
ExpireRestored: opts.Transition.ExpireRestored, ExpireRestored: opts.Transition.ExpireRestored,
} }
dfi.SetTierFreeVersionID(fvID) dfi.SetTierFreeVersionID(fvID)
if opts.SkipFreeVersion {
dfi.SetSkipTierFreeVersion()
}
if err = er.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil { if err = er.deleteObjectVersion(ctx, bucket, object, dfi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object) return objInfo, toObjectErr(err, bucket, object)
} }

View File

@ -192,10 +192,6 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
go globalMRFState.healRoutine(z) go globalMRFState.healRoutine(z)
}) })
bootstrapTrace("initBackgroundExpiry", func() {
initBackgroundExpiry(GlobalContext, z)
})
// initialize the object layer. // initialize the object layer.
defer setObjectLayer(z) defer setObjectLayer(z)

View File

@ -411,8 +411,6 @@ var (
globalTierConfigMgr *TierConfigMgr globalTierConfigMgr *TierConfigMgr
globalTierJournal *TierJournal
globalConsoleSrv *consoleapi.Server globalConsoleSrv *consoleapi.Server
// handles service freeze or un-freeze S3 API calls. // handles service freeze or un-freeze S3 API calls.

View File

@ -47,6 +47,7 @@ type apiConfig struct {
replicationPriority string replicationPriority string
replicationMaxWorkers int replicationMaxWorkers int
transitionWorkers int transitionWorkers int
expiryWorkers int
staleUploadsExpiry time.Duration staleUploadsExpiry time.Duration
staleUploadsCleanupInterval time.Duration staleUploadsCleanupInterval time.Duration
@ -170,7 +171,9 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
} }
t.replicationPriority = cfg.ReplicationPriority t.replicationPriority = cfg.ReplicationPriority
t.replicationMaxWorkers = cfg.ReplicationMaxWorkers t.replicationMaxWorkers = cfg.ReplicationMaxWorkers
if globalTransitionState != nil && cfg.TransitionWorkers != t.transitionWorkers {
// N B api.transition_workers will be deprecated
if globalTransitionState != nil {
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
} }
t.transitionWorkers = cfg.TransitionWorkers t.transitionWorkers = cfg.TransitionWorkers
@ -365,6 +368,13 @@ func (t *apiConfig) getReplicationOpts() replicationPoolOpts {
} }
} }
func (t *apiConfig) getExpiryWorkers() int {
t.mu.RLock()
defer t.mu.RUnlock()
return t.expiryWorkers
}
func (t *apiConfig) getTransitionWorkers() int { func (t *apiConfig) getTransitionWorkers() int {
t.mu.RLock() t.mu.RLock()
defer t.mu.RUnlock() defer t.mu.RUnlock()

View File

@ -273,10 +273,14 @@ const (
vmemory = "virtual_memory_bytes" vmemory = "virtual_memory_bytes"
cpu = "cpu_total_seconds" cpu = "cpu_total_seconds"
expiryPendingTasks MetricName = "expiry_pending_tasks" expiryPendingTasks MetricName = "expiry_pending_tasks"
transitionPendingTasks MetricName = "transition_pending_tasks" expiryMissedTasks MetricName = "expiry_missed_tasks"
transitionActiveTasks MetricName = "transition_active_tasks" expiryMissedFreeVersions MetricName = "expiry_missed_freeversions"
transitionMissedTasks MetricName = "transition_missed_immediate_tasks" expiryMissedTierJournalTasks MetricName = "expiry_missed_tierjournal_tasks"
expiryNumWorkers MetricName = "expiry_num_workers"
transitionPendingTasks MetricName = "transition_pending_tasks"
transitionActiveTasks MetricName = "transition_active_tasks"
transitionMissedTasks MetricName = "transition_missed_immediate_tasks"
transitionedBytes MetricName = "transitioned_bytes" transitionedBytes MetricName = "transitioned_bytes"
transitionedObjects MetricName = "transitioned_objects" transitionedObjects MetricName = "transitioned_objects"
@ -2000,6 +2004,42 @@ func getILMNodeMetrics() *MetricsGroup {
expPendingTasks := Metric{ expPendingTasks := Metric{
Description: getExpiryPendingTasksMD(), Description: getExpiryPendingTasksMD(),
} }
expMissedTasks := Metric{
Description: MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: ilmSubsystem,
Name: expiryMissedTasks,
Help: "Number of object version expiry missed due to busy system",
Type: counterMetric,
},
}
expMissedFreeVersions := Metric{
Description: MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: ilmSubsystem,
Name: expiryMissedFreeVersions,
Help: "Number of free versions expiry missed due to busy system",
Type: counterMetric,
},
}
expMissedTierJournalTasks := Metric{
Description: MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: ilmSubsystem,
Name: expiryMissedTierJournalTasks,
Help: "Number of tier journal entries cleanup missed due to busy system",
Type: counterMetric,
},
}
expNumWorkers := Metric{
Description: MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: ilmSubsystem,
Name: expiryNumWorkers,
Help: "Number of workers expiring object versions currently",
Type: gaugeMetric,
},
}
trPendingTasks := Metric{ trPendingTasks := Metric{
Description: getTransitionPendingTasksMD(), Description: getTransitionPendingTasksMD(),
} }
@ -2011,6 +2051,10 @@ func getILMNodeMetrics() *MetricsGroup {
} }
if globalExpiryState != nil { if globalExpiryState != nil {
expPendingTasks.Value = float64(globalExpiryState.PendingTasks()) expPendingTasks.Value = float64(globalExpiryState.PendingTasks())
expMissedTasks.Value = float64(globalExpiryState.stats.MissedTasks())
expMissedFreeVersions.Value = float64(globalExpiryState.stats.MissedFreeVersTasks())
expMissedTierJournalTasks.Value = float64(globalExpiryState.stats.MissedTierJournalTasks())
expNumWorkers.Value = float64(globalExpiryState.stats.NumWorkers())
} }
if globalTransitionState != nil { if globalTransitionState != nil {
trPendingTasks.Value = float64(globalTransitionState.PendingTasks()) trPendingTasks.Value = float64(globalTransitionState.PendingTasks())
@ -2019,6 +2063,10 @@ func getILMNodeMetrics() *MetricsGroup {
} }
return []Metric{ return []Metric{
expPendingTasks, expPendingTasks,
expMissedTasks,
expMissedFreeVersions,
expMissedTierJournalTasks,
expNumWorkers,
trPendingTasks, trPendingTasks,
trActiveTasks, trActiveTasks,
trMissedTasks, trMissedTasks,

View File

@ -117,7 +117,13 @@ type ObjectOptions struct {
// Object must have been read at this point. // Object must have been read at this point.
IndexCB func() []byte IndexCB func() []byte
// InclFreeVersions indicates that free versions need to be included
// when looking up a version by fi.VersionID
InclFreeVersions bool InclFreeVersions bool
// SkipFreeVersion skips adding a free version when a tiered version is
// being 'replaced'
// Note: Used only when a tiered object is being expired.
SkipFreeVersion bool
MetadataChg bool // is true if it is a metadata update operation. MetadataChg bool // is true if it is a metadata update operation.
EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject. EvalRetentionBypassFn EvalRetentionBypassFn // only set for enforcing retention bypass on DeleteObject.

View File

@ -427,7 +427,6 @@ func initAllSubsystems(ctx context.Context) {
// Create new ILM tier configuration subsystem // Create new ILM tier configuration subsystem
globalTierConfigMgr = NewTierConfigMgr() globalTierConfigMgr = NewTierConfigMgr()
globalTierJournal = NewTierJournal()
globalTransitionState = newTransitionState(GlobalContext) globalTransitionState = newTransitionState(GlobalContext)
globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext) globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext)
@ -911,6 +910,11 @@ func serverMain(ctx *cli.Context) {
initBackgroundReplication(GlobalContext, newObject) initBackgroundReplication(GlobalContext, newObject)
}) })
// Initialize background ILM worker poool
bootstrapTrace("initBackgroundExpiry", func() {
initBackgroundExpiry(GlobalContext, newObject)
})
bootstrapTrace("globalTransitionState.Init", func() { bootstrapTrace("globalTransitionState.Init", func() {
globalTransitionState.Init(newObject) globalTransitionState.Init(newObject)
}) })
@ -930,8 +934,6 @@ func serverMain(ctx *cli.Context) {
bootstrapTrace("globalTierConfigMgr.Init", func() { bootstrapTrace("globalTierConfigMgr.Init", func() {
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil { if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
logger.LogIf(GlobalContext, err) logger.LogIf(GlobalContext, err)
} else {
logger.FatalIf(globalTierJournal.Init(GlobalContext), "Unable to initialize remote tier pending deletes journal")
} }
}) })
}() }()

View File

@ -1,295 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/minio/minio/internal/logger"
)
//go:generate msgp -file $GOFILE -unexported
//msgp:ignore TierJournal tierDiskJournal walkfn
type tierDiskJournal struct {
sync.RWMutex
diskPath string
file *os.File // active journal file
}
// TierJournal holds an in-memory and an on-disk delete journal of tiered content.
type TierJournal struct {
*tierDiskJournal // for processing legacy journal entries
*tierMemJournal // for processing new journal entries
}
type jentry struct {
ObjName string `msg:"obj"`
VersionID string `msg:"vid"`
TierName string `msg:"tier"`
}
const (
tierJournalVersion = 1
tierJournalHdrLen = 2 // 2 bytes
)
var errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version")
func newTierDiskJournal() *tierDiskJournal {
return &tierDiskJournal{}
}
// NewTierJournal initializes tier deletion journal
func NewTierJournal() *TierJournal {
j := &TierJournal{
tierMemJournal: newTierMemJournal(1000),
tierDiskJournal: newTierDiskJournal(),
}
return j
}
// Init initializes an in-memory journal built using a
// buffered channel for new journal entries. It also initializes the on-disk
// journal only to process existing journal entries made from previous versions.
func (t *TierJournal) Init(ctx context.Context) error {
for _, diskPath := range globalEndpoints.LocalDisksPaths() {
t.diskPath = diskPath
go t.deletePending(ctx) // for existing journal entries from previous MinIO versions
go t.processEntries(ctx) // for newer journal entries circa free-versions
return nil
}
return errors.New("no local drive found")
}
// rotate rotates the journal. If a read-only journal already exists it does
// nothing. Otherwise renames the active journal to a read-only journal and
// opens a new active journal.
func (jd *tierDiskJournal) rotate() error {
// Do nothing if a read-only journal file already exists.
if _, err := os.Stat(jd.ReadOnlyPath()); err == nil {
return nil
}
// Close the active journal if present and delete it.
return jd.Close()
}
type walkFn func(ctx context.Context, objName, rvID, tierName string) error
func (jd *tierDiskJournal) ReadOnlyPath() string {
return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
}
func (jd *tierDiskJournal) JournalPath() string {
return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin")
}
func (jd *tierDiskJournal) WalkEntries(ctx context.Context, fn walkFn) {
if err := jd.rotate(); err != nil {
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err))
return
}
ro, err := jd.OpenRO()
switch {
case errors.Is(err, os.ErrNotExist):
return // No read-only journal to process; nothing to do.
case err != nil:
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err))
return
}
defer ro.Close()
mr := msgpNewReader(ro)
defer readMsgpReaderPoolPut(mr)
done := false
for {
var entry jentry
err := entry.DecodeMsg(mr)
if errors.Is(err, io.EOF) {
done = true
break
}
if err != nil {
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
break
}
err = fn(ctx, entry.ObjName, entry.VersionID, entry.TierName)
if err != nil && !isErrObjectNotFound(err) {
logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
// We add the entry into the active journal to try again
// later.
jd.addEntry(entry)
}
}
if done {
os.Remove(jd.ReadOnlyPath())
}
}
func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
w, err := globalTierConfigMgr.getDriver(tierName)
if err != nil {
return err
}
err = w.Remove(ctx, objName, remoteVersionID(rvID))
if err != nil {
return err
}
return nil
}
func (jd *tierDiskJournal) deletePending(ctx context.Context) {
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
jd.WalkEntries(ctx, deleteObjectFromRemoteTier)
case <-ctx.Done():
jd.Close()
return
}
}
}
func (jd *tierDiskJournal) addEntry(je jentry) error {
// Open journal if it hasn't been
err := jd.Open()
if err != nil {
return err
}
b, err := je.MarshalMsg(nil)
if err != nil {
return err
}
jd.Lock()
defer jd.Unlock()
_, err = jd.file.Write(b)
if err != nil {
// Do not leak fd here, close the file properly.
Fdatasync(jd.file)
_ = jd.file.Close()
jd.file = nil // reset to allow subsequent reopen when file/disk is available.
}
return err
}
// Close closes the active journal and renames it to read-only for pending
// deletes processing. Note: calling Close on a closed journal is a no-op.
func (jd *tierDiskJournal) Close() error {
jd.Lock()
defer jd.Unlock()
if jd.file == nil { // already closed
return nil
}
var (
f *os.File
fi os.FileInfo
err error
)
// Setting j.file to nil
f, jd.file = jd.file, f
if fi, err = f.Stat(); err != nil {
return err
}
f.Close() // close before rename()
// Skip renaming active journal if empty.
if fi.Size() == tierJournalHdrLen {
return os.Remove(jd.JournalPath())
}
jPath := jd.JournalPath()
jroPath := jd.ReadOnlyPath()
// Rotate active journal to perform pending deletes.
return os.Rename(jPath, jroPath)
}
// Open opens a new active journal. Note: calling Open on an opened journal is a
// no-op.
func (jd *tierDiskJournal) Open() error {
jd.Lock()
defer jd.Unlock()
if jd.file != nil { // already open
return nil
}
var err error
jd.file, err = OpenFile(jd.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0o666)
if err != nil {
return err
}
// write journal version header if active journal is empty
fi, err := jd.file.Stat()
if err != nil {
return err
}
if fi.Size() == 0 {
var data [tierJournalHdrLen]byte
binary.LittleEndian.PutUint16(data[:], tierJournalVersion)
_, err = jd.file.Write(data[:])
if err != nil {
return err
}
}
return nil
}
func (jd *tierDiskJournal) OpenRO() (io.ReadCloser, error) {
file, err := Open(jd.ReadOnlyPath())
if err != nil {
return nil, err
}
// read journal version header
var data [tierJournalHdrLen]byte
if _, err := io.ReadFull(file, data[:]); err != nil {
return nil, err
}
switch binary.LittleEndian.Uint16(data[:]) {
case tierJournalVersion:
return file, nil
default:
return nil, errUnsupportedJournalVersion
}
}
// jentryV1 represents the entry in the journal before RemoteVersionID was
// added. It remains here for use in tests for the struct element addition.
type jentryV1 struct {
ObjName string `msg:"obj"`
TierName string `msg:"tier"`
}

View File

@ -1,288 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *jentry) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "vid":
z.VersionID, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "tier":
z.TierName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z jentry) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 3
// write "obj"
err = en.Append(0x83, 0xa3, 0x6f, 0x62, 0x6a)
if err != nil {
return
}
err = en.WriteString(z.ObjName)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
// write "vid"
err = en.Append(0xa3, 0x76, 0x69, 0x64)
if err != nil {
return
}
err = en.WriteString(z.VersionID)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
// write "tier"
err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72)
if err != nil {
return
}
err = en.WriteString(z.TierName)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z jentry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 3
// string "obj"
o = append(o, 0x83, 0xa3, 0x6f, 0x62, 0x6a)
o = msgp.AppendString(o, z.ObjName)
// string "vid"
o = append(o, 0xa3, 0x76, 0x69, 0x64)
o = msgp.AppendString(o, z.VersionID)
// string "tier"
o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72)
o = msgp.AppendString(o, z.TierName)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *jentry) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "vid":
z.VersionID, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "VersionID")
return
}
case "tier":
z.TierName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z jentry) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 4 + msgp.StringPrefixSize + len(z.VersionID) + 5 + msgp.StringPrefixSize + len(z.TierName)
return
}
// DecodeMsg implements msgp.Decodable
func (z *jentryV1) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "tier":
z.TierName, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z jentryV1) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "obj"
err = en.Append(0x82, 0xa3, 0x6f, 0x62, 0x6a)
if err != nil {
return
}
err = en.WriteString(z.ObjName)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
// write "tier"
err = en.Append(0xa4, 0x74, 0x69, 0x65, 0x72)
if err != nil {
return
}
err = en.WriteString(z.TierName)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z jentryV1) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "obj"
o = append(o, 0x82, 0xa3, 0x6f, 0x62, 0x6a)
o = msgp.AppendString(o, z.ObjName)
// string "tier"
o = append(o, 0xa4, 0x74, 0x69, 0x65, 0x72)
o = msgp.AppendString(o, z.TierName)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *jentryV1) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "obj":
z.ObjName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjName")
return
}
case "tier":
z.TierName, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "TierName")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z jentryV1) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.ObjName) + 5 + msgp.StringPrefixSize + len(z.TierName)
return
}

View File

@ -1,236 +0,0 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshaljentry(t *testing.T) {
v := jentry{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgjentry(b *testing.B) {
v := jentry{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgjentry(b *testing.B) {
v := jentry{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaljentry(b *testing.B) {
v := jentry{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodejentry(t *testing.T) {
v := jentry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodejentry Msgsize() is inaccurate")
}
vn := jentry{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodejentry(b *testing.B) {
v := jentry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodejentry(b *testing.B) {
v := jentry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshaljentryV1(t *testing.T) {
v := jentryV1{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgjentryV1(b *testing.B) {
v := jentryV1{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgjentryV1(b *testing.B) {
v := jentryV1{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaljentryV1(b *testing.B) {
v := jentryV1{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodejentryV1(t *testing.T) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodejentryV1 Msgsize() is inaccurate")
}
vn := jentryV1{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodejentryV1(b *testing.B) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodejentryV1(b *testing.B) {
v := jentryV1{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -1,121 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
// TestJEntryReadOldToNew1 - tests that adding the RemoteVersionID parameter to the
// jentry struct does not cause unexpected errors when reading the serialized
// old version into new version.
func TestJEntryReadOldToNew1(t *testing.T) {
readOldToNewCases := []struct {
je jentryV1
exp jentry
}{
{jentryV1{"obj1", "tier1"}, jentry{"obj1", "", "tier1"}},
{jentryV1{"obj1", ""}, jentry{"obj1", "", ""}},
{jentryV1{"", "tier1"}, jentry{"", "", "tier1"}},
{jentryV1{"", ""}, jentry{"", "", ""}},
}
var b bytes.Buffer
for _, item := range readOldToNewCases {
bs, err := item.je.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
mr := msgp.NewReader(&b)
for i, item := range readOldToNewCases {
var je jentry
err := je.DecodeMsg(mr)
if err != nil {
t.Fatal(err)
}
if je != item.exp {
t.Errorf("Case %d: Expected: %v Got: %v", i, item.exp, je)
}
}
}
// TestJEntryWriteNewToOldMix1 - tests that adding the RemoteVersionID parameter
// to the jentry struct does not cause unexpected errors when writing. This
// simulates the case when the active journal has entries in the older version
// struct and due to errors new entries are added in the new version of the
// struct.
func TestJEntryWriteNewToOldMix1(t *testing.T) {
oldStructVals := []jentryV1{
{"obj1", "tier1"},
{"obj2", "tier2"},
{"obj3", "tier3"},
}
newStructVals := []jentry{
{"obj4", "", "tier1"},
{"obj5", "ver2", "tier2"},
{"obj6", "", "tier3"},
}
// Write old struct version values followed by new version values.
var b bytes.Buffer
for _, item := range oldStructVals {
bs, err := item.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
for _, item := range newStructVals {
bs, err := item.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
b.Write(bs)
}
// Read into new struct version and check.
mr := msgp.NewReader(&b)
for i := 0; i < len(oldStructVals)+len(newStructVals); i++ {
var je jentry
err := je.DecodeMsg(mr)
if err != nil {
t.Fatal(err)
}
var expectedJe jentry
if i < len(oldStructVals) {
// For old struct values, the RemoteVersionID will be
// empty
expectedJe = jentry{
ObjName: oldStructVals[i].ObjName,
VersionID: "",
TierName: oldStructVals[i].TierName,
}
} else {
expectedJe = newStructVals[i-len(oldStructVals)]
}
if expectedJe != je {
t.Errorf("Case %d: Expected: %v, Got: %v", i, expectedJe, je)
}
}
}

View File

@ -1,56 +0,0 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"github.com/minio/minio/internal/logger"
)
type tierMemJournal struct {
entries chan jentry
}
func newTierMemJournal(nevents int) *tierMemJournal {
return &tierMemJournal{
entries: make(chan jentry, nevents),
}
}
func (j *tierMemJournal) processEntries(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case entry := <-j.entries:
logger.LogIf(ctx, deleteObjectFromRemoteTier(ctx, entry.ObjName, entry.VersionID, entry.TierName))
}
}
}
func (j *tierMemJournal) AddEntry(je jentry) error {
select {
case j.entries <- je:
default:
return fmt.Errorf("failed to remove tiered content at %s with version %s from tier %s, will be retried later.",
je.ObjName, je.VersionID, je.TierName)
}
return nil
}

View File

@ -18,6 +18,8 @@
package cmd package cmd
import ( import (
"context"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
) )
@ -128,9 +130,26 @@ func (os *objSweeper) shouldRemoveRemoteObject() (jentry, bool) {
} }
// Sweep removes the transitioned object if it's no longer referred to. // Sweep removes the transitioned object if it's no longer referred to.
func (os *objSweeper) Sweep() error { func (os *objSweeper) Sweep() {
if je, ok := os.shouldRemoveRemoteObject(); ok { if je, ok := os.shouldRemoveRemoteObject(); ok {
return globalTierJournal.AddEntry(je) globalExpiryState.enqueueTierJournalEntry(je)
}
}
type jentry struct {
ObjName string
VersionID string
TierName string
}
func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
w, err := globalTierConfigMgr.getDriver(tierName)
if err != nil {
return err
}
err = w.Remove(ctx, objName, remoteVersionID(rvID))
if err != nil {
return err
} }
return nil return nil
} }

View File

@ -30,6 +30,9 @@ const freeVersion = "free-version"
// InitFreeVersion creates a free-version to track the tiered-content of j. If j has // InitFreeVersion creates a free-version to track the tiered-content of j. If j has
// no tiered content, it returns false. // no tiered content, it returns false.
func (j xlMetaV2Object) InitFreeVersion(fi FileInfo) (xlMetaV2Version, bool) { func (j xlMetaV2Object) InitFreeVersion(fi FileInfo) (xlMetaV2Version, bool) {
if fi.SkipTierFreeVersion() {
return xlMetaV2Version{}, false
}
if status, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok && bytes.Equal(status, []byte(lifecycle.TransitionComplete)) { if status, ok := j.MetaSys[ReservedMetadataPrefixLower+TransitionStatus]; ok && bytes.Equal(status, []byte(lifecycle.TransitionComplete)) {
vID, err := uuid.Parse(fi.TierFreeVersionID()) vID, err := uuid.Parse(fi.TierFreeVersionID())
if err != nil { if err != nil {

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/uuid"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
) )
@ -84,9 +85,7 @@ func TestFreeVersion(t *testing.T) {
Hash: nil, Hash: nil,
}}, }},
}, },
MarkDeleted: false, MarkDeleted: false,
// DeleteMarkerReplicationStatus: "",
// VersionPurgeStatus: "",
NumVersions: 1, NumVersions: 1,
SuccessorModTime: time.Time{}, SuccessorModTime: time.Time{},
} }
@ -228,3 +227,55 @@ func TestFreeVersion(t *testing.T) {
t.Fatalf("Expected zero free version but got %d", len(freeVersions)) t.Fatalf("Expected zero free version but got %d", len(freeVersions))
} }
} }
func TestSkipFreeVersion(t *testing.T) {
fi := FileInfo{
Volume: "volume",
Name: "object-name",
VersionID: "00000000-0000-0000-0000-000000000001",
IsLatest: true,
Deleted: false,
TransitionStatus: "",
DataDir: "bffea160-ca7f-465f-98bc-9b4f1c3ba1ef",
XLV1: false,
ModTime: time.Now(),
Size: 0,
Mode: 0,
Metadata: nil,
Parts: nil,
Erasure: ErasureInfo{
Algorithm: ReedSolomon.String(),
DataBlocks: 4,
ParityBlocks: 2,
BlockSize: 10000,
Index: 1,
Distribution: []int{1, 2, 3, 4, 5, 6, 7, 8},
Checksums: []ChecksumInfo{{
PartNumber: 1,
Algorithm: HighwayHash256S,
Hash: nil,
}},
},
MarkDeleted: false,
// DeleteMarkerReplicationStatus: "",
// VersionPurgeStatus: "",
NumVersions: 1,
SuccessorModTime: time.Time{},
}
fi.SetTierFreeVersionID(uuid.New().String())
// Test if free version is created when SkipTier wasn't set on fi
j := xlMetaV2Object{}
j.MetaSys = make(map[string][]byte)
j.MetaSys[metaTierName] = []byte("WARM-1")
j.MetaSys[metaTierStatus] = []byte(lifecycle.TransitionComplete)
j.MetaSys[metaTierObjName] = []byte("obj-1")
if _, ok := j.InitFreeVersion(fi); !ok {
t.Fatal("Expected a free version to be created")
}
// Test if we skip creating a free version if SkipTier was set on fi
fi.SetSkipTierFreeVersion()
if _, ok := j.InitFreeVersion(fi); ok {
t.Fatal("Expected no free version to be created")
}
}

View File

@ -632,7 +632,7 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
for _, freeVersion := range fivs.FreeVersions { for _, freeVersion := range fivs.FreeVersions {
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned) oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned)
done = globalScannerMetrics.time(scannerMetricTierObjSweep) done = globalScannerMetrics.time(scannerMetricTierObjSweep)
item.applyTierObjSweep(ctx, objAPI, oi) globalExpiryState.enqueueFreeVersion(oi)
done() done()
} }

View File

@ -41,6 +41,7 @@ const (
apiReplicationMaxWorkers = "replication_max_workers" apiReplicationMaxWorkers = "replication_max_workers"
apiTransitionWorkers = "transition_workers" apiTransitionWorkers = "transition_workers"
apiExpiryWorkers = "expiry_workers"
apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval"
apiStaleUploadsExpiry = "stale_uploads_expiry" apiStaleUploadsExpiry = "stale_uploads_expiry"
apiDeleteCleanupInterval = "delete_cleanup_interval" apiDeleteCleanupInterval = "delete_cleanup_interval"
@ -56,6 +57,7 @@ const (
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN" EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE" EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS"
EnvAPIExpiryWorkers = "MINIO_API_EXPIRY_WORKERS"
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM" EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS" // default config.EnableOn
EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY" EnvAPIReplicationPriority = "MINIO_API_REPLICATION_PRIORITY"
@ -117,6 +119,10 @@ var (
Key: apiTransitionWorkers, Key: apiTransitionWorkers,
Value: "100", Value: "100",
}, },
config.KV{
Key: apiExpiryWorkers,
Value: "100",
},
config.KV{ config.KV{
Key: apiStaleUploadsCleanupInterval, Key: apiStaleUploadsCleanupInterval,
Value: "6h", Value: "6h",
@ -164,6 +170,7 @@ type Config struct {
ReplicationPriority string `json:"replication_priority"` ReplicationPriority string `json:"replication_priority"`
ReplicationMaxWorkers int `json:"replication_max_workers"` ReplicationMaxWorkers int `json:"replication_max_workers"`
TransitionWorkers int `json:"transition_workers"` TransitionWorkers int `json:"transition_workers"`
ExpiryWorkers int `json:"expiry_workers"`
StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"`
StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"`
DeleteCleanupInterval time.Duration `json:"delete_cleanup_interval"` DeleteCleanupInterval time.Duration `json:"delete_cleanup_interval"`
@ -281,6 +288,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
} }
cfg.TransitionWorkers = transitionWorkers cfg.TransitionWorkers = transitionWorkers
expiryWorkers, err := strconv.Atoi(env.Get(EnvAPIExpiryWorkers, kvs.GetWithDefault(apiExpiryWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
if expiryWorkers <= 0 || expiryWorkers > 500 {
return cfg, config.ErrInvalidExpiryWorkersValue(nil).Msg("Number of expiry workers should be between 1 and 500")
}
cfg.ExpiryWorkers = expiryWorkers
v := env.Get(EnvAPIDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval)) v := env.Get(EnvAPIDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval))
if v == "" { if v == "" {
v = env.Get(EnvDeleteCleanupInterval, kvs.GetWithDefault(apiDeleteCleanupInterval, DefaultKVS)) v = env.Get(EnvDeleteCleanupInterval, kvs.GetWithDefault(apiDeleteCleanupInterval, DefaultKVS))

View File

@ -19,12 +19,12 @@ package api
import "github.com/minio/minio/internal/config" import "github.com/minio/minio/internal/config"
// Help template for storageclass feature.
var ( var (
defaultHelpPostfix = func(key string) string { defaultHelpPostfix = func(key string) string {
return config.DefaultHelpPostfix(DefaultKVS, key) return config.DefaultHelpPostfix(DefaultKVS, key)
} }
// Help holds configuration keys and their default values for api subsystem.
Help = config.HelpKVS{ Help = config.HelpKVS{
config.HelpKV{ config.HelpKV{
Key: apiRequestsMax, Key: apiRequestsMax,
@ -80,6 +80,12 @@ var (
Optional: true, Optional: true,
Type: "number", Type: "number",
}, },
config.HelpKV{
Key: apiExpiryWorkers,
Description: `set the number of expiry workers` + defaultHelpPostfix(apiExpiryWorkers),
Optional: true,
Type: "number",
},
config.HelpKV{ config.HelpKV{
Key: apiStaleUploadsExpiry, Key: apiStaleUploadsExpiry,
Description: `set to expire stale multipart uploads older than this values` + defaultHelpPostfix(apiStaleUploadsExpiry), Description: `set to expire stale multipart uploads older than this values` + defaultHelpPostfix(apiStaleUploadsExpiry),

View File

@ -120,6 +120,7 @@ const (
DriveSubSys = madmin.DriveSubSys DriveSubSys = madmin.DriveSubSys
BatchSubSys = madmin.BatchSubSys BatchSubSys = madmin.BatchSubSys
BrowserSubSys = madmin.BrowserSubSys BrowserSubSys = madmin.BrowserSubSys
ILMSubSys = madmin.ILMSubsys
// Add new constants here (similar to above) if you add new fields to config. // Add new constants here (similar to above) if you add new fields to config.
) )
@ -188,6 +189,7 @@ var SubSystemsDynamic = set.CreateStringSet(
AuditKafkaSubSys, AuditKafkaSubSys,
StorageClassSubSys, StorageClassSubSys,
CacheSubSys, CacheSubSys,
ILMSubSys,
BatchSubSys, BatchSubSys,
BrowserSubSys, BrowserSubSys,
) )
@ -211,6 +213,7 @@ var SubSystemsSingleTargets = set.CreateStringSet(
SubnetSubSys, SubnetSubSys,
CallhomeSubSys, CallhomeSubSys,
DriveSubSys, DriveSubSys,
ILMSubSys,
BatchSubSys, BatchSubSys,
BrowserSubSys, BrowserSubSys,
) )

View File

@ -224,6 +224,11 @@ Examples:
"", "",
"MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2", "MINIO_API_TRANSITION_WORKERS: should be >= GOMAXPROCS/2",
) )
ErrInvalidExpiryWorkersValue = newErrFn(
"Invalid value for expiry workers",
"",
"MINIO_API_EXPIRY_WORKERS: should be between 1 and 500",
)
ErrInvalidBatchKeyRotationWorkersWait = newErrFn( ErrInvalidBatchKeyRotationWorkersWait = newErrFn(
"Invalid value for batch key rotation workers wait", "Invalid value for batch key rotation workers wait",
"Please input a non-negative duration", "Please input a non-negative duration",

View File

@ -0,0 +1,52 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ilm
import "github.com/minio/minio/internal/config"
const (
transitionWorkers = "transition_workers"
expirationWorkers = "expiration_workers"
// EnvILMTransitionWorkers env variable to configure number of transition workers
EnvILMTransitionWorkers = "MINIO_ILM_TRANSITION_WORKERS"
// EnvILMExpirationWorkers env variable to configure number of expiration workers
EnvILMExpirationWorkers = "MINIO_ILM_EXPIRATION_WORKERS"
)
var (
defaultHelpPostfix = func(key string) string {
return config.DefaultHelpPostfix(DefaultKVS, key)
}
// HelpILM holds configuration keys and their default values for the ILM
// subsystem
HelpILM = config.HelpKVS{
config.HelpKV{
Key: transitionWorkers,
Type: "number",
Description: `set the number of transition workers` + defaultHelpPostfix(transitionWorkers),
Optional: true,
},
config.HelpKV{
Key: expirationWorkers,
Type: "number",
Description: `set the number of expiration workers` + defaultHelpPostfix(expirationWorkers),
Optional: true,
},
}
)

View File

@ -0,0 +1,60 @@
// Copyright (c) 2015-2024 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ilm
import (
"strconv"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/v2/env"
)
// DefaultKVS default configuration values for ILM subsystem
var DefaultKVS = config.KVS{
config.KV{
Key: transitionWorkers,
Value: "100",
},
config.KV{
Key: expirationWorkers,
Value: "100",
},
}
// Config represents the different configuration values for ILM subsystem
type Config struct {
TransitionWorkers int
ExpirationWorkers int
}
// LookupConfig - lookup ilm config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
tw, err := strconv.Atoi(env.Get(EnvILMTransitionWorkers, kvs.GetWithDefault(transitionWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
ew, err := strconv.Atoi(env.Get(EnvILMExpirationWorkers, kvs.GetWithDefault(expirationWorkers, DefaultKVS)))
if err != nil {
return cfg, err
}
cfg.TransitionWorkers = tw
cfg.ExpirationWorkers = ew
return cfg, nil
}