diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index a22d0416e..089bdbf55 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -1791,20 +1791,12 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error { disks := er.getDisks() g := errgroup.WithNErrs(len(disks)) - dirPrefix := encodeDirObject(prefix) for index := range disks { index := index g.Go(func() error { if disks[index] == nil { return nil } - // Deletes - // - The prefix and its children - // - The prefix__XLDIR__ - defer disks[index].Delete(ctx, bucket, dirPrefix, DeleteOptions{ - Recursive: true, - Immediate: true, - }) return disks[index].Delete(ctx, bucket, prefix, DeleteOptions{ Recursive: true, Immediate: true, @@ -1828,9 +1820,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string } if opts.DeletePrefix { - if globalCacheConfig.Enabled() { - return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) - } return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 2d9d63695..f5b0b86e1 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -120,6 +120,8 @@ type xlStorage struct { nrRequests uint64 major, minor uint32 + immediatePurge chan string + // mutex to prevent concurrent read operations overloading walks. rotational bool walkMu *sync.Mutex @@ -221,15 +223,26 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error { // Initialize a new storage disk. func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { - s = &xlStorage{ - drivePath: ep.Path, - endpoint: ep, - globalSync: globalFSOSync, - diskInfoCache: cachevalue.New[DiskInfo](), - poolIndex: -1, - setIndex: -1, - diskIndex: -1, + immediatePurgeQueue := 100000 + if globalIsTesting || globalIsCICD { + immediatePurgeQueue = 1 } + s = &xlStorage{ + drivePath: ep.Path, + endpoint: ep, + globalSync: globalFSOSync, + diskInfoCache: cachevalue.New[DiskInfo](), + poolIndex: -1, + setIndex: -1, + diskIndex: -1, + immediatePurge: make(chan string, immediatePurgeQueue), + } + + defer func() { + if err == nil { + go s.cleanupTrashImmediateCallers(GlobalContext) + } + }() s.drivePath, err = getValidPath(ep.Path) if err != nil { @@ -311,7 +324,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) { // oDirect off. if globalIsErasureSD || !disk.ODirectPlatform { s.oDirect = false - } else if err := s.checkODirectDiskSupport(); err == nil { + } else if err := s.checkODirectDiskSupport(info.FSType); err == nil { s.oDirect = true } else { return s, err @@ -405,11 +418,18 @@ func (s *xlStorage) Healing() *healingTracker { // checkODirectDiskSupport asks the disk to write some data // with O_DIRECT support, return an error if any and return // errUnsupportedDisk if there is no O_DIRECT support -func (s *xlStorage) checkODirectDiskSupport() error { +func (s *xlStorage) checkODirectDiskSupport(fsType string) error { if !disk.ODirectPlatform { return errUnsupportedDisk } + // We know XFS already supports O_DIRECT no need to check. + if fsType == "XFS" { + return nil + } + + // For all other FS pay the price of not using our recommended filesystem. + // Check if backend is writable and supports O_DIRECT uuid := mustGetUUID() filePath := pathJoin(s.drivePath, minioMetaTmpDeletedBucket, ".writable-check-"+uuid+".tmp") @@ -1165,6 +1185,17 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions return errs } +func (s *xlStorage) cleanupTrashImmediateCallers(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case entry := <-s.immediatePurge: + removeAll(entry) + } + } +} + func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) { pathUUID := mustGetUUID() targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID) @@ -1175,6 +1206,13 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) err = Rename(filePath, targetPath) } + var targetPath2 string + if immediatePurge && HasSuffix(filePath, SlashSeparator) { + // With immediate purge also attempt deleting for `__XL_DIR__` folder/directory objects. + targetPath2 = pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, mustGetUUID()) + renameAll(encodeDirObject(filePath), targetPath2, pathutil.Join(s.drivePath, minioMetaBucket)) + } + // ENOSPC is a valid error from rename(); remove instead of rename in that case if errors.Is(err, errDiskFull) || isSysErrNoSpace(err) { if recursive { @@ -1191,7 +1229,21 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) // immediately purge the target if immediatePurge { - removeAll(targetPath) + for _, target := range []string{ + targetPath, + targetPath2, + } { + if target == "" { + continue + } + select { + case s.immediatePurge <- target: + default: + // Too much back pressure, we will perform the delete + // blocking at this point we need to serialize operations. + removeAll(target) + } + } } return nil @@ -2261,10 +2313,10 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate if basePath == "" || deletePath == "" { return nil } - isObjectDir := HasSuffix(deletePath, SlashSeparator) - basePath = pathutil.Clean(basePath) - deletePath = pathutil.Clean(deletePath) - if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath { + + bp := pathutil.Clean(basePath) // do not override basepath / or deletePath / + dp := pathutil.Clean(deletePath) + if !strings.HasPrefix(dp, bp) || dp == bp { return nil } @@ -2279,7 +2331,7 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate case isSysErrNotEmpty(err): // if object is a directory, but if its not empty // return FileNotFound to indicate its an empty prefix. - if isObjectDir { + if HasSuffix(deletePath, SlashSeparator) { return errFileNotFound } // if we have .DS_Store only on macOS @@ -2311,11 +2363,9 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate } } - deletePath = pathutil.Dir(deletePath) - // Delete parent directory obviously not recursively. Errors for // parent directories shouldn't trickle down. - s.deleteFile(basePath, deletePath, false, false) + s.deleteFile(basePath, pathutil.Dir(pathutil.Clean(deletePath)), false, false) return nil }