mirror of https://github.com/minio/minio.git
Improve delete performance by reducing the number of calls (#9092)
- Remove the requirement to honor storage class for deletes - Improve `posix.DeleteFileBulk` code to Stat the volumeDir only once per call, rather than for all object paths.
This commit is contained in:
parent
23a0415eb7
commit
88ae0f1196
33
cmd/posix.go
33
cmd/posix.go
|
@ -1340,9 +1340,40 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
|||
}
|
||||
|
||||
func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
defer func() {
|
||||
atomic.AddInt32(&s.activeIOCount, -1)
|
||||
}()
|
||||
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Stat a volume entry.
|
||||
_, err = os.Stat(volumeDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, errVolumeNotFound
|
||||
} else if os.IsPermission(err) {
|
||||
return nil, errVolumeAccessDenied
|
||||
} else if isSysErrIO(err) {
|
||||
return nil, errFaultyDisk
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
errs = make([]error, len(paths))
|
||||
// Following code is needed so that we retain SlashSeparator
|
||||
// suffix if any in path argument.
|
||||
for idx, path := range paths {
|
||||
errs[idx] = s.DeleteFile(volume, path)
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
errs[idx] = checkPathLength(filePath)
|
||||
if errs[idx] != nil {
|
||||
continue
|
||||
}
|
||||
// Delete file and delete parent directory as well if its empty.
|
||||
errs[idx] = deleteFile(volumeDir, filePath)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -747,32 +747,37 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
|
|||
// object.
|
||||
func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) {
|
||||
var tmpObjs = make([]string, len(objects))
|
||||
disks := xl.getDisks()
|
||||
if bucket == minioMetaTmpBucket {
|
||||
copy(tmpObjs, objects)
|
||||
} else {
|
||||
for i, object := range objects {
|
||||
if errs[i] != nil {
|
||||
for idx := range objects {
|
||||
if errs[idx] != nil {
|
||||
continue
|
||||
}
|
||||
tmpObjs[idx] = mustGetUUID()
|
||||
var err error
|
||||
tmpObjs[i] = mustGetUUID()
|
||||
// Rename the current object while requiring write quorum, but also consider
|
||||
// that a non found object in a given disk as a success since it already
|
||||
// confirms that the object doesn't have a part in that disk (already removed)
|
||||
if isDirs[i] {
|
||||
disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
|
||||
// Rename the current object while requiring
|
||||
// write quorum, but also consider that a non
|
||||
// found object in a given disk as a success
|
||||
// since it already confirms that the object
|
||||
// doesn't have a part in that disk (already removed)
|
||||
if isDirs[idx] {
|
||||
_, err = rename(ctx, xl.getDisks(), bucket, objects[idx],
|
||||
minioMetaTmpBucket, tmpObjs[idx], true, writeQuorums[idx],
|
||||
[]error{errFileNotFound, errFileAccessDenied})
|
||||
} else {
|
||||
disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
|
||||
_, err = rename(ctx, xl.getDisks(), bucket, objects[idx],
|
||||
minioMetaTmpBucket, tmpObjs[idx], true, writeQuorums[idx],
|
||||
[]error{errFileNotFound})
|
||||
}
|
||||
if err != nil {
|
||||
errs[i] = err
|
||||
errs[idx] = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
disks := xl.getDisks()
|
||||
|
||||
// Initialize list of errors.
|
||||
var opErrs = make([]error, len(disks))
|
||||
var delObjErrs = make([][]error, len(disks))
|
||||
|
@ -840,23 +845,17 @@ func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []
|
|||
}
|
||||
}
|
||||
|
||||
for i, object := range objects {
|
||||
for i := range objects {
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
if isObjectDirs[i] {
|
||||
writeQuorums[i] = len(xl.getDisks())/2 + 1
|
||||
} else {
|
||||
var err error
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, readXLErrs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
// get Quorum for this object
|
||||
_, writeQuorums[i], err = objectQuorumFromMeta(ctx, xl, partsMetadata, readXLErrs)
|
||||
if err != nil {
|
||||
errs[i] = toObjectErr(err, bucket, object)
|
||||
continue
|
||||
}
|
||||
}
|
||||
// Assume (N/2 + 1) quorums for all objects
|
||||
// this is a theoretical assumption such that
|
||||
// for delete's we do not need to honor storage
|
||||
// class for objects which have reduced quorum
|
||||
// storage class only needs to be honored for
|
||||
// Read() requests alone which we already do.
|
||||
writeQuorums[i] = len(xl.getDisks())/2 + 1
|
||||
}
|
||||
|
||||
return xl.doDeleteObjects(ctx, bucket, objects, errs, writeQuorums, isObjectDirs)
|
||||
|
|
Loading…
Reference in New Issue