From 2dc7ecc59b13074ae4abc950ac157129277290c1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 2 Sep 2016 01:59:08 -0700 Subject: [PATCH] notifications: Fix bucket notifications for DeleteMultipleObjects. (#2609) Now reports bucket notifications for DeleteMultipleObjects API. Also deletes multiple objects in parallel. --- cmd/bucket-handlers.go | 68 +++++++++++++++++++++++++++++++----------- cmd/server_test.go | 64 +++++++++++++++++++++++++++++++++++++++ cmd/test-utils_test.go | 7 +++++ 3 files changed, 122 insertions(+), 17 deletions(-) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 4115e62bf..8ccae0849 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -23,6 +23,7 @@ import ( "net/url" "path" "strings" + "sync" mux "github.com/gorilla/mux" "github.com/minio/minio-go/pkg/set" @@ -240,24 +241,41 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, return } - var deleteErrors []DeleteError - var deletedObjects []ObjectIdentifier - // Loop through all the objects and delete them sequentially. - for _, object := range deleteObjects.Objects { - err := api.ObjectAPI.DeleteObject(bucket, object.ObjectName) - if err == nil { - deletedObjects = append(deletedObjects, ObjectIdentifier{ - ObjectName: object.ObjectName, - }) - } else { - errorIf(err, "Unable to delete object.") - deleteErrors = append(deleteErrors, DeleteError{ - Code: errorCodeResponse[toAPIErrorCode(err)].Code, - Message: errorCodeResponse[toAPIErrorCode(err)].Description, - Key: object.ObjectName, - }) - } + var wg = &sync.WaitGroup{} // Allocate a new wait group. + var dErrs = make([]error, len(deleteObjects.Objects)) + + // Delete all requested objects in parallel. + for index, object := range deleteObjects.Objects { + wg.Add(1) + go func(i int, obj ObjectIdentifier) { + defer wg.Done() + dErr := api.ObjectAPI.DeleteObject(bucket, obj.ObjectName) + if dErr != nil { + dErrs[i] = dErr + } + }(index, object) } + wg.Wait() + + // Collect deleted objects and errors if any. + var deletedObjects []ObjectIdentifier + var deleteErrors []DeleteError + for index, err := range dErrs { + object := deleteObjects.Objects[index] + // Success deleted objects are collected separately. + if err == nil { + deletedObjects = append(deletedObjects, object) + continue + } + errorIf(err, "Unable to delete object. %s", object.ObjectName) + // Error during delete should be collected separately. + deleteErrors = append(deleteErrors, DeleteError{ + Code: errorCodeResponse[toAPIErrorCode(err)].Code, + Message: errorCodeResponse[toAPIErrorCode(err)].Description, + Key: object.ObjectName, + }) + } + // Generate response response := generateMultiDeleteResponse(deleteObjects.Quiet, deletedObjects, deleteErrors) encodedSuccessResponse := encodeResponse(response) @@ -265,6 +283,22 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, setCommonHeaders(w) // Write success response. writeSuccessResponse(w, encodedSuccessResponse) + + if eventN.IsBucketNotificationSet(bucket) { + // Notify deleted event for objects. + for _, dobj := range deletedObjects { + eventNotify(eventData{ + Type: ObjectRemovedDelete, + Bucket: bucket, + ObjInfo: ObjectInfo{ + Name: dobj.ObjectName, + }, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) + } + } } // PutBucketHandler - PUT Bucket diff --git a/cmd/server_test.go b/cmd/server_test.go index 78784ba54..7a9596ccb 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -309,6 +309,70 @@ func (s *TestSuiteCommon) TestDeleteBucketNotEmpty(c *C) { } +func (s *TestSuiteCommon) TestDeleteMultipleObjects(c *C) { + // generate a random bucket name. + bucketName := getRandomBucketName() + // HTTP request to create the bucket. + request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client := http.Client{} + // execute the request. + response, err := client.Do(request) + c.Assert(err, IsNil) + // assert the http response status code. + c.Assert(response.StatusCode, Equals, http.StatusOK) + + objectName := "prefix/myobject" + delObjReq := DeleteObjectsRequest{ + Quiet: false, + } + for i := 0; i < 10; i++ { + // Obtain http request to upload object. + // object Name contains a prefix. + objName := fmt.Sprintf("%d/%s", i, objectName) + request, err = newTestSignedRequest("PUT", getPutObjectURL(s.endPoint, bucketName, objName), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the http request. + response, err = client.Do(request) + c.Assert(err, IsNil) + // assert the status of http response. + c.Assert(response.StatusCode, Equals, http.StatusOK) + // Append all objects. + delObjReq.Objects = append(delObjReq.Objects, ObjectIdentifier{ + ObjectName: objName, + }) + } + + // Marshal delete request. + deleteReqBytes, err := xml.Marshal(delObjReq) + c.Assert(err, IsNil) + + // object name was "prefix/myobject", an attempt to delelte "prefix" + // Should not delete "prefix/myobject" + request, err = newTestSignedRequest("POST", getMultiDeleteObjectURL(s.endPoint, bucketName), + int64(len(deleteReqBytes)), bytes.NewReader(deleteReqBytes), s.accessKey, s.secretKey) + c.Assert(err, IsNil) + client = http.Client{} + response, err = client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + var deleteResp = DeleteObjectsResponse{} + delRespBytes, err := ioutil.ReadAll(response.Body) + c.Assert(err, IsNil) + err = xml.Unmarshal(delRespBytes, &deleteResp) + c.Assert(err, IsNil) + for i := 0; i < 10; i++ { + c.Assert(deleteResp.DeletedObjects[i], DeepEquals, delObjReq.Objects[i]) + } + c.Assert(len(deleteResp.Errors), Equals, 0) +} + // Tests delete object responses and success. func (s *TestSuiteCommon) TestDeleteObject(c *C) { // generate a random bucket name. diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index dcd6663a6..2f20dae84 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -623,6 +623,13 @@ func getDeleteObjectURL(endPoint, bucketName, objectName string) string { return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{}) } +// return URL for deleting multiple objects from a bucket. +func getMultiDeleteObjectURL(endPoint, bucketName string) string { + queryValue := url.Values{} + queryValue.Set("delete", "") + return makeTestTargetURL(endPoint, bucketName, "", queryValue) +} + // return URL for HEAD on the object. func getHeadObjectURL(endPoint, bucketName, objectName string) string { return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{})