convert multipart-cleanup from a blocking unlink() to a rename to trash (#19495)

unlinking() at two different locations on a disk when there
are lots to purge, this can lead to huge IOwaits, instead
rely on rename() to .trash to avoid running multiple unlinks()
in parallel.
This commit is contained in:
Harshavardhana
2024-04-15 03:02:39 -07:00
committed by GitHub
parent 1c70e9ed1b
commit cb06aee5ac
3 changed files with 45 additions and 33 deletions

View File

@@ -194,11 +194,10 @@ func (er erasureObjects) deleteAll(ctx context.Context, bucket, prefix string) {
// Remove the old multipart uploads on the given disk.
func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
now := time.Now()
diskPath := disk.Endpoint().Path
drivePath := disk.Endpoint().Path
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
readDirFn(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, "", minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
if err != nil {
@@ -206,9 +205,12 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(fi.ModTime) > expiry {
removeAll(pathJoin(diskPath, minioMetaMultipartBucket, uploadIDPath))
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(fi.ModTime) > expiry {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
renameAll(pathJoin(drivePath, minioMetaMultipartBucket, uploadIDPath), targetPath, pathJoin(drivePath, minioMetaBucket))
}
wait()
return nil
@@ -220,19 +222,23 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(vi.Created) > expiry {
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(vi.Created) > expiry {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
// We are not deleting shaDir recursively here, if shaDir is empty
// and its older then we can happily delete it.
Remove(pathJoin(diskPath, minioMetaMultipartBucket, shaDir))
Rename(pathJoin(drivePath, minioMetaMultipartBucket, shaDir), targetPath)
}
wait()
return nil
})
})
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
readDirFn(pathJoin(drivePath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
if strings.HasPrefix(tmpDir, ".trash") {
// do not remove .trash/ here, it has its own routines
return nil
}
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
@@ -241,9 +247,12 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
}
w := xioutil.NewDeadlineWorker(globalDriveConfig.GetMaxTimeout())
return w.Run(func() error {
wait := deletedCleanupSleeper.Timer(ctx)
if now.Sub(vi.Created) > expiry {
removeAll(pathJoin(diskPath, minioMetaTmpBucket, tmpDir))
wait := deleteMultipartCleanupSleeper.Timer(ctx)
if time.Since(vi.Created) > expiry {
pathUUID := mustGetUUID()
targetPath := pathJoin(drivePath, minioMetaTmpDeletedBucket, pathUUID)
renameAll(pathJoin(drivePath, minioMetaTmpBucket, tmpDir), targetPath, pathJoin(drivePath, minioMetaBucket))
}
wait()
return nil