mirror of https://github.com/minio/minio.git
xl: Implement posix.DeletePrefixes to enhance delete perf (#9100)
Bulk delete API was using cleanupObjectsBulk() which calls posix listing and delete API to remove objects internal files in the backend (xl.json and parts) one by one. Add DeletePrefixes in the storage API to remove the content of a directory in a single call. Also use a remove goroutine for each disk to accelerate removal.
This commit is contained in:
parent
7c32f3f554
commit
0af62d35a0
|
@ -429,7 +429,7 @@ func fsDeleteFile(ctx context.Context, basePath, deletePath string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := deleteFile(basePath, deletePath); err != nil {
|
||||
if err := deleteFile(basePath, deletePath, false); err != nil {
|
||||
if err != errFileNotFound {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
|
|
@ -196,6 +196,13 @@ func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, er
|
|||
return errs, nil
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DeletePrefixes(volume string, paths []string) ([]error, error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return d.disk.DeletePrefixes(volume, paths)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) WriteAll(volume string, path string, reader io.Reader) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
|
|
|
@ -147,78 +147,6 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string)
|
|||
return err
|
||||
}
|
||||
|
||||
// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend
|
||||
func cleanupObjectsBulk(storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) {
|
||||
// The list of files in disk to delete
|
||||
var filesToDelete []string
|
||||
// Map files to delete to the passed objsPaths
|
||||
var filesToDeleteObjsIndexes []int
|
||||
|
||||
// Traverse and return the list of sub entries
|
||||
var traverse func(string) ([]string, error)
|
||||
traverse = func(entryPath string) ([]string, error) {
|
||||
var output = make([]string, 0)
|
||||
if !HasSuffix(entryPath, SlashSeparator) {
|
||||
output = append(output, entryPath)
|
||||
return output, nil
|
||||
}
|
||||
entries, err := storage.ListDir(volume, entryPath, -1, "")
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
subEntries, err := traverse(pathJoin(entryPath, entry))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
output = append(output, subEntries...)
|
||||
}
|
||||
return output, nil
|
||||
}
|
||||
|
||||
// Find and collect the list of files to remove associated
|
||||
// to the passed objects paths
|
||||
for idx, objPath := range objsPaths {
|
||||
if errs[idx] != nil {
|
||||
continue
|
||||
}
|
||||
output, err := traverse(retainSlash(pathJoin(objPath)))
|
||||
if err != nil {
|
||||
errs[idx] = err
|
||||
continue
|
||||
} else {
|
||||
errs[idx] = nil
|
||||
}
|
||||
filesToDelete = append(filesToDelete, output...)
|
||||
for i := 0; i < len(output); i++ {
|
||||
filesToDeleteObjsIndexes = append(filesToDeleteObjsIndexes, idx)
|
||||
}
|
||||
}
|
||||
|
||||
// Reverse the list so remove can succeed
|
||||
reverseStringSlice(filesToDelete)
|
||||
|
||||
dErrs, err := storage.DeleteFileBulk(volume, filesToDelete)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Map files deletion errors to the correspondent objects
|
||||
for i := range dErrs {
|
||||
if dErrs[i] != nil {
|
||||
if errs[filesToDeleteObjsIndexes[i]] != nil {
|
||||
errs[filesToDeleteObjsIndexes[i]] = dErrs[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// Removes notification.xml for a given bucket, only used during DeleteBucket.
|
||||
func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error {
|
||||
// Verify bucket is valid.
|
||||
|
|
|
@ -179,6 +179,13 @@ func (p *posixDiskIDCheck) DeleteFileBulk(volume string, paths []string) (errs [
|
|||
return p.storage.DeleteFileBulk(volume, paths)
|
||||
}
|
||||
|
||||
func (p *posixDiskIDCheck) DeletePrefixes(volume string, paths []string) (errs []error, err error) {
|
||||
if p.isDiskStale() {
|
||||
return nil, errDiskNotFound
|
||||
}
|
||||
return p.storage.DeletePrefixes(volume, paths)
|
||||
}
|
||||
|
||||
func (p *posixDiskIDCheck) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
|
||||
if p.isDiskStale() {
|
||||
return errDiskNotFound
|
||||
|
|
86
cmd/posix.go
86
cmd/posix.go
|
@ -1265,16 +1265,28 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
// deleteFile deletes a file path if its empty. If it's successfully deleted,
|
||||
// it will recursively move up the tree, deleting empty parent directories
|
||||
// until it finds one with files in it. Returns nil for a non-empty directory.
|
||||
func deleteFile(basePath, deletePath string) error {
|
||||
if basePath == deletePath {
|
||||
// deleteFile deletes a file or a directory if its empty unless recursive
|
||||
// is set to true. If the target is successfully deleted, it will recursively
|
||||
// move up the tree, deleting empty parent directories until it finds one
|
||||
// with files in it. Returns nil for a non-empty directory even when
|
||||
// recursive is set to false.
|
||||
func deleteFile(basePath, deletePath string, recursive bool) error {
|
||||
if basePath == "" || deletePath == "" {
|
||||
return nil
|
||||
}
|
||||
basePath = filepath.Clean(basePath)
|
||||
deletePath = filepath.Clean(deletePath)
|
||||
if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Attempt to remove path.
|
||||
if err := os.Remove((deletePath)); err != nil {
|
||||
var err error
|
||||
if recursive {
|
||||
err = os.RemoveAll(deletePath)
|
||||
} else {
|
||||
err = os.Remove(deletePath)
|
||||
}
|
||||
if err != nil {
|
||||
switch {
|
||||
case isSysErrNotEmpty(err):
|
||||
// Ignore errors if the directory is not empty. The server relies on
|
||||
|
@ -1297,12 +1309,58 @@ func deleteFile(basePath, deletePath string) error {
|
|||
deletePath = strings.TrimSuffix(deletePath, SlashSeparator)
|
||||
deletePath = slashpath.Dir(deletePath)
|
||||
|
||||
// Delete parent directory. Errors for parent directories shouldn't trickle down.
|
||||
deleteFile(basePath, deletePath)
|
||||
// Delete parent directory obviously not recursively. Errors for
|
||||
// parent directories shouldn't trickle down.
|
||||
deleteFile(basePath, deletePath, false)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeletePrefixes forcibly deletes all the contents of a set of specified paths.
|
||||
// Parent directories are automatically removed if they become empty. err can
|
||||
// bil nil while errs can contain some errors for corresponding objects. No error
|
||||
// is set if a specified prefix path does not exist.
|
||||
func (s *posix) DeletePrefixes(volume string, paths []string) (errs []error, err error) {
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt32(&s.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Stat a volume entry.
|
||||
_, err = os.Stat(volumeDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
} else if os.IsPermission(err) {
|
||||
return nil, errVolumeAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
errs = make([]error, len(paths))
|
||||
// Following code is needed so that we retain SlashSeparator
|
||||
// suffix if any in path argument.
|
||||
for idx, path := range paths {
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
errs[idx] = checkPathLength(filePath)
|
||||
if errs[idx] != nil {
|
||||
continue
|
||||
}
|
||||
// Delete file or a directory recursively, delete parent
|
||||
// directory as well if its empty.
|
||||
errs[idx] = deleteFile(volumeDir, filePath, true)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// DeleteFile - delete a file at path.
|
||||
func (s *posix) DeleteFile(volume, path string) (err error) {
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
|
@ -1316,7 +1374,7 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
|||
}
|
||||
|
||||
// Stat a volume entry.
|
||||
_, err = os.Stat((volumeDir))
|
||||
_, err = os.Stat(volumeDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
|
@ -1331,12 +1389,12 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
|||
// Following code is needed so that we retain SlashSeparator suffix if any in
|
||||
// path argument.
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
if err = checkPathLength((filePath)); err != nil {
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete file and delete parent directory as well if its empty.
|
||||
return deleteFile(volumeDir, filePath)
|
||||
return deleteFile(volumeDir, filePath, false)
|
||||
}
|
||||
|
||||
func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
|
||||
|
@ -1373,7 +1431,7 @@ func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err
|
|||
continue
|
||||
}
|
||||
// Delete file and delete parent directory as well if its empty.
|
||||
errs[idx] = deleteFile(volumeDir, filePath)
|
||||
errs[idx] = deleteFile(volumeDir, filePath, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -1460,7 +1518,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
|
|||
|
||||
// Remove parent dir of the source file if empty
|
||||
if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) {
|
||||
deleteFile(srcVolumeDir, parentDir)
|
||||
deleteFile(srcVolumeDir, parentDir, false)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -55,6 +55,7 @@ type StorageAPI interface {
|
|||
StatFile(volume string, path string) (file FileInfo, err error)
|
||||
DeleteFile(volume string, path string) (err error)
|
||||
DeleteFileBulk(volume string, paths []string) (errs []error, err error)
|
||||
DeletePrefixes(volume string, paths []string) (errs []error, err error)
|
||||
VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error
|
||||
|
||||
// Write all data, syncs the data to disk.
|
||||
|
|
|
@ -414,13 +414,54 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (
|
|||
|
||||
respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader, err := clearLeadingSpaces(respBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dErrResp := &DeleteFileBulkErrsResp{}
|
||||
if err = gob.NewDecoder(respBody).Decode(dErrResp); err != nil {
|
||||
if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, dErr := range dErrResp.Errs {
|
||||
errs = append(errs, toStorageErr(dErr))
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// DeletePrefixes - deletes prefixes in bulk.
|
||||
func (client *storageRESTClient) DeletePrefixes(volume string, paths []string) (errs []error, err error) {
|
||||
if len(paths) == 0 {
|
||||
return errs, err
|
||||
}
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
for _, path := range paths {
|
||||
buffer.WriteString(path)
|
||||
buffer.WriteString("\n")
|
||||
}
|
||||
|
||||
respBody, err := client.call(storageRESTMethodDeletePrefixes, values, &buffer, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reader, err := clearLeadingSpaces(respBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dErrResp := &DeletePrefixesErrsResp{}
|
||||
if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -443,6 +484,22 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa
|
|||
return err
|
||||
}
|
||||
|
||||
// clearLeadingSpaces removes all the first spaces returned from a reader.
|
||||
func clearLeadingSpaces(r io.Reader) (io.Reader, error) {
|
||||
reader := bufio.NewReader(r)
|
||||
for {
|
||||
b, err := reader.ReadByte()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if b != ' ' {
|
||||
reader.UnreadByte()
|
||||
break
|
||||
}
|
||||
}
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
|
@ -457,17 +514,10 @@ func (client *storageRESTClient) VerifyFile(volume, path string, size int64, alg
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reader := bufio.NewReader(respBody)
|
||||
for {
|
||||
b, err := reader.ReadByte()
|
||||
reader, err := clearLeadingSpaces(respBody)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != ' ' {
|
||||
reader.UnreadByte()
|
||||
break
|
||||
}
|
||||
}
|
||||
verifyResp := &VerifyFileResp{}
|
||||
if err = gob.NewDecoder(reader).Decode(verifyResp); err != nil {
|
||||
return err
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v14" // DeleteFileBulk API change
|
||||
storageRESTVersion = "v15" // Adding DeletePrefixes API
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
@ -42,6 +42,7 @@ const (
|
|||
storageRESTMethodWalk = "/walk"
|
||||
storageRESTMethodDeleteFile = "/deletefile"
|
||||
storageRESTMethodDeleteFileBulk = "/deletefilebulk"
|
||||
storageRESTMethodDeletePrefixes = "/deleteprefixes"
|
||||
storageRESTMethodRenameFile = "/renamefile"
|
||||
storageRESTMethodVerifyFile = "/verifyfile"
|
||||
)
|
||||
|
|
|
@ -508,20 +508,69 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
w.Header().Set(xhttp.ContentType, "text/event-stream")
|
||||
encoder := gob.NewEncoder(w)
|
||||
doneCh := sendWhiteSpaceToHTTPResponse(w)
|
||||
errs, err := s.storage.DeleteFileBulk(volume, filePaths)
|
||||
<-doneCh
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
derrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))}
|
||||
dErrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))}
|
||||
for idx, err := range errs {
|
||||
if err != nil {
|
||||
derrsResp.Errs[idx] = StorageErr(err.Error())
|
||||
dErrsResp.Errs[idx] = StorageErr(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
gob.NewEncoder(w).Encode(derrsResp)
|
||||
encoder.Encode(dErrsResp)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// DeletePrefixesErrsResp - collection of delete errors
|
||||
// for bulk prefixes deletes
|
||||
type DeletePrefixesErrsResp struct {
|
||||
Errs []error
|
||||
}
|
||||
|
||||
// DeletePrefixesHandler - delete a set of a prefixes.
|
||||
func (s *storageRESTServer) DeletePrefixesHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
vars := r.URL.Query()
|
||||
volume := vars.Get(storageRESTVolume)
|
||||
|
||||
bio := bufio.NewScanner(r.Body)
|
||||
var prefixes []string
|
||||
for bio.Scan() {
|
||||
prefixes = append(prefixes, bio.Text())
|
||||
}
|
||||
|
||||
if err := bio.Err(); err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set(xhttp.ContentType, "text/event-stream")
|
||||
encoder := gob.NewEncoder(w)
|
||||
doneCh := sendWhiteSpaceToHTTPResponse(w)
|
||||
errs, err := s.storage.DeletePrefixes(volume, prefixes)
|
||||
<-doneCh
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
dErrsResp := &DeletePrefixesErrsResp{Errs: make([]error, len(errs))}
|
||||
for idx, err := range errs {
|
||||
if err != nil {
|
||||
dErrsResp.Errs[idx] = StorageErr(err.Error())
|
||||
}
|
||||
}
|
||||
encoder.Encode(dErrsResp)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
|
@ -665,6 +714,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
|
|||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeletePrefixes).HandlerFunc(httpTraceHdrs(server.DeletePrefixesHandler)).
|
||||
Queries(restQueries(storageRESTVolume)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)).
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"sync"
|
||||
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
|
@ -781,19 +782,26 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
|
|||
// Initialize list of errors.
|
||||
var opErrs = make([]error, len(disks))
|
||||
var delObjErrs = make([][]error, len(disks))
|
||||
var wg = sync.WaitGroup{}
|
||||
|
||||
// Remove objects in bulk for each disk
|
||||
for index, disk := range disks {
|
||||
if disk == nil {
|
||||
opErrs[index] = errDiskNotFound
|
||||
for i, d := range disks {
|
||||
if d == nil {
|
||||
opErrs[i] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs)
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
delObjErrs[index], opErrs[index] = disk.DeletePrefixes(minioMetaTmpBucket, tmpObjs)
|
||||
if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound {
|
||||
opErrs[index] = nil
|
||||
}
|
||||
}(i, d)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Return errors if any during deletion
|
||||
if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(disks)/2+1); err != nil {
|
||||
return nil, err
|
||||
|
@ -805,11 +813,16 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
|
|||
continue
|
||||
}
|
||||
listErrs := make([]error, len(disks))
|
||||
// Iterate over disks to fetch the error
|
||||
// of deleting of the current object
|
||||
for i := range delObjErrs {
|
||||
// delObjErrs[i] is not nil when disks[i] is also not nil
|
||||
if delObjErrs[i] != nil {
|
||||
if delObjErrs[i][objIndex] != errFileNotFound {
|
||||
listErrs[i] = delObjErrs[i][objIndex]
|
||||
}
|
||||
}
|
||||
}
|
||||
errs[objIndex] = reduceWriteQuorumErrs(ctx, listErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue