mirror of https://github.com/minio/minio.git
fix: DeleteMultipleObjects to finish even if cancelled + concurrent sets (#14038)
* Process sets concurrently. * Disconnect context from request. * Insert context cancellation checks. * errFileNotFound and errFileVersionNotFound are ok, unless creating delete markers.
This commit is contained in:
parent
c27110e37d
commit
0e31cff762
|
@ -576,6 +576,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
|
|||
return
|
||||
}
|
||||
|
||||
// Disable timeouts and cancellation
|
||||
ctx = bgContext(ctx)
|
||||
|
||||
deleteList := toNames(objectsToDelete)
|
||||
dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{
|
||||
Versioned: versioned,
|
||||
|
|
|
@ -911,3 +911,32 @@ func contextCanceled(ctx context.Context) bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// bgContext returns a context that can be used for async operations.
|
||||
// Cancellation/timeouts are removed, so parent cancellations/timeout will
|
||||
// not propagate from parent.
|
||||
// Context values are preserved.
|
||||
// This can be used for goroutines that live beyond the parent context.
|
||||
func bgContext(parent context.Context) context.Context {
|
||||
return bgCtx{parent: parent}
|
||||
}
|
||||
|
||||
type bgCtx struct {
|
||||
parent context.Context
|
||||
}
|
||||
|
||||
func (a bgCtx) Done() <-chan struct{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a bgCtx) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a bgCtx) Deadline() (deadline time.Time, ok bool) {
|
||||
return time.Time{}, false
|
||||
}
|
||||
|
||||
func (a bgCtx) Value(key interface{}) interface{} {
|
||||
return a.parent.Value(key)
|
||||
}
|
||||
|
|
|
@ -1197,6 +1197,12 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
|||
continue
|
||||
}
|
||||
for _, v := range dedupVersions[i].Versions {
|
||||
if err == errFileNotFound || err == errFileVersionNotFound {
|
||||
if !dobjects[v.Idx].DeleteMarker {
|
||||
// Not delete marker, if not found, ok.
|
||||
continue
|
||||
}
|
||||
}
|
||||
delObjErrs[index][v.Idx] = err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -974,17 +974,25 @@ func (s *erasureSets) DeleteObjects(ctx context.Context, bucket string, objects
|
|||
|
||||
// Invoke bulk delete on objects per set and save
|
||||
// the result of the delete operation
|
||||
for _, objsGroup := range objSetMap {
|
||||
set := s.getHashedSet(objsGroup[0].object.ObjectName)
|
||||
dobjects, errs := set.DeleteObjects(ctx, bucket, toNames(objsGroup), opts)
|
||||
for i, obj := range objsGroup {
|
||||
delErrs[obj.origIndex] = errs[i]
|
||||
delObjects[obj.origIndex] = dobjects[i]
|
||||
if errs[i] == nil {
|
||||
auditObjectErasureSet(ctx, obj.object.ObjectName, set)
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
wg.Add(len(objSetMap))
|
||||
for setIdx, objsGroup := range objSetMap {
|
||||
go func(set *erasureObjects, group []delObj) {
|
||||
defer wg.Done()
|
||||
dobjects, errs := set.DeleteObjects(ctx, bucket, toNames(group), opts)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
for i, obj := range group {
|
||||
delErrs[obj.origIndex] = errs[i]
|
||||
delObjects[obj.origIndex] = dobjects[i]
|
||||
if errs[i] == nil {
|
||||
auditObjectErasureSet(ctx, obj.object.ObjectName, set)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(s.sets[setIdx], objsGroup)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
return delObjects, delErrs
|
||||
}
|
||||
|
|
|
@ -159,10 +159,15 @@ func toObjectErr(err error, params ...string) error {
|
|||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
case io.ErrUnexpectedEOF.Error(), io.ErrShortWrite.Error():
|
||||
return IncompleteBody{}
|
||||
case context.Canceled.Error(), context.DeadlineExceeded.Error():
|
||||
return IncompleteBody{}
|
||||
case io.ErrUnexpectedEOF.Error(), io.ErrShortWrite.Error(), context.Canceled.Error(), context.DeadlineExceeded.Error():
|
||||
apiErr := IncompleteBody{}
|
||||
if len(params) >= 1 {
|
||||
apiErr.Bucket = params[0]
|
||||
}
|
||||
if len(params) >= 2 {
|
||||
apiErr.Object = decodeDirObject(params[1])
|
||||
}
|
||||
return apiErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -611,6 +611,9 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
if contextCanceled(ctx) {
|
||||
err = ctx.Err()
|
||||
}
|
||||
for i := range errs {
|
||||
errs[i] = err
|
||||
}
|
||||
|
|
|
@ -943,6 +943,10 @@ func (s *xlStorage) DeleteVersions(ctx context.Context, volume string, versions
|
|||
errs := make([]error, len(versions))
|
||||
|
||||
for i, fiv := range versions {
|
||||
if contextCanceled(ctx) {
|
||||
errs[i] = ctx.Err()
|
||||
continue
|
||||
}
|
||||
if err := s.deleteVersions(ctx, volume, fiv.Name, fiv.Versions...); err != nil {
|
||||
errs[i] = err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue