From fabf60bc4c3df707385890e18aa6c423c4da34bc Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 4 Oct 2021 10:52:28 -0700 Subject: [PATCH] fix: allow configuring cleanup of stale multipart uploads (#13354) allow dynamically changing cleanup of stale multipart uploads, their expiry and how frequently its checked. Improves #13270 --- cmd/erasure-sets.go | 32 +++++------- cmd/fs-v1-multipart.go | 8 +-- cmd/fs-v1-multipart_test.go | 16 +++++- cmd/fs-v1.go | 2 +- cmd/globals.go | 8 ++- cmd/handler-api.go | 41 +++++++++++++++ cmd/test-utils_test.go | 3 ++ internal/config/api/api.go | 100 ++++++++++++++++++++++++++---------- internal/config/api/help.go | 18 +++++++ 9 files changed, 171 insertions(+), 57 deletions(-) diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 711b498be..734307076 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -41,14 +41,11 @@ import ( "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/console" - "github.com/minio/pkg/env" ) // setsDsyncLockers is encapsulated type for Close() type setsDsyncLockers [][]dsync.NetLocker -const envMinioDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL" - // erasureSets implements ObjectLayer combining a static list of erasure coded // object sets. NOTE: There is no dynamic scaling allowed or intended in // current design. @@ -443,19 +440,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto } } - // cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each - // deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between - // deletes to be 2 seconds. - deletedObjectsCleanupInterval, err := time.ParseDuration(env.Get(envMinioDeleteCleanupInterval, "5m")) - if err != nil { - return nil, err - } - // start cleanup stale uploads go-routine. - go s.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry) + go s.cleanupStaleUploads(ctx) // start cleanup of deleted objects. - go s.cleanupDeletedObjects(ctx, deletedObjectsCleanupInterval) + go s.cleanupDeletedObjects(ctx) // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval) @@ -463,8 +452,11 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto return s, nil } -func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval time.Duration) { - timer := time.NewTimer(cleanupInterval) +// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each +// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between +// deletes to be 2 seconds. +func (s *erasureSets) cleanupDeletedObjects(ctx context.Context) { + timer := time.NewTimer(globalAPIConfig.getDeleteCleanupInterval()) defer timer.Stop() for { @@ -473,7 +465,7 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval return case <-timer.C: // Reset for the next interval - timer.Reset(cleanupInterval) + timer.Reset(globalAPIConfig.getDeleteCleanupInterval()) for _, set := range s.sets { set.cleanupDeletedObjects(ctx) @@ -482,8 +474,8 @@ func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval } } -func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { - timer := time.NewTimer(cleanupInterval) +func (s *erasureSets) cleanupStaleUploads(ctx context.Context) { + timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) defer timer.Stop() for { @@ -492,10 +484,10 @@ func (s *erasureSets) cleanupStaleUploads(ctx context.Context, cleanupInterval, return case <-timer.C: // Reset for the next interval - timer.Reset(cleanupInterval) + timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) for _, set := range s.sets { - set.cleanupStaleUploads(ctx, expiry) + set.cleanupStaleUploads(ctx, globalAPIConfig.getStaleUploadsExpiry()) } } } diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index b3417014a..e0afa6464 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -847,8 +847,8 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u // Removes multipart uploads if any older than `expiry` duration // on all buckets for every `cleanupInterval`, this function is // blocking and should be run in a go-routine. -func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, expiry time.Duration) { - timer := time.NewTimer(cleanupInterval) +func (fs *FSObjects) cleanupStaleUploads(ctx context.Context) { + timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) defer timer.Stop() for { @@ -857,7 +857,9 @@ func (fs *FSObjects) cleanupStaleUploads(ctx context.Context, cleanupInterval, e return case <-timer.C: // Reset for the next interval - timer.Reset(cleanupInterval) + timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) + + expiry := globalAPIConfig.getStaleUploadsExpiry() now := time.Now() entries, err := readDir(pathJoin(fs.fsPath, minioMetaMultipartBucket)) diff --git a/cmd/fs-v1-multipart_test.go b/cmd/fs-v1-multipart_test.go index 07c4201ae..72063efc9 100644 --- a/cmd/fs-v1-multipart_test.go +++ b/cmd/fs-v1-multipart_test.go @@ -26,6 +26,8 @@ import ( "sync" "testing" "time" + + "github.com/minio/minio/internal/config/api" ) // Tests cleanup multipart uploads for filesystem backend. @@ -49,11 +51,23 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) { t.Fatal("Unexpected err: ", err) } + globalAPIConfig.init(api.Config{ + ListQuorum: "optimal", + StaleUploadsExpiry: time.Millisecond, + StaleUploadsCleanupInterval: time.Millisecond, + }, obj.SetDriveCounts()) + + defer func() { + globalAPIConfig = apiConfig{ + listQuorum: 3, + } + }() + var cleanupWg sync.WaitGroup cleanupWg.Add(1) go func() { defer cleanupWg.Done() - fs.cleanupStaleUploads(ctx, time.Millisecond, 0) + fs.cleanupStaleUploads(ctx) }() // Wait for 100ms such that - we have given enough time for diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 5a8672e75..00b7e1dbe 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -177,7 +177,7 @@ func NewFSObjectLayer(fsPath string) (ObjectLayer, error) { // or cause changes on backend format. fs.fsFormatRlk = rlk - go fs.cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry) + go fs.cleanupStaleUploads(ctx) go intDataUpdateTracker.start(ctx, fsPath) // Return successfully initialized object layer. diff --git a/cmd/globals.go b/cmd/globals.go index 3daa04d49..49198e3cc 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -91,14 +91,12 @@ const ( // date and server date during signature verification. globalMaxSkewTime = 15 * time.Minute // 15 minutes skew allowed. - // GlobalStaleUploadsExpiry - Expiry duration after which the uploads in multipart, tmp directory are deemed stale. + // GlobalStaleUploadsExpiry - Expiry duration after which the uploads in multipart, + // tmp directory are deemed stale. GlobalStaleUploadsExpiry = time.Hour * 24 // 24 hrs. // GlobalStaleUploadsCleanupInterval - Cleanup interval when the stale uploads cleanup is initiated. - GlobalStaleUploadsCleanupInterval = time.Hour * 12 // 12 hrs. - - // GlobalServiceExecutionInterval - Executes the Lifecycle events. - GlobalServiceExecutionInterval = time.Hour * 24 // 24 hrs. + GlobalStaleUploadsCleanupInterval = time.Hour * 6 // 6 hrs. // Refresh interval to update in-memory iam config cache. globalRefreshIAMInterval = 5 * time.Minute diff --git a/cmd/handler-api.go b/cmd/handler-api.go index 07a460407..5904a6b8a 100644 --- a/cmd/handler-api.go +++ b/cmd/handler-api.go @@ -42,6 +42,10 @@ type apiConfig struct { replicationWorkers int replicationFailedWorkers int transitionWorkers int + + staleUploadsExpiry time.Duration + staleUploadsCleanupInterval time.Duration + deleteCleanupInterval time.Duration } func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { @@ -107,6 +111,10 @@ func (t *apiConfig) init(cfg api.Config, setDriveCounts []int) { globalTransitionState.UpdateWorkers(cfg.TransitionWorkers) } t.transitionWorkers = cfg.TransitionWorkers + + t.staleUploadsExpiry = cfg.StaleUploadsExpiry + t.staleUploadsCleanupInterval = cfg.StaleUploadsCleanupInterval + t.deleteCleanupInterval = cfg.DeleteCleanupInterval } func (t *apiConfig) getListQuorum() int { @@ -125,6 +133,39 @@ func (t *apiConfig) getCorsAllowOrigins() []string { return corsAllowOrigins } +func (t *apiConfig) getStaleUploadsCleanupInterval() time.Duration { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.staleUploadsCleanupInterval == 0 { + return 6 * time.Hour // default 6 hours + } + + return t.staleUploadsCleanupInterval +} + +func (t *apiConfig) getStaleUploadsExpiry() time.Duration { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.staleUploadsExpiry == 0 { + return 24 * time.Hour // default 24 hours + } + + return t.staleUploadsExpiry +} + +func (t *apiConfig) getDeleteCleanupInterval() time.Duration { + t.mu.RLock() + defer t.mu.RUnlock() + + if t.deleteCleanupInterval == 0 { + return 5 * time.Minute // every 5 minutes + } + + return t.deleteCleanupInterval +} + func (t *apiConfig) getClusterDeadline() time.Duration { t.mu.RLock() defer t.mu.RUnlock() diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 5920159b3..cacab8bb3 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -223,7 +223,10 @@ func initFSObjects(disk string, t *testing.T) (obj ObjectLayer) { if err != nil { t.Fatal(err) } + newTestConfig(globalMinioDefaultRegion, obj) + + newAllSubsystems() return obj } diff --git a/internal/config/api/api.go b/internal/config/api/api.go index 0715a5465..5494ac42f 100644 --- a/internal/config/api/api.go +++ b/internal/config/api/api.go @@ -31,15 +31,18 @@ import ( // API sub-system constants const ( - apiRequestsMax = "requests_max" - apiRequestsDeadline = "requests_deadline" - apiClusterDeadline = "cluster_deadline" - apiCorsAllowOrigin = "cors_allow_origin" - apiRemoteTransportDeadline = "remote_transport_deadline" - apiListQuorum = "list_quorum" - apiReplicationWorkers = "replication_workers" - apiReplicationFailedWorkers = "replication_failed_workers" - apiTransitionWorkers = "transition_workers" + apiRequestsMax = "requests_max" + apiRequestsDeadline = "requests_deadline" + apiClusterDeadline = "cluster_deadline" + apiCorsAllowOrigin = "cors_allow_origin" + apiRemoteTransportDeadline = "remote_transport_deadline" + apiListQuorum = "list_quorum" + apiReplicationWorkers = "replication_workers" + apiReplicationFailedWorkers = "replication_failed_workers" + apiTransitionWorkers = "transition_workers" + apiStaleUploadsCleanupInterval = "stale_uploads_cleanup_interval" + apiStaleUploadsExpiry = "stale_uploads_expiry" + apiDeleteCleanupInterval = "delete_cleanup_interval" EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX" EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE" @@ -51,6 +54,11 @@ const ( EnvAPIReplicationWorkers = "MINIO_API_REPLICATION_WORKERS" EnvAPIReplicationFailedWorkers = "MINIO_API_REPLICATION_FAILED_WORKERS" EnvAPITransitionWorkers = "MINIO_API_TRANSITION_WORKERS" + + EnvAPIStaleUploadsCleanupInterval = "MINIO_API_STALE_UPLOADS_CLEANUP_INTERVAL" + EnvAPIStaleUploadsExpiry = "MINIO_API_STALE_UPLOADS_EXPIRY" + EnvAPIDeleteCleanupInterval = "MINIO_API_DELETE_CLEANUP_INTERVAL" + EnvDeleteCleanupInterval = "MINIO_DELETE_CLEANUP_INTERVAL" ) // Deprecated key and ENVs @@ -98,20 +106,35 @@ var ( Key: apiTransitionWorkers, Value: "100", }, + config.KV{ + Key: apiStaleUploadsCleanupInterval, + Value: "6h", + }, + config.KV{ + Key: apiStaleUploadsExpiry, + Value: "24h", + }, + config.KV{ + Key: apiDeleteCleanupInterval, + Value: "5m", + }, } ) // Config storage class configuration type Config struct { - RequestsMax int `json:"requests_max"` - RequestsDeadline time.Duration `json:"requests_deadline"` - ClusterDeadline time.Duration `json:"cluster_deadline"` - CorsAllowOrigin []string `json:"cors_allow_origin"` - RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` - ListQuorum string `json:"list_quorum"` - ReplicationWorkers int `json:"replication_workers"` - ReplicationFailedWorkers int `json:"replication_failed_workers"` - TransitionWorkers int `json:"transition_workers"` + RequestsMax int `json:"requests_max"` + RequestsDeadline time.Duration `json:"requests_deadline"` + ClusterDeadline time.Duration `json:"cluster_deadline"` + CorsAllowOrigin []string `json:"cors_allow_origin"` + RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"` + ListQuorum string `json:"list_quorum"` + ReplicationWorkers int `json:"replication_workers"` + ReplicationFailedWorkers int `json:"replication_failed_workers"` + TransitionWorkers int `json:"transition_workers"` + StaleUploadsCleanupInterval time.Duration `json:"stale_uploads_cleanup_interval"` + StaleUploadsExpiry time.Duration `json:"stale_uploads_expiry"` + DeleteCleanupInterval time.Duration `json:"delete_cleanup_interval"` } // UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON. @@ -211,15 +234,38 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) { return cfg, config.ErrInvalidTransitionWorkersValue(nil) } + v := env.Get(EnvAPIDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval)) + if v == "" { + v = env.Get(EnvDeleteCleanupInterval, kvs.Get(apiDeleteCleanupInterval)) + } + + deleteCleanupInterval, err := time.ParseDuration(v) + if err != nil { + return cfg, err + } + + staleUploadsCleanupInterval, err := time.ParseDuration(env.Get(EnvAPIStaleUploadsCleanupInterval, kvs.Get(apiStaleUploadsCleanupInterval))) + if err != nil { + return cfg, err + } + + staleUploadsExpiry, err := time.ParseDuration(env.Get(EnvAPIStaleUploadsExpiry, kvs.Get(apiStaleUploadsExpiry))) + if err != nil { + return cfg, err + } + return Config{ - RequestsMax: requestsMax, - RequestsDeadline: requestsDeadline, - ClusterDeadline: clusterDeadline, - CorsAllowOrigin: corsAllowOrigin, - RemoteTransportDeadline: remoteTransportDeadline, - ListQuorum: listQuorum, - ReplicationWorkers: replicationWorkers, - ReplicationFailedWorkers: replicationFailedWorkers, - TransitionWorkers: transitionWorkers, + RequestsMax: requestsMax, + RequestsDeadline: requestsDeadline, + ClusterDeadline: clusterDeadline, + CorsAllowOrigin: corsAllowOrigin, + RemoteTransportDeadline: remoteTransportDeadline, + ListQuorum: listQuorum, + ReplicationWorkers: replicationWorkers, + ReplicationFailedWorkers: replicationFailedWorkers, + TransitionWorkers: transitionWorkers, + StaleUploadsCleanupInterval: staleUploadsCleanupInterval, + StaleUploadsExpiry: staleUploadsExpiry, + DeleteCleanupInterval: deleteCleanupInterval, }, nil } diff --git a/internal/config/api/help.go b/internal/config/api/help.go index 899b05e55..84937a63d 100644 --- a/internal/config/api/help.go +++ b/internal/config/api/help.go @@ -64,5 +64,23 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: apiStaleUploadsExpiry, + Description: `set to expire stale multipart uploads older than this value, defaults to 24 hours`, + Optional: true, + Type: "duration", + }, + config.HelpKV{ + Key: apiStaleUploadsCleanupInterval, + Description: `set to change intervals when stale multipart uploads are expired, defaults to every 6 hours`, + Optional: true, + Type: "duration", + }, + config.HelpKV{ + Key: apiDeleteCleanupInterval, + Description: `set to change intervals when deleted objects are permanently deleted from ".trash" folder, defaults to every 5 minutes`, + Optional: true, + Type: "duration", + }, } )