mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
fix: allow configuring cleanup of stale multipart uploads (#13354)
allow dynamically changing cleanup of stale multipart uploads, their expiry and how frequently its checked. Improves #13270
This commit is contained in:
@@ -41,14 +41,11 @@ import (
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/minio/internal/sync/errgroup"
|
||||
"github.com/minio/pkg/console"
|
||||
"github.com/minio/pkg/env"
|
||||
)
|
||||
|
||||
// setsDsyncLockers is encapsulated type for Close()
|
||||
type setsDsyncLockers [][]dsync.NetLocker
|
||||
|
||||
const envMinioDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL"
|
||||
|
||||
// erasureSets implements ObjectLayer combining a static list of erasure coded
|
||||
// object sets. NOTE: There is no dynamic scaling allowed or intended in
|
||||
// current design.
|
||||
@@ -443,19 +440,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each
|
||||
// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between
|
||||
// deletes to be 2 seconds.
|
||||
deletedObjectsCleanupInterval, err := time.ParseDuration(env.Get(envMinioDeleteCleanupInterval, "5m"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// start cleanup stale uploads go-routine.
|
||||
go s.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
|
||||
go s.cleanupStaleUploads(ctx)
|
||||
|
||||
// start cleanup of deleted objects.
|
||||
go s.cleanupDeletedObjects(ctx, deletedObjectsCleanupInterval)
|
||||
go s.cleanupDeletedObjects(ctx)
|
||||
|
||||
// Start the disk monitoring and connect routine.
|
||||
go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval)
|
||||
@@ -463,8 +452,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval time.Duration) {
|
||||
timer := time.NewTimer(cleanupInterval)
|
||||
// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each
|
||||
// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between
|
||||
// deletes to be 2 seconds.
|
||||
func (s *erasureSets) cleanupDeletedObjects(ctx context.Context) {
|
||||
timer := time.NewTimer(globalAPIConfig.getDeleteCleanupInterval())
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
@@ -473,7 +465,7 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset for the next interval
|
||||
timer.Reset(cleanupInterval)
|
||||
timer.Reset(globalAPIConfig.getDeleteCleanupInterval())
|
||||
|
||||
for _, set := range s.sets {
|
||||
set.cleanupDeletedObjects(ctx)
|
||||
@@ -482,8 +474,8 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval
|
||||
}
|
||||
}
|
||||
|
||||
func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
||||
timer := time.NewTimer(cleanupInterval)
|
||||
func (s *erasureSets) cleanupStaleUploads(ctx context.Context) {
|
||||
timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
@@ -492,10 +484,10 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval,
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset for the next interval
|
||||
timer.Reset(cleanupInterval)
|
||||
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
|
||||
for _, set := range s.sets {
|
||||
set.cleanupStaleUploads(ctx, expiry)
|
||||
set.cleanupStaleUploads(ctx, globalAPIConfig.getStaleUploadsExpiry())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -847,8 +847,8 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
|
||||
// Removes multipart uploads if any older than `expiry` duration
|
||||
// on all buckets for every `cleanupInterval`, this function is
|
||||
// blocking and should be run in a go-routine.
|
||||
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
||||
timer := time.NewTimer(cleanupInterval)
|
||||
func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) {
|
||||
timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
@@ -857,7 +857,9 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, e
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset for the next interval
|
||||
timer.Reset(cleanupInterval)
|
||||
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
||||
|
||||
expiry := globalAPIConfig.getStaleUploadsExpiry()
|
||||
|
||||
now := time.Now()
|
||||
entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket))
|
||||
|
||||
@@ -26,6 +26,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/internal/config/api"
|
||||
)
|
||||
|
||||
// Tests cleanup multipart uploads for filesystem backend.
|
||||
@@ -49,11 +51,23 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
|
||||
t.Fatal("Unexpected err: ", err)
|
||||
}
|
||||
|
||||
globalAPIConfig.init(api.Config{
|
||||
ListQuorum: "optimal",
|
||||
StaleUploadsExpiry: time.Millisecond,
|
||||
StaleUploadsCleanupInterval: time.Millisecond,
|
||||
}, obj.SetDriveCounts())
|
||||
|
||||
defer func() {
|
||||
globalAPIConfig = apiConfig{
|
||||
listQuorum: 3,
|
||||
}
|
||||
}()
|
||||
|
||||
var cleanupWg sync.WaitGroup
|
||||
cleanupWg.Add(1)
|
||||
go func() {
|
||||
defer cleanupWg.Done()
|
||||
fs.cleanupStaleUploads(ctx, time.Millisecond, 0)
|
||||
fs.cleanupStaleUploads(ctx)
|
||||
}()
|
||||
|
||||
// Wait for 100ms such that - we have given enough time for
|
||||
|
||||
@@ -177,7 +177,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) {
|
||||
// or cause changes on backend format.
|
||||
fs.fsFormatRlk = rlk
|
||||
|
||||
go fs.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
|
||||
go fs.cleanupStaleUploads(ctx)
|
||||
go intDataUpdateTracker.start(ctx, fsPath)
|
||||
|
||||
// Return successfully initialized object layer.
|
||||
|
||||
@@ -91,14 +91,12 @@ const (
|
||||
// date and server date during signature verification.
|
||||
globalMaxSkewTime = 15 * time.Minute // 15 minutes skew allowed.
|
||||
|
||||
// GlobalStaleUploadsExpiry - Expiry duration after which the uploads in multipart, tmp directory are deemed stale.
|
||||
// GlobalStaleUploadsExpiry - Expiry duration after which the uploads in multipart,
|
||||
// tmp directory are deemed stale.
|
||||
GlobalStaleUploadsExpiry = time.Hour * 24 // 24 hrs.
|
||||
|
||||
// GlobalStaleUploadsCleanupInterval - Cleanup interval when the stale uploads cleanup is initiated.
|
||||
GlobalStaleUploadsCleanupInterval = time.Hour * 12 // 12 hrs.
|
||||
|
||||
// GlobalServiceExecutionInterval - Executes the Lifecycle events.
|
||||
GlobalServiceExecutionInterval = time.Hour * 24 // 24 hrs.
|
||||
GlobalStaleUploadsCleanupInterval = time.Hour * 6 // 6 hrs.
|
||||
|
||||
// Refresh interval to update in-memory iam config cache.
|
||||
globalRefreshIAMInterval = 5 * time.Minute
|
||||
|
||||
@@ -42,6 +42,10 @@ type apiConfig struct {
|
||||
replicationWorkers int
|
||||
replicationFailedWorkers int
|
||||
transitionWorkers int
|
||||
|
||||
staleUploadsExpiry time.Duration
|
||||
staleUploadsCleanupInterval time.Duration
|
||||
deleteCleanupInterval time.Duration
|
||||
}
|
||||
|
||||
func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
@@ -107,6 +111,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) {
|
||||
globalTransitionState.UpdateWorkers(cfg.TransitionWorkers)
|
||||
}
|
||||
t.transitionWorkers = cfg.TransitionWorkers
|
||||
|
||||
t.staleUploadsExpiry = cfg.StaleUploadsExpiry
|
||||
t.staleUploadsCleanupInterval = cfg.StaleUploadsCleanupInterval
|
||||
t.deleteCleanupInterval = cfg.DeleteCleanupInterval
|
||||
}
|
||||
|
||||
func (t *apiConfig) getListQuorum() int {
|
||||
@@ -125,6 +133,39 @@ func (t *apiConfig) getCorsAllowOrigins() []string {
|
||||
return corsAllowOrigins
|
||||
}
|
||||
|
||||
func (t *apiConfig) getStaleUploadsCleanupInterval() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.staleUploadsCleanupInterval == 0 {
|
||||
return 6 * time.Hour // default 6 hours
|
||||
}
|
||||
|
||||
return t.staleUploadsCleanupInterval
|
||||
}
|
||||
|
||||
func (t *apiConfig) getStaleUploadsExpiry() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.staleUploadsExpiry == 0 {
|
||||
return 24 * time.Hour // default 24 hours
|
||||
}
|
||||
|
||||
return t.staleUploadsExpiry
|
||||
}
|
||||
|
||||
func (t *apiConfig) getDeleteCleanupInterval() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
if t.deleteCleanupInterval == 0 {
|
||||
return 5 * time.Minute // every 5 minutes
|
||||
}
|
||||
|
||||
return t.deleteCleanupInterval
|
||||
}
|
||||
|
||||
func (t *apiConfig) getClusterDeadline() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
@@ -223,7 +223,10 @@ func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
newTestConfig(globalMinioDefaultRegion, obj)
|
||||
|
||||
newAllSubsystems()
|
||||
return obj
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user