mirror of
https://github.com/minio/minio.git
synced 2025-07-14 03:11:52 -04:00
background delete operations and delete serially every 10mins
addtionally introduce MINIO_DELETE_CLEANUP_INTERVAL environment value to control this interval, choose a lesser value if higher speed is necessary. Supports time.Duration format expory MINIO_DELETE_CLEANUP_INTERVAL=1m Would let MinIO run delete cleanup interval every minute, once the previous cycle completes.
This commit is contained in:
parent
f864931ab4
commit
29e7058ebf
@ -21,11 +21,13 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/config"
|
||||
@ -57,6 +59,129 @@ var (
|
||||
dataCrawlerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
|
||||
)
|
||||
|
||||
type dynamicSleeper struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// Sleep factor
|
||||
factor float64
|
||||
|
||||
// maximum sleep cap,
|
||||
// set to <= 0 to disable.
|
||||
maxSleep time.Duration
|
||||
|
||||
// Don't sleep at all, if time taken is below this value.
|
||||
// This is to avoid too small costly sleeps.
|
||||
minSleep time.Duration
|
||||
|
||||
// cycle will be closed
|
||||
cycle chan struct{}
|
||||
}
|
||||
|
||||
// newDynamicSleeper
|
||||
func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper {
|
||||
return &dynamicSleeper{
|
||||
factor: factor,
|
||||
cycle: make(chan struct{}),
|
||||
maxSleep: maxWait,
|
||||
minSleep: 100 * time.Microsecond,
|
||||
}
|
||||
}
|
||||
|
||||
// Timer returns a timer that has started.
|
||||
// When the returned function is called it will wait.
|
||||
func (d *dynamicSleeper) Timer(ctx context.Context) func() {
|
||||
t := time.Now()
|
||||
return func() {
|
||||
doneAt := time.Now()
|
||||
for {
|
||||
// Grab current values
|
||||
d.mu.RLock()
|
||||
minWait, maxWait := d.minSleep, d.maxSleep
|
||||
factor := d.factor
|
||||
cycle := d.cycle
|
||||
d.mu.RUnlock()
|
||||
elapsed := doneAt.Sub(t)
|
||||
// Don't sleep for really small amount of time
|
||||
wantSleep := time.Duration(float64(elapsed) * factor)
|
||||
if wantSleep <= minWait {
|
||||
return
|
||||
}
|
||||
if maxWait > 0 && wantSleep > maxWait {
|
||||
wantSleep = maxWait
|
||||
}
|
||||
timer := time.NewTimer(wantSleep)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
case <-timer.C:
|
||||
return
|
||||
case <-cycle:
|
||||
if !timer.Stop() {
|
||||
// We expired.
|
||||
<-timer.C
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep sleeps the specified time multiplied by the sleep factor.
|
||||
// If the factor is updated the sleep will be done again with the new factor.
|
||||
func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
|
||||
for {
|
||||
// Grab current values
|
||||
d.mu.RLock()
|
||||
minWait, maxWait := d.minSleep, d.maxSleep
|
||||
factor := d.factor
|
||||
cycle := d.cycle
|
||||
d.mu.RUnlock()
|
||||
// Don't sleep for really small amount of time
|
||||
wantSleep := time.Duration(float64(base) * factor)
|
||||
if wantSleep <= minWait {
|
||||
return
|
||||
}
|
||||
if maxWait > 0 && wantSleep > maxWait {
|
||||
wantSleep = maxWait
|
||||
}
|
||||
timer := time.NewTimer(wantSleep)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
case <-timer.C:
|
||||
return
|
||||
case <-cycle:
|
||||
if !timer.Stop() {
|
||||
// We expired.
|
||||
<-timer.C
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the current settings and cycle all waiting.
|
||||
// Parameters are the same as in the contructor.
|
||||
func (d *dynamicSleeper) Update(factor float64, maxWait time.Duration) error {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if math.Abs(d.factor-factor) < 1e-10 && d.maxSleep == maxWait {
|
||||
return nil
|
||||
}
|
||||
// Update values and cycle waiting.
|
||||
close(d.cycle)
|
||||
d.factor = factor
|
||||
d.maxSleep = maxWait
|
||||
d.cycle = make(chan struct{})
|
||||
return nil
|
||||
}
|
||||
|
||||
// initDataCrawler will start the crawler unless disabled.
|
||||
func initDataCrawler(ctx context.Context, objAPI ObjectLayer) {
|
||||
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
|
||||
|
@ -142,6 +142,9 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
return
|
||||
}
|
||||
for _, tmpDir := range tmpDirs {
|
||||
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
|
||||
continue
|
||||
}
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
|
||||
if err != nil {
|
||||
continue
|
||||
|
@ -412,21 +412,31 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
|
||||
// Initialize erasure objects for a given set.
|
||||
s.sets[i] = &erasureObjects{
|
||||
getDisks: s.GetDisks(i),
|
||||
getLockers: s.GetLockers(i),
|
||||
getEndpoints: s.GetEndpoints(i),
|
||||
nsMutex: mutex,
|
||||
bp: bp,
|
||||
mrfOpCh: make(chan partialOperation, 10000),
|
||||
getDisks: s.GetDisks(i),
|
||||
getLockers: s.GetLockers(i),
|
||||
getEndpoints: s.GetEndpoints(i),
|
||||
deletedCleanupSleeper: newDynamicSleeper(10, 10*time.Second),
|
||||
nsMutex: mutex,
|
||||
bp: bp,
|
||||
mrfOpCh: make(chan partialOperation, 10000),
|
||||
}
|
||||
|
||||
go s.sets[i].cleanupStaleUploads(ctx,
|
||||
GlobalStaleUploadsCleanupInterval, GlobalStaleUploadsExpiry)
|
||||
}
|
||||
|
||||
// cleanup ".trash/" folder every 10 minutes with sufficient sleep cycles.
|
||||
deletedObjectsCleanupInterval, err := time.ParseDuration(env.Get("MINIO_DELETE_CLEANUP_INTERVAL", "10m"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mctx, mctxCancel := context.WithCancel(ctx)
|
||||
s.monitorContextCancel = mctxCancel
|
||||
|
||||
// start cleanup of deleted objects.
|
||||
go s.cleanupDeletedObjects(ctx, deletedObjectsCleanupInterval)
|
||||
|
||||
// Start the disk monitoring and connect routine.
|
||||
go s.monitorAndConnectEndpoints(mctx, defaultMonitorConnectEndpointInterval)
|
||||
go s.maintainMRFList()
|
||||
@ -435,6 +445,25 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *erasureSets) cleanupDeletedObjects(ctx context.Context, cleanupInterval time.Duration) {
|
||||
timer := time.NewTimer(cleanupInterval)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
// Reset for the next interval
|
||||
timer.Reset(cleanupInterval)
|
||||
|
||||
for _, set := range s.sets {
|
||||
set.cleanupDeletedObjects(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewNSLock - initialize a new namespace RWLocker instance.
|
||||
func (s *erasureSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||
if len(objects) == 1 {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
@ -65,6 +66,8 @@ type erasureObjects struct {
|
||||
// Byte pools used for temporary i/o buffers.
|
||||
bp *bpool.BytePoolCap
|
||||
|
||||
deletedCleanupSleeper *dynamicSleeper
|
||||
|
||||
mrfOpCh chan partialOperation
|
||||
}
|
||||
|
||||
@ -238,6 +241,31 @@ func (er erasureObjects) StorageInfo(ctx context.Context, local bool) (StorageIn
|
||||
return getStorageInfo(disks, endpoints)
|
||||
}
|
||||
|
||||
// Clean-up previously deleted objects. from .minio.sys/tmp/.trash/
|
||||
func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
|
||||
// run multiple cleanup's local to this server.
|
||||
var wg sync.WaitGroup
|
||||
for _, disk := range er.getLoadBalancedLocalDisks() {
|
||||
if disk != nil {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
diskPath := disk.Endpoint().Path
|
||||
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
|
||||
wait := er.deletedCleanupSleeper.Timer(ctx)
|
||||
if intDataUpdateTracker != nil && intDataUpdateTracker.debug {
|
||||
logger.Info("cleanupDeletedObjects: %s/%s", minioMetaTmpDeletedBucket, ddir)
|
||||
}
|
||||
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))
|
||||
wait()
|
||||
return nil
|
||||
})
|
||||
}(disk)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// CrawlAndGetDataUsage will start crawling buckets and send updated totals as they are traversed.
|
||||
// Updates are sent on a regular basis and the caller *must* consume them.
|
||||
func (er erasureObjects) crawlAndGetDataUsage(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, updates chan<- dataUsageCache) error {
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"os"
|
||||
pathutil "path"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/lock"
|
||||
@ -403,6 +404,55 @@ func fsRenameFile(ctx context.Context, sourcePath, destPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
if basePath == "" || deletePath == "" {
|
||||
return nil
|
||||
}
|
||||
isObjectDir := HasSuffix(deletePath, SlashSeparator)
|
||||
basePath = pathutil.Clean(basePath)
|
||||
deletePath = pathutil.Clean(deletePath)
|
||||
if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
if recursive {
|
||||
os.RemoveAll(deletePath)
|
||||
} else {
|
||||
err = os.Remove(deletePath)
|
||||
}
|
||||
if err != nil {
|
||||
switch {
|
||||
case isSysErrNotEmpty(err):
|
||||
// if object is a directory, but if its not empty
|
||||
// return FileNotFound to indicate its an empty prefix.
|
||||
if isObjectDir {
|
||||
return errFileNotFound
|
||||
}
|
||||
// Ignore errors if the directory is not empty. The server relies on
|
||||
// this functionality, and sometimes uses recursion that should not
|
||||
// error on parent directories.
|
||||
return nil
|
||||
case osIsNotExist(err):
|
||||
return errFileNotFound
|
||||
case osIsPermission(err):
|
||||
return errFileAccessDenied
|
||||
case isSysErrIO(err):
|
||||
return errFaultyDisk
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
deletePath = pathutil.Dir(deletePath)
|
||||
|
||||
// Delete parent directory obviously not recursively. Errors for
|
||||
// parent directories shouldn't trickle down.
|
||||
deleteFile(basePath, deletePath, false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fsDeleteFile is a wrapper for deleteFile(), after checking the path length.
|
||||
func fsDeleteFile(ctx context.Context, basePath, deletePath string) error {
|
||||
if err := checkPathLength(basePath); err != nil {
|
||||
|
@ -58,6 +58,8 @@ const (
|
||||
minioMetaMultipartBucket = minioMetaBucket + SlashSeparator + mpartMetaPrefix
|
||||
// MinIO Tmp meta prefix.
|
||||
minioMetaTmpBucket = minioMetaBucket + "/tmp"
|
||||
// MinIO tmp meta prefix for deleted objects.
|
||||
minioMetaTmpDeletedBucket = minioMetaTmpBucket + "/.trash"
|
||||
// DNS separator (period), used for bucket name validation.
|
||||
dnsDelimiter = "."
|
||||
// On compressed files bigger than this;
|
||||
|
@ -68,6 +68,14 @@ func isSysErrTooManySymlinks(err error) bool {
|
||||
return errors.Is(err, syscall.ELOOP)
|
||||
}
|
||||
|
||||
func osIsNotExist(err error) bool {
|
||||
return errors.Is(err, os.ErrNotExist)
|
||||
}
|
||||
|
||||
func osIsPermission(err error) bool {
|
||||
return errors.Is(err, os.ErrPermission)
|
||||
}
|
||||
|
||||
// Check if the given error corresponds to ENOTEMPTY for unix,
|
||||
// EEXIST for solaris variants,
|
||||
// and ERROR_DIR_NOT_EMPTY for windows (directory not empty).
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
pathutil "path"
|
||||
slashpath "path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
@ -1171,7 +1172,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
if !isXL2V1Format(buf) {
|
||||
// Delete the meta file, if there are no more versions the
|
||||
// top level parent is automatically removed.
|
||||
return deleteFile(volumeDir, pathJoin(volumeDir, path), true)
|
||||
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true)
|
||||
}
|
||||
|
||||
var xlMeta xlMetaV2
|
||||
@ -1196,7 +1197,8 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
return err
|
||||
}
|
||||
|
||||
if err = removeAll(filePath); err != nil {
|
||||
tmpuuid := mustGetUUID()
|
||||
if err = renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -1212,7 +1214,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
return err
|
||||
}
|
||||
|
||||
return deleteFile(volumeDir, filePath, false)
|
||||
return s.deleteFile(volumeDir, filePath, false)
|
||||
}
|
||||
|
||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||
@ -1930,7 +1932,7 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
|
||||
// move up the tree, deleting empty parent directories until it finds one
|
||||
// with files in it. Returns nil for a non-empty directory even when
|
||||
// recursive is set to false.
|
||||
func deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
if basePath == "" || deletePath == "" {
|
||||
return nil
|
||||
}
|
||||
@ -1943,7 +1945,8 @@ func deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
|
||||
var err error
|
||||
if recursive {
|
||||
err = os.RemoveAll(deletePath)
|
||||
tmpuuid := mustGetUUID()
|
||||
err = renameAll(deletePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
} else {
|
||||
err = os.Remove(deletePath)
|
||||
}
|
||||
@ -1974,7 +1977,7 @@ func deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
|
||||
// Delete parent directory obviously not recursively. Errors for
|
||||
// parent directories shouldn't trickle down.
|
||||
deleteFile(basePath, deletePath, false)
|
||||
s.deleteFile(basePath, deletePath, false)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -2012,7 +2015,7 @@ func (s *xlStorage) DeleteFile(ctx context.Context, volume string, path string)
|
||||
}
|
||||
|
||||
// Delete file and delete parent directory as well if its empty.
|
||||
return deleteFile(volumeDir, filePath, false)
|
||||
return s.deleteFile(volumeDir, filePath, false)
|
||||
}
|
||||
|
||||
func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
|
||||
@ -2049,7 +2052,7 @@ func (s *xlStorage) DeleteFileBulk(volume string, paths []string) (errs []error,
|
||||
continue
|
||||
}
|
||||
// Delete file and delete parent directory as well if its empty.
|
||||
errs[idx] = deleteFile(volumeDir, filePath, false)
|
||||
errs[idx] = s.deleteFile(volumeDir, filePath, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -2243,8 +2246,10 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
|
||||
// Commit data
|
||||
if srcDataPath != "" {
|
||||
removeAll(oldDstDataPath)
|
||||
removeAll(dstDataPath)
|
||||
tmpuuid := mustGetUUID()
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
tmpuuid = mustGetUUID()
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
@ -2257,12 +2262,12 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
|
||||
deleteFile(srcVolumeDir, parentDir, false)
|
||||
s.deleteFile(srcVolumeDir, parentDir, false)
|
||||
}
|
||||
|
||||
if srcDataPath != "" {
|
||||
if parentDir := slashpath.Dir(srcDataPath); isDirEmpty(parentDir) {
|
||||
deleteFile(srcVolumeDir, parentDir, false)
|
||||
s.deleteFile(srcVolumeDir, parentDir, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user