xl: remove cleanupDir instead use Delete() (#11880)

use a single call to remove directly at disk
instead of doing recursively at network layer.
This commit is contained in:
Harshavardhana 2021-03-24 09:08:05 -07:00 committed by GitHub
parent fad7b27f15
commit 75741dbf4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 103 deletions

View File

@ -144,12 +144,12 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
if err == errVolumeNotEmpty {
// Attempt to delete bucket again.
if derr := storageDisks[index].DeleteVol(ctx, bucket, false); derr == errVolumeNotEmpty {
_ = cleanupDir(ctx, storageDisks[index], bucket, "")
_ = storageDisks[index].Delete(ctx, bucket, "", true)
_ = storageDisks[index].DeleteVol(ctx, bucket, false)
// Cleanup all the previously incomplete multiparts.
_ = cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
_ = storageDisks[index].Delete(ctx, minioMetaMultipartBucket, bucket, true)
}
}
}
@ -170,8 +170,7 @@ func (er erasureObjects) DeleteBucket(ctx context.Context, bucket string, forceD
if err := storageDisks[index].DeleteVol(ctx, bucket, forceDelete); err != nil {
return err
}
err := cleanupDir(ctx, storageDisks[index], minioMetaMultipartBucket, bucket)
if err != nil && err != errVolumeNotFound {
if err := storageDisks[index].Delete(ctx, minioMetaMultipartBucket, bucket, true); err != errFileNotFound {
return err
}
return nil

View File

@ -881,7 +881,7 @@ func (er erasureObjects) deleteObject(ctx context.Context, bucket, object string
if disks[index] == nil {
return errDiskNotFound
}
return cleanupDir(ctx, disks[index], minioMetaTmpBucket, tmpObj)
return disks[index].Delete(ctx, minioMetaTmpBucket, tmpObj, true)
}, index)
}

View File

@ -18,12 +18,10 @@ package cmd
import (
"context"
"errors"
"strings"
"sync"
humanize "github.com/dustin/go-humanize"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/sync/errgroup"
)
@ -88,72 +86,6 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
return newStorageRESTClient(endpoint, true), nil
}
// Cleanup a directory recursively.
func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) error {
var delFunc func(string) error
// Function to delete entries recursively.
delFunc = func(entryPath string) error {
if !HasSuffix(entryPath, SlashSeparator) {
// Delete the file entry.
err := storage.Delete(ctx, volume, entryPath, false)
if !IsErrIgnored(err, []error{
errDiskNotFound,
errUnformattedDisk,
errFileNotFound,
}...) {
logger.LogIf(ctx, err)
}
return err
}
// If it's a directory, list and call delFunc() for each entry.
entries, err := storage.ListDir(ctx, volume, entryPath, -1)
// If entryPath prefix never existed, safe to ignore
if errors.Is(err, errFileNotFound) {
return nil
} else if err != nil { // For any other errors fail.
if !IsErrIgnored(err, []error{
errDiskNotFound,
errUnformattedDisk,
errFileNotFound,
}...) {
logger.LogIf(ctx, err)
}
return err
} // else on success..
// Entry path is empty, just delete it.
if len(entries) == 0 {
err = storage.Delete(ctx, volume, entryPath, false)
if !IsErrIgnored(err, []error{
errDiskNotFound,
errUnformattedDisk,
errFileNotFound,
}...) {
logger.LogIf(ctx, err)
}
return err
}
// Recurse and delete all other entries.
for _, entry := range entries {
if err = delFunc(pathJoin(entryPath, entry)); err != nil {
return err
}
}
return nil
}
err := delFunc(retainSlash(pathJoin(dirPath)))
if IsErrIgnored(err, []error{
errVolumeNotFound,
errVolumeAccessDenied,
}...) {
return nil
}
return err
}
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, isLeaf IsLeafFunc, isLeafDir IsLeafDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
endWalkCh := make(chan struct{})
defer close(endWalkCh)

View File

@ -785,6 +785,7 @@ func (s *xlStorage) ListDir(ctx context.Context, volume, dirPath string, count i
// or multiple objects.
func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error {
errs := make([]error, len(versions))
for i, version := range versions {
if err := s.DeleteVersion(ctx, volume, version.Name, version, false); err != nil {
errs[i] = err
@ -831,7 +832,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
if !isXL2V1Format(buf) {
// Delete the meta file, if there are no more versions the
// top level parent is automatically removed.
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true)
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true, volume == minioMetaTmpBucket)
}
var xlMeta xlMetaV2
@ -877,7 +878,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return err
}
return s.deleteFile(volumeDir, filePath, false)
return s.deleteFile(volumeDir, filePath, false, volume == minioMetaTmpBucket)
}
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
@ -1206,17 +1207,7 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
return int64(len(buffer)), nil
}
func (s *xlStorage) openFile(volume, path string, mode int) (f *os.File, err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return nil, err
}
func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error) {
// Create top level directories if they don't exist.
// with mode 0777 mkdir honors system umask.
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
@ -1428,10 +1419,27 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
return errInvalidArgument
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return err
}
parentFilePath := pathutil.Dir(filePath)
defer func() {
if err != nil {
removeAll(parentFilePath)
}
}()
if fileSize >= 0 && fileSize <= smallFileThreshold {
// For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync)
// and not O_DIRECT to avoid the complexities of aligned I/O.
w, err := s.openFile(volume, path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
if err != nil {
return err
}
@ -1449,15 +1457,9 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
return nil
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
filePath := pathJoin(volumeDir, path)
// Create top level directories if they don't exist.
// with mode 0777 mkdir honors system umask.
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
if err = mkdirAll(parentFilePath, 0777); err != nil {
return osErrToFileErr(err)
}
@ -1489,7 +1491,17 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
}
func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
w, err := s.openFile(volume, path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return err
}
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
if err != nil {
return err
}
@ -1523,10 +1535,15 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
return err
}
filePath := pathJoin(volumeDir, path)
if err = checkPathLength(filePath); err != nil {
return err
}
var w *os.File
// Create file if not found. Not doing O_DIRECT here to avoid the code that does buffer aligned writes.
// AppendFile() is only used by healing code to heal objects written in old format.
w, err = s.openFile(volume, path, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
w, err = s.openFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
if err != nil {
return err
}
@ -1645,7 +1662,7 @@ func (s *xlStorage) CheckFile(ctx context.Context, volume string, path string) e
// move up the tree, deleting empty parent directories until it finds one
// with files in it. Returns nil for a non-empty directory even when
// recursive is set to false.
func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) error {
func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool, tmpbucket bool) error {
if basePath == "" || deletePath == "" {
return nil
}
@ -1658,8 +1675,11 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) erro
var err error
if recursive {
tmpuuid := mustGetUUID()
err = renameAll(deletePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
if tmpbucket {
err = removeAll(deletePath)
} else {
err = renameAll(deletePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
}
} else {
err = Remove(deletePath)
}
@ -1690,7 +1710,7 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) erro
// Delete parent directory obviously not recursively. Errors for
// parent directories shouldn't trickle down.
s.deleteFile(basePath, deletePath, false)
s.deleteFile(basePath, deletePath, false, tmpbucket)
return nil
}
@ -1723,7 +1743,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
}
// Delete file and delete parent directory as well if it's empty.
return s.deleteFile(volumeDir, filePath, recursive)
return s.deleteFile(volumeDir, filePath, recursive, volume == minioMetaTmpBucket)
}
// RenameData - rename source path to destination path atomically, metadata and data directory.
@ -1965,7 +1985,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
// Remove parent dir of the source file if empty
parentDir := pathutil.Dir(srcFilePath)
s.deleteFile(srcVolumeDir, parentDir, false)
s.deleteFile(srcVolumeDir, parentDir, false, srcVolume == minioMetaTmpBucket)
return nil
}
@ -2044,7 +2064,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
// Remove parent dir of the source file if empty
parentDir := pathutil.Dir(srcFilePath)
s.deleteFile(srcVolumeDir, parentDir, false)
s.deleteFile(srcVolumeDir, parentDir, false, srcVolume == minioMetaTmpBucket)
return nil
}