s3: Force a prefix removal using a special header (#12504)

An S3 client can send `x-minio-force-delete: true` to remove a prefix.
This commit is contained in:
Anis Elleuch 2021-06-16 02:43:14 +01:00 committed by GitHub
parent f30c996d48
commit 7722b91e1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 72 additions and 1 deletions

View File

@ -1014,10 +1014,34 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
return dobjects, errs return dobjects, errs
} }
func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error {
disks := er.getDisks()
g := errgroup.WithNErrs(len(disks))
for index := range disks {
index := index
g.Go(func() error {
if disks[index] == nil {
return nil
}
return disks[index].Delete(ctx, bucket, prefix, true)
}, index)
}
for _, err := range g.Wait() {
if err != nil {
return err
}
}
return nil
}
// DeleteObject - deletes an object, this call doesn't necessary reply // DeleteObject - deletes an object, this call doesn't necessary reply
// any error as it is not necessary for the handler to reply back a // any error as it is not necessary for the handler to reply back a
// response to the client request. // response to the client request.
func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if opts.DeletePrefix {
return ObjectInfo{}, toObjectErr(er.deletePrefix(ctx, bucket, object), bucket, object)
}
versionFound := true versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
goi, gerr := er.GetObjectInfo(ctx, bucket, object, opts) goi, gerr := er.GetObjectInfo(ctx, bucket, object, opts)

View File

@ -793,11 +793,26 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
return z.serverPools[idx].PutObject(ctx, bucket, object, data, opts) return z.serverPools[idx].PutObject(ctx, bucket, object, data, opts)
} }
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for _, zone := range z.serverPools {
_, err := zone.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err
}
}
return nil
}
func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkDelObjArgs(ctx, bucket, object); err != nil { if err = checkDelObjArgs(ctx, bucket, object); err != nil {
return objInfo, err return objInfo, err
} }
if opts.DeletePrefix {
err := z.deletePrefix(ctx, bucket, object)
return ObjectInfo{}, err
}
object = encodeDirObject(object) object = encodeDirObject(object)
if z.SinglePool() { if z.SinglePool() {
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts) return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)

View File

@ -919,10 +919,25 @@ func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string,
return set.GetObjectInfo(ctx, bucket, object, opts) return set.GetObjectInfo(ctx, bucket, object, opts)
} }
func (s *erasureSets) deletePrefix(ctx context.Context, bucket string, prefix string) error {
for _, s := range s.sets {
_, err := s.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err
}
}
return nil
}
// DeleteObject - deletes an object from the hashedSet based on the object name. // DeleteObject - deletes an object from the hashedSet based on the object name.
func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set) auditObjectErasureSet(ctx, object, set)
if opts.DeletePrefix {
err := s.deletePrefix(ctx, bucket, object)
return ObjectInfo{}, err
}
return set.DeleteObject(ctx, bucket, object, opts) return set.DeleteObject(ctx, bucket, object, opts)
} }

View File

@ -58,6 +58,8 @@ type ObjectOptions struct {
ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario ProxyHeaderSet bool // only set for GET/HEAD in active-active replication scenario
ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object. ParentIsObject func(ctx context.Context, bucket, parent string) bool // Used to verify if parent is an object.
DeletePrefix bool // set true to enforce a prefix deletion, only application for DeleteObject API,
// Use the maximum parity (N/2), used when saving server configuration files // Use the maximum parity (N/2), used when saving server configuration files
MaxParity bool MaxParity bool
} }

View File

@ -121,11 +121,21 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec
}, nil }, nil
} }
deletePrefix := false
if d := r.Header.Get(xhttp.MinIOForceDelete); d != "" {
if b, err := strconv.ParseBool(d); err == nil {
deletePrefix = b
} else {
return opts, err
}
}
// default case of passing encryption headers to backend // default case of passing encryption headers to backend
opts, err = getDefaultOpts(r.Header, false, nil) opts, err = getDefaultOpts(r.Header, false, nil)
if err != nil { if err != nil {
return opts, err return opts, err
} }
opts.DeletePrefix = deletePrefix
opts.PartNumber = partNumber opts.PartNumber = partNumber
opts.VersionID = vid opts.VersionID = vid
delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker)) delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker))

View File

@ -22,6 +22,7 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/xml" "encoding/xml"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -3270,6 +3271,10 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
apiErr := ErrNone apiErr := ErrNone
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled { if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled {
if opts.DeletePrefix {
writeErrorResponse(ctx, w, toAPIError(ctx, errors.New("force-delete is forbidden in a locked-enabled bucket")), r.URL, guessIsBrowserReq(r))
return
}
if vID != "" { if vID != "" {
apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{ apiErr = enforceRetentionBypassForDelete(ctx, r, bucket, ObjectToDelete{
ObjectName: object, ObjectName: object,

View File

@ -141,7 +141,7 @@ const (
// Server-Status // Server-Status
MinIOServerStatus = "x-minio-server-status" MinIOServerStatus = "x-minio-server-status"
// Delete special flag to force delete a bucket // Delete special flag to force delete a bucket or a prefix
MinIOForceDelete = "x-minio-force-delete" MinIOForceDelete = "x-minio-force-delete"
// Header indicates if the mtime should be preserved by client // Header indicates if the mtime should be preserved by client