mirror of https://github.com/minio/minio.git
Implement bulk delete (#7607)
Bulk delete at storage level in Multiple Delete Objects API In order to accelerate bulk delete in Multiple Delete objects API, a new bulk delete is introduced in storage layer, which will accept a list of objects to delete rather than only one. Consequently, a new API is also need to be added to Object API.
This commit is contained in:
parent
d9a7f80f68
commit
9c90a28546
|
@ -309,12 +309,19 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
return
|
||||
}
|
||||
|
||||
deleteObject := objectAPI.DeleteObject
|
||||
deleteObjectsFn := objectAPI.DeleteObjects
|
||||
if api.CacheAPI() != nil {
|
||||
deleteObject = api.CacheAPI().DeleteObject
|
||||
deleteObjectsFn = api.CacheAPI().DeleteObjects
|
||||
}
|
||||
|
||||
type delObj struct {
|
||||
origIndex int
|
||||
name string
|
||||
}
|
||||
|
||||
var objectsToDelete []delObj
|
||||
var dErrs = make([]APIErrorCode, len(deleteObjects.Objects))
|
||||
|
||||
for index, object := range deleteObjects.Objects {
|
||||
if dErrs[index] = checkRequestAuthType(ctx, r, policy.DeleteObjectAction, bucket, object.ObjectName); dErrs[index] != ErrNone {
|
||||
if dErrs[index] == ErrSignatureDoesNotMatch || dErrs[index] == ErrInvalidAccessKeyID {
|
||||
|
@ -323,10 +330,26 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
}
|
||||
continue
|
||||
}
|
||||
err := deleteObject(ctx, bucket, object.ObjectName)
|
||||
if err != nil {
|
||||
dErrs[index] = toAPIErrorCode(ctx, err)
|
||||
|
||||
objectsToDelete = append(objectsToDelete, delObj{index, object.ObjectName})
|
||||
}
|
||||
|
||||
toNames := func(input []delObj) (output []string) {
|
||||
output = make([]string, len(input))
|
||||
for i := range input {
|
||||
output[i] = input[i].name
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
errs, err := deleteObjectsFn(ctx, bucket, toNames(objectsToDelete))
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
||||
for i, obj := range objectsToDelete {
|
||||
dErrs[obj.origIndex] = toAPIErrorCode(ctx, errs[i])
|
||||
}
|
||||
|
||||
// Collect deleted objects and errors if any.
|
||||
|
|
|
@ -762,7 +762,7 @@ func testAPIDeleteMultipleObjectsHandler(obj ObjectLayer, instanceType, bucketNa
|
|||
apiRouter.ServeHTTP(rec, req)
|
||||
// Assert the response code with the expected status.
|
||||
if rec.Code != testCase.expectedRespStatus {
|
||||
t.Errorf("Case %d: MinIO %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
t.Errorf("Test %d: MinIO %s: Expected the response status to be `%d`, but instead found `%d`", i+1, instanceType, testCase.expectedRespStatus, rec.Code)
|
||||
}
|
||||
|
||||
// read the response body.
|
||||
|
|
|
@ -62,6 +62,7 @@ type cacheObjects struct {
|
|||
GetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
PutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
DeleteObjectFn func(ctx context.Context, bucket, object string) error
|
||||
DeleteObjectsFn func(ctx context.Context, bucket string, objects []string) ([]error, error)
|
||||
ListObjectsFn func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
|
||||
ListObjectsV2Fn func(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
|
||||
ListBucketsFn func(ctx context.Context) (buckets []BucketInfo, err error)
|
||||
|
@ -94,6 +95,7 @@ type CacheObjectLayer interface {
|
|||
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
DeleteObject(ctx context.Context, bucket, object string) error
|
||||
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
|
||||
|
||||
// Multipart operations.
|
||||
NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error)
|
||||
|
@ -629,6 +631,14 @@ func (c cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (
|
|||
return
|
||||
}
|
||||
|
||||
func (c cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = c.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// Returns true if object should be excluded from cache
|
||||
func (c cacheObjects) isCacheExclude(bucket, object string) bool {
|
||||
for _, pattern := range c.exclude {
|
||||
|
@ -974,6 +984,14 @@ func newServerCacheObjects(config CacheConfig) (CacheObjectLayer, error) {
|
|||
DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
|
||||
return newObjectLayerFn().DeleteObject(ctx, bucket, object)
|
||||
},
|
||||
DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
},
|
||||
|
||||
ListObjectsFn: func(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
|
||||
return newObjectLayerFn().ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
||||
},
|
||||
|
|
10
cmd/fs-v1.go
10
cmd/fs-v1.go
|
@ -953,6 +953,16 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
|
|||
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||
}
|
||||
|
||||
// DeleteObjects - deletes an object from a bucket, this operation is destructive
|
||||
// and there are no rollbacks supported.
|
||||
func (fs *FSObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = fs.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// DeleteObject - deletes an object from a bucket, this operation is destructive
|
||||
// and there are no rollbacks supported.
|
||||
func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) error {
|
||||
|
|
|
@ -896,6 +896,14 @@ func (a *azureObjects) DeleteObject(ctx context.Context, bucket, object string)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *azureObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = a.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads - It's decided not to support List Multipart Uploads, hence returning empty result.
|
||||
func (a *azureObjects) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result minio.ListMultipartsInfo, err error) {
|
||||
// It's decided not to support List Multipart Uploads, hence returning empty result.
|
||||
|
|
|
@ -595,6 +595,14 @@ func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object stri
|
|||
return b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
func (l *b2Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = l.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads lists all multipart uploads.
|
||||
func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string,
|
||||
delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
|
||||
|
|
|
@ -955,6 +955,14 @@ func (l *gcsGateway) DeleteObject(ctx context.Context, bucket string, object str
|
|||
return nil
|
||||
}
|
||||
|
||||
func (l *gcsGateway) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = l.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// NewMultipartUpload - upload object in multiple parts
|
||||
func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key string, o minio.ObjectOptions) (uploadID string, err error) {
|
||||
// generate new uploadid
|
||||
|
|
|
@ -400,6 +400,14 @@ func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string) e
|
|||
return hdfsToObjectErr(ctx, n.deleteObject(minio.PathJoin(hdfsSeparator, bucket), minio.PathJoin(hdfsSeparator, bucket, object)), bucket, object)
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = n.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
||||
objInfo, err := n.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
|
|
|
@ -710,6 +710,14 @@ func (l *ossObjects) DeleteObject(ctx context.Context, bucket, object string) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (l *ossObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = l.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// fromOSSClientListMultipartsInfo converts oss ListMultipartUploadResult to ListMultipartsInfo
|
||||
func fromOSSClientListMultipartsInfo(lmur oss.ListMultipartUploadResult) minio.ListMultipartsInfo {
|
||||
uploads := make([]minio.MultipartInfo, len(lmur.Uploads))
|
||||
|
|
|
@ -500,6 +500,14 @@ func (l *s3Objects) DeleteObject(ctx context.Context, bucket string, object stri
|
|||
return nil
|
||||
}
|
||||
|
||||
func (l *s3Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
for idx, object := range objects {
|
||||
errs[idx] = l.DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// ListMultipartUploads lists all multipart uploads.
|
||||
func (l *s3Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {
|
||||
result, err := l.Client.ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||
|
|
|
@ -167,6 +167,14 @@ func (d *naughtyDisk) DeleteFile(volume string, path string) (err error) {
|
|||
return d.disk.DeleteFile(volume, path)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) DeleteFileBulk(volume string, paths []string) ([]error, error) {
|
||||
errs := make([]error, len(paths))
|
||||
for idx, path := range paths {
|
||||
errs[idx] = d.disk.DeleteFile(volume, path)
|
||||
}
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) WriteAll(volume string, path string, buf []byte) (err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
|
|
|
@ -144,6 +144,78 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string)
|
|||
return err
|
||||
}
|
||||
|
||||
// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend
|
||||
func cleanupObjectsBulk(ctx context.Context, storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) {
|
||||
// The list of files in disk to delete
|
||||
var filesToDelete []string
|
||||
// Map files to delete to the passed objsPaths
|
||||
var filesToDeleteObjsIndexes []int
|
||||
|
||||
// Traverse and return the list of sub entries
|
||||
var traverse func(string) ([]string, error)
|
||||
traverse = func(entryPath string) ([]string, error) {
|
||||
var output = make([]string, 0)
|
||||
if !hasSuffix(entryPath, slashSeparator) {
|
||||
output = append(output, entryPath)
|
||||
return output, nil
|
||||
}
|
||||
entries, err := storage.ListDir(volume, entryPath, -1, "")
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
subEntries, err := traverse(pathJoin(entryPath, entry))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
output = append(output, subEntries...)
|
||||
}
|
||||
return output, nil
|
||||
}
|
||||
|
||||
// Find and collect the list of files to remove associated
|
||||
// to the passed objects paths
|
||||
for idx, objPath := range objsPaths {
|
||||
if errs[idx] != nil {
|
||||
continue
|
||||
}
|
||||
output, err := traverse(objPath)
|
||||
if err != nil {
|
||||
errs[idx] = err
|
||||
continue
|
||||
} else {
|
||||
errs[idx] = nil
|
||||
}
|
||||
filesToDelete = append(filesToDelete, output...)
|
||||
for i := 0; i < len(output); i++ {
|
||||
filesToDeleteObjsIndexes = append(filesToDeleteObjsIndexes, idx)
|
||||
}
|
||||
}
|
||||
|
||||
// Reverse the list so remove can succeed
|
||||
reverseStringSlice(filesToDelete)
|
||||
|
||||
dErrs, err := storage.DeleteFileBulk(volume, filesToDelete)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Map files deletion errors to the correspondent objects
|
||||
for i := range dErrs {
|
||||
if dErrs[i] != nil {
|
||||
if errs[filesToDeleteObjsIndexes[i]] != nil {
|
||||
errs[filesToDeleteObjsIndexes[i]] = dErrs[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// Removes notification.xml for a given bucket, only used during DeleteBucket.
|
||||
func removeNotificationConfig(ctx context.Context, objAPI ObjectLayer, bucket string) error {
|
||||
// Verify bucket is valid.
|
||||
|
|
|
@ -73,6 +73,7 @@ type ObjectLayer interface {
|
|||
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
DeleteObject(ctx context.Context, bucket, object string) error
|
||||
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
|
||||
|
||||
// Multipart operations.
|
||||
ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error)
|
||||
|
|
|
@ -1388,6 +1388,14 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
|
|||
return deleteFile(volumeDir, filePath)
|
||||
}
|
||||
|
||||
func (s *posix) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
|
||||
errs = make([]error, len(paths))
|
||||
for idx, path := range paths {
|
||||
errs[idx] = s.DeleteFile(volume, path)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// RenameFile - rename source path to destination path atomically.
|
||||
func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
defer func() {
|
||||
|
|
|
@ -47,6 +47,7 @@ type StorageAPI interface {
|
|||
RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error
|
||||
StatFile(volume string, path string) (file FileInfo, err error)
|
||||
DeleteFile(volume string, path string) (err error)
|
||||
DeleteFileBulk(volume string, paths []string) (errs []error, err error)
|
||||
|
||||
// Write all data, syncs the data to disk.
|
||||
WriteAll(volume string, path string, buf []byte) (err error)
|
||||
|
|
|
@ -356,6 +356,34 @@ func (client *storageRESTClient) DeleteFile(volume, path string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// DeleteFileBulk - deletes files in bulk.
|
||||
func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
|
||||
errs = make([]error, len(paths))
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
for _, path := range paths {
|
||||
values.Add(storageRESTFilePath, path)
|
||||
}
|
||||
respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, nil, -1)
|
||||
defer http.DrainBody(respBody)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bulkErrs := bulkErrorsResponse{}
|
||||
gob.NewDecoder(respBody).Decode(&bulkErrs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, dErr := range bulkErrs.Errs {
|
||||
errs[i] = toStorageErr(dErr)
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
// RenameFile - renames a file.
|
||||
func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err error) {
|
||||
values := make(url.Values)
|
||||
|
|
|
@ -35,6 +35,7 @@ const (
|
|||
storageRESTMethodReadFileStream = "readfilestream"
|
||||
storageRESTMethodListDir = "listdir"
|
||||
storageRESTMethodDeleteFile = "deletefile"
|
||||
storageRESTMethodDeleteFileBulk = "deletefilebulk"
|
||||
storageRESTMethodRenameFile = "renamefile"
|
||||
storageRESTMethodGetInstanceID = "getinstanceid"
|
||||
)
|
||||
|
|
|
@ -47,6 +47,22 @@ func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error)
|
|||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
type bulkErrorsResponse struct {
|
||||
Errs []error `json:"errors"`
|
||||
}
|
||||
|
||||
func (s *storageRESTServer) writeErrorsResponse(w http.ResponseWriter, errs []error) {
|
||||
resp := bulkErrorsResponse{Errs: make([]error, len(errs))}
|
||||
for idx, err := range errs {
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
resp.Errs[idx] = err
|
||||
}
|
||||
gob.NewEncoder(w).Encode(resp)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// DefaultSkewTime - skew time is 15 minutes between minio peers.
|
||||
const DefaultSkewTime = 15 * time.Minute
|
||||
|
||||
|
@ -391,6 +407,24 @@ func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Req
|
|||
}
|
||||
}
|
||||
|
||||
// DeleteFileBulkHandler - delete a file.
|
||||
func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
vars := r.URL.Query()
|
||||
volume := vars.Get(storageRESTVolume)
|
||||
filePaths := vars[storageRESTFilePath]
|
||||
|
||||
errs, err := s.storage.DeleteFileBulk(volume, filePaths)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.writeErrorsResponse(w, errs)
|
||||
}
|
||||
|
||||
// RenameFileHandler - rename a file.
|
||||
func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
|
@ -447,6 +481,9 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
|
|||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...)
|
||||
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
|
||||
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)).
|
||||
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)
|
||||
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID))
|
||||
|
|
|
@ -1927,6 +1927,10 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
|
|||
|
||||
globalIAMSys = NewIAMSys()
|
||||
globalIAMSys.Init(objLayer)
|
||||
|
||||
globalPolicySys = NewPolicySys()
|
||||
globalPolicySys.Init(objLayer)
|
||||
|
||||
// initialize the server and obtain the credentials and root.
|
||||
// credentials are necessary to sign the HTTP request.
|
||||
if err = newTestConfig(globalMinioDefaultRegion, objLayer); err != nil {
|
||||
|
|
|
@ -474,3 +474,10 @@ func restQueries(keys ...string) []string {
|
|||
}
|
||||
return accumulator
|
||||
}
|
||||
|
||||
// Reverse the input order of a slice of string
|
||||
func reverseStringSlice(input []string) {
|
||||
for left, right := 0, len(input)-1; left < right; left, right = left+1, right-1 {
|
||||
input[left], input[right] = input[right], input[left]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -466,9 +466,14 @@ func hashKey(algo string, key string, cardinality int) int {
|
|||
}
|
||||
}
|
||||
|
||||
// Returns always a same erasure coded set for a given input.
|
||||
func (s *xlSets) getHashedSetIndex(input string) int {
|
||||
return hashKey(s.distributionAlgo, input, len(s.sets))
|
||||
}
|
||||
|
||||
// Returns always a same erasure coded set for a given input.
|
||||
func (s *xlSets) getHashedSet(input string) (set *xlObjects) {
|
||||
return s.sets[hashKey(s.distributionAlgo, input, len(s.sets))]
|
||||
return s.sets[s.getHashedSetIndex(input)]
|
||||
}
|
||||
|
||||
// GetBucketInfo - returns bucket info from one of the erasure coded set.
|
||||
|
@ -618,6 +623,58 @@ func (s *xlSets) DeleteObject(ctx context.Context, bucket string, object string)
|
|||
return s.getHashedSet(object).DeleteObject(ctx, bucket, object)
|
||||
}
|
||||
|
||||
// DeleteObjects - bulk delete of objects
|
||||
// Bulk delete is only possible within one set. For that purpose
|
||||
// objects are group by set first, and then bulk delete is invoked
|
||||
// for each set, the error response of each delete will be returned
|
||||
func (s *xlSets) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
|
||||
type delObj struct {
|
||||
// Set index associated to this object
|
||||
setIndex int
|
||||
// Original index from the list of arguments
|
||||
// where this object is passed
|
||||
origIndex int
|
||||
// Object name
|
||||
name string
|
||||
}
|
||||
|
||||
// Transform []delObj to the list of object names
|
||||
toNames := func(delObjs []delObj) []string {
|
||||
names := make([]string, len(delObjs))
|
||||
for i, obj := range delObjs {
|
||||
names[i] = obj.name
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
// The result of delete operation on all passed objects
|
||||
var delErrs = make([]error, len(objects))
|
||||
|
||||
// A map between a set and its associated objects
|
||||
var objSetMap = make(map[int][]delObj)
|
||||
|
||||
// Group objects by set index
|
||||
for i, object := range objects {
|
||||
index := s.getHashedSetIndex(object)
|
||||
objSetMap[index] = append(objSetMap[index], delObj{setIndex: index, origIndex: i, name: object})
|
||||
}
|
||||
|
||||
// Invoke bulk delete on objects per set and save
|
||||
// the result of the delete operation
|
||||
for _, objsGroup := range objSetMap {
|
||||
errs, err := s.getHashedSet(objsGroup[0].name).DeleteObjects(ctx, bucket, toNames(objsGroup))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, obj := range objsGroup {
|
||||
delErrs[obj.origIndex] = errs[i]
|
||||
}
|
||||
}
|
||||
|
||||
return delErrs, nil
|
||||
}
|
||||
|
||||
// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
|
||||
func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||
srcSet := s.getHashedSet(srcObject)
|
||||
|
|
|
@ -824,6 +824,199 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
|
|||
return reduceWriteQuorumErrs(ctx, dErrs, objectOpIgnoredErrs, writeQuorum)
|
||||
}
|
||||
|
||||
// deleteObject - wrapper for delete object, deletes an object from
|
||||
// all the disks in parallel, including `xl.json` associated with the
|
||||
// object.
|
||||
func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) {
|
||||
var tmpObjs = make([]string, len(objects))
|
||||
var disks = xl.getDisks()
|
||||
|
||||
if bucket == minioMetaTmpBucket {
|
||||
copy(tmpObjs, objects)
|
||||
} else {
|
||||
for i, object := range objects {
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
tmpObjs[i] = mustGetUUID()
|
||||
// Rename the current object while requiring write quorum, but also consider
|
||||
// that a non found object in a given disk as a success since it already
|
||||
// confirms that the object doesn't have a part in that disk (already removed)
|
||||
if isDirs[i] {
|
||||
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
|
||||
[]error{errFileNotFound, errFileAccessDenied})
|
||||
} else {
|
||||
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
|
||||
[]error{errFileNotFound})
|
||||
}
|
||||
if err != nil {
|
||||
errs[i] = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize sync waitgroup.
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Initialize list of errors.
|
||||
var opErrs = make([]error, len(disks))
|
||||
var delObjErrs = make([][]error, len(disks))
|
||||
|
||||
for index, disk := range disks {
|
||||
if disk == nil {
|
||||
opErrs[index] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int, disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(ctx, disk, minioMetaTmpBucket, tmpObjs, errs)
|
||||
if opErrs[index] == errVolumeNotFound {
|
||||
opErrs[index] = nil
|
||||
}
|
||||
}(index, disk)
|
||||
}
|
||||
|
||||
// Wait for all routines to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Return errors if any during deletion
|
||||
if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reduce errors for each object
|
||||
for objIndex := range objects {
|
||||
if errs[objIndex] != nil {
|
||||
continue
|
||||
}
|
||||
listErrs := make([]error, len(xl.getDisks()))
|
||||
for i := range delObjErrs {
|
||||
if delObjErrs[i] != nil {
|
||||
listErrs[i] = delObjErrs[i][objIndex]
|
||||
}
|
||||
}
|
||||
errs[objIndex] = reduceWriteQuorumErrs(ctx, listErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
||||
|
||||
func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
errs := make([]error, len(objects))
|
||||
writeQuorums := make([]int, len(objects))
|
||||
isObjectDirs := make([]bool, len(objects))
|
||||
|
||||
for i, object := range objects {
|
||||
errs[i] = checkDelObjArgs(ctx, bucket, object)
|
||||
}
|
||||
|
||||
var objectLocks = make([]RWLocker, len(objects))
|
||||
|
||||
for i, object := range objects {
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
// Acquire a write lock before deleting the object.
|
||||
objectLocks[i] = xl.nsMutex.NewNSLock(bucket, object)
|
||||
if errs[i] = objectLocks[i].GetLock(globalOperationTimeout); errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
defer objectLocks[i].Unlock()
|
||||
}
|
||||
|
||||
for i, object := range objects {
|
||||
isObjectDirs[i] = hasSuffix(object, slashSeparator)
|
||||
}
|
||||
|
||||
for i, object := range objects {
|
||||
if isObjectDirs[i] {
|
||||
_, err := xl.getObjectInfoDir(ctx, bucket, object)
|
||||
if err == errXLReadQuorum {
|
||||
if isObjectDirDangling(statAllDirs(ctx, xl.getDisks(), bucket, object)) {
|
||||
// If object is indeed dangling, purge it.
|
||||
errs[i] = nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
errs[i] = toObjectErr(err, bucket, object)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i, object := range objects {
|
||||
if errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
if isObjectDirs[i] {
|
||||
writeQuorums[i] = len(xl.getDisks())/2 + 1
|
||||
} else {
|
||||
var err error
|
||||
// Read metadata associated with the object from all disks.
|
||||
partsMetadata, readXLErrs := readAllXLMetadata(ctx, xl.getDisks(), bucket, object)
|
||||
// get Quorum for this object
|
||||
_, writeQuorums[i], err = objectQuorumFromMeta(ctx, xl, partsMetadata, readXLErrs)
|
||||
if err != nil {
|
||||
errs[i] = toObjectErr(err, bucket, object)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return xl.doDeleteObjects(ctx, bucket, objects, errs, writeQuorums, isObjectDirs)
|
||||
}
|
||||
|
||||
// DeleteObjects deletes objects in bulk, this function will still automatically split objects list
|
||||
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
||||
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
||||
func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
|
||||
var (
|
||||
i, start, end int
|
||||
// Deletion result for all objects
|
||||
deleteErrs []error
|
||||
// Object names store will be used to check for object name duplication
|
||||
objectNamesStore = make(map[string]interface{})
|
||||
)
|
||||
|
||||
for {
|
||||
if i >= len(objects) {
|
||||
break
|
||||
}
|
||||
|
||||
object := objects[i]
|
||||
|
||||
_, duplicationFound := objectNamesStore[object]
|
||||
if duplicationFound {
|
||||
end = i - 1
|
||||
} else {
|
||||
objectNamesStore[object] = true
|
||||
end = i
|
||||
}
|
||||
|
||||
if duplicationFound || i == len(objects)-1 {
|
||||
errs, err := xl.deleteObjects(ctx, bucket, objects[start:end+1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deleteErrs = append(deleteErrs, errs...)
|
||||
objectNamesStore = make(map[string]interface{})
|
||||
}
|
||||
|
||||
if duplicationFound {
|
||||
// Avoid to increase the index if object
|
||||
// name is found to be duplicated.
|
||||
start = i
|
||||
end = i
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
return deleteErrs, nil
|
||||
}
|
||||
|
||||
// DeleteObject - deletes an object, this call doesn't necessary reply
|
||||
// any error as it is not necessary for the handler to reply back a
|
||||
// response to the client request.
|
||||
|
|
|
@ -111,6 +111,80 @@ func TestXLDeleteObjectBasic(t *testing.T) {
|
|||
// Cleanup backend directories
|
||||
removeRoots(fsDirs)
|
||||
}
|
||||
|
||||
func TestXLDeleteObjectsXLSet(t *testing.T) {
|
||||
|
||||
var objs []*xlObjects
|
||||
for i := 0; i < 32; i++ {
|
||||
obj, fsDirs, err := prepareXL(16)
|
||||
if err != nil {
|
||||
t.Fatal("Unable to initialize 'XL' object layer.", err)
|
||||
}
|
||||
// Remove all dirs.
|
||||
for _, dir := range fsDirs {
|
||||
defer os.RemoveAll(dir)
|
||||
}
|
||||
objs = append(objs, obj.(*xlObjects))
|
||||
}
|
||||
|
||||
xlSets := &xlSets{sets: objs, distributionAlgo: "CRCMOD"}
|
||||
|
||||
type testCaseType struct {
|
||||
bucket string
|
||||
object string
|
||||
}
|
||||
|
||||
bucketName := "bucket"
|
||||
testCases := []testCaseType{
|
||||
{bucketName, "dir/obj1"},
|
||||
{bucketName, "dir/obj2"},
|
||||
{bucketName, "obj3"},
|
||||
{bucketName, "obj_4"},
|
||||
}
|
||||
|
||||
err := xlSets.MakeBucketWithLocation(context.Background(), bucketName, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
_, err = xlSets.PutObject(context.Background(), testCase.bucket, testCase.object,
|
||||
mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("XL Object upload failed: <ERROR> %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
toObjectNames := func(testCases []testCaseType) []string {
|
||||
names := make([]string, len(testCases))
|
||||
for i := range testCases {
|
||||
names[i] = testCases[i].object
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
objectNames := toObjectNames(testCases)
|
||||
delErrs, err := xlSets.DeleteObjects(context.Background(), bucketName, objectNames)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to call DeleteObjects with the error: `%v`", err)
|
||||
}
|
||||
|
||||
for i := range delErrs {
|
||||
if delErrs[i] != nil {
|
||||
t.Errorf("Failed to remove object `%v` with the error: `%v`", objectNames[i], delErrs[i])
|
||||
}
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
_, statErr := xlSets.GetObjectInfo(context.Background(), test.bucket, test.object, ObjectOptions{})
|
||||
switch statErr.(type) {
|
||||
case ObjectNotFound:
|
||||
default:
|
||||
t.Fatalf("Object %s is not removed", test.bucket+"/"+test.object)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestXLDeleteObjectDiskNotFound(t *testing.T) {
|
||||
// Reset global storage class flags
|
||||
resetGlobalStorageEnvs()
|
||||
|
|
Loading…
Reference in New Issue