From 0af62d35a0c89ea9d4cf1e4d3c0a79209afd88df Mon Sep 17 00:00:00 2001 From: Anis Elleuch Date: Wed, 11 Mar 2020 08:56:36 -0700 Subject: [PATCH] xl: Implement posix.DeletePrefixes to enhance delete perf (#9100) Bulk delete API was using cleanupObjectsBulk() which calls posix listing and delete API to remove objects internal files in the backend (xl.json and parts) one by one. Add DeletePrefixes in the storage API to remove the content of a directory in a single call. Also use a remove goroutine for each disk to accelerate removal. --- cmd/fs-v1-helpers.go | 2 +- cmd/naughty-disk_test.go | 7 ++++ cmd/object-api-common.go | 72 ------------------------------- cmd/posix-diskid-check.go | 7 ++++ cmd/posix.go | 86 +++++++++++++++++++++++++++++++------- cmd/storage-interface.go | 1 + cmd/storage-rest-client.go | 72 ++++++++++++++++++++++++++----- cmd/storage-rest-common.go | 3 +- cmd/storage-rest-server.go | 57 +++++++++++++++++++++++-- cmd/xl-v1-object.go | 29 +++++++++---- 10 files changed, 226 insertions(+), 110 deletions(-) diff --git a/cmd/fs-v1-helpers.go b/cmd/fs-v1-helpers.go index f8a187a87..a275c08b6 100644 --- a/cmd/fs-v1-helpers.go +++ b/cmd/fs-v1-helpers.go @@ -429,7 +429,7 @@ func fsDeleteFile(ctx context.Context, basePath, deletePath string) error { return err } - if err := deleteFile(basePath, deletePath); err != nil { + if err := deleteFile(basePath, deletePath, false); err != nil { if err != errFileNotFound { logger.LogIf(ctx, err) } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 3dda0b9a2..0c89b8d59 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -196,6 +196,13 @@ func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, er return errs, nil } +func (d *naughtyDisk) DeletePrefixes(volume string, paths []string) ([]error, error) { + if err := d.calcError(); err != nil { + return nil, err + } + return d.disk.DeletePrefixes(volume, paths) +} + func (d *naughtyDisk) WriteAll(volume string, path string, reader io.Reader) (err error) { if err := d.calcError(); err != nil { return err diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 1830482d4..ba2c3b18f 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -147,78 +147,6 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string) return err } -// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend -func cleanupObjectsBulk(storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) { - // The list of files in disk to delete - var filesToDelete []string - // Map files to delete to the passed objsPaths - var filesToDeleteObjsIndexes []int - - // Traverse and return the list of sub entries - var traverse func(string) ([]string, error) - traverse = func(entryPath string) ([]string, error) { - var output = make([]string, 0) - if !HasSuffix(entryPath, SlashSeparator) { - output = append(output, entryPath) - return output, nil - } - entries, err := storage.ListDir(volume, entryPath, -1, "") - if err != nil { - if err == errFileNotFound { - return nil, nil - } - return nil, err - } - - for _, entry := range entries { - subEntries, err := traverse(pathJoin(entryPath, entry)) - if err != nil { - return nil, err - } - output = append(output, subEntries...) - } - return output, nil - } - - // Find and collect the list of files to remove associated - // to the passed objects paths - for idx, objPath := range objsPaths { - if errs[idx] != nil { - continue - } - output, err := traverse(retainSlash(pathJoin(objPath))) - if err != nil { - errs[idx] = err - continue - } else { - errs[idx] = nil - } - filesToDelete = append(filesToDelete, output...) - for i := 0; i < len(output); i++ { - filesToDeleteObjsIndexes = append(filesToDeleteObjsIndexes, idx) - } - } - - // Reverse the list so remove can succeed - reverseStringSlice(filesToDelete) - - dErrs, err := storage.DeleteFileBulk(volume, filesToDelete) - if err != nil { - return nil, err - } - - // Map files deletion errors to the correspondent objects - for i := range dErrs { - if dErrs[i] != nil { - if errs[filesToDeleteObjsIndexes[i]] != nil { - errs[filesToDeleteObjsIndexes[i]] = dErrs[i] - } - } - } - - return errs, nil -} - // Removes notification.xml for a given bucket, only used during DeleteBucket. func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error { // Verify bucket is valid. diff --git a/cmd/posix-diskid-check.go b/cmd/posix-diskid-check.go index 240a1c156..2b265f220 100644 --- a/cmd/posix-diskid-check.go +++ b/cmd/posix-diskid-check.go @@ -179,6 +179,13 @@ func (p *posixDiskIDCheck) DeleteFileBulk(volume string, paths []string) (errs [ return p.storage.DeleteFileBulk(volume, paths) } +func (p *posixDiskIDCheck) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + if p.isDiskStale() { + return nil, errDiskNotFound + } + return p.storage.DeletePrefixes(volume, paths) +} + func (p *posixDiskIDCheck) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { if p.isDiskStale() { return errDiskNotFound diff --git a/cmd/posix.go b/cmd/posix.go index 4cbdc429f..049961cb3 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -1265,16 +1265,28 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) { }, nil } -// deleteFile deletes a file path if its empty. If it's successfully deleted, -// it will recursively move up the tree, deleting empty parent directories -// until it finds one with files in it. Returns nil for a non-empty directory. -func deleteFile(basePath, deletePath string) error { - if basePath == deletePath { +// deleteFile deletes a file or a directory if its empty unless recursive +// is set to true. If the target is successfully deleted, it will recursively +// move up the tree, deleting empty parent directories until it finds one +// with files in it. Returns nil for a non-empty directory even when +// recursive is set to false. +func deleteFile(basePath, deletePath string, recursive bool) error { + if basePath == "" || deletePath == "" { + return nil + } + basePath = filepath.Clean(basePath) + deletePath = filepath.Clean(deletePath) + if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath { return nil } - // Attempt to remove path. - if err := os.Remove((deletePath)); err != nil { + var err error + if recursive { + err = os.RemoveAll(deletePath) + } else { + err = os.Remove(deletePath) + } + if err != nil { switch { case isSysErrNotEmpty(err): // Ignore errors if the directory is not empty. The server relies on @@ -1297,12 +1309,58 @@ func deleteFile(basePath, deletePath string) error { deletePath = strings.TrimSuffix(deletePath, SlashSeparator) deletePath = slashpath.Dir(deletePath) - // Delete parent directory. Errors for parent directories shouldn't trickle down. - deleteFile(basePath, deletePath) + // Delete parent directory obviously not recursively. Errors for + // parent directories shouldn't trickle down. + deleteFile(basePath, deletePath, false) return nil } +// DeletePrefixes forcibly deletes all the contents of a set of specified paths. +// Parent directories are automatically removed if they become empty. err can +// bil nil while errs can contain some errors for corresponding objects. No error +// is set if a specified prefix path does not exist. +func (s *posix) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + atomic.AddInt32(&s.activeIOCount, 1) + defer func() { + atomic.AddInt32(&s.activeIOCount, -1) + }() + + volumeDir, err := s.getVolDir(volume) + if err != nil { + return nil, err + } + + // Stat a volume entry. + _, err = os.Stat(volumeDir) + if err != nil { + if os.IsNotExist(err) { + return nil, errVolumeNotFound + } else if os.IsPermission(err) { + return nil, errVolumeAccessDenied + } else if isSysErrIO(err) { + return nil, errFaultyDisk + } + return nil, err + } + + errs = make([]error, len(paths)) + // Following code is needed so that we retain SlashSeparator + // suffix if any in path argument. + for idx, path := range paths { + filePath := pathJoin(volumeDir, path) + errs[idx] = checkPathLength(filePath) + if errs[idx] != nil { + continue + } + // Delete file or a directory recursively, delete parent + // directory as well if its empty. + errs[idx] = deleteFile(volumeDir, filePath, true) + } + + return +} + // DeleteFile - delete a file at path. func (s *posix) DeleteFile(volume, path string) (err error) { atomic.AddInt32(&s.activeIOCount, 1) @@ -1316,7 +1374,7 @@ func (s *posix) DeleteFile(volume, path string) (err error) { } // Stat a volume entry. - _, err = os.Stat((volumeDir)) + _, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { return errVolumeNotFound @@ -1331,12 +1389,12 @@ func (s *posix) DeleteFile(volume, path string) (err error) { // Following code is needed so that we retain SlashSeparator suffix if any in // path argument. filePath := pathJoin(volumeDir, path) - if err = checkPathLength((filePath)); err != nil { + if err = checkPathLength(filePath); err != nil { return err } // Delete file and delete parent directory as well if its empty. - return deleteFile(volumeDir, filePath) + return deleteFile(volumeDir, filePath, false) } func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) { @@ -1373,7 +1431,7 @@ func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err continue } // Delete file and delete parent directory as well if its empty. - errs[idx] = deleteFile(volumeDir, filePath) + errs[idx] = deleteFile(volumeDir, filePath, false) } return } @@ -1460,7 +1518,7 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e // Remove parent dir of the source file if empty if parentDir := slashpath.Dir(srcFilePath); isDirEmpty(parentDir) { - deleteFile(srcVolumeDir, parentDir) + deleteFile(srcVolumeDir, parentDir, false) } return nil diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 394c3915c..bcd04e539 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -55,6 +55,7 @@ type StorageAPI interface { StatFile(volume string, path string) (file FileInfo, err error) DeleteFile(volume string, path string) (err error) DeleteFileBulk(volume string, paths []string) (errs []error, err error) + DeletePrefixes(volume string, paths []string) (errs []error, err error) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error // Write all data, syncs the data to disk. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 1e95bfe34..9f5fd138b 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -414,13 +414,54 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) ( respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, &buffer, -1) defer http.DrainBody(respBody) + if err != nil { + return nil, err + } + reader, err := clearLeadingSpaces(respBody) if err != nil { return nil, err } dErrResp := &DeleteFileBulkErrsResp{} - if err = gob.NewDecoder(respBody).Decode(dErrResp); err != nil { + if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil { + return nil, err + } + + for _, dErr := range dErrResp.Errs { + errs = append(errs, toStorageErr(dErr)) + } + + return errs, nil +} + +// DeletePrefixes - deletes prefixes in bulk. +func (client *storageRESTClient) DeletePrefixes(volume string, paths []string) (errs []error, err error) { + if len(paths) == 0 { + return errs, err + } + values := make(url.Values) + values.Set(storageRESTVolume, volume) + + var buffer bytes.Buffer + for _, path := range paths { + buffer.WriteString(path) + buffer.WriteString("\n") + } + + respBody, err := client.call(storageRESTMethodDeletePrefixes, values, &buffer, -1) + defer http.DrainBody(respBody) + if err != nil { + return nil, err + } + + reader, err := clearLeadingSpaces(respBody) + if err != nil { + return nil, err + } + + dErrResp := &DeletePrefixesErrsResp{} + if err = gob.NewDecoder(reader).Decode(dErrResp); err != nil { return nil, err } @@ -443,6 +484,22 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa return err } +// clearLeadingSpaces removes all the first spaces returned from a reader. +func clearLeadingSpaces(r io.Reader) (io.Reader, error) { + reader := bufio.NewReader(r) + for { + b, err := reader.ReadByte() + if err != nil { + return nil, err + } + if b != ' ' { + reader.UnreadByte() + break + } + } + return reader, nil +} + func (client *storageRESTClient) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error { values := make(url.Values) values.Set(storageRESTVolume, volume) @@ -457,16 +514,9 @@ func (client *storageRESTClient) VerifyFile(volume, path string, size int64, alg if err != nil { return err } - reader := bufio.NewReader(respBody) - for { - b, err := reader.ReadByte() - if err != nil { - return err - } - if b != ' ' { - reader.UnreadByte() - break - } + reader, err := clearLeadingSpaces(respBody) + if err != nil { + return err } verifyResp := &VerifyFileResp{} if err = gob.NewDecoder(reader).Decode(verifyResp); err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 597e09101..ace9dc456 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v14" // DeleteFileBulk API change + storageRESTVersion = "v15" // Adding DeletePrefixes API storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -42,6 +42,7 @@ const ( storageRESTMethodWalk = "/walk" storageRESTMethodDeleteFile = "/deletefile" storageRESTMethodDeleteFileBulk = "/deletefilebulk" + storageRESTMethodDeletePrefixes = "/deleteprefixes" storageRESTMethodRenameFile = "/renamefile" storageRESTMethodVerifyFile = "/verifyfile" ) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index c798d5b8e..790acf1cd 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -508,20 +508,69 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http return } + w.Header().Set(xhttp.ContentType, "text/event-stream") + encoder := gob.NewEncoder(w) + doneCh := sendWhiteSpaceToHTTPResponse(w) errs, err := s.storage.DeleteFileBulk(volume, filePaths) + <-doneCh if err != nil { s.writeErrorResponse(w, err) return } - derrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))} + dErrsResp := &DeleteFileBulkErrsResp{Errs: make([]error, len(errs))} for idx, err := range errs { if err != nil { - derrsResp.Errs[idx] = StorageErr(err.Error()) + dErrsResp.Errs[idx] = StorageErr(err.Error()) } } - gob.NewEncoder(w).Encode(derrsResp) + encoder.Encode(dErrsResp) + w.(http.Flusher).Flush() +} + +// DeletePrefixesErrsResp - collection of delete errors +// for bulk prefixes deletes +type DeletePrefixesErrsResp struct { + Errs []error +} + +// DeletePrefixesHandler - delete a set of a prefixes. +func (s *storageRESTServer) DeletePrefixesHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + vars := r.URL.Query() + volume := vars.Get(storageRESTVolume) + + bio := bufio.NewScanner(r.Body) + var prefixes []string + for bio.Scan() { + prefixes = append(prefixes, bio.Text()) + } + + if err := bio.Err(); err != nil { + s.writeErrorResponse(w, err) + return + } + + w.Header().Set(xhttp.ContentType, "text/event-stream") + encoder := gob.NewEncoder(w) + doneCh := sendWhiteSpaceToHTTPResponse(w) + errs, err := s.storage.DeletePrefixes(volume, prefixes) + <-doneCh + if err != nil { + s.writeErrorResponse(w, err) + return + } + + dErrsResp := &DeletePrefixesErrsResp{Errs: make([]error, len(errs))} + for idx, err := range errs { + if err != nil { + dErrsResp.Errs[idx] = StorageErr(err.Error()) + } + } + encoder.Encode(dErrsResp) w.(http.Flusher).Flush() } @@ -665,6 +714,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)). Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeletePrefixes).HandlerFunc(httpTraceHdrs(server.DeletePrefixesHandler)). + Queries(restQueries(storageRESTVolume)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index aea9ac5ee..5a35da219 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "path" + "sync" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" @@ -781,19 +782,26 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects // Initialize list of errors. var opErrs = make([]error, len(disks)) var delObjErrs = make([][]error, len(disks)) + var wg = sync.WaitGroup{} // Remove objects in bulk for each disk - for index, disk := range disks { - if disk == nil { - opErrs[index] = errDiskNotFound + for i, d := range disks { + if d == nil { + opErrs[i] = errDiskNotFound continue } - delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs) - if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound { - opErrs[index] = nil - } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + delObjErrs[index], opErrs[index] = disk.DeletePrefixes(minioMetaTmpBucket, tmpObjs) + if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound { + opErrs[index] = nil + } + }(i, d) } + wg.Wait() + // Return errors if any during deletion if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(disks)/2+1); err != nil { return nil, err @@ -805,9 +813,14 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects continue } listErrs := make([]error, len(disks)) + // Iterate over disks to fetch the error + // of deleting of the current object for i := range delObjErrs { + // delObjErrs[i] is not nil when disks[i] is also not nil if delObjErrs[i] != nil { - listErrs[i] = delObjErrs[i][objIndex] + if delObjErrs[i][objIndex] != errFileNotFound { + listErrs[i] = delObjErrs[i][objIndex] + } } } errs[objIndex] = reduceWriteQuorumErrs(ctx, listErrs, objectOpIgnoredErrs, writeQuorums[objIndex])