notifications: Fix bucket notifications for DeleteMultipleObjects. (#2609)

Now reports bucket notifications for DeleteMultipleObjects API.
Also deletes multiple objects in parallel.
This commit is contained in:
Harshavardhana 2016-09-02 01:59:08 -07:00 committed by GitHub
parent 600551feb9
commit 2dc7ecc59b
3 changed files with 122 additions and 17 deletions

View File

@ -23,6 +23,7 @@ import (
"net/url"
"path"
"strings"
"sync"
mux "github.com/gorilla/mux"
"github.com/minio/minio-go/pkg/set"
@ -240,24 +241,41 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
return
}
var deleteErrors []DeleteError
var deletedObjects []ObjectIdentifier
// Loop through all the objects and delete them sequentially.
for _, object := range deleteObjects.Objects {
err := api.ObjectAPI.DeleteObject(bucket, object.ObjectName)
if err == nil {
deletedObjects = append(deletedObjects, ObjectIdentifier{
ObjectName: object.ObjectName,
})
} else {
errorIf(err, "Unable to delete object.")
deleteErrors = append(deleteErrors, DeleteError{
Code: errorCodeResponse[toAPIErrorCode(err)].Code,
Message: errorCodeResponse[toAPIErrorCode(err)].Description,
Key: object.ObjectName,
})
}
var wg = &sync.WaitGroup{} // Allocate a new wait group.
var dErrs = make([]error, len(deleteObjects.Objects))
// Delete all requested objects in parallel.
for index, object := range deleteObjects.Objects {
wg.Add(1)
go func(i int, obj ObjectIdentifier) {
defer wg.Done()
dErr := api.ObjectAPI.DeleteObject(bucket, obj.ObjectName)
if dErr != nil {
dErrs[i] = dErr
}
}(index, object)
}
wg.Wait()
// Collect deleted objects and errors if any.
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
for index, err := range dErrs {
object := deleteObjects.Objects[index]
// Success deleted objects are collected separately.
if err == nil {
deletedObjects = append(deletedObjects, object)
continue
}
errorIf(err, "Unable to delete object. %s", object.ObjectName)
// Error during delete should be collected separately.
deleteErrors = append(deleteErrors, DeleteError{
Code: errorCodeResponse[toAPIErrorCode(err)].Code,
Message: errorCodeResponse[toAPIErrorCode(err)].Description,
Key: object.ObjectName,
})
}
// Generate response
response := generateMultiDeleteResponse(deleteObjects.Quiet, deletedObjects, deleteErrors)
encodedSuccessResponse := encodeResponse(response)
@ -265,6 +283,22 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
setCommonHeaders(w)
// Write success response.
writeSuccessResponse(w, encodedSuccessResponse)
if eventN.IsBucketNotificationSet(bucket) {
// Notify deleted event for objects.
for _, dobj := range deletedObjects {
eventNotify(eventData{
Type: ObjectRemovedDelete,
Bucket: bucket,
ObjInfo: ObjectInfo{
Name: dobj.ObjectName,
},
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
}
}
// PutBucketHandler - PUT Bucket

View File

@ -309,6 +309,70 @@ func (s *TestSuiteCommon) TestDeleteBucketNotEmpty(c *C) {
}
func (s *TestSuiteCommon) TestDeleteMultipleObjects(c *C) {
// generate a random bucket name.
bucketName := getRandomBucketName()
// HTTP request to create the bucket.
request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName),
0, nil, s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client := http.Client{}
// execute the request.
response, err := client.Do(request)
c.Assert(err, IsNil)
// assert the http response status code.
c.Assert(response.StatusCode, Equals, http.StatusOK)
objectName := "prefix/myobject"
delObjReq := DeleteObjectsRequest{
Quiet: false,
}
for i := 0; i < 10; i++ {
// Obtain http request to upload object.
// object Name contains a prefix.
objName := fmt.Sprintf("%d/%s", i, objectName)
request, err = newTestSignedRequest("PUT", getPutObjectURL(s.endPoint, bucketName, objName),
0, nil, s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
// execute the http request.
response, err = client.Do(request)
c.Assert(err, IsNil)
// assert the status of http response.
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Append all objects.
delObjReq.Objects = append(delObjReq.Objects, ObjectIdentifier{
ObjectName: objName,
})
}
// Marshal delete request.
deleteReqBytes, err := xml.Marshal(delObjReq)
c.Assert(err, IsNil)
// object name was "prefix/myobject", an attempt to delelte "prefix"
// Should not delete "prefix/myobject"
request, err = newTestSignedRequest("POST", getMultiDeleteObjectURL(s.endPoint, bucketName),
int64(len(deleteReqBytes)), bytes.NewReader(deleteReqBytes), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
var deleteResp = DeleteObjectsResponse{}
delRespBytes, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
err = xml.Unmarshal(delRespBytes, &deleteResp)
c.Assert(err, IsNil)
for i := 0; i < 10; i++ {
c.Assert(deleteResp.DeletedObjects[i], DeepEquals, delObjReq.Objects[i])
}
c.Assert(len(deleteResp.Errors), Equals, 0)
}
// Tests delete object responses and success.
func (s *TestSuiteCommon) TestDeleteObject(c *C) {
// generate a random bucket name.

View File

@ -623,6 +623,13 @@ func getDeleteObjectURL(endPoint, bucketName, objectName string) string {
return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{})
}
// return URL for deleting multiple objects from a bucket.
func getMultiDeleteObjectURL(endPoint, bucketName string) string {
queryValue := url.Values{}
queryValue.Set("delete", "")
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
}
// return URL for HEAD on the object.
func getHeadObjectURL(endPoint, bucketName, objectName string) string {
return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{})