make immediate purge non-blocking up to 100,000 entries per drive (#19231)

make immediate purge non-blocking upto 100000 entries per drive

Bonus: turn-off O_DIRECT verification when FSType is 'XFS'
This commit is contained in:
Harshavardhana 2024-03-09 18:53:48 -08:00 committed by GitHub
parent 8e2238ea09
commit 88a89213ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 69 additions and 30 deletions

View File

@ -1791,20 +1791,12 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error {
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
dirPrefix := encodeDirObject(prefix)
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return nil
}
// Deletes
// - The prefix and its children
// - The prefix__XLDIR__
defer disks[index].Delete(ctx, bucket, dirPrefix, DeleteOptions{
Recursive: true,
Immediate: true,
})
return disks[index].Delete(ctx, bucket, prefix, DeleteOptions{
Recursive: true,
Immediate: true,
@ -1828,9 +1820,6 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
if opts.DeletePrefix {
if globalCacheConfig.Enabled() {
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
}
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
}

View File

@ -120,6 +120,8 @@ type xlStorage struct {
nrRequests uint64
major, minor uint32
immediatePurge chan string
// mutex to prevent concurrent read operations overloading walks.
rotational bool
walkMu *sync.Mutex
@ -221,15 +223,26 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error {
// Initialize a new storage disk.
func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
s = &xlStorage{
drivePath: ep.Path,
endpoint: ep,
globalSync: globalFSOSync,
diskInfoCache: cachevalue.New[DiskInfo](),
poolIndex: -1,
setIndex: -1,
diskIndex: -1,
immediatePurgeQueue := 100000
if globalIsTesting || globalIsCICD {
immediatePurgeQueue = 1
}
s = &xlStorage{
drivePath: ep.Path,
endpoint: ep,
globalSync: globalFSOSync,
diskInfoCache: cachevalue.New[DiskInfo](),
poolIndex: -1,
setIndex: -1,
diskIndex: -1,
immediatePurge: make(chan string, immediatePurgeQueue),
}
defer func() {
if err == nil {
go s.cleanupTrashImmediateCallers(GlobalContext)
}
}()
s.drivePath, err = getValidPath(ep.Path)
if err != nil {
@ -311,7 +324,7 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
// oDirect off.
if globalIsErasureSD || !disk.ODirectPlatform {
s.oDirect = false
} else if err := s.checkODirectDiskSupport(); err == nil {
} else if err := s.checkODirectDiskSupport(info.FSType); err == nil {
s.oDirect = true
} else {
return s, err
@ -405,11 +418,18 @@ func (s *xlStorage) Healing() *healingTracker {
// checkODirectDiskSupport asks the disk to write some data
// with O_DIRECT support, return an error if any and return
// errUnsupportedDisk if there is no O_DIRECT support
func (s *xlStorage) checkODirectDiskSupport() error {
func (s *xlStorage) checkODirectDiskSupport(fsType string) error {
if !disk.ODirectPlatform {
return errUnsupportedDisk
}
// We know XFS already supports O_DIRECT no need to check.
if fsType == "XFS" {
return nil
}
// For all other FS pay the price of not using our recommended filesystem.
// Check if backend is writable and supports O_DIRECT
uuid := mustGetUUID()
filePath := pathJoin(s.drivePath, minioMetaTmpDeletedBucket, ".writable-check-"+uuid+".tmp")
@ -1165,6 +1185,17 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
return errs
}
func (s *xlStorage) cleanupTrashImmediateCallers(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case entry := <-s.immediatePurge:
removeAll(entry)
}
}
}
func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool) (err error) {
pathUUID := mustGetUUID()
targetPath := pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, pathUUID)
@ -1175,6 +1206,13 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool)
err = Rename(filePath, targetPath)
}
var targetPath2 string
if immediatePurge && HasSuffix(filePath, SlashSeparator) {
// With immediate purge also attempt deleting for `__XL_DIR__` folder/directory objects.
targetPath2 = pathutil.Join(s.drivePath, minioMetaTmpDeletedBucket, mustGetUUID())
renameAll(encodeDirObject(filePath), targetPath2, pathutil.Join(s.drivePath, minioMetaBucket))
}
// ENOSPC is a valid error from rename(); remove instead of rename in that case
if errors.Is(err, errDiskFull) || isSysErrNoSpace(err) {
if recursive {
@ -1191,7 +1229,21 @@ func (s *xlStorage) moveToTrash(filePath string, recursive, immediatePurge bool)
// immediately purge the target
if immediatePurge {
removeAll(targetPath)
for _, target := range []string{
targetPath,
targetPath2,
} {
if target == "" {
continue
}
select {
case s.immediatePurge <- target:
default:
// Too much back pressure, we will perform the delete
// blocking at this point we need to serialize operations.
removeAll(target)
}
}
}
return nil
@ -2261,10 +2313,10 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
if basePath == "" || deletePath == "" {
return nil
}
isObjectDir := HasSuffix(deletePath, SlashSeparator)
basePath = pathutil.Clean(basePath)
deletePath = pathutil.Clean(deletePath)
if !strings.HasPrefix(deletePath, basePath) || deletePath == basePath {
bp := pathutil.Clean(basePath) // do not override basepath / or deletePath /
dp := pathutil.Clean(deletePath)
if !strings.HasPrefix(dp, bp) || dp == bp {
return nil
}
@ -2279,7 +2331,7 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
case isSysErrNotEmpty(err):
// if object is a directory, but if its not empty
// return FileNotFound to indicate its an empty prefix.
if isObjectDir {
if HasSuffix(deletePath, SlashSeparator) {
return errFileNotFound
}
// if we have .DS_Store only on macOS
@ -2311,11 +2363,9 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, immediate
}
}
deletePath = pathutil.Dir(deletePath)
// Delete parent directory obviously not recursively. Errors for
// parent directories shouldn't trickle down.
s.deleteFile(basePath, deletePath, false, false)
s.deleteFile(basePath, pathutil.Dir(pathutil.Clean(deletePath)), false, false)
return nil
}