fix: multi-delete API write quorum failures (#8926)

multi-delete API failed with write quorum errors
under following situations

- list of files requested for delete doesn't exist
  anymore can lead to quorum errors and failure
- due to usage of query param for paths, for really
  long paths MinIO server rejects these requests as
  malformed as unexpected.

This was reproduced with warp
This commit is contained in:
Harshavardhana 2020-02-02 07:41:29 +05:30 committed by GitHub
parent 7432b5c9b2
commit 7ce63b3078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 13 deletions

View File

@ -245,7 +245,7 @@ func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte)
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
reader := bytes.NewBuffer(buffer) reader := bytes.NewReader(buffer)
respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1) respBody, err := client.call(storageRESTMethodAppendFile, values, reader, -1)
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
return err return err
@ -405,10 +405,14 @@ func (client *storageRESTClient) DeleteFileBulk(volume string, paths []string) (
} }
values := make(url.Values) values := make(url.Values)
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
var buffer bytes.Buffer
for _, path := range paths { for _, path := range paths {
values.Add(storageRESTFilePath, path) buffer.WriteString(path)
buffer.WriteString("\n")
} }
respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, nil, -1)
respBody, err := client.call(storageRESTMethodDeleteFileBulk, values, &buffer, -1)
defer http.DrainBody(respBody) defer http.DrainBody(respBody)
if err != nil { if err != nil {

View File

@ -17,7 +17,7 @@
package cmd package cmd
const ( const (
storageRESTVersion = "v13" // Introduced StorageErr error type. storageRESTVersion = "v14" // DeleteFileBulk API change
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage" storageRESTPrefix = minioReservedBucketPath + "/storage"
) )

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"bufio"
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
@ -502,7 +503,17 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http
} }
vars := r.URL.Query() vars := r.URL.Query()
volume := vars.Get(storageRESTVolume) volume := vars.Get(storageRESTVolume)
filePaths := vars[storageRESTFilePath]
bio := bufio.NewScanner(r.Body)
var filePaths []string
for bio.Scan() {
filePaths = append(filePaths, bio.Text())
}
if err := bio.Err(); err != nil {
s.writeErrorResponse(w, err)
return
}
errs, err := s.storage.DeleteFileBulk(volume, filePaths) errs, err := s.storage.DeleteFileBulk(volume, filePaths)
if err != nil { if err != nil {
@ -664,7 +675,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)). subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler)).
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...) Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)

View File

@ -738,8 +738,7 @@ func (xl xlObjects) deleteObject(ctx context.Context, bucket, object string, wri
// object. // object.
func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) { func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects []string, errs []error, writeQuorums []int, isDirs []bool) ([]error, error) {
var tmpObjs = make([]string, len(objects)) var tmpObjs = make([]string, len(objects))
var disks = xl.getDisks() disks := xl.getDisks()
if bucket == minioMetaTmpBucket { if bucket == minioMetaTmpBucket {
copy(tmpObjs, objects) copy(tmpObjs, objects)
} else { } else {
@ -753,10 +752,10 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
// that a non found object in a given disk as a success since it already // that a non found object in a given disk as a success since it already
// confirms that the object doesn't have a part in that disk (already removed) // confirms that the object doesn't have a part in that disk (already removed)
if isDirs[i] { if isDirs[i] {
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
[]error{errFileNotFound, errFileAccessDenied}) []error{errFileNotFound, errFileAccessDenied})
} else { } else {
disks, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i], disks, err = rename(ctx, disks, bucket, object, minioMetaTmpBucket, tmpObjs[i], true, writeQuorums[i],
[]error{errFileNotFound}) []error{errFileNotFound})
} }
if err != nil { if err != nil {
@ -776,13 +775,13 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
continue continue
} }
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs) delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs)
if opErrs[index] == errVolumeNotFound { if opErrs[index] == errVolumeNotFound || opErrs[index] == errFileNotFound {
opErrs[index] = nil opErrs[index] = nil
} }
} }
// Return errors if any during deletion // Return errors if any during deletion
if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(xl.getDisks())/2+1); err != nil { if err := reduceWriteQuorumErrs(ctx, opErrs, objectOpIgnoredErrs, len(disks)/2+1); err != nil {
return nil, err return nil, err
} }
@ -791,7 +790,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
if errs[objIndex] != nil { if errs[objIndex] != nil {
continue continue
} }
listErrs := make([]error, len(xl.getDisks())) listErrs := make([]error, len(disks))
for i := range delObjErrs { for i := range delObjErrs {
if delObjErrs[i] != nil { if delObjErrs[i] != nil {
listErrs[i] = delObjErrs[i][objIndex] listErrs[i] = delObjErrs[i][objIndex]