mirror of
https://github.com/minio/minio.git
synced 2025-04-05 04:10:28 -04:00
fix: add deadline worker pattern for local disk removers (#17845)
This commit is contained in:
parent
b760137e1d
commit
4643efe6be
@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/minio/minio/internal/crypto"
|
"github.com/minio/minio/internal/crypto"
|
||||||
"github.com/minio/minio/internal/hash"
|
"github.com/minio/minio/internal/hash"
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/mimedb"
|
"github.com/minio/pkg/mimedb"
|
||||||
"github.com/minio/pkg/sync/errgroup"
|
"github.com/minio/pkg/sync/errgroup"
|
||||||
@ -205,6 +206,8 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
||||||
|
return w.Run(func() error {
|
||||||
wait := deletedCleanupSleeper.Timer(ctx)
|
wait := deletedCleanupSleeper.Timer(ctx)
|
||||||
if now.Sub(fi.ModTime) > expiry {
|
if now.Sub(fi.ModTime) > expiry {
|
||||||
removeAll(pathJoin(diskPath, minioMetaMultipartBucket, uploadIDPath))
|
removeAll(pathJoin(diskPath, minioMetaMultipartBucket, uploadIDPath))
|
||||||
@ -212,10 +215,13 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
wait()
|
wait()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
})
|
||||||
vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir))
|
vi, err := disk.StatVol(ctx, pathJoin(minioMetaMultipartBucket, shaDir))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
||||||
|
return w.Run(func() error {
|
||||||
wait := deletedCleanupSleeper.Timer(ctx)
|
wait := deletedCleanupSleeper.Timer(ctx)
|
||||||
if now.Sub(vi.Created) > expiry {
|
if now.Sub(vi.Created) > expiry {
|
||||||
// We are not deleting shaDir recursively here, if shaDir is empty
|
// We are not deleting shaDir recursively here, if shaDir is empty
|
||||||
@ -225,6 +231,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
wait()
|
wait()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
})
|
||||||
|
|
||||||
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
|
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
|
||||||
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
|
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
|
||||||
@ -234,6 +241,8 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
||||||
|
return w.Run(func() error {
|
||||||
wait := deletedCleanupSleeper.Timer(ctx)
|
wait := deletedCleanupSleeper.Timer(ctx)
|
||||||
if now.Sub(vi.Created) > expiry {
|
if now.Sub(vi.Created) > expiry {
|
||||||
removeAll(pathJoin(diskPath, minioMetaTmpBucket, tmpDir))
|
removeAll(pathJoin(diskPath, minioMetaTmpBucket, tmpDir))
|
||||||
@ -241,6 +250,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
|||||||
wait()
|
wait()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListMultipartUploads - lists all the pending multipart
|
// ListMultipartUploads - lists all the pending multipart
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/minio/internal/bpool"
|
"github.com/minio/minio/internal/bpool"
|
||||||
"github.com/minio/minio/internal/dsync"
|
"github.com/minio/minio/internal/dsync"
|
||||||
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/sync/errgroup"
|
"github.com/minio/pkg/sync/errgroup"
|
||||||
)
|
)
|
||||||
@ -342,11 +343,14 @@ func (er erasureObjects) cleanupDeletedObjects(ctx context.Context) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
diskPath := disk.Endpoint().Path
|
diskPath := disk.Endpoint().Path
|
||||||
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
|
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
|
||||||
|
w := xioutil.NewDeadlineWorker(diskMaxTimeout)
|
||||||
|
return w.Run(func() error {
|
||||||
wait := deletedCleanupSleeper.Timer(ctx)
|
wait := deletedCleanupSleeper.Timer(ctx)
|
||||||
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))
|
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))
|
||||||
wait()
|
wait()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
})
|
||||||
}(disk)
|
}(disk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user