allow more workers for ILM expiration (#16924)

This commit is contained in:
Harshavardhana 2023-03-30 10:47:15 -07:00 committed by GitHub
parent c468b4e2a8
commit 7a6c4e438e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -24,6 +24,8 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"runtime"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -38,6 +40,8 @@ import (
xhttp "github.com/minio/minio/internal/http" xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/s3select" "github.com/minio/minio/internal/s3select"
"github.com/minio/minio/internal/workers"
"github.com/minio/pkg/env"
) )
const ( const (
@ -116,26 +120,49 @@ var globalExpiryState *expiryState
func newExpiryState() *expiryState { func newExpiryState() *expiryState {
return &expiryState{ return &expiryState{
byDaysCh: make(chan expiryTask, 10000), byDaysCh: make(chan expiryTask, 100000),
byNewerNoncurrentCh: make(chan newerNoncurrentTask, 10000), byNewerNoncurrentCh: make(chan newerNoncurrentTask, 100000),
} }
} }
func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) { func initBackgroundExpiry(ctx context.Context, objectAPI ObjectLayer) {
globalExpiryState = newExpiryState() globalExpiryState = newExpiryState()
workerSize, _ := strconv.Atoi(env.Get("_MINIO_ILM_EXPIRY_WORKERS", strconv.Itoa((runtime.GOMAXPROCS(0)+1)/2)))
ewk, err := workers.New(workerSize)
if err != nil {
logger.LogIf(ctx, err)
}
nwk, err := workers.New(workerSize)
if err != nil {
logger.LogIf(ctx, err)
}
go func() { go func() {
for t := range globalExpiryState.byDaysCh { for t := range globalExpiryState.byDaysCh {
ewk.Take()
go func(t expiryTask) {
defer ewk.Give()
if t.objInfo.TransitionedObject.Status != "" { if t.objInfo.TransitionedObject.Status != "" {
applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.restoredObject) applyExpiryOnTransitionedObject(ctx, objectAPI, t.objInfo, t.restoredObject)
} else { } else {
applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.versionExpiry) applyExpiryOnNonTransitionedObjects(ctx, objectAPI, t.objInfo, t.versionExpiry)
} }
}(t)
} }
ewk.Wait()
}() }()
go func() { go func() {
for t := range globalExpiryState.byNewerNoncurrentCh { for t := range globalExpiryState.byNewerNoncurrentCh {
nwk.Take()
go func(t newerNoncurrentTask) {
defer nwk.Give()
deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions) deleteObjectVersions(ctx, objectAPI, t.bucket, t.versions)
}(t)
} }
nwk.Wait()
}() }()
} }