mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Add bucket-notification-handler tests (#2750)
This commit is contained in:
parent
90417d2dd6
commit
559ad38b8c
@ -1,10 +1,14 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,7 +23,7 @@ func (f *flushWriter) Write(b []byte) (n int, err error) { return f.Writer.Write
|
|||||||
func (f *flushWriter) Header() http.Header { return http.Header{} }
|
func (f *flushWriter) Header() http.Header { return http.Header{} }
|
||||||
func (f *flushWriter) WriteHeader(code int) {}
|
func (f *flushWriter) WriteHeader(code int) {}
|
||||||
|
|
||||||
func newFlushWriter(writer io.Writer) *flushWriter {
|
func newFlushWriter(writer io.Writer) http.ResponseWriter {
|
||||||
return &flushWriter{writer}
|
return &flushWriter{writer}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,7 +39,7 @@ func TestWriteNotification(t *testing.T) {
|
|||||||
var buffer bytes.Buffer
|
var buffer bytes.Buffer
|
||||||
// Collection of test cases for each event writer.
|
// Collection of test cases for each event writer.
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
writer *flushWriter
|
writer http.ResponseWriter
|
||||||
event map[string][]NotificationEvent
|
event map[string][]NotificationEvent
|
||||||
err error
|
err error
|
||||||
}{
|
}{
|
||||||
@ -88,3 +92,198 @@ func TestWriteNotification(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSendBucketNotification(t *testing.T) {
|
||||||
|
// Initialize a new test config.
|
||||||
|
root, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to initialize test config %s", err)
|
||||||
|
}
|
||||||
|
defer removeAll(root)
|
||||||
|
|
||||||
|
eventCh := make(chan []NotificationEvent)
|
||||||
|
|
||||||
|
// Create a Pipe with FlushWriter on the write-side and bufio.Scanner
|
||||||
|
// on the reader-side to receive notification over the listen channel in a
|
||||||
|
// synchronized manner.
|
||||||
|
pr, pw := io.Pipe()
|
||||||
|
fw := newFlushWriter(pw)
|
||||||
|
scanner := bufio.NewScanner(pr)
|
||||||
|
// Start a go-routine to wait for notification events.
|
||||||
|
go func(listenerCh <-chan []NotificationEvent) {
|
||||||
|
sendBucketNotification(fw, listenerCh)
|
||||||
|
}(eventCh)
|
||||||
|
|
||||||
|
// Construct notification events to be passed on the events channel.
|
||||||
|
var events []NotificationEvent
|
||||||
|
evTypes := []EventName{
|
||||||
|
ObjectCreatedPut,
|
||||||
|
ObjectCreatedPost,
|
||||||
|
ObjectCreatedCopy,
|
||||||
|
ObjectCreatedCompleteMultipartUpload,
|
||||||
|
}
|
||||||
|
for _, evType := range evTypes {
|
||||||
|
events = append(events, newNotificationEvent(eventData{
|
||||||
|
Type: evType,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
// Send notification events to the channel on which sendBucketNotification
|
||||||
|
// is waiting on.
|
||||||
|
eventCh <- events
|
||||||
|
|
||||||
|
// Read from the pipe connected to the ResponseWriter.
|
||||||
|
scanner.Scan()
|
||||||
|
notificationBytes := scanner.Bytes()
|
||||||
|
|
||||||
|
// Close the read-end and send an empty notification event on the channel
|
||||||
|
// to signal sendBucketNotification to terminate.
|
||||||
|
pr.Close()
|
||||||
|
eventCh <- []NotificationEvent{}
|
||||||
|
close(eventCh)
|
||||||
|
|
||||||
|
// Checking if the notification are the same as those sent over the channel.
|
||||||
|
var notifications map[string][]NotificationEvent
|
||||||
|
err = json.Unmarshal(notificationBytes, ¬ifications)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to Unmarshal notification")
|
||||||
|
}
|
||||||
|
records := notifications["Records"]
|
||||||
|
for i, rec := range records {
|
||||||
|
if rec.EventName == evTypes[i].String() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t.Errorf("Failed to receive %d event %s", i, evTypes[i].String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initMockEventNotifier(objAPI ObjectLayer) error {
|
||||||
|
if objAPI == nil {
|
||||||
|
return errInvalidArgument
|
||||||
|
}
|
||||||
|
globalEventNotifier = &eventNotifier{
|
||||||
|
rwMutex: &sync.RWMutex{},
|
||||||
|
queueTargets: nil,
|
||||||
|
notificationConfigs: make(map[string]*notificationConfig),
|
||||||
|
snsTargets: make(map[string][]chan []NotificationEvent),
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func testGetBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
|
||||||
|
// get random bucket name.
|
||||||
|
bucketName := getRandomBucketName()
|
||||||
|
|
||||||
|
// Create bucket to add notification config for.
|
||||||
|
err := obj.MakeBucket(bucketName)
|
||||||
|
if err != nil {
|
||||||
|
// failed to create newbucket, abort.
|
||||||
|
t.Fatalf("%s : %s", instanceType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
noNotificationBucket := "nonotification"
|
||||||
|
err = obj.MakeBucket(noNotificationBucket)
|
||||||
|
if err != nil {
|
||||||
|
// failed to create newbucket, abort.
|
||||||
|
t.Fatalf("%s : %s", instanceType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the API end points with XL/FS object layer.
|
||||||
|
apiRouter := initTestAPIEndPoints(obj, []string{
|
||||||
|
"GetBucketNotificationHandler",
|
||||||
|
"PutBucketNotificationHandler",
|
||||||
|
})
|
||||||
|
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
rootPath, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Init Test config failed")
|
||||||
|
}
|
||||||
|
// remove the root folder after the test ends.
|
||||||
|
defer removeAll(rootPath)
|
||||||
|
|
||||||
|
credentials := serverConfig.GetCredential()
|
||||||
|
|
||||||
|
//Initialize global event notifier with mock queue targets.
|
||||||
|
err = initMockEventNotifier(obj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test %s: Failed to initialize mock event notifier %v",
|
||||||
|
instanceType, err)
|
||||||
|
}
|
||||||
|
// Initialize httptest recorder.
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
|
// Initialize sample bucket notification config.
|
||||||
|
sampleNotificationConfig := []byte("<NotificationConfiguration><TopicConfiguration>" +
|
||||||
|
"<Event>s3:ObjectCreated:*</Event><Event>s3:ObjectRemoved:*</Event><Filter>" +
|
||||||
|
"<S3Key></S3Key></Filter><Id></Id><Topic>arn:minio:sns:us-east-1:1474332374:listen</Topic>" +
|
||||||
|
"</TopicConfiguration></NotificationConfiguration>")
|
||||||
|
|
||||||
|
// Prepare notification config for one of the test cases.
|
||||||
|
req, err := newTestSignedRequest("PUT", getPutBucketNotificationURL("", bucketName),
|
||||||
|
int64(len(sampleNotificationConfig)), bytes.NewReader(sampleNotificationConfig),
|
||||||
|
credentials.AccessKeyID, credentials.SecretAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test %d %s: Failed to create HTTP request for PutBucketNotification: <ERROR> %v",
|
||||||
|
1, instanceType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiRouter.ServeHTTP(rec, req)
|
||||||
|
// Test 1: Check if we get back sample notification.
|
||||||
|
req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", bucketName),
|
||||||
|
int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: <ERROR> %v",
|
||||||
|
1, instanceType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiRouter.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: <ERROR> %v",
|
||||||
|
2, instanceType, rec.Code, err)
|
||||||
|
}
|
||||||
|
rspBytes, err := ioutil.ReadAll(rec.Body)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Test %d: %s: Failed to read response body: <ERROR> %v", 1, instanceType, err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(rspBytes, sampleNotificationConfig) {
|
||||||
|
t.Errorf("Test %d: %s: Notification config doesn't match expected value: <ERROR> %v", 2, instanceType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 2: Try getting bucket notification on a non-existent bucket.
|
||||||
|
invalidBucketName := "Invalid_BucketName"
|
||||||
|
req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", invalidBucketName),
|
||||||
|
int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: <ERROR> %v",
|
||||||
|
2, instanceType, err)
|
||||||
|
}
|
||||||
|
apiRouter.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: <ERROR> %v",
|
||||||
|
2, instanceType, rec.Code, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 3: Try getting bucket notification for a bucket with notification set.
|
||||||
|
emptyNotificationXML := []byte("<NotificationConfiguration></NotificationConfiguration>" +
|
||||||
|
"<NotificationConfiguration></NotificationConfiguration>")
|
||||||
|
req, err = newTestSignedRequest("GET", getGetBucketNotificationURL("", noNotificationBucket),
|
||||||
|
int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Test %d: %s: Failed to create HTTP request for GetBucketNotification: <ERROR> %v",
|
||||||
|
3, instanceType, err)
|
||||||
|
}
|
||||||
|
apiRouter.ServeHTTP(rec, req)
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("Test %d: %s: GetBucketNotification request failed with %d: <ERROR> %v",
|
||||||
|
3, instanceType, rec.Code, err)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(rec.Body.Bytes(), emptyNotificationXML) {
|
||||||
|
t.Errorf("Test %d: %s: GetBucketNotification request received notification "+
|
||||||
|
"config different from empty config: <ERROR> %v", 3, instanceType, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetBucketNotificationHandler(t *testing.T) {
|
||||||
|
ExecObjectLayerTest(t, testGetBucketNotificationHandler)
|
||||||
|
}
|
||||||
|
@ -942,6 +942,7 @@ func getMultiDeleteObjectURL(endPoint, bucketName string) string {
|
|||||||
queryValue := url.Values{}
|
queryValue := url.Values{}
|
||||||
queryValue.Set("delete", "")
|
queryValue.Set("delete", "")
|
||||||
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
|
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// return URL for HEAD on the object.
|
// return URL for HEAD on the object.
|
||||||
@ -1095,6 +1096,18 @@ func getCompleteMultipartUploadURL(endPoint, bucketName, objectName, uploadID st
|
|||||||
return makeTestTargetURL(endPoint, bucketName, objectName, queryValue)
|
return makeTestTargetURL(endPoint, bucketName, objectName, queryValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return URL for put bucket notification.
|
||||||
|
func getPutBucketNotificationURL(endPoint, bucketName string) string {
|
||||||
|
return getGetBucketNotificationURL(endPoint, bucketName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// return URL for get bucket notification.
|
||||||
|
func getGetBucketNotificationURL(endPoint, bucketName string) string {
|
||||||
|
queryValue := url.Values{}
|
||||||
|
queryValue.Set("notification", "")
|
||||||
|
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
|
||||||
|
}
|
||||||
|
|
||||||
// returns temp root directory. `
|
// returns temp root directory. `
|
||||||
func getTestRoot() (string, error) {
|
func getTestRoot() (string, error) {
|
||||||
return ioutil.TempDir(os.TempDir(), "api-")
|
return ioutil.TempDir(os.TempDir(), "api-")
|
||||||
@ -1324,6 +1337,11 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand
|
|||||||
// Register ListMultipartUploads handler.
|
// Register ListMultipartUploads handler.
|
||||||
case "ListMultipartUploads":
|
case "ListMultipartUploads":
|
||||||
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
|
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
|
||||||
|
// Register GetBucketNotification Handler.
|
||||||
|
case "GetBucketNotification":
|
||||||
|
bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
|
||||||
|
case "PutBucketNotification":
|
||||||
|
bucket.Methods("PUT").HandlerFunc(api.PutBucketNotificationHandler).Queries("notification", "")
|
||||||
// Register all api endpoints by default.
|
// Register all api endpoints by default.
|
||||||
default:
|
default:
|
||||||
registerAPIRouter(muxRouter, api)
|
registerAPIRouter(muxRouter, api)
|
||||||
|
Loading…
Reference in New Issue
Block a user