From 29e7058ebfd54d973a1c43dd29e296b677cb76b8 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 9 Mar 2021 16:38:18 -0800 Subject: [PATCH] background delete operations and delete serially every 10mins addtionally introduce MINIO_DELETE_CLEANUP_INTERVAL environment value to control this interval, choose a lesser value if higher speed is necessary. Supports time.Duration format expory MINIO_DELETE_CLEANUP_INTERVAL=1m Would let MinIO run delete cleanup interval every minute, once the previous cycle completes. --- cmd/data-crawler.go | 125 +++++++++++++++++++++++++++++++++++++++ cmd/erasure-multipart.go | 3 + cmd/erasure-sets.go | 41 +++++++++++-- cmd/erasure.go | 28 +++++++++ cmd/fs-v1-helpers.go | 50 ++++++++++++++++ cmd/object-api-utils.go | 2 + cmd/xl-storage-errors.go | 8 +++ cmd/xl-storage.go | 29 +++++---- 8 files changed, 268 insertions(+), 18 deletions(-) diff --git a/cmd/data-crawler.go b/cmd/data-crawler.go index 1b980d3f6..b3a93aadf 100644 --- a/cmd/data-crawler.go +++ b/cmd/data-crawler.go @@ -21,11 +21,13 @@ import ( "context" "encoding/binary" "errors" + "math" "math/rand" "os" "path" "strconv" "strings" + "sync" "time" "github.com/minio/minio/cmd/config" @@ -57,6 +59,129 @@ var ( dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second) ) +type dynamicSleeper struct { + mu sync.RWMutex + + // Sleep factor + factor float64 + + // maximum sleep cap, + // set to <= 0 to disable. + maxSleep time.Duration + + // Don't sleep at all, if time taken is below this value. + // This is to avoid too small costly sleeps. + minSleep time.Duration + + // cycle will be closed + cycle chan struct{} +} + +// newDynamicSleeper +func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper { + return &dynamicSleeper{ + factor: factor, + cycle: make(chan struct{}), + maxSleep: maxWait, + minSleep: 100 * time.Microsecond, + } +} + +// Timer returns a timer that has started. +// When the returned function is called it will wait. +func (d *dynamicSleeper) Timer(ctx context.Context) func() { + t := time.Now() + return func() { + doneAt := time.Now() + for { + // Grab current values + d.mu.RLock() + minWait, maxWait := d.minSleep, d.maxSleep + factor := d.factor + cycle := d.cycle + d.mu.RUnlock() + elapsed := doneAt.Sub(t) + // Don't sleep for really small amount of time + wantSleep := time.Duration(float64(elapsed) * factor) + if wantSleep <= minWait { + return + } + if maxWait > 0 && wantSleep > maxWait { + wantSleep = maxWait + } + timer := time.NewTimer(wantSleep) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + return + case <-cycle: + if !timer.Stop() { + // We expired. + <-timer.C + return + } + } + } + } +} + +// Sleep sleeps the specified time multiplied by the sleep factor. +// If the factor is updated the sleep will be done again with the new factor. +func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) { + for { + // Grab current values + d.mu.RLock() + minWait, maxWait := d.minSleep, d.maxSleep + factor := d.factor + cycle := d.cycle + d.mu.RUnlock() + // Don't sleep for really small amount of time + wantSleep := time.Duration(float64(base) * factor) + if wantSleep <= minWait { + return + } + if maxWait > 0 && wantSleep > maxWait { + wantSleep = maxWait + } + timer := time.NewTimer(wantSleep) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + return + case <-cycle: + if !timer.Stop() { + // We expired. + <-timer.C + return + } + } + } +} + +// Update the current settings and cycle all waiting. +// Parameters are the same as in the contructor. +func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error { + d.mu.Lock() + defer d.mu.Unlock() + if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait { + return nil + } + // Update values and cycle waiting. + close(d.cycle) + d.factor = factor + d.maxSleep = maxWait + d.cycle = make(chan struct{}) + return nil +} + // initDataCrawler will start the crawler unless disabled. func initDataCrawler(ctx context.Context, objAPI ObjectLayer) { if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 74b23d4b8..abedbe4fa 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -142,6 +142,9 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { + if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines + continue + } fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) if err != nil { continue diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index fa9d66ca7..7dce555eb 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -412,21 +412,31 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto // Initialize erasure objects for a given set. s.sets[i] = &erasureObjects{ - getDisks: s.GetDisks(i), - getLockers: s.GetLockers(i), - getEndpoints: s.GetEndpoints(i), - nsMutex: mutex, - bp: bp, - mrfOpCh: make(chan partialOperation, 10000), + getDisks: s.GetDisks(i), + getLockers: s.GetLockers(i), + getEndpoints: s.GetEndpoints(i), + deletedCleanupSleeper: newDynamicSleeper(10, 10*time.Second), + nsMutex: mutex, + bp: bp, + mrfOpCh: make(chan partialOperation, 10000), } go s.sets[i].cleanupStaleUploads(ctx, GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry) } + // cleanup ".trash/" folder every 10 minutes with sufficient sleep cycles. + deletedObjectsCleanupInterval, err := time.ParseDuration(env.Get("MINIO_DELETE_CLEANUP_INTERVAL", "10m")) + if err != nil { + return nil, err + } + mctx, mctxCancel := context.WithCancel(ctx) s.monitorContextCancel = mctxCancel + // start cleanup of deleted objects. + go s.cleanupDeletedObjects(ctx, deletedObjectsCleanupInterval) + // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(mctx, defaultMonitorConnectEndpointInterval) go s.maintainMRFList() @@ -435,6 +445,25 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto return s, nil } +func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval time.Duration) { + timer := time.NewTimer(cleanupInterval) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + // Reset for the next interval + timer.Reset(cleanupInterval) + + for _, set := range s.sets { + set.cleanupDeletedObjects(ctx) + } + } + } +} + // NewNSLock - initialize a new namespace RWLocker instance. func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker { if len(objects) == 1 { diff --git a/cmd/erasure.go b/cmd/erasure.go index 223bf9d4f..ce8fe492b 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/rand" + "os" "sort" "sync" "time" @@ -65,6 +66,8 @@ type erasureObjects struct { // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap + deletedCleanupSleeper *dynamicSleeper + mrfOpCh chan partialOperation } @@ -238,6 +241,31 @@ func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageIn return getStorageInfo(disks, endpoints) } +// Clean-up previously deleted objects. from .minio.sys/tmp/.trash/ +func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) { + // run multiple cleanup's local to this server. + var wg sync.WaitGroup + for _, disk := range er.getLoadBalancedLocalDisks() { + if disk != nil { + wg.Add(1) + go func(disk StorageAPI) { + defer wg.Done() + diskPath := disk.Endpoint().Path + readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error { + wait := er.deletedCleanupSleeper.Timer(ctx) + if intDataUpdateTracker != nil && intDataUpdateTracker.debug { + logger.Info("cleanupDeletedObjects: %s/%s", minioMetaTmpDeletedBucket, ddir) + } + removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir)) + wait() + return nil + }) + }(disk) + } + } + wg.Wait() +} + // CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error { diff --git a/cmd/fs-v1-helpers.go b/cmd/fs-v1-helpers.go index 2fb42c694..2d77f64cd 100644 --- a/cmd/fs-v1-helpers.go +++ b/cmd/fs-v1-helpers.go @@ -22,6 +22,7 @@ import ( "os" pathutil "path" "runtime" + "strings" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/lock" @@ -403,6 +404,55 @@ func fsRenameFile(ctx context.Context, sourcePath, destPath string) error { return nil } +func deleteFile(basePath, deletePath string, recursive bool) error { + if basePath == "" || deletePath == "" { + return nil + } + isObjectDir := HasSuffix(deletePath, SlashSeparator) + basePath = pathutil.Clean(basePath) + deletePath = pathutil.Clean(deletePath) + if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath { + return nil + } + + var err error + if recursive { + os.RemoveAll(deletePath) + } else { + err = os.Remove(deletePath) + } + if err != nil { + switch { + case isSysErrNotEmpty(err): + // if object is a directory, but if its not empty + // return FileNotFound to indicate its an empty prefix. + if isObjectDir { + return errFileNotFound + } + // Ignore errors if the directory is not empty. The server relies on + // this functionality, and sometimes uses recursion that should not + // error on parent directories. + return nil + case osIsNotExist(err): + return errFileNotFound + case osIsPermission(err): + return errFileAccessDenied + case isSysErrIO(err): + return errFaultyDisk + default: + return err + } + } + + deletePath = pathutil.Dir(deletePath) + + // Delete parent directory obviously not recursively. Errors for + // parent directories shouldn't trickle down. + deleteFile(basePath, deletePath, false) + + return nil +} + // fsDeleteFile is a wrapper for deleteFile(), after checking the path length. func fsDeleteFile(ctx context.Context, basePath, deletePath string) error { if err := checkPathLength(basePath); err != nil { diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 86e689d6f..e9f34eb78 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -58,6 +58,8 @@ const ( minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix // MinIO Tmp meta prefix. minioMetaTmpBucket = minioMetaBucket + "/tmp" + // MinIO tmp meta prefix for deleted objects. + minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash" // DNS separator (period), used for bucket name validation. dnsDelimiter = "." // On compressed files bigger than this; diff --git a/cmd/xl-storage-errors.go b/cmd/xl-storage-errors.go index 9c2a612ad..e5fc7bb59 100644 --- a/cmd/xl-storage-errors.go +++ b/cmd/xl-storage-errors.go @@ -68,6 +68,14 @@ func isSysErrTooManySymlinks(err error) bool { return errors.Is(err, syscall.ELOOP) } +func osIsNotExist(err error) bool { + return errors.Is(err, os.ErrNotExist) +} + +func osIsPermission(err error) bool { + return errors.Is(err, os.ErrPermission) +} + // Check if the given error corresponds to ENOTEMPTY for unix, // EEXIST for solaris variants, // and ERROR_DIR_NOT_EMPTY for windows (directory not empty). diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 8df3865da..1c65dc0d9 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -29,6 +29,7 @@ import ( "net/url" "os" "path" + pathutil "path" slashpath "path" "path/filepath" "runtime" @@ -1171,7 +1172,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F if !isXL2V1Format(buf) { // Delete the meta file, if there are no more versions the // top level parent is automatically removed. - return deleteFile(volumeDir, pathJoin(volumeDir, path), true) + return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true) } var xlMeta xlMetaV2 @@ -1196,7 +1197,8 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } - if err = removeAll(filePath); err != nil { + tmpuuid := mustGetUUID() + if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)); err != nil { return err } } @@ -1212,7 +1214,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F return err } - return deleteFile(volumeDir, filePath, false) + return s.deleteFile(volumeDir, filePath, false) } // WriteMetadata - writes FileInfo metadata for path at `xl.meta` @@ -1930,7 +1932,7 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e // move up the tree, deleting empty parent directories until it finds one // with files in it. Returns nil for a non-empty directory even when // recursive is set to false. -func deleteFile(basePath, deletePath string, recursive bool) error { +func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) error { if basePath == "" || deletePath == "" { return nil } @@ -1943,7 +1945,8 @@ func deleteFile(basePath, deletePath string, recursive bool) error { var err error if recursive { - err = os.RemoveAll(deletePath) + tmpuuid := mustGetUUID() + err = renameAll(deletePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)) } else { err = os.Remove(deletePath) } @@ -1974,7 +1977,7 @@ func deleteFile(basePath, deletePath string, recursive bool) error { // Delete parent directory obviously not recursively. Errors for // parent directories shouldn't trickle down. - deleteFile(basePath, deletePath, false) + s.deleteFile(basePath, deletePath, false) return nil } @@ -2012,7 +2015,7 @@ func (s *xlStorage) DeleteFile(ctx context.Context, volume string, path string) } // Delete file and delete parent directory as well if its empty. - return deleteFile(volumeDir, filePath, false) + return s.deleteFile(volumeDir, filePath, false) } func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error, err error) { @@ -2049,7 +2052,7 @@ func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error, continue } // Delete file and delete parent directory as well if its empty. - errs[idx] = deleteFile(volumeDir, filePath, false) + errs[idx] = s.deleteFile(volumeDir, filePath, false) } return } @@ -2243,8 +2246,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, // Commit data if srcDataPath != "" { - removeAll(oldDstDataPath) - removeAll(dstDataPath) + tmpuuid := mustGetUUID() + renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)) + tmpuuid = mustGetUUID() + renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)) if err = renameAll(srcDataPath, dstDataPath); err != nil { return osErrToFileErr(err) } @@ -2257,12 +2262,12 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, // Remove parent dir of the source file if empty if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) { - deleteFile(srcVolumeDir, parentDir, false) + s.deleteFile(srcVolumeDir, parentDir, false) } if srcDataPath != "" { if parentDir := slashpath.Dir(srcDataPath); isDirEmpty(parentDir) { - deleteFile(srcVolumeDir, parentDir, false) + s.deleteFile(srcVolumeDir, parentDir, false) } }