purge objects immediately with x-minio-force-delete in DeleteObject and DeleteBucket API (#15148)

This commit is contained in:
Praveen raj Mani
2022-07-11 21:45:54 +05:30
committed by GitHub
parent 00e235a1ee
commit b49fc33cb3
20 changed files with 220 additions and 79 deletions

View File

@@ -818,7 +818,7 @@ func (s *xlStorage) DeleteVol(ctx context.Context, volume string, forceDelete bo
}
if forceDelete {
err = s.moveToTrash(volumeDir, true)
err = s.moveToTrash(volumeDir, true, true)
} else {
err = Remove(volumeDir)
}
@@ -903,7 +903,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
if !isXL2V1Format(buf) {
// Delete the meta file, if there are no more versions the
// top level parent is automatically removed.
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true)
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true, false)
}
var xlMeta xlMetaV2
@@ -939,7 +939,7 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
if err = checkPathLength(filePath); err != nil {
return err
}
if err = s.moveToTrash(filePath, true); err != nil {
if err = s.moveToTrash(filePath, true, false); err != nil {
if err != errFileNotFound {
return err
}
@@ -959,9 +959,9 @@ func (s *xlStorage) deleteVersions(ctx context.Context, volume, path string, fis
}
// Move xl.meta to trash
err = s.moveToTrash(pathJoin(volumeDir, path, xlStorageFormatFile), false)
err = s.moveToTrash(pathJoin(volumeDir, path, xlStorageFormatFile), false, false)
if err == nil || err == errFileNotFound {
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false)
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false, false)
}
return err
}
@@ -985,19 +985,36 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
return errs
}
func (s *xlStorage) moveToTrash(filePath string, recursive bool) error {
func (s *xlStorage) moveToTrash(filePath string, recursive, force bool) error {
pathUUID := mustGetUUID()
targetPath := pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, pathUUID)
var renameFn func(source, target string) error
if recursive {
return renameAll(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, pathUUID))
renameFn = renameAll
} else {
renameFn = Rename
}
return Rename(filePath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, pathUUID))
if err := renameFn(filePath, targetPath); err != nil {
return err
}
// immediately purge the target
if force {
removeAll(targetPath)
}
return nil
}
// DeleteVersion - deletes FileInfo metadata for path at `xl.meta`. forceDelMarker
// will force creating a new `xl.meta` to create a new delete marker
func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi FileInfo, forceDelMarker bool) error {
if HasSuffix(path, SlashSeparator) {
return s.Delete(ctx, volume, path, false)
return s.Delete(ctx, volume, path, DeleteOptions{
Recursive: false,
Force: false,
})
}
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
@@ -1035,7 +1052,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
if !isXL2V1Format(buf) {
// Delete the meta file, if there are no more versions the
// top level parent is automatically removed.
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true)
return s.deleteFile(volumeDir, pathJoin(volumeDir, path), true, false)
}
var xlMeta xlMetaV2
@@ -1065,7 +1082,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
if err = checkPathLength(filePath); err != nil {
return err
}
if err = s.moveToTrash(filePath, true); err != nil {
if err = s.moveToTrash(filePath, true, false); err != nil {
if err != errFileNotFound {
return err
}
@@ -1088,9 +1105,9 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
return err
}
err = s.moveToTrash(filePath, false)
err = s.moveToTrash(filePath, false, false)
if err == nil || err == errFileNotFound {
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false)
s.deleteFile(volumeDir, pathJoin(volumeDir, path), false, false)
}
return err
}
@@ -2014,7 +2031,7 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
// move up the tree, deleting empty parent directories until it finds one
// with files in it. Returns nil for a non-empty directory even when
// recursive is set to false.
func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) error {
func (s *xlStorage) deleteFile(basePath, deletePath string, recursive, force bool) error {
if basePath == "" || deletePath == "" {
return nil
}
@@ -2027,7 +2044,7 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) erro
var err error
if recursive {
err = s.moveToTrash(deletePath, true)
err = s.moveToTrash(deletePath, true, force)
} else {
err = Remove(deletePath)
}
@@ -2060,13 +2077,13 @@ func (s *xlStorage) deleteFile(basePath, deletePath string, recursive bool) erro
// Delete parent directory obviously not recursively. Errors for
// parent directories shouldn't trickle down.
s.deleteFile(basePath, deletePath, false)
s.deleteFile(basePath, deletePath, false, false)
return nil
}
// DeleteFile - delete a file at path.
func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recursive bool) (err error) {
func (s *xlStorage) Delete(ctx context.Context, volume string, path string, deleteOpts DeleteOptions) (err error) {
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
@@ -2092,7 +2109,7 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
}
// Delete file and delete parent directory as well if it's empty.
return s.deleteFile(volumeDir, filePath, recursive)
return s.deleteFile(volumeDir, filePath, deleteOpts.Recursive, deleteOpts.Force)
}
func skipAccessChecks(volume string) (ok bool) {
@@ -2270,7 +2287,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// legacy data dir means its old content, honor system umask.
if err = mkdirAll(legacyDataPath, 0o777); err != nil {
// any failed mkdir-calls delete them.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return osErrToFileErr(err)
}
@@ -2282,7 +2299,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
return osErrToFileErr(err)
}
@@ -2327,7 +2344,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if err = xlMeta.AddVersion(fi); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
return err
}
@@ -2338,7 +2355,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
logger.LogIf(ctx, err)
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
return errFileCorrupt
}
@@ -2347,7 +2364,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
return osErrToFileErr(err)
}
@@ -2355,19 +2372,19 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// renameAll only for objects that have xl.meta not saved inline.
if len(fi.Data) == 0 && fi.Size > 0 {
s.moveToTrash(dstDataPath, true)
s.moveToTrash(dstDataPath, true, false)
if healing {
// If we are healing we should purge any legacyDataPath content,
// that was previously preserved during PutObject() call
// on a versioned bucket.
s.moveToTrash(legacyDataPath, true)
s.moveToTrash(legacyDataPath, true, false)
}
if err = renameAll(srcDataPath, dstDataPath); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
s.deleteFile(dstVolumeDir, dstDataPath, false)
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
return osErrToFileErr(err)
}
}
@@ -2376,9 +2393,9 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
if err = renameAll(srcFilePath, dstFilePath); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
s.deleteFile(dstVolumeDir, dstFilePath, false)
s.deleteFile(dstVolumeDir, dstFilePath, false, false)
return osErrToFileErr(err)
}
@@ -2386,16 +2403,16 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
// movement, this is to ensure that previous data references can co-exist for
// any recoverability.
if oldDstDataPath != "" {
s.moveToTrash(oldDstDataPath, true)
s.moveToTrash(oldDstDataPath, true, false)
}
} else {
// Write meta-file directly, no data
if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil {
if legacyPreserved {
// Any failed rename calls un-roll previous transaction.
s.deleteFile(dstVolumeDir, legacyDataPath, true)
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
}
s.deleteFile(dstVolumeDir, dstFilePath, false)
s.deleteFile(dstVolumeDir, dstFilePath, false, false)
return err
}
}
@@ -2484,7 +2501,7 @@ func (s *xlStorage) RenameFile(ctx context.Context, srcVolume, srcPath, dstVolum
// Remove parent dir of the source file if empty
parentDir := pathutil.Dir(srcFilePath)
s.deleteFile(srcVolumeDir, parentDir, false)
s.deleteFile(srcVolumeDir, parentDir, false, false)
return nil
}