mirror of
https://github.com/minio/minio.git
synced 2025-04-19 02:05:24 -04:00
tests: Fix a potential race in ListenBucketNotificationHandler. (#3040)
This commit is contained in:
parent
87af2dbc43
commit
83b364891d
@ -23,4 +23,4 @@ after_success:
|
|||||||
- bash <(curl -s https://codecov.io/bash)
|
- bash <(curl -s https://codecov.io/bash)
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.7.1
|
- 1.7.3
|
||||||
|
@ -210,8 +210,6 @@ var crlf = []byte("\r\n")
|
|||||||
// for each notification input, otherwise writes whitespace characters periodically
|
// for each notification input, otherwise writes whitespace characters periodically
|
||||||
// to keep the connection active. Each notification messages are terminated by CRLF
|
// to keep the connection active. Each notification messages are terminated by CRLF
|
||||||
// character. Upon any error received on response writer the for loop exits.
|
// character. Upon any error received on response writer the for loop exits.
|
||||||
//
|
|
||||||
// TODO - do not log for all errors.
|
|
||||||
func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) {
|
func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) {
|
||||||
var dummyEvents = map[string][]NotificationEvent{"Records": nil}
|
var dummyEvents = map[string][]NotificationEvent{"Records": nil}
|
||||||
// Continuously write to client either timely empty structures
|
// Continuously write to client either timely empty structures
|
||||||
@ -223,8 +221,9 @@ func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []Notifi
|
|||||||
errorIf(err, "Unable to write notification to client.")
|
errorIf(err, "Unable to write notification to client.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(globalSNSConnAlive): // Wait for global conn active seconds.
|
||||||
if err := writeNotification(w, dummyEvents); err != nil {
|
if err := writeNotification(w, dummyEvents); err != nil {
|
||||||
|
// FIXME - do not log for all errors.
|
||||||
errorIf(err, "Unable to write notification to client.")
|
errorIf(err, "Unable to write notification to client.")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -7,15 +7,10 @@ import (
|
|||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Implement a dummy flush writer.
|
// Implement a dummy flush writer.
|
||||||
@ -235,112 +230,24 @@ func testGetBucketNotificationHandler(obj ObjectLayer, instanceType, bucketName
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListenBucketNotificationHandler(t *testing.T) {
|
func TestListenBucketNotificationNilHandler(t *testing.T) {
|
||||||
ExecObjectLayerAPITest(t, testListenBucketNotificationHandler, []string{
|
ExecObjectLayerAPITest(t, testListenBucketNotificationNilHandler, []string{
|
||||||
"ListenBucketNotification",
|
"ListenBucketNotification",
|
||||||
"PutObject",
|
"PutObject",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testListenBucketNotificationHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
|
func testListenBucketNotificationNilHandler(obj ObjectLayer, instanceType, bucketName string, apiRouter http.Handler,
|
||||||
credentials credential, t *testing.T) {
|
credentials credential, t *testing.T) {
|
||||||
mux, ok := apiRouter.(*mux.Router)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("Invalid mux router found")
|
|
||||||
}
|
|
||||||
registerS3PeerRPCRouter(mux)
|
|
||||||
|
|
||||||
testServer := httptest.NewServer(apiRouter)
|
|
||||||
defer testServer.Close()
|
|
||||||
|
|
||||||
// setup port and minio addr
|
|
||||||
_, portStr, err := net.SplitHostPort(testServer.Listener.Addr().String())
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Initialization error: %v", err)
|
|
||||||
}
|
|
||||||
globalMinioPort, err = strconv.Atoi(portStr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Initialization error: %v", err)
|
|
||||||
}
|
|
||||||
globalMinioAddr = testServer.Listener.Addr().String()
|
|
||||||
// initialize the peer client(s)
|
|
||||||
initGlobalS3Peers([]storageEndPoint{})
|
|
||||||
|
|
||||||
invalidBucket := "Invalid\\Bucket"
|
|
||||||
noNotificationBucket := "nonotificationbucket"
|
|
||||||
// get random bucket name.
|
// get random bucket name.
|
||||||
randBucket := getRandomBucketName()
|
randBucket := getRandomBucketName()
|
||||||
for _, bucket := range []string{randBucket, noNotificationBucket} {
|
|
||||||
err = obj.MakeBucket(bucket)
|
|
||||||
if err != nil {
|
|
||||||
// failed to create bucket, abort.
|
|
||||||
t.Fatalf("Failed to create bucket %s %s : %s", bucket,
|
|
||||||
instanceType, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var testRec *httptest.ResponseRecorder
|
|
||||||
var testReq *http.Request
|
|
||||||
var tErr error
|
|
||||||
|
|
||||||
signatureMismatchError := getAPIError(ErrContentSHA256Mismatch)
|
|
||||||
tooBigPrefix := string(bytes.Repeat([]byte("a"), 1025))
|
|
||||||
validEvents := []string{"s3:ObjectCreated:*", "s3:ObjectRemoved:*"}
|
|
||||||
invalidEvents := []string{"invalidEvent"}
|
|
||||||
testCases := []struct {
|
|
||||||
bucketName string
|
|
||||||
prefixes []string
|
|
||||||
suffixes []string
|
|
||||||
events []string
|
|
||||||
expectedHTTPCode int
|
|
||||||
expectedAPIError string
|
|
||||||
}{
|
|
||||||
{randBucket, []string{}, []string{}, invalidEvents, signatureMismatchError.HTTPStatusCode, "InvalidArgument"},
|
|
||||||
{randBucket, []string{tooBigPrefix}, []string{}, validEvents, http.StatusBadRequest, "InvalidArgument"},
|
|
||||||
{invalidBucket, []string{}, []string{}, validEvents, http.StatusBadRequest, "InvalidBucketName"},
|
|
||||||
{randBucket, []string{}, []string{}, validEvents, signatureMismatchError.HTTPStatusCode, signatureMismatchError.Code},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, test := range testCases {
|
|
||||||
testRec = httptest.NewRecorder()
|
|
||||||
testReq, tErr = newTestSignedRequestV4("GET",
|
|
||||||
getListenBucketNotificationURL("", test.bucketName, test.prefixes, test.suffixes, test.events),
|
|
||||||
0, nil, credentials.AccessKeyID, credentials.SecretAccessKey)
|
|
||||||
if tErr != nil {
|
|
||||||
t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: <ERROR> %v", instanceType, tErr)
|
|
||||||
}
|
|
||||||
// Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature.
|
|
||||||
if test.expectedAPIError == "XAmzContentSHA256Mismatch" {
|
|
||||||
// Triggering a authentication failure.
|
|
||||||
testReq.Header.Set("x-amz-content-sha256", "somethingElse")
|
|
||||||
}
|
|
||||||
apiRouter.ServeHTTP(testRec, testReq)
|
|
||||||
rspBytes, rErr := ioutil.ReadAll(testRec.Body)
|
|
||||||
if rErr != nil {
|
|
||||||
t.Errorf("Test %d: %s: Failed to read response body: <ERROR> %v", i+1, instanceType, rErr)
|
|
||||||
}
|
|
||||||
var errXML APIErrorResponse
|
|
||||||
xErr := xml.Unmarshal(rspBytes, &errXML)
|
|
||||||
if xErr != nil {
|
|
||||||
t.Errorf("Test %d: %s: Failed to unmarshal error XML: <ERROR> %v", i+1, instanceType, xErr)
|
|
||||||
}
|
|
||||||
if errXML.Code != test.expectedAPIError {
|
|
||||||
t.Errorf("Test %d: %s: Expected error code %s but received %s: <ERROR> %v", i+1,
|
|
||||||
instanceType, test.expectedAPIError, errXML.Code, err)
|
|
||||||
|
|
||||||
}
|
|
||||||
if testRec.Code != test.expectedHTTPCode {
|
|
||||||
t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: <ERROR> %v",
|
|
||||||
i+1, instanceType, test.expectedHTTPCode, testRec.Code, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Nil Object layer
|
// Nil Object layer
|
||||||
nilAPIRouter := initTestAPIEndPoints(nil, []string{
|
nilAPIRouter := initTestAPIEndPoints(nil, []string{
|
||||||
"ListenBucketNotification",
|
"ListenBucketNotification",
|
||||||
})
|
})
|
||||||
testRec = httptest.NewRecorder()
|
testRec := httptest.NewRecorder()
|
||||||
testReq, tErr = newTestSignedRequestV4("GET",
|
testReq, tErr := newTestSignedRequestV4("GET",
|
||||||
getListenBucketNotificationURL("", randBucket, []string{},
|
getListenBucketNotificationURL("", randBucket, []string{},
|
||||||
[]string{"*.jpg"}, []string{
|
[]string{"*.jpg"}, []string{
|
||||||
"s3:ObjectCreated:*",
|
"s3:ObjectCreated:*",
|
||||||
@ -351,58 +258,8 @@ func testListenBucketNotificationHandler(obj ObjectLayer, instanceType, bucketNa
|
|||||||
}
|
}
|
||||||
nilAPIRouter.ServeHTTP(testRec, testReq)
|
nilAPIRouter.ServeHTTP(testRec, testReq)
|
||||||
if testRec.Code != http.StatusServiceUnavailable {
|
if testRec.Code != http.StatusServiceUnavailable {
|
||||||
t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: <ERROR> %v",
|
t.Fatalf("Test 1: %s: expected HTTP code %d, but received %d: <ERROR> %v",
|
||||||
1, instanceType, http.StatusServiceUnavailable, testRec.Code, err)
|
instanceType, http.StatusServiceUnavailable, testRec.Code, tErr)
|
||||||
}
|
|
||||||
|
|
||||||
testRec = httptest.NewRecorder()
|
|
||||||
testReq, tErr = newTestSignedRequestV4("GET",
|
|
||||||
getListenBucketNotificationURL("", randBucket, []string{}, []string{}, validEvents),
|
|
||||||
0, nil, credentials.AccessKeyID, credentials.SecretAccessKey)
|
|
||||||
if tErr != nil {
|
|
||||||
t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: <ERROR> %v", instanceType, tErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
globalObjLayerMutex.Lock()
|
|
||||||
globalObjectAPI = obj
|
|
||||||
globalObjLayerMutex.Unlock()
|
|
||||||
|
|
||||||
go apiRouter.ServeHTTP(testRec, testReq)
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
rec := httptest.NewRecorder()
|
|
||||||
buf := bytes.NewReader([]byte("hello, world"))
|
|
||||||
req, rerr := newTestSignedRequestV4("PUT", getPutObjectURL("", randBucket, "jeezus"),
|
|
||||||
int64(buf.Len()), buf, credentials.AccessKeyID, credentials.SecretAccessKey)
|
|
||||||
if rerr != nil {
|
|
||||||
t.Fatalf("%s: Failed to create HTTP testRequest for ListenBucketNotification: <ERROR> %v", instanceType, rerr)
|
|
||||||
}
|
|
||||||
apiRouter.ServeHTTP(rec, req)
|
|
||||||
if rec.Code != http.StatusOK {
|
|
||||||
t.Fatalf("Unexpected http reply %d should be %d", rec.Code, http.StatusOK)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
bio := bufio.NewScanner(testRec.Body)
|
|
||||||
// Unmarshal each line, returns marshalled values.
|
|
||||||
for bio.Scan() {
|
|
||||||
var notificationInfo struct {
|
|
||||||
Records []NotificationEvent
|
|
||||||
}
|
|
||||||
if err = json.Unmarshal(bio.Bytes(), ¬ificationInfo); err != nil {
|
|
||||||
t.Fatalf("%s: Unable to marshal: <ERROR> %v", instanceType, err)
|
|
||||||
}
|
|
||||||
// Send notifications on channel only if there are events received.
|
|
||||||
if len(notificationInfo.Records) == 0 {
|
|
||||||
t.Fatalf("%s: Expected notification events, received none", instanceType)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Look for any underlying errors.
|
|
||||||
if err = bio.Err(); err != nil {
|
|
||||||
t.Fatalf("%s: Server connection closed prematurely %s", instanceType, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,8 +225,7 @@ func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerC
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func eventNotifyForBucketNotifications(eventType, objectName, bucketName string,
|
func eventNotifyForBucketNotifications(eventType, objectName, bucketName string, nEvent []NotificationEvent) {
|
||||||
nEvent []NotificationEvent) {
|
|
||||||
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
|
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
|
||||||
if nConfig == nil {
|
if nConfig == nil {
|
||||||
return
|
return
|
||||||
@ -294,12 +293,10 @@ func eventNotify(event eventData) {
|
|||||||
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
|
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
|
||||||
|
|
||||||
// Notify external targets.
|
// Notify external targets.
|
||||||
eventNotifyForBucketNotifications(eventType, objectName, event.Bucket,
|
eventNotifyForBucketNotifications(eventType, objectName, event.Bucket, notificationEvent)
|
||||||
notificationEvent)
|
|
||||||
|
|
||||||
// Notify internal targets.
|
// Notify internal targets.
|
||||||
eventNotifyForBucketListeners(eventType, objectName, event.Bucket,
|
eventNotifyForBucketListeners(eventType, objectName, event.Bucket, notificationEvent)
|
||||||
notificationEvent)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// loads notification config if any for a given bucket, returns
|
// loads notification config if any for a given bucket, returns
|
||||||
|
@ -74,6 +74,10 @@ var (
|
|||||||
var (
|
var (
|
||||||
// The maximum allowed difference between the request generation time and the server processing time
|
// The maximum allowed difference between the request generation time and the server processing time
|
||||||
globalMaxSkewTime = 15 * time.Minute
|
globalMaxSkewTime = 15 * time.Minute
|
||||||
|
|
||||||
|
// Keeps the connection active by waiting for following amount of time.
|
||||||
|
// Primarily used in ListenBucketNotification.
|
||||||
|
globalSNSConnAlive = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// global colors.
|
// global colors.
|
||||||
|
@ -246,6 +246,99 @@ func (s *TestSuiteCommon) TestDeleteBucketNotEmpty(c *C) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *TestSuiteCommon) TestListenBucketNotificationHandler(c *C) {
|
||||||
|
// generate a random bucket name.
|
||||||
|
bucketName := getRandomBucketName()
|
||||||
|
// HTTP request to create the bucket.
|
||||||
|
req, err := newTestSignedRequestV4("PUT", getMakeBucketURL(s.endPoint, bucketName),
|
||||||
|
0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client := http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err := client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
// assert the http response status code.
|
||||||
|
c.Assert(response.StatusCode, Equals, http.StatusOK)
|
||||||
|
|
||||||
|
invalidBucket := "Invalid\\Bucket"
|
||||||
|
tooByte := bytes.Repeat([]byte("a"), 1025)
|
||||||
|
tooBigPrefix := string(tooByte)
|
||||||
|
validEvents := []string{"s3:ObjectCreated:*", "s3:ObjectRemoved:*"}
|
||||||
|
invalidEvents := []string{"invalidEvent"}
|
||||||
|
|
||||||
|
req, err = newTestSignedRequestV4("GET",
|
||||||
|
getListenBucketNotificationURL(s.endPoint, invalidBucket, []string{}, []string{}, []string{}),
|
||||||
|
0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client = http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err = client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
verifyError(c, response, "InvalidBucketName", "The specified bucket is not valid.", http.StatusBadRequest)
|
||||||
|
|
||||||
|
req, err = newTestSignedRequestV4("GET",
|
||||||
|
getListenBucketNotificationURL(s.endPoint, bucketName, []string{}, []string{}, invalidEvents),
|
||||||
|
0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client = http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err = client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest)
|
||||||
|
|
||||||
|
req, err = newTestSignedRequestV4("GET",
|
||||||
|
getListenBucketNotificationURL(s.endPoint, bucketName, []string{tooBigPrefix}, []string{}, validEvents),
|
||||||
|
0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client = http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err = client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest)
|
||||||
|
|
||||||
|
req, err = newTestSignedRequestV4("GET",
|
||||||
|
getListenBucketNotificationURL(s.endPoint, bucketName, []string{}, []string{}, validEvents),
|
||||||
|
0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
req.Header.Set("x-amz-content-sha256", "somethingElse")
|
||||||
|
client = http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err = client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
verifyError(c, response, "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header does not match what was computed.", http.StatusBadRequest)
|
||||||
|
|
||||||
|
// Change global value from 5 second to 100millisecond.
|
||||||
|
globalSNSConnAlive = 100 * time.Millisecond
|
||||||
|
req, err = newTestSignedRequestV4("GET",
|
||||||
|
getListenBucketNotificationURL(s.endPoint, bucketName,
|
||||||
|
[]string{}, []string{}, validEvents), 0, nil, s.accessKey, s.secretKey)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
client = http.Client{}
|
||||||
|
// execute the request.
|
||||||
|
response, err = client.Do(req)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(response.StatusCode, Equals, http.StatusOK)
|
||||||
|
// FIXME: uncomment this in future when we have a code to read notifications from.
|
||||||
|
// go func() {
|
||||||
|
// buf := bytes.NewReader(tooByte)
|
||||||
|
// rreq, rerr := newTestSignedRequestV4("GET",
|
||||||
|
// getPutObjectURL(s.endPoint, bucketName, "myobject/1"),
|
||||||
|
// int64(buf.Len()), buf, s.accessKey, s.secretKey)
|
||||||
|
// c.Assert(rerr, IsNil)
|
||||||
|
// client = http.Client{}
|
||||||
|
// // execute the request.
|
||||||
|
// resp, rerr := client.Do(rreq)
|
||||||
|
// c.Assert(rerr, IsNil)
|
||||||
|
// c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||||
|
// }()
|
||||||
|
response.Body.Close() // FIXME. Find a way to read from the returned body.
|
||||||
|
}
|
||||||
|
|
||||||
// Test deletes multple objects and verifies server resonse.
|
// Test deletes multple objects and verifies server resonse.
|
||||||
func (s *TestSuiteCommon) TestDeleteMultipleObjects(c *C) {
|
func (s *TestSuiteCommon) TestDeleteMultipleObjects(c *C) {
|
||||||
// generate a random bucket name.
|
// generate a random bucket name.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user