diff --git a/cmd/bucket-notification-datatypes.go b/cmd/bucket-notification-datatypes.go
index 79532dd22..db5cd1e76 100644
--- a/cmd/bucket-notification-datatypes.go
+++ b/cmd/bucket-notification-datatypes.go
@@ -32,30 +32,32 @@ type keyFilter struct {
FilterRules []filterRule `xml:"FilterRule,omitempty"`
}
-// Common elements of service notification.
-type serviceConfig struct {
- Events []string `xml:"Event"`
- Filter struct {
- Key keyFilter `xml:"S3Key,omitempty"`
- }
- ID string `xml:"Id"`
+type filterStruct struct {
+ Key keyFilter `xml:"S3Key,omitempty" json:"S3Key,omitempty"`
+}
+
+// ServiceConfig - Common elements of service notification.
+type ServiceConfig struct {
+ Events []string `xml:"Event" json:"Event"`
+ Filter filterStruct `xml:"Filter" json:"Filter"`
+ ID string `xml:"Id" json:"Id"`
}
// Queue SQS configuration.
type queueConfig struct {
- serviceConfig
+ ServiceConfig
QueueARN string `xml:"Queue"`
}
// Topic SNS configuration, this is a compliance field not used by minio yet.
type topicConfig struct {
- serviceConfig
- TopicARN string `xml:"Topic"`
+ ServiceConfig
+ TopicARN string `xml:"Topic" json:"Topic"`
}
// Lambda function configuration, this is a compliance field not used by minio yet.
type lambdaConfig struct {
- serviceConfig
+ ServiceConfig
LambdaARN string `xml:"CloudFunction"`
}
@@ -64,10 +66,16 @@ type lambdaConfig struct {
type notificationConfig struct {
XMLName xml.Name `xml:"NotificationConfiguration"`
QueueConfigs []queueConfig `xml:"QueueConfiguration"`
- TopicConfigs []topicConfig `xml:"TopicConfiguration"`
LambdaConfigs []lambdaConfig `xml:"CloudFunctionConfiguration"`
}
+// listenerConfig structure represents run-time notification
+// configuration for live listeners
+type listenerConfig struct {
+ TopicConfig topicConfig `json:"TopicConfiguration"`
+ TargetServer string `json:"TargetServer"`
+}
+
// Internal error used to signal notifications not set.
var errNoSuchNotifications = errors.New("The specified bucket does not have bucket notifications")
diff --git a/cmd/bucket-notification-handlers.go b/cmd/bucket-notification-handlers.go
index e34c5efad..af5a7f77a 100644
--- a/cmd/bucket-notification-handlers.go
+++ b/cmd/bucket-notification-handlers.go
@@ -32,6 +32,7 @@ import (
const (
bucketConfigPrefix = "buckets"
bucketNotificationConfig = "notification.xml"
+ bucketListenerConfig = "listener.json"
)
// GetBucketNotificationHandler - This implementation of the GET
@@ -117,11 +118,10 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
// Reads the incoming notification configuration.
var buffer bytes.Buffer
- var bufferSize int64
if r.ContentLength >= 0 {
- bufferSize, err = io.CopyN(&buffer, r.Body, r.ContentLength)
+ _, err = io.CopyN(&buffer, r.Body, r.ContentLength)
} else {
- bufferSize, err = io.Copy(&buffer, r.Body)
+ _, err = io.Copy(&buffer, r.Body)
}
if err != nil {
errorIf(err, "Unable to read incoming body.")
@@ -144,24 +144,39 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
return
}
- // Proceed to save notification configuration.
- notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
- sha256sum := ""
- var metadata map[string]string
- _, err = objectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), metadata, sha256sum)
+ // Put bucket notification config.
+ err = PutBucketNotificationConfig(bucket, ¬ificationCfg, objectAPI)
if err != nil {
- errorIf(err, "Unable to write bucket notification configuration.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
- // Set bucket notification config.
- globalEventNotifier.SetBucketNotificationConfig(bucket, ¬ificationCfg)
-
// Success.
writeSuccessResponse(w, nil)
}
+// PutBucketNotificationConfig - Put a new notification config for a
+// bucket (overwrites any previous config) persistently, updates
+// global in-memory state, and notify other nodes in the cluster (if
+// any)
+func PutBucketNotificationConfig(bucket string, ncfg *notificationConfig, objAPI ObjectLayer) error {
+ if ncfg == nil {
+ return errInvalidArgument
+ }
+
+ // persist config to disk
+ err := persistNotificationConfig(bucket, ncfg, objAPI)
+ if err != nil {
+ return fmt.Errorf("Unable to persist Bucket notification config to object layer - config=%v errMsg=%v", *ncfg, err)
+ }
+
+ // All servers (including local) are told to update in-memory
+ // config
+ S3PeersUpdateBucketNotification(bucket, ncfg)
+
+ return nil
+}
+
// writeNotification marshals notification message before writing to client.
func writeNotification(w http.ResponseWriter, notification map[string][]NotificationEvent) error {
// Invalid response writer.
@@ -172,7 +187,7 @@ func writeNotification(w http.ResponseWriter, notification map[string][]Notifica
if notification == nil {
return errInvalidArgument
}
- // Marshal notification data into XML and write to client.
+ // Marshal notification data into JSON and write to client.
notificationBytes, err := json.Marshal(¬ification)
if err != nil {
return err
@@ -251,13 +266,18 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
_, err := objAPI.GetBucketInfo(bucket)
if err != nil {
- errorIf(err, "Unable to bucket info.")
+ errorIf(err, "Unable to get bucket info.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano())
- accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen"
+ accountARN := fmt.Sprintf(
+ "arn:minio:sqs:%s:%s:listen-%s",
+ serverConfig.GetRegion(),
+ accountID,
+ globalMinioAddr,
+ )
var filterRules []filterRule
if prefix != "" {
filterRules = append(filterRules, filterRule{
@@ -272,13 +292,14 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
})
}
- // Make topic configuration corresponding to this ListenBucketNotification request.
+ // Make topic configuration corresponding to this
+ // ListenBucketNotification request.
topicCfg := &topicConfig{
TopicARN: accountARN,
- serviceConfig: serviceConfig{
+ ServiceConfig: ServiceConfig{
Events: events,
Filter: struct {
- Key keyFilter `xml:"S3Key,omitempty"`
+ Key keyFilter `xml:"S3Key,omitempty" json:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
@@ -288,29 +309,93 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
},
}
- // Add topic config to bucket notification config.
- if err = globalEventNotifier.AddTopicConfig(bucket, topicCfg); err != nil {
+ // Setup a listening channel that will receive notifications
+ // from the RPC handler.
+ nEventCh := make(chan []NotificationEvent)
+ defer close(nEventCh)
+ // Add channel for listener events
+ globalEventNotifier.AddListenerChan(accountARN, nEventCh)
+ // Remove listener channel after the writer has closed or the
+ // client disconnected.
+ defer globalEventNotifier.RemoveListenerChan(accountARN)
+
+ // Update topic config to bucket config and persist - as soon
+ // as this call compelets, events may start appearing in
+ // nEventCh
+ lc := listenerConfig{
+ TopicConfig: *topicCfg,
+ TargetServer: globalMinioAddr,
+ }
+ err = AddBucketListenerConfig(bucket, &lc, objAPI)
+ if err != nil {
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
+ defer RemoveBucketListenerConfig(bucket, &lc, objAPI)
// Add all common headers.
setCommonHeaders(w)
- // Create a new notification event channel.
- nEventCh := make(chan []NotificationEvent)
- // Close the listener channel.
- defer close(nEventCh)
-
- // Set sns target.
- globalEventNotifier.SetSNSTarget(accountARN, nEventCh)
- // Remove sns listener after the writer has closed or the client disconnected.
- defer globalEventNotifier.RemoveSNSTarget(accountARN, nEventCh)
-
// Start sending bucket notifications.
sendBucketNotification(w, nEventCh)
}
+// AddBucketListenerConfig - Updates on disk state of listeners, and
+// updates all peers with the change in listener config.
+func AddBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectLayer) error {
+ if lcfg == nil {
+ return errInvalidArgument
+ }
+ listenerCfgs := globalEventNotifier.GetBucketListenerConfig(bucket)
+
+ // add new lid to listeners and persist to object layer.
+ listenerCfgs = append(listenerCfgs, *lcfg)
+
+ // update persistent config
+ err := persistListenerConfig(bucket, listenerCfgs, objAPI)
+ if err != nil {
+ errorIf(err, "Error persisting listener config when adding a listener.")
+ return err
+ }
+
+ // persistence success - now update in-memory globals on all
+ // peers (including local)
+ S3PeersUpdateBucketListener(bucket, listenerCfgs)
+ return nil
+}
+
+// RemoveBucketListenerConfig - removes a given bucket notification config
+func RemoveBucketListenerConfig(bucket string, lcfg *listenerConfig, objAPI ObjectLayer) {
+ listenerCfgs := globalEventNotifier.GetBucketListenerConfig(bucket)
+
+ // remove listener with matching ARN - if not found ignore and
+ // exit.
+ var updatedLcfgs []listenerConfig
+ found := false
+ for k, configuredLcfg := range listenerCfgs {
+ if configuredLcfg.TopicConfig.TopicARN == lcfg.TopicConfig.TopicARN {
+ updatedLcfgs = append(listenerCfgs[:k],
+ listenerCfgs[k+1:]...)
+ found = true
+ break
+ }
+ }
+ if !found {
+ return
+ }
+
+ // update persistent config
+ err := persistListenerConfig(bucket, updatedLcfgs, objAPI)
+ if err != nil {
+ errorIf(err, "Error persisting listener config when removing a listener.")
+ return
+ }
+
+ // persistence success - now update in-memory globals on all
+ // peers (including local)
+ S3PeersUpdateBucketListener(bucket, updatedLcfgs)
+}
+
// Removes notification.xml for a given bucket, only used during DeleteBucket.
func removeNotificationConfig(bucket string, objAPI ObjectLayer) error {
// Verify bucket is valid.
diff --git a/cmd/bucket-notification-handlers_test.go b/cmd/bucket-notification-handlers_test.go
index 497488b2c..39257304e 100644
--- a/cmd/bucket-notification-handlers_test.go
+++ b/cmd/bucket-notification-handlers_test.go
@@ -5,11 +5,16 @@ import (
"bytes"
"encoding/json"
"encoding/xml"
+ "fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"net/http/httptest"
+ "strconv"
"testing"
+
+ "github.com/gorilla/mux"
)
// Implement a dummy flush writer.
@@ -156,325 +161,50 @@ func TestSendBucketNotification(t *testing.T) {
}
}
-func testGetBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
- // get random bucket name.
- randBucket := getRandomBucketName()
- noNotificationBucket := "nonotification"
- invalidBucket := "Invalid\\Bucket"
-
- // Create buckets for the following test cases.
- for _, bucket := range []string{randBucket, noNotificationBucket} {
- err := obj.MakeBucket(bucket)
- if err != nil {
- // failed to create newbucket, abort.
- t.Fatalf("Failed to create bucket %s %s : %s", bucket,
- instanceType, err)
- }
- }
-
- // Initialize sample bucket notification config.
- sampleNotificationBytes := []byte("" +
- "s3:ObjectCreated:*s3:ObjectRemoved:*" +
- "arn:minio:sns:us-east-1:1474332374:listen" +
- "")
-
- emptyNotificationBytes := []byte("")
-
- // Register the API end points with XL/FS object layer.
- apiRouter := initTestAPIEndPoints(obj, []string{
- "GetBucketNotification",
- "PutBucketNotification",
- })
-
- // initialize the server and obtain the credentials and root.
- // credentials are necessary to sign the HTTP request.
- rootPath, err := newTestConfig("us-east-1")
- if err != nil {
- t.Fatalf("Init Test config failed")
- }
- // remove the root folder after the test ends.
- defer removeAll(rootPath)
-
- credentials := serverConfig.GetCredential()
-
- //Initialize global event notifier with mock queue targets.
- err = initEventNotifier(obj)
- if err != nil {
- t.Fatalf("Test %s: Failed to initialize mock event notifier %v",
- instanceType, err)
- }
- // Initialize httptest recorder.
- rec := httptest.NewRecorder()
-
- // Prepare notification config for one of the test cases.
- req, err := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket),
- int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes),
- credentials.AccessKeyID, credentials.SecretAccessKey)
- if err != nil {
- t.Fatalf("Test %d %s: Failed to create HTTP request for PutBucketNotification: %v",
- 1, instanceType, err)
- }
-
- apiRouter.ServeHTTP(rec, req)
-
- type testKind int
- const (
- CompareBytes testKind = iota
- CheckStatus
- InvalidAuth
- )
- testCases := []struct {
- bucketName string
- kind testKind
- expectedNotificationBytes []byte
- expectedHTTPCode int
- }{
- {randBucket, CompareBytes, sampleNotificationBytes, http.StatusOK},
- {randBucket, InvalidAuth, nil, http.StatusBadRequest},
- {noNotificationBucket, CompareBytes, emptyNotificationBytes, http.StatusOK},
- {invalidBucket, CheckStatus, nil, http.StatusBadRequest},
- }
- signatureMismatchCode := getAPIError(ErrContentSHA256Mismatch).Code
- for i, test := range testCases {
- testRec := httptest.NewRecorder()
- testReq, tErr := newTestSignedRequestV4("GET", getGetBucketNotificationURL("", test.bucketName),
- int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v",
- i+1, instanceType, tErr)
- }
-
- // Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature.
- if test.kind == InvalidAuth {
- // Triggering a authentication type check failure.
- testReq.Header.Set("x-amz-content-sha256", "somethingElse")
- }
-
- apiRouter.ServeHTTP(testRec, testReq)
-
- switch test.kind {
- case CompareBytes:
- rspBytes, rErr := ioutil.ReadAll(testRec.Body)
- if rErr != nil {
- t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr)
- }
- if !bytes.Equal(rspBytes, test.expectedNotificationBytes) {
- t.Errorf("Test %d: %s: Notification config doesn't match expected value %s: %v",
- i+1, instanceType, string(test.expectedNotificationBytes), err)
- }
- case InvalidAuth:
- rspBytes, rErr := ioutil.ReadAll(testRec.Body)
- if rErr != nil {
- t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr)
- }
- var errCode APIError
- xErr := xml.Unmarshal(rspBytes, &errCode)
- if xErr != nil {
- t.Errorf("Test %d: %s: Failed to unmarshal error XML: %v", i+1, instanceType, xErr)
-
- }
-
- if errCode.Code != signatureMismatchCode {
- t.Errorf("Test %d: %s: Expected error code %s but received %s: %v", i+1,
- instanceType, signatureMismatchCode, errCode.Code, err)
-
- }
- fallthrough
- case CheckStatus:
- if testRec.Code != test.expectedHTTPCode {
- t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v",
- i+1, instanceType, test.expectedHTTPCode, testRec.Code, err)
- }
- }
- }
-
- // Nil Object layer
- nilAPIRouter := initTestAPIEndPoints(nil, []string{
- "GetBucketNotification",
- "PutBucketNotification",
- })
- testRec := httptest.NewRecorder()
- testReq, tErr := newTestSignedRequestV4("GET", getGetBucketNotificationURL("", randBucket),
- int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v",
- len(testCases)+1, instanceType, tErr)
- }
- nilAPIRouter.ServeHTTP(testRec, testReq)
- if testRec.Code != http.StatusServiceUnavailable {
- t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v",
- len(testCases)+1, instanceType, http.StatusServiceUnavailable, testRec.Code, err)
- }
-}
-
-func TestGetBucketNotificationHandler(t *testing.T) {
- ExecObjectLayerTest(t, testGetBucketNotificationHandler)
-}
-
-func testPutBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
- invalidBucket := "Invalid\\Bucket"
- // get random bucket name.
- randBucket := getRandomBucketName()
-
- err := obj.MakeBucket(randBucket)
- if err != nil {
- // failed to create randBucket, abort.
- t.Fatalf("Failed to create bucket %s %s : %s", randBucket,
- instanceType, err)
- }
-
- sampleNotificationBytes := []byte("" +
- "s3:ObjectCreated:*s3:ObjectRemoved:*" +
- "arn:minio:sns:us-east-1:1474332374:listen" +
- "")
-
- // Register the API end points with XL/FS object layer.
- apiRouter := initTestAPIEndPoints(obj, []string{
- "GetBucketNotification",
- "PutBucketNotification",
- })
-
- // initialize the server and obtain the credentials and root.
- // credentials are necessary to sign the HTTP request.
- rootPath, err := newTestConfig("us-east-1")
- if err != nil {
- t.Fatalf("Init Test config failed")
- }
- // remove the root folder after the test ends.
- defer removeAll(rootPath)
-
- credentials := serverConfig.GetCredential()
-
- //Initialize global event notifier with mock queue targets.
- err = initEventNotifier(obj)
- if err != nil {
- t.Fatalf("Test %s: Failed to initialize mock event notifier %v",
- instanceType, err)
- }
-
- signatureMismatchError := getAPIError(ErrContentSHA256Mismatch)
- missingContentLengthError := getAPIError(ErrMissingContentLength)
- type testKind int
- const (
- CompareBytes testKind = iota
- CheckStatus
- InvalidAuth
- MissingContentLength
- ChunkedEncoding
- )
- testCases := []struct {
- bucketName string
- kind testKind
- expectedNotificationBytes []byte
- expectedHTTPCode int
- expectedAPIError string
- }{
- {randBucket, CompareBytes, sampleNotificationBytes, http.StatusOK, ""},
- {randBucket, ChunkedEncoding, sampleNotificationBytes, http.StatusOK, ""},
- {randBucket, InvalidAuth, nil, signatureMismatchError.HTTPStatusCode, signatureMismatchError.Code},
- {randBucket, MissingContentLength, nil, missingContentLengthError.HTTPStatusCode, missingContentLengthError.Code},
- {invalidBucket, CheckStatus, nil, http.StatusBadRequest, ""},
- }
- for i, test := range testCases {
- testRec := httptest.NewRecorder()
- testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", test.bucketName),
- int64(len(test.expectedNotificationBytes)), bytes.NewReader(test.expectedNotificationBytes),
- credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for PutBucketNotification: %v",
- i+1, instanceType, tErr)
- }
-
- // Set X-Amz-Content-SHA256 in header different from what was used to calculate Signature.
- switch test.kind {
- case InvalidAuth:
- // Triggering a authentication type check failure.
- testReq.Header.Set("x-amz-content-sha256", "somethingElse")
- case MissingContentLength:
- testReq.ContentLength = -1
- case ChunkedEncoding:
- testReq.ContentLength = -1
- testReq.TransferEncoding = append(testReq.TransferEncoding, "chunked")
- }
-
- apiRouter.ServeHTTP(testRec, testReq)
-
- switch test.kind {
- case CompareBytes:
-
- testReq, tErr = newTestSignedRequestV4("GET", getGetBucketNotificationURL("", test.bucketName),
- int64(0), nil, credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for GetBucketNotification: %v",
- i+1, instanceType, tErr)
- }
- apiRouter.ServeHTTP(testRec, testReq)
-
- rspBytes, rErr := ioutil.ReadAll(testRec.Body)
- if rErr != nil {
- t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr)
- }
- if !bytes.Equal(rspBytes, test.expectedNotificationBytes) {
- t.Errorf("Test %d: %s: Notification config doesn't match expected value %s: %v",
- i+1, instanceType, string(test.expectedNotificationBytes), err)
- }
- case MissingContentLength, InvalidAuth:
- rspBytes, rErr := ioutil.ReadAll(testRec.Body)
- if rErr != nil {
- t.Errorf("Test %d: %s: Failed to read response body: %v", i+1, instanceType, rErr)
- }
- var errCode APIError
- xErr := xml.Unmarshal(rspBytes, &errCode)
- if xErr != nil {
- t.Errorf("Test %d: %s: Failed to unmarshal error XML: %v", i+1, instanceType, xErr)
-
- }
-
- if errCode.Code != test.expectedAPIError {
- t.Errorf("Test %d: %s: Expected error code %s but received %s: %v", i+1,
- instanceType, test.expectedAPIError, errCode.Code, err)
-
- }
- fallthrough
- case CheckStatus:
- if testRec.Code != test.expectedHTTPCode {
- t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v",
- i+1, instanceType, test.expectedHTTPCode, testRec.Code, err)
- }
- }
- }
-
- // Nil Object layer
- nilAPIRouter := initTestAPIEndPoints(nil, []string{
- "GetBucketNotification",
- "PutBucketNotification",
- })
- testRec := httptest.NewRecorder()
- testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket),
- int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes),
- credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("Test %d: %s: Failed to create HTTP testRequest for PutBucketNotification: %v",
- len(testCases)+1, instanceType, tErr)
- }
- nilAPIRouter.ServeHTTP(testRec, testReq)
- if testRec.Code != http.StatusServiceUnavailable {
- t.Errorf("Test %d: %s: expected HTTP code %d, but received %d: %v",
- len(testCases)+1, instanceType, http.StatusServiceUnavailable, testRec.Code, err)
- }
-}
-
-func TestPutBucketNotificationHandler(t *testing.T) {
- ExecObjectLayerTest(t, testPutBucketNotificationHandler)
-}
-
func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
+ // Register the API end points with XL/FS object layer.
+ apiRouter := initTestAPIEndPoints(obj, []string{
+ "ListenBucketNotification",
+ })
+ mux, ok := apiRouter.(*mux.Router)
+ if !ok {
+ t.Fatal("Unable to setup test")
+ }
+ registerS3PeerRPCRouter(mux)
+
+ testServer := httptest.NewServer(apiRouter)
+ defer testServer.Close()
+
+ // initialize the server and obtain the credentials and root.
+ // credentials are necessary to sign the HTTP request.
+ rootPath, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("Init Test config failed")
+ }
+ // remove the root folder after the test ends.
+ defer removeAll(rootPath)
+
+ credentials := serverConfig.GetCredential()
+
+ // setup port and minio addr
+ _, portStr, err := net.SplitHostPort(testServer.Listener.Addr().String())
+ if err != nil {
+ t.Fatalf("Initialisation error: %v", err)
+ }
+ globalMinioPort, err = strconv.Atoi(portStr)
+ if err != nil {
+ t.Fatalf("Initialisation error: %v", err)
+ }
+ globalMinioAddr = fmt.Sprintf(":%d", globalMinioPort)
+ // initialize the peer client(s)
+ initGlobalS3Peers([]string{})
+
invalidBucket := "Invalid\\Bucket"
noNotificationBucket := "nonotificationbucket"
// get random bucket name.
randBucket := getRandomBucketName()
for _, bucket := range []string{randBucket, noNotificationBucket} {
- err := obj.MakeBucket(bucket)
+ err = obj.MakeBucket(bucket)
if err != nil {
// failed to create bucket, abort.
t.Fatalf("Failed to create bucket %s %s : %s", bucket,
@@ -482,43 +212,16 @@ func testListenBucketNotificationHandler(obj ObjectLayer, instanceType string, t
}
}
- sampleNotificationBytes := []byte("" +
- "s3:ObjectCreated:*s3:ObjectRemoved:*" +
- "arn:minio:sns:us-east-1:1474332374:listen" +
- "")
-
- // Register the API end points with XL/FS object layer.
- apiRouter := initTestAPIEndPoints(obj, []string{
- "PutBucketNotification",
- "ListenBucketNotification",
- "PutObject",
- })
-
- // initialize the server and obtain the credentials and root.
- // credentials are necessary to sign the HTTP request.
- rootPath, err := newTestConfig("us-east-1")
- if err != nil {
- t.Fatalf("Init Test config failed")
- }
- // remove the root folder after the test ends.
- defer removeAll(rootPath)
-
- credentials := serverConfig.GetCredential()
-
// Initialize global event notifier with mock queue targets.
err = initEventNotifier(obj)
if err != nil {
t.Fatalf("Test %s: Failed to initialize mock event notifier %v",
instanceType, err)
}
- testRec := httptest.NewRecorder()
- testReq, tErr := newTestSignedRequestV4("PUT", getPutBucketNotificationURL("", randBucket),
- int64(len(sampleNotificationBytes)), bytes.NewReader(sampleNotificationBytes),
- credentials.AccessKeyID, credentials.SecretAccessKey)
- if tErr != nil {
- t.Fatalf("%s: Failed to create HTTP testRequest for PutBucketNotification: %v", instanceType, tErr)
- }
- apiRouter.ServeHTTP(testRec, testReq)
+
+ var testRec *httptest.ResponseRecorder
+ var testReq *http.Request
+ var tErr error
signatureMismatchError := getAPIError(ErrContentSHA256Mismatch)
type testKind int
diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go
index 46a26cbd8..fa2e6341c 100644
--- a/cmd/bucket-notification-utils.go
+++ b/cmd/bucket-notification-utils.go
@@ -265,25 +265,6 @@ func checkDuplicateQueueConfigs(configs []queueConfig) APIErrorCode {
return ErrNone
}
-// Check all the topic configs for any duplicates.
-func checkDuplicateTopicConfigs(configs []topicConfig) APIErrorCode {
- var topicConfigARNS []string
-
- // Navigate through each configs and count the entries.
- for _, config := range configs {
- topicConfigARNS = append(topicConfigARNS, config.TopicARN)
- }
-
- // Check if there are any duplicate counts.
- if err := checkDuplicates(topicConfigARNS); err != nil {
- errorIf(err, "Invalid topic configs found.")
- return ErrOverlappingConfigs
- }
-
- // Success.
- return ErrNone
-}
-
// Validates all the bucket notification configuration for their validity,
// if one of the config is malformed or has invalid data it is rejected.
// Configuration is never applied partially.
@@ -292,10 +273,6 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone {
return s3Error
}
- // Validate all topic configs.
- if s3Error := validateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone {
- return s3Error
- }
// Check for duplicate queue configs.
if len(nConfig.QueueConfigs) > 1 {
@@ -304,13 +281,6 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
}
}
- // Check for duplicate topic configs.
- if len(nConfig.TopicConfigs) > 1 {
- if s3Error := checkDuplicateTopicConfigs(nConfig.TopicConfigs); s3Error != ErrNone {
- return s3Error
- }
- }
-
// Add validation for other configurations.
return ErrNone
}
diff --git a/cmd/bucket-notification-utils_test.go b/cmd/bucket-notification-utils_test.go
index 0d313e086..c32ee8377 100644
--- a/cmd/bucket-notification-utils_test.go
+++ b/cmd/bucket-notification-utils_test.go
@@ -57,42 +57,6 @@ func TestCheckDuplicateConfigs(t *testing.T) {
t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.expectedErrCode, errCode)
}
}
-
- // Test cases for SNS topic config.
- topicTestCases := []struct {
- tConfigs []topicConfig
- expectedErrCode APIErrorCode
- }{
- // Error out for duplicate configs.
- {
- tConfigs: []topicConfig{
- {
- TopicARN: "arn:minio:sns:us-east-1:1:listen",
- },
- {
- TopicARN: "arn:minio:sns:us-east-1:1:listen",
- },
- },
- expectedErrCode: ErrOverlappingConfigs,
- },
- // Valid config.
- {
- tConfigs: []topicConfig{
- {
- TopicARN: "arn:minio:sns:us-east-1:1:listen",
- },
- },
- expectedErrCode: ErrNone,
- },
- }
-
- // ... validate for duplicate topic configs.
- for i, testCase := range topicTestCases {
- errCode := checkDuplicateTopicConfigs(testCase.tConfigs)
- if errCode != testCase.expectedErrCode {
- t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.expectedErrCode, errCode)
- }
- }
}
// Tests for validating filter rules.
diff --git a/cmd/control-router.go b/cmd/control-router.go
index 8514b1c7d..8c50b5a51 100644
--- a/cmd/control-router.go
+++ b/cmd/control-router.go
@@ -31,24 +31,6 @@ const (
controlPath = "/control"
)
-// Find local node through the command line arguments.
-func getLocalAddress(srvCmdConfig serverCmdConfig) string {
- if !srvCmdConfig.isDistXL {
- return fmt.Sprintf(":%d", globalMinioPort)
- }
- for _, export := range srvCmdConfig.disks {
- // Validates if remote disk is local.
- if isLocalStorage(export) {
- var host string
- if idx := strings.LastIndex(export, ":"); idx != -1 {
- host = export[:idx]
- }
- return fmt.Sprintf("%s:%d", host, globalMinioPort)
- }
- }
- return ""
-}
-
// Initializes remote control clients for making remote requests.
func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
if !srvCmdConfig.isDistXL {
diff --git a/cmd/control-router_test.go b/cmd/control-router_test.go
index 4e3d4c293..8520acdc4 100644
--- a/cmd/control-router_test.go
+++ b/cmd/control-router_test.go
@@ -16,70 +16,7 @@
package cmd
-import (
- "runtime"
- "testing"
-)
-
-// Tests fetch local address.
-func TestLocalAddress(t *testing.T) {
- if runtime.GOOS == "windows" {
- return
- }
- testCases := []struct {
- srvCmdConfig serverCmdConfig
- localAddr string
- }{
- // Test 1 - local address is found.
- {
- srvCmdConfig: serverCmdConfig{
- isDistXL: true,
- disks: []string{
- "localhost:/mnt/disk1",
- "1.1.1.2:/mnt/disk2",
- "1.1.2.1:/mnt/disk3",
- "1.1.2.2:/mnt/disk4",
- },
- },
- localAddr: "localhost:9000",
- },
- // Test 2 - local address is everything.
- {
- srvCmdConfig: serverCmdConfig{
- isDistXL: false,
- disks: []string{
- "/mnt/disk1",
- "/mnt/disk2",
- "/mnt/disk3",
- "/mnt/disk4",
- },
- },
- localAddr: ":9000",
- },
- // Test 3 - local address is not found.
- {
- srvCmdConfig: serverCmdConfig{
- isDistXL: true,
- disks: []string{
- "1.1.1.1:/mnt/disk1",
- "1.1.1.2:/mnt/disk2",
- "1.1.2.1:/mnt/disk3",
- "1.1.2.2:/mnt/disk4",
- },
- },
- localAddr: "",
- },
- }
-
- // Validates fetching local address.
- for i, testCase := range testCases {
- localAddr := getLocalAddress(testCase.srvCmdConfig)
- if localAddr != testCase.localAddr {
- t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr)
- }
- }
-
-}
+import "testing"
// Tests initialization of remote controller clients.
func TestInitRemoteControlClients(t *testing.T) {
diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go
index c8b07889c..30249dbf6 100644
--- a/cmd/event-notifier.go
+++ b/cmd/event-notifier.go
@@ -18,6 +18,7 @@ package cmd
import (
"bytes"
+ "encoding/json"
"encoding/xml"
"fmt"
"net"
@@ -29,14 +30,51 @@ import (
"github.com/Sirupsen/logrus"
)
-// Global event notification queue. This is the queue that would be used to send all notifications.
-type eventNotifier struct {
- rwMutex *sync.RWMutex
-
- // Collection of 'bucket' and notification config.
+type externalNotifier struct {
+ // Per-bucket notification config. This is updated via
+ // PutBucketNotification API.
notificationConfigs map[string]*notificationConfig
- snsTargets map[string][]chan []NotificationEvent
- queueTargets map[string]*logrus.Logger
+
+ // An external target keeps a connection to an external
+ // service to which events are to be sent. It is a mapping
+ // from an ARN to a log object
+ targets map[string]*logrus.Logger
+
+ rwMutex *sync.RWMutex
+}
+
+type internalNotifier struct {
+ // per-bucket listener configuration. This is updated
+ // when listeners connect or disconnect.
+ listenerConfigs map[string][]listenerConfig
+
+ // An internal target is a peer Minio server, that is
+ // connected to a listening client. Here, targets is a map of
+ // listener ARN to log object.
+ targets map[string]*listenerLogger
+
+ // Connected listeners is a map of listener ARNs to channels
+ // on which the ListenBucket API handler go routine is waiting
+ // for events to send to a client.
+ connectedListeners map[string]chan []NotificationEvent
+
+ rwMutex *sync.RWMutex
+}
+
+// Global event notification configuration. This structure has state
+// about configured external notifications, and run-time configuration
+// for listener notifications.
+type eventNotifier struct {
+
+ // `external` here refers to notification configuration to
+ // send events to supported external systems
+ external externalNotifier
+
+ // `internal` refers to notification configuration for live
+ // listening clients. Events for a client are send from all
+ // servers, internally to a particular server that is
+ // connected to the client.
+ internal internalNotifier
}
// Represents data to be sent with notification event.
@@ -54,7 +92,8 @@ func newNotificationEvent(event eventData) NotificationEvent {
region := serverConfig.GetRegion()
tnow := time.Now().UTC()
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
- // Following blocks fills in all the necessary details of s3 event message structure.
+ // Following blocks fills in all the necessary details of s3
+ // event message structure.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
nEvent := NotificationEvent{
EventVersion: "2.0",
@@ -96,85 +135,147 @@ func newNotificationEvent(event eventData) NotificationEvent {
return nEvent
}
-// Fetch the saved queue target.
-func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
- return en.queueTargets[queueARN]
+// Fetch the external target. No locking needed here since this map is
+// never written after initial startup.
+func (en eventNotifier) GetExternalTarget(queueARN string) *logrus.Logger {
+ return en.external.targets[queueARN]
}
-func (en eventNotifier) GetSNSTarget(snsARN string) []chan []NotificationEvent {
- en.rwMutex.RLock()
- defer en.rwMutex.RUnlock()
- return en.snsTargets[snsARN]
+func (en eventNotifier) GetInternalTarget(arn string) *listenerLogger {
+ en.internal.rwMutex.RLock()
+ defer en.internal.rwMutex.RUnlock()
+ return en.internal.targets[arn]
}
// Set a new sns target for an input sns ARN.
-func (en *eventNotifier) SetSNSTarget(snsARN string, listenerCh chan []NotificationEvent) error {
- en.rwMutex.Lock()
- defer en.rwMutex.Unlock()
+func (en *eventNotifier) AddListenerChan(snsARN string, listenerCh chan []NotificationEvent) error {
if listenerCh == nil {
return errInvalidArgument
}
- en.snsTargets[snsARN] = append(en.snsTargets[snsARN], listenerCh)
+ en.internal.rwMutex.Lock()
+ defer en.internal.rwMutex.Unlock()
+ en.internal.connectedListeners[snsARN] = listenerCh
return nil
}
// Remove sns target for an input sns ARN.
-func (en *eventNotifier) RemoveSNSTarget(snsARN string, listenerCh chan []NotificationEvent) {
- en.rwMutex.Lock()
- defer en.rwMutex.Unlock()
- snsTarget, ok := en.snsTargets[snsARN]
+func (en *eventNotifier) RemoveListenerChan(snsARN string) {
+ en.internal.rwMutex.Lock()
+ defer en.internal.rwMutex.Unlock()
+ if en.internal.connectedListeners != nil {
+ delete(en.internal.connectedListeners, snsARN)
+ }
+}
+
+func (en *eventNotifier) SendListenerEvent(arn string, event []NotificationEvent) error {
+ en.internal.rwMutex.Lock()
+ defer en.internal.rwMutex.Unlock()
+
+ ch, ok := en.internal.connectedListeners[arn]
if ok {
- for i, savedListenerCh := range snsTarget {
- if listenerCh == savedListenerCh {
- snsTarget = append(snsTarget[:i], snsTarget[i+1:]...)
- if len(snsTarget) == 0 {
- delete(en.snsTargets, snsARN)
- break
- }
- en.snsTargets[snsARN] = snsTarget
+ ch <- event
+ }
+ // If the channel is not present we ignore the event.
+ return nil
+}
+
+// Fetch bucket notification config for an input bucket.
+func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
+ en.external.rwMutex.RLock()
+ defer en.external.rwMutex.RUnlock()
+ return en.external.notificationConfigs[bucket]
+}
+
+func (en *eventNotifier) SetBucketNotificationConfig(bucket string, ncfg *notificationConfig) {
+ en.external.rwMutex.Lock()
+ if ncfg == nil {
+ delete(en.external.notificationConfigs, bucket)
+ } else {
+ en.external.notificationConfigs[bucket] = ncfg
+ }
+ en.external.rwMutex.Unlock()
+}
+
+func (en *eventNotifier) GetBucketListenerConfig(bucket string) []listenerConfig {
+ en.internal.rwMutex.RLock()
+ defer en.internal.rwMutex.RUnlock()
+ return en.internal.listenerConfigs[bucket]
+}
+
+func (en *eventNotifier) SetBucketListenerConfig(bucket string, lcfg []listenerConfig) error {
+ en.internal.rwMutex.Lock()
+ defer en.internal.rwMutex.Unlock()
+ if lcfg == nil {
+ delete(en.internal.listenerConfigs, bucket)
+ } else {
+ en.internal.listenerConfigs[bucket] = lcfg
+ }
+ // close all existing loggers and initialize again.
+ for _, v := range en.internal.targets {
+ v.lconn.Close()
+ }
+ en.internal.targets = make(map[string]*listenerLogger)
+ for _, lc := range lcfg {
+ logger, err := newListenerLogger(lc.TopicConfig.TopicARN,
+ lc.TargetServer)
+ if err != nil {
+ return err
+ }
+ en.internal.targets[lc.TopicConfig.TopicARN] = logger
+ }
+ return nil
+}
+
+func eventNotifyForBucketNotifications(eventType, objectName, bucketName string,
+ nEvent []NotificationEvent) {
+ nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
+ if nConfig == nil {
+ return
+ }
+ // Validate if the event and object match the queue configs.
+ for _, qConfig := range nConfig.QueueConfigs {
+ eventMatch := eventMatch(eventType, qConfig.Events)
+ ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
+ if eventMatch && ruleMatch {
+ targetLog := globalEventNotifier.GetExternalTarget(qConfig.QueueARN)
+ if targetLog != nil {
+ targetLog.WithFields(logrus.Fields{
+ "Key": path.Join(bucketName, objectName),
+ "EventType": eventType,
+ "Records": nEvent,
+ }).Info()
}
}
}
}
-// Fetch bucket notification config for an input bucket.
-func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
- en.rwMutex.RLock()
- defer en.rwMutex.RUnlock()
- return en.notificationConfigs[bucket]
-}
-
-// Set a new notification config for a bucket, this operation will overwrite any previous
-// notification configs for the bucket.
-func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error {
- en.rwMutex.Lock()
- defer en.rwMutex.Unlock()
- if notificationCfg == nil {
- return errInvalidArgument
+func eventNotifyForBucketListeners(eventType, objectName, bucketName string,
+ nEvent []NotificationEvent) {
+ lCfgs := globalEventNotifier.GetBucketListenerConfig(bucketName)
+ if lCfgs == nil {
+ return
}
- en.notificationConfigs[bucket] = notificationCfg
- return nil
-}
-
-func (en *eventNotifier) AddTopicConfig(bucket string, topicCfg *topicConfig) error {
- en.rwMutex.Lock()
- defer en.rwMutex.Unlock()
- if topicCfg == nil {
- return errInvalidArgument
- }
- notificationCfg := en.notificationConfigs[bucket]
- if notificationCfg == nil {
- en.notificationConfigs[bucket] = ¬ificationConfig{
- TopicConfigs: []topicConfig{*topicCfg},
+ // Validate if the event and object match listener configs
+ for _, lcfg := range lCfgs {
+ ruleMatch := filterRuleMatch(objectName, lcfg.TopicConfig.Filter.Key.FilterRules)
+ eventMatch := eventMatch(eventType, lcfg.TopicConfig.Events)
+ if eventMatch && ruleMatch {
+ targetLog := globalEventNotifier.GetInternalTarget(
+ lcfg.TopicConfig.TopicARN)
+ if targetLog != nil && targetLog.log != nil {
+ targetLog.log.WithFields(logrus.Fields{
+ "Key": path.Join(bucketName, objectName),
+ "EventType": eventType,
+ "Records": nEvent,
+ }).Info()
+ }
}
- return nil
}
- notificationCfg.TopicConfigs = append(notificationCfg.TopicConfigs, *topicCfg)
- return nil
+
}
// eventNotify notifies an event to relevant targets based on their
-// bucket notification configs.
+// bucket configuration (notifications and listeners).
func eventNotify(event eventData) {
// Notifies a new event.
// List of events reported through this function are
@@ -184,15 +285,6 @@ func eventNotify(event eventData) {
// - s3:ObjectCreated:CompleteMultipartUpload
// - s3:ObjectRemoved:Delete
- nConfig := globalEventNotifier.GetBucketNotificationConfig(event.Bucket)
- // No bucket notifications enabled, drop the event notification.
- if nConfig == nil {
- return
- }
- if len(nConfig.QueueConfigs) == 0 && len(nConfig.TopicConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
- return
- }
-
// Event type.
eventType := event.Type.String()
@@ -202,43 +294,26 @@ func eventNotify(event eventData) {
// Save the notification event to be sent.
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
- // Validate if the event and object match the queue configs.
- for _, qConfig := range nConfig.QueueConfigs {
- eventMatch := eventMatch(eventType, qConfig.Events)
- ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
- if eventMatch && ruleMatch {
- targetLog := globalEventNotifier.GetQueueTarget(qConfig.QueueARN)
- if targetLog != nil {
- targetLog.WithFields(logrus.Fields{
- "Key": path.Join(event.Bucket, objectName),
- "EventType": eventType,
- "Records": notificationEvent,
- }).Info()
- }
- }
- }
- // Validate if the event and object match the sns configs.
- for _, topicConfig := range nConfig.TopicConfigs {
- ruleMatch := filterRuleMatch(objectName, topicConfig.Filter.Key.FilterRules)
- eventMatch := eventMatch(eventType, topicConfig.Events)
- if eventMatch && ruleMatch {
- targetListeners := globalEventNotifier.GetSNSTarget(topicConfig.TopicARN)
- for _, listener := range targetListeners {
- listener <- notificationEvent
- }
- }
- }
+ // Notify external targets.
+ eventNotifyForBucketNotifications(eventType, objectName, event.Bucket,
+ notificationEvent)
+
+ // Notify internal targets.
+ eventNotifyForBucketListeners(eventType, objectName, event.Bucket,
+ notificationEvent)
}
-// loads notifcation config if any for a given bucket, returns back structured notification config.
+// loads notification config if any for a given bucket, returns
+// structured notification config.
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
// Construct the notification config path.
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
err = errorCause(err)
if err != nil {
- // 'notification.xml' not found return 'errNoSuchNotifications'.
- // This is default when no bucket notifications are found on the bucket.
+ // 'notification.xml' not found return
+ // 'errNoSuchNotifications'. This is default when no
+ // bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
@@ -251,8 +326,9 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
err = errorCause(err)
if err != nil {
- // 'notification.xml' not found return 'errNoSuchNotifications'.
- // This is default when no bucket notifications are found on the bucket.
+ // 'notification.xml' not found return
+ // 'errNoSuchNotifications'. This is default when no
+ // bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
@@ -267,36 +343,144 @@ func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationCon
notificationCfg := ¬ificationConfig{}
if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil {
return nil, err
- } // Successfully marshalled notification configuration.
+ }
// Return success.
return notificationCfg, nil
}
-// loads all bucket notifications if present.
-func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) {
- // List buckets to proceed loading all notification configuration.
- buckets, err := objAPI.ListBuckets()
+// loads notification config if any for a given bucket, returns
+// structured notification config.
+func loadListenerConfig(bucket string, objAPI ObjectLayer) ([]listenerConfig, error) {
+ // Construct the notification config path.
+ listenerConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
+ objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, listenerConfigPath)
+ err = errorCause(err)
if err != nil {
+ // 'listener.json' not found return
+ // 'errNoSuchNotifications'. This is default when no
+ // bucket notifications are found on the bucket.
+ switch err.(type) {
+ case ObjectNotFound:
+ return nil, errNoSuchNotifications
+ }
+ errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
+ // Returns error for other errors.
+ return nil, err
+ }
+ var buffer bytes.Buffer
+ err = objAPI.GetObject(minioMetaBucket, listenerConfigPath, 0, objInfo.Size, &buffer)
+ err = errorCause(err)
+ if err != nil {
+ // 'notification.xml' not found return
+ // 'errNoSuchNotifications'. This is default when no
+ // bucket listners are found on the bucket.
+ switch err.(type) {
+ case ObjectNotFound:
+ return nil, errNoSuchNotifications
+ }
+ errorIf(err, "Unable to load bucket-listeners for bucket %s", bucket)
+ // Returns error for other errors.
return nil, err
}
- configs := make(map[string]*notificationConfig)
+ // Unmarshal notification bytes.
+ var lCfg []listenerConfig
+ lConfigBytes := buffer.Bytes()
+ if err = json.Unmarshal(lConfigBytes, &lCfg); err != nil {
+ errorIf(err, "Unable to unmarshal listener config from JSON.")
+ return nil, err
+ }
+
+ // Return success.
+ return lCfg, nil
+}
+
+func persistNotificationConfig(bucket string, ncfg *notificationConfig, obj ObjectLayer) error {
+ // marshal to xml
+ buf, err := xml.Marshal(ncfg)
+ if err != nil {
+ errorIf(err, "Unable to marshal notification configuration into XML")
+ return err
+ }
+
+ // verify bucket exists
+ // FIXME: There is a race between this check and PutObject
+ if err = isBucketExist(bucket, obj); err != nil {
+ return err
+ }
+
+ // build path
+ ncPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
+ // write object to path
+ _, err = obj.PutObject(minioMetaBucket, ncPath, int64(len(buf)),
+ bytes.NewReader(buf), nil, "")
+ if err != nil {
+ errorIf(err, "Unable to write bucket notification configuration.")
+ return err
+ }
+ return nil
+}
+
+// Persists validated listener config to object layer.
+func persistListenerConfig(bucket string, lcfg []listenerConfig, obj ObjectLayer) error {
+ buf, err := json.Marshal(lcfg)
+ if err != nil {
+ errorIf(err, "Unable to marshal listener config to JSON.")
+ return err
+ }
+
+ // verify bucket exists
+ // FIXME: There is a race between this check and PutObject
+ if err = isBucketExist(bucket, obj); err != nil {
+ return err
+ }
+
+ // build path
+ lcPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
+ // write object to path
+ _, err = obj.PutObject(minioMetaBucket, lcPath, int64(len(buf)),
+ bytes.NewReader(buf), nil, "")
+ if err != nil {
+ errorIf(err, "Unable to write bucket listener configuration to object layer.")
+ }
+ return err
+}
+
+// loads all bucket notifications if present.
+func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, map[string][]listenerConfig, error) {
+ // List buckets to proceed loading all notification configuration.
+ buckets, err := objAPI.ListBuckets()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ nConfigs := make(map[string]*notificationConfig)
+ lConfigs := make(map[string][]listenerConfig)
// Loads all bucket notifications.
for _, bucket := range buckets {
nCfg, nErr := loadNotificationConfig(bucket.Name, objAPI)
if nErr != nil {
- if nErr == errNoSuchNotifications {
- continue
+ if nErr != errNoSuchNotifications {
+ return nil, nil, nErr
}
- return nil, nErr
+ } else {
+ nConfigs[bucket.Name] = nCfg
+ }
+
+ lCfg, lErr := loadListenerConfig(bucket.Name, objAPI)
+ if lErr != nil {
+ if lErr != errNoSuchNotifications {
+ return nil, nil, lErr
+ }
+ } else {
+ lConfigs[bucket.Name] = lCfg
}
- configs[bucket.Name] = nCfg
}
// Success.
- return configs, nil
+ return nConfigs, lConfigs, nil
}
// Loads all queue targets, initializes each queueARNs depending on their config.
@@ -452,8 +636,9 @@ func initEventNotifier(objAPI ObjectLayer) error {
}
// Read all saved bucket notifications.
- configs, err := loadAllBucketNotifications(objAPI)
+ nConfigs, lConfigs, err := loadAllBucketNotifications(objAPI)
if err != nil {
+ errorIf(err, "Error loading bucket notifications - %v", err)
return err
}
@@ -463,12 +648,36 @@ func initEventNotifier(objAPI ObjectLayer) error {
return err
}
- // Inititalize event notifier queue.
+ // Initialize internal listener targets
+ listenTargets := make(map[string]*listenerLogger)
+ for _, listeners := range lConfigs {
+ for _, listener := range listeners {
+ ln, err := newListenerLogger(
+ listener.TopicConfig.TopicARN,
+ listener.TargetServer,
+ )
+ if err != nil {
+ errorIf(err, "Unable to initialize listener target logger.")
+ //TODO: improve error
+ return fmt.Errorf("Error initializing listner target logger - %v", err)
+ }
+ listenTargets[listener.TopicConfig.TopicARN] = ln
+ }
+ }
+
+ // Initialize event notifier queue.
globalEventNotifier = &eventNotifier{
- rwMutex: &sync.RWMutex{},
- notificationConfigs: configs,
- queueTargets: queueTargets,
- snsTargets: make(map[string][]chan []NotificationEvent),
+ external: externalNotifier{
+ notificationConfigs: nConfigs,
+ targets: queueTargets,
+ rwMutex: &sync.RWMutex{},
+ },
+ internal: internalNotifier{
+ rwMutex: &sync.RWMutex{},
+ targets: listenTargets,
+ listenerConfigs: lConfigs,
+ connectedListeners: make(map[string]chan []NotificationEvent),
+ },
}
return nil
diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go
index 31322db9c..7f17d7d6b 100644
--- a/cmd/event-notifier_test.go
+++ b/cmd/event-notifier_test.go
@@ -18,130 +18,13 @@ package cmd
import (
"fmt"
+ "net"
"reflect"
+ "strconv"
"testing"
"time"
)
-// Tests event notify.
-func TestEventNotify(t *testing.T) {
- ExecObjectLayerTest(t, testEventNotify)
-}
-
-func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) {
- bucketName := getRandomBucketName()
-
- // initialize the server and obtain the credentials and root.
- // credentials are necessary to sign the HTTP request.
- rootPath, err := newTestConfig("us-east-1")
- if err != nil {
- t.Fatalf("Init Test config failed")
- }
- // remove the root folder after the test ends.
- defer removeAll(rootPath)
-
- if err := initEventNotifier(obj); err != nil {
- t.Fatal("Unexpected error:", err)
- }
-
- // Notify object created event.
- eventNotify(eventData{
- Type: ObjectCreatedPost,
- Bucket: bucketName,
- ObjInfo: ObjectInfo{
- Bucket: bucketName,
- Name: "object1",
- },
- ReqParams: map[string]string{
- "sourceIPAddress": "localhost:1337",
- },
- })
-
- if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, nil); err != errInvalidArgument {
- t.Errorf("Expected error %s, got %s", errInvalidArgument, err)
- }
-
- if err := globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{}); err != nil {
- t.Errorf("Expected error to be nil, got %s", err)
- }
-
- nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
- if nConfig == nil {
- t.Errorf("Notification expected to be set, but notification not set.")
- }
-
- if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) {
- t.Errorf("Mismatching notification configs.")
- }
-
- // Notify object created event.
- eventNotify(eventData{
- Type: ObjectRemovedDelete,
- Bucket: bucketName,
- ObjInfo: ObjectInfo{
- Bucket: bucketName,
- Name: "object1",
- },
- ReqParams: map[string]string{
- "sourceIPAddress": "localhost:1337",
- },
- })
-}
-
-// Tests various forms of inititalization of event notifier.
-func TestInitEventNotifier(t *testing.T) {
- disks, err := getRandomDisks(1)
- if err != nil {
- t.Fatal("Unable to create directories for FS backend. ", err)
- }
- defer removeRoots(disks)
- fs, _, err := initObjectLayer(disks, nil)
- if err != nil {
- t.Fatal("Unable to initialize FS backend.", err)
- }
- nDisks := 16
- disks, err = getRandomDisks(nDisks)
- if err != nil {
- t.Fatal("Unable to create directories for XL backend. ", err)
- }
- defer removeRoots(disks)
- xl, _, err := initObjectLayer(disks, nil)
- if err != nil {
- t.Fatal("Unable to initialize XL backend.", err)
- }
-
- // Collection of test cases for inititalizing event notifier.
- testCases := []struct {
- objAPI ObjectLayer
- configs map[string]*notificationConfig
- err error
- }{
- // Test 1 - invalid arguments.
- {
- objAPI: nil,
- err: errInvalidArgument,
- },
- // Test 2 - valid FS object layer but no bucket notifications.
- {
- objAPI: fs,
- err: nil,
- },
- // Test 3 - valid XL object layer but no bucket notifications.
- {
- objAPI: xl,
- err: nil,
- },
- }
-
- // Validate if event notifier is properly initialized.
- for i, testCase := range testCases {
- err = initEventNotifier(testCase.objAPI)
- if err != testCase.err {
- t.Errorf("Test %d: Expected %s, but got: %s", i+1, testCase.err, err)
- }
- }
-}
-
// Test InitEventNotifier with faulty disks
func TestInitEventNotifierFaultyDisks(t *testing.T) {
// Prepare for tests
@@ -272,78 +155,231 @@ func TestInitEventNotifierWithRedis(t *testing.T) {
}
}
-// TestListenBucketNotification - test Listen Bucket Notification process
+type TestPeerRPCServerData struct {
+ serverType string
+ testServer TestServer
+}
+
+func (s *TestPeerRPCServerData) Setup(t *testing.T) {
+ s.testServer = StartTestPeersRPCServer(t, s.serverType)
+
+ // setup port and minio addr
+ _, portStr, err := net.SplitHostPort(s.testServer.Server.Listener.Addr().String())
+ if err != nil {
+ t.Fatalf("Initialisation error: %v", err)
+ }
+ globalMinioPort, err = strconv.Atoi(portStr)
+ if err != nil {
+ t.Fatalf("Initialisation error: %v", err)
+ }
+ globalMinioAddr = getLocalAddress(
+ s.testServer.SrvCmdCfg,
+ )
+
+ // initialize the peer client(s)
+ initGlobalS3Peers(s.testServer.Disks)
+}
+
+func (s *TestPeerRPCServerData) TearDown() {
+ s.testServer.Stop()
+ _ = removeAll(s.testServer.Root)
+ for _, d := range s.testServer.Disks {
+ _ = removeAll(d)
+ }
+}
+
+func TestSetNGetBucketNotification(t *testing.T) {
+ s := TestPeerRPCServerData{serverType: "XL"}
+
+ // setup and teardown
+ s.Setup(t)
+ defer s.TearDown()
+
+ bucketName := getRandomBucketName()
+
+ obj := s.testServer.Obj
+ if err := initEventNotifier(obj); err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{})
+ nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
+ if nConfig == nil {
+ t.Errorf("Notification expected to be set, but notification not set.")
+ }
+
+ if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) {
+ t.Errorf("Mismatching notification configs.")
+ }
+}
+
+func TestInitEventNotifier(t *testing.T) {
+ s := TestPeerRPCServerData{serverType: "XL"}
+
+ // setup and teardown
+ s.Setup(t)
+ defer s.TearDown()
+
+ // test if empty object layer arg. returns expected error.
+ if err := initEventNotifier(nil); err == nil || err != errInvalidArgument {
+ t.Fatalf("initEventNotifier returned unexpected error value - %v", err)
+ }
+
+ obj := s.testServer.Obj
+ bucketName := getRandomBucketName()
+ // declare sample configs
+ filterRules := []filterRule{
+ {
+ Name: "prefix",
+ Value: "minio",
+ },
+ {
+ Name: "suffix",
+ Value: "*.jpg",
+ },
+ }
+ sampleSvcCfg := ServiceConfig{
+ []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
+ filterStruct{
+ keyFilter{filterRules},
+ },
+ "1",
+ }
+ sampleNotifCfg := notificationConfig{
+ QueueConfigs: []queueConfig{
+ {
+ ServiceConfig: sampleSvcCfg,
+ QueueARN: "testqARN",
+ },
+ },
+ }
+ sampleListenCfg := []listenerConfig{
+ {
+ TopicConfig: topicConfig{ServiceConfig: sampleSvcCfg,
+ TopicARN: "testlARN"},
+ TargetServer: globalMinioAddr,
+ },
+ }
+
+ // write without an existing bucket and check
+ if err := persistNotificationConfig(bucketName, ¬ificationConfig{}, obj); err == nil {
+ t.Fatalf("Did not get an error though bucket does not exist!")
+ }
+ // no bucket write check for listener
+ if err := persistListenerConfig(bucketName, []listenerConfig{}, obj); err == nil {
+ t.Fatalf("Did not get an error though bucket does not exist!")
+ }
+
+ // create bucket
+ if err := obj.MakeBucket(bucketName); err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ // bucket is created, now writing should not give errors.
+ if err := persistNotificationConfig(bucketName, &sampleNotifCfg, obj); err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ if err := persistListenerConfig(bucketName, sampleListenCfg, obj); err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ // test event notifier init
+ if err := initEventNotifier(obj); err != nil {
+ t.Fatal("Unexpected error:", err)
+ }
+
+ // fetch bucket configs and verify
+ ncfg := globalEventNotifier.GetBucketNotificationConfig(bucketName)
+ if ncfg == nil {
+ t.Error("Bucket notification was not present for ", bucketName)
+ }
+ if len(ncfg.QueueConfigs) != 1 || ncfg.QueueConfigs[0].QueueARN != "testqARN" {
+ t.Error("Unexpected bucket notification found - ", *ncfg)
+ }
+ if globalEventNotifier.GetExternalTarget("testqARN") != nil {
+ t.Error("A logger was not expected to be found as it was not enabled in the config.")
+ }
+
+ lcfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
+ if lcfg == nil {
+ t.Error("Bucket listener was not present for ", bucketName)
+ }
+ if len(lcfg) != 1 || lcfg[0].TargetServer != globalMinioAddr || lcfg[0].TopicConfig.TopicARN != "testlARN" {
+ t.Error("Unexpected listener config found - ", lcfg[0])
+ }
+ if globalEventNotifier.GetInternalTarget("testlARN") == nil {
+ t.Error("A listen logger was not found.")
+ }
+}
+
func TestListenBucketNotification(t *testing.T) {
+ s := TestPeerRPCServerData{serverType: "XL"}
+
+ // setup and teardown
+ s.Setup(t)
+ defer s.TearDown()
+
+ // test initialisation
+ obj := s.testServer.Obj
bucketName := "bucket"
objectName := "object"
- // Prepare for tests
- // Create fs backend
- rootPath, err := newTestConfig("us-east-1")
- if err != nil {
- t.Fatalf("Init Test config failed")
- }
- // remove the root folder after the test ends.
- defer removeAll(rootPath)
-
- disk, err := getRandomDisks(1)
- defer removeAll(disk[0])
- if err != nil {
- t.Fatal("Unable to create directories for FS backend. ", err)
- }
- obj, _, err := initObjectLayer(disk, nil)
- if err != nil {
- t.Fatal("Unable to initialize FS backend.", err)
- }
-
// Create the bucket to listen on
if err := obj.MakeBucket(bucketName); err != nil {
t.Fatal("Unexpected error:", err)
}
- listenARN := "arn:minio:sns:us-east-1:1:listen"
- queueARN := "arn:minio:sqs:us-east-1:1:redis"
+ listenARN := "arn:minio:sns:us-east-1:1:listen-" + globalMinioAddr
+ lcfg := listenerConfig{
+ topicConfig{
+ ServiceConfig{
+ []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
+ filterStruct{},
+ "0",
+ },
+ listenARN,
+ },
+ globalMinioAddr,
+ }
- fs := obj.(fsObjects)
- storage := fs.storage.(*posix)
-
- // Create and store notification.xml with listen and queue notification configured
- notificationXML := ""
- notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + ""
- notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + ""
- notificationXML += ""
- if err := storage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil {
- t.Fatal("Unexpected error:", err)
+ // write listener config to storage layer
+ lcfgs := []listenerConfig{lcfg}
+ if err := persistListenerConfig(bucketName, lcfgs, obj); err != nil {
+ t.Fatalf("Test Setup error: %v", err)
}
// Init event notifier
- if err := initEventNotifier(fs); err != nil {
+ if err := initEventNotifier(obj); err != nil {
t.Fatal("Unexpected error:", err)
}
// Check if the config is loaded
- notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucketName)
- if notificationCfg == nil {
- t.Fatal("Cannot load bucket notification config")
+ listenerCfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
+ if listenerCfg == nil {
+ t.Fatal("Cannot load bucket listener config")
}
- if len(notificationCfg.TopicConfigs) != 1 || len(notificationCfg.QueueConfigs) != 1 {
- t.Fatal("Notification config is not correctly loaded. Exactly one topic and one queue config are expected")
+ if len(listenerCfg) != 1 {
+ t.Fatal("Listener config is not correctly loaded. Exactly one listener config is expected")
}
- // Check if topic ARN is enabled
- if notificationCfg.TopicConfigs[0].TopicARN != listenARN {
- t.Fatal("SNS listen is not configured.")
+ // Check if topic ARN is correct
+ if listenerCfg[0].TopicConfig.TopicARN != listenARN {
+ t.Fatal("Configured topic ARN is incorrect.")
}
// Create a new notification event channel.
nEventCh := make(chan []NotificationEvent)
// Close the listener channel.
defer close(nEventCh)
- // Set sns target.
- globalEventNotifier.SetSNSTarget(listenARN, nEventCh)
- // Remove sns listener after the writer has closed or the client disconnected.
- defer globalEventNotifier.RemoveSNSTarget(listenARN, nEventCh)
+ // Add events channel for listener.
+ if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil {
+ t.Fatalf("Test Setup error: %v", err)
+ }
+ // Remove listen channel after the writer has closed or the
+ // client disconnected.
+ defer globalEventNotifier.RemoveListenerChan(listenARN)
// Fire an event notification
go eventNotify(eventData{
@@ -370,73 +406,90 @@ func TestListenBucketNotification(t *testing.T) {
t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName)
}
break
- case <-time.After(30 * time.Second):
+ case <-time.After(3 * time.Second):
break
}
+
}
-func testAddTopicConfig(obj ObjectLayer, instanceType string, t TestErrHandler) {
- root, cErr := newTestConfig("us-east-1")
- if cErr != nil {
- t.Fatalf("[%s] Failed to initialize test config: %v", instanceType, cErr)
- }
- defer removeAll(root)
+func TestAddRemoveBucketListenerConfig(t *testing.T) {
+ s := TestPeerRPCServerData{serverType: "XL"}
+ // setup and teardown
+ s.Setup(t)
+ defer s.TearDown()
+
+ // test code
+ obj := s.testServer.Obj
if err := initEventNotifier(obj); err != nil {
- t.Fatalf("[%s] : Failed to initialize event notifier: %v", instanceType, err)
+ t.Fatalf("Failed to initialize event notifier: %v", err)
}
// Make a bucket to store topicConfigs.
randBucket := getRandomBucketName()
if err := obj.MakeBucket(randBucket); err != nil {
- t.Fatalf("[%s] : Failed to make bucket %s", instanceType, randBucket)
+ t.Fatalf("Failed to make bucket %s", randBucket)
}
// Add a topicConfig to an empty notificationConfig.
accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano())
- accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen"
- var filterRules []filterRule
- filterRules = append(filterRules, filterRule{
- Name: "prefix",
- Value: "minio",
- })
- filterRules = append(filterRules, filterRule{
- Name: "suffix",
- Value: "*.jpg",
- })
+ accountARN := fmt.Sprintf(
+ "arn:minio:sqs:%s:%s:listen-%s",
+ serverConfig.GetRegion(),
+ accountID,
+ globalMinioAddr,
+ )
- // Make topic configuration corresponding to this ListenBucketNotification request.
- sampleTopicCfg := &topicConfig{
- TopicARN: accountARN,
- serviceConfig: serviceConfig{
- Filter: struct {
- Key keyFilter `xml:"S3Key,omitempty"`
- }{
- Key: keyFilter{
- FilterRules: filterRules,
- },
- },
- ID: "sns-" + accountID,
+ // Make topic configuration
+ filterRules := []filterRule{
+ {
+ Name: "prefix",
+ Value: "minio",
+ },
+ {
+ Name: "suffix",
+ Value: "*.jpg",
},
}
+ sampleTopicCfg := topicConfig{
+ TopicARN: accountARN,
+ ServiceConfig: ServiceConfig{
+ []string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
+ filterStruct{
+ keyFilter{filterRules},
+ },
+ "sns-" + accountID,
+ },
+ }
+ sampleListenerCfg := &listenerConfig{
+ TopicConfig: sampleTopicCfg,
+ TargetServer: globalMinioAddr,
+ }
testCases := []struct {
- topicCfg *topicConfig
+ lCfg *listenerConfig
expectedErr error
}{
- {sampleTopicCfg, nil},
+ {sampleListenerCfg, nil},
{nil, errInvalidArgument},
- {sampleTopicCfg, nil},
}
for i, test := range testCases {
- err := globalEventNotifier.AddTopicConfig(randBucket, test.topicCfg)
+ err := AddBucketListenerConfig(randBucket, test.lCfg, obj)
if err != test.expectedErr {
- t.Errorf("Test %d: %s failed with error %v, expected to fail with %v",
- i+1, instanceType, err, test.expectedErr)
+ t.Errorf(
+ "Test %d: Failed with error %v, expected to fail with %v",
+ i+1, err, test.expectedErr,
+ )
}
}
-}
-func TestAddTopicConfig(t *testing.T) {
- ExecObjectLayerTest(t, testAddTopicConfig)
+ // test remove listener actually removes a listener
+ RemoveBucketListenerConfig(randBucket, sampleListenerCfg, obj)
+ // since it does not return errors we fetch the config and
+ // check
+ lcSlice := globalEventNotifier.GetBucketListenerConfig(randBucket)
+ if len(lcSlice) != 0 {
+ t.Errorf("Remove Listener Config Test: did not remove listener config - %v",
+ lcSlice)
+ }
}
diff --git a/cmd/globals.go b/cmd/globals.go
index 0ac2a299e..5668e4ba5 100644
--- a/cmd/globals.go
+++ b/cmd/globals.go
@@ -52,8 +52,13 @@ var (
globalMaxCacheSize = uint64(maxCacheSize)
// Cache expiry.
globalCacheExpiry = objcache.DefaultExpiry
+ // Minio local server address (in `host:port` format)
+ globalMinioAddr = ""
// Minio default port, can be changed through command line.
globalMinioPort = 9000
+ // Peer communication struct
+ globalS3Peers = s3Peers{}
+
// Add new variable global values here.
)
diff --git a/cmd/logger.go b/cmd/logger.go
index f1418b3d5..ddc91d08b 100644
--- a/cmd/logger.go
+++ b/cmd/logger.go
@@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"fmt"
+ "path"
"path/filepath"
"runtime"
"runtime/debug"
@@ -50,10 +51,10 @@ type logger struct {
func funcFromPC(pc uintptr, file string, line int, shortFile bool) string {
var fn, name string
if shortFile {
- fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/", "", -1)
+ fn = strings.Replace(file, path.Join(filepath.ToSlash(GOPATH)+"/src/github.com/minio/minio/cmd/")+"/", "", -1)
name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1)
} else {
- fn = strings.Replace(file, filepath.ToSlash(GOPATH)+"/src/", "", -1)
+ fn = strings.Replace(file, path.Join(filepath.ToSlash(GOPATH)+"/src/")+"/", "", -1)
name = strings.Replace(runtime.FuncForPC(pc).Name(), "github.com/minio/minio/cmd.", "", -1)
}
return fmt.Sprintf("%s [%s:%d]", name, fn, line)
diff --git a/cmd/notify-listener.go b/cmd/notify-listener.go
new file mode 100644
index 000000000..e94d16aa1
--- /dev/null
+++ b/cmd/notify-listener.go
@@ -0,0 +1,82 @@
+/*
+ * Minio Cloud Storage, (C) 2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "fmt"
+ "io/ioutil"
+
+ "github.com/Sirupsen/logrus"
+)
+
+type listenerConn struct {
+ Client *AuthRPCClient
+ ListenerARN string
+}
+
+type listenerLogger struct {
+ log *logrus.Logger
+ lconn listenerConn
+}
+
+func newListenerLogger(listenerArn, targetAddr string) (*listenerLogger, error) {
+ client := globalS3Peers.GetPeerClient(targetAddr)
+ if client == nil {
+ return nil, fmt.Errorf("Peer %s was not initialized - bug!",
+ targetAddr)
+ }
+ lc := listenerConn{
+ Client: client,
+ ListenerARN: listenerArn,
+ }
+
+ lcLog := logrus.New()
+
+ lcLog.Out = ioutil.Discard
+
+ lcLog.Formatter = new(logrus.JSONFormatter)
+
+ lcLog.Hooks.Add(lc)
+
+ return &listenerLogger{lcLog, lc}, nil
+}
+
+func (lc listenerConn) Close() {
+ // ignore closing errors
+ _ = lc.Client.Close()
+}
+
+// send event to target server via rpc client calls.
+func (lc listenerConn) Fire(entry *logrus.Entry) error {
+ notificationEvent, ok := entry.Data["Records"].([]NotificationEvent)
+ if !ok {
+ // If the record is not of the expected type, silently
+ // discard.
+ return nil
+ }
+
+ evArgs := EventArgs{Event: notificationEvent, Arn: lc.ListenerARN}
+ reply := GenericReply{}
+ err := lc.Client.Call("S3.Event", &evArgs, &reply)
+ return err
+}
+
+func (lc listenerConn) Levels() []logrus.Level {
+ return []logrus.Level{
+ logrus.InfoLevel,
+ }
+}
diff --git a/cmd/routers.go b/cmd/routers.go
index 33c0be5ce..bf4ea1fad 100644
--- a/cmd/routers.go
+++ b/cmd/routers.go
@@ -80,6 +80,9 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
registerDistNSLockRouter(mux, srvCmdConfig)
}
+ // Register S3 peer communication router.
+ registerS3PeerRPCRouter(mux)
+
// Register controller rpc router.
registerControlRPCRouter(mux, srvCmdConfig)
diff --git a/cmd/s3-peer-client.go b/cmd/s3-peer-client.go
new file mode 100644
index 000000000..458ced6f1
--- /dev/null
+++ b/cmd/s3-peer-client.go
@@ -0,0 +1,176 @@
+/*
+ * Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "fmt"
+ "path"
+ "time"
+
+ "github.com/minio/minio-go/pkg/set"
+)
+
+type s3Peers struct {
+ // A map of peer server address (in `host:port` format) to RPC
+ // client connections
+ rpcClient map[string]*AuthRPCClient
+
+ // slice of all peer addresses (in `host:port` format)
+ peers []string
+}
+
+func initGlobalS3Peers(disks []string) {
+ // get list of de-duplicated peers
+ peers := getAllPeers(disks)
+ globalS3Peers = s3Peers{make(map[string]*AuthRPCClient), nil}
+ for _, peer := range peers {
+ globalS3Peers.InitS3PeerClient(peer)
+ }
+
+ // Additionally setup a local peer if one does not exist
+ if globalS3Peers.GetPeerClient(globalMinioAddr) == nil {
+ globalS3Peers.InitS3PeerClient(globalMinioAddr)
+ peers = append(peers, globalMinioAddr)
+ }
+
+ globalS3Peers.peers = peers
+}
+
+func (s3p *s3Peers) GetPeers() []string {
+ return s3p.peers
+}
+
+func (s3p *s3Peers) GetPeerClient(peer string) *AuthRPCClient {
+ return s3p.rpcClient[peer]
+}
+
+// Initializes a new RPC connection (or closes and re-opens if it
+// already exists) to a peer. Note that peer address is in `host:port`
+// format.
+func (s3p *s3Peers) InitS3PeerClient(peer string) {
+ if s3p.rpcClient[peer] != nil {
+ s3p.rpcClient[peer].Close()
+ delete(s3p.rpcClient, peer)
+ }
+ authCfg := &authConfig{
+ accessKey: serverConfig.GetCredential().AccessKeyID,
+ secretKey: serverConfig.GetCredential().SecretAccessKey,
+ address: peer,
+ path: path.Join(reservedBucket, s3Path),
+ loginMethod: "S3.LoginHandler",
+ }
+ s3p.rpcClient[peer] = newAuthClient(authCfg)
+}
+
+func (s3p *s3Peers) Close() error {
+ for _, v := range s3p.rpcClient {
+ if err := v.Close(); err != nil {
+ return err
+ }
+ }
+ s3p.rpcClient = nil
+ s3p.peers = nil
+ return nil
+}
+
+// returns the network addresses of all Minio servers in the cluster
+// in `host:port` format.
+func getAllPeers(disks []string) []string {
+ res := []string{}
+ // use set to de-duplicate
+ sset := set.NewStringSet()
+ for _, disk := range disks {
+ netAddr, _, err := splitNetPath(disk)
+ if err != nil || netAddr == "" {
+ errorIf(err, "Unexpected error - most likely a bug.")
+ continue
+ }
+ if !sset.Contains(netAddr) {
+ res = append(
+ res,
+ fmt.Sprintf("%s:%d", netAddr, globalMinioPort),
+ )
+ sset.Add(netAddr)
+ }
+ }
+ return res
+}
+
+// Make RPC calls with the given method and arguments to all the given
+// peers (in parallel), and collects the results. Since the methods
+// intended for use here, have only a success or failure response, we
+// do not return/inspect the `reply` parameter in the RPC call. The
+// function attempts to connect to a peer only once, and returns a map
+// of peer address to error response. If the error is nil, it means
+// the RPC succeeded.
+func (s3p *s3Peers) SendRPC(peers []string, method string, args interface {
+ SetToken(token string)
+ SetTimestamp(tstamp time.Time)
+}) map[string]error {
+ // result type
+ type callResult struct {
+ target string
+ err error
+ }
+ // channel to collect results from goroutines
+ resChan := make(chan callResult)
+ // closure to make a single request.
+ callTarget := func(target string) {
+ reply := &GenericReply{}
+ err := s3p.rpcClient[target].Call(method, args, reply)
+ resChan <- callResult{target, err}
+ }
+ // map of errors
+ errsMap := make(map[string]error)
+ // make network calls in parallel
+ for _, target := range peers {
+ go callTarget(target)
+ }
+ // wait on channel and collect all results
+ for range peers {
+ res := <-resChan
+ if res.err != nil {
+ errsMap[res.target] = res.err
+ }
+ }
+ // return errors map
+ return errsMap
+}
+
+// S3PeersUpdateBucketNotification - Sends Update Bucket notification
+// request to all peers. Currently we log an error and continue.
+func S3PeersUpdateBucketNotification(bucket string, ncfg *notificationConfig) {
+ setBNPArgs := &SetBNPArgs{Bucket: bucket, NCfg: ncfg}
+ peers := globalS3Peers.GetPeers()
+ errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketNotificationPeer",
+ setBNPArgs)
+ for peer, err := range errsMap {
+ errorIf(err, "Error sending peer update bucket notification to %s - %v", peer, err)
+ }
+}
+
+// S3PeersUpdateBucketListener - Sends Update Bucket listeners request
+// to all peers. Currently we log an error and continue.
+func S3PeersUpdateBucketListener(bucket string, lcfg []listenerConfig) {
+ setBLPArgs := &SetBLPArgs{Bucket: bucket, LCfg: lcfg}
+ peers := globalS3Peers.GetPeers()
+ errsMap := globalS3Peers.SendRPC(peers, "S3.SetBucketListenerPeer",
+ setBLPArgs)
+ for peer, err := range errsMap {
+ errorIf(err, "Error sending peer update bucket listener to %s - %v", peer, err)
+ }
+}
diff --git a/cmd/s3-peer-router.go b/cmd/s3-peer-router.go
new file mode 100644
index 000000000..7e72ce306
--- /dev/null
+++ b/cmd/s3-peer-router.go
@@ -0,0 +1,43 @@
+/*
+ * Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "net/rpc"
+
+ router "github.com/gorilla/mux"
+)
+
+const (
+ s3Path = "/s3/remote"
+)
+
+type s3PeerAPIHandlers struct {
+ ObjectAPI func() ObjectLayer
+}
+
+func registerS3PeerRPCRouter(mux *router.Router) {
+ s3PeerHandlers := &s3PeerAPIHandlers{
+ ObjectAPI: newObjectLayerFn,
+ }
+
+ s3PeerRPCServer := rpc.NewServer()
+ s3PeerRPCServer.RegisterName("S3", s3PeerHandlers)
+
+ s3PeerRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter()
+ s3PeerRouter.Path(s3Path).Handler(s3PeerRPCServer)
+}
diff --git a/cmd/s3-peer-rpc-handlers.go b/cmd/s3-peer-rpc-handlers.go
new file mode 100644
index 000000000..8e6c06366
--- /dev/null
+++ b/cmd/s3-peer-rpc-handlers.go
@@ -0,0 +1,123 @@
+/*
+ * Minio Cloud Storage, (C) 2014-2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import "time"
+
+func (s3 *s3PeerAPIHandlers) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
+ jwt, err := newJWT(defaultInterNodeJWTExpiry)
+ if err != nil {
+ return err
+ }
+ if err = jwt.Authenticate(args.Username, args.Password); err != nil {
+ return err
+ }
+ token, err := jwt.GenerateToken(args.Username)
+ if err != nil {
+ return err
+ }
+ reply.Token = token
+ reply.ServerVersion = Version
+ reply.Timestamp = time.Now().UTC()
+ return nil
+}
+
+// SetBNPArgs - Arguments collection to SetBucketNotificationPeer RPC
+// call
+type SetBNPArgs struct {
+ // For Auth
+ GenericArgs
+
+ Bucket string
+
+ // Notification config for the given bucket.
+ NCfg *notificationConfig
+}
+
+func (s3 *s3PeerAPIHandlers) SetBucketNotificationPeer(args *SetBNPArgs, reply *GenericReply) error {
+ // check auth
+ if !isRPCTokenValid(args.Token) {
+ return errInvalidToken
+ }
+
+ // check if object layer is available.
+ objAPI := s3.ObjectAPI()
+ if objAPI == nil {
+ return errServerNotInitialized
+ }
+
+ // Update in-memory notification config.
+ globalEventNotifier.SetBucketNotificationConfig(args.Bucket, args.NCfg)
+
+ return nil
+}
+
+// SetBLPArgs - Arguments collection to SetBucketListenerPeer RPC call
+type SetBLPArgs struct {
+ // For Auth
+ GenericArgs
+
+ Bucket string
+
+ // Listener config for a given bucket.
+ LCfg []listenerConfig
+}
+
+func (s3 *s3PeerAPIHandlers) SetBucketListenerPeer(args SetBLPArgs, reply *GenericReply) error {
+ // check auth
+ if !isRPCTokenValid(args.Token) {
+ return errInvalidToken
+ }
+
+ // check if object layer is available.
+ objAPI := s3.ObjectAPI()
+ if objAPI == nil {
+ return errServerNotInitialized
+ }
+
+ // Update in-memory notification config.
+ return globalEventNotifier.SetBucketListenerConfig(args.Bucket, args.LCfg)
+}
+
+// EventArgs - Arguments collection for Event RPC call
+type EventArgs struct {
+ // For Auth
+ GenericArgs
+
+ // event being sent
+ Event []NotificationEvent
+
+ // client that it is meant for
+ Arn string
+}
+
+// submit an event to the receiving server.
+func (s3 *s3PeerAPIHandlers) Event(args *EventArgs, reply *GenericReply) error {
+ // check auth
+ if !isRPCTokenValid(args.Token) {
+ return errInvalidToken
+ }
+
+ // check if object layer is available.
+ objAPI := s3.ObjectAPI()
+ if objAPI == nil {
+ return errServerNotInitialized
+ }
+
+ err := globalEventNotifier.SendListenerEvent(args.Arn, args.Event)
+ return err
+}
diff --git a/cmd/server-main.go b/cmd/server-main.go
index e7de50e3b..4a4656214 100644
--- a/cmd/server-main.go
+++ b/cmd/server-main.go
@@ -386,6 +386,12 @@ func serverMain(c *cli.Context) {
globalObjectAPI = newObject
globalObjLayerMutex.Unlock()
+ // Initialize local server address
+ globalMinioAddr = getLocalAddress(srvConfig)
+
+ // Initialize S3 Peers inter-node communication
+ initGlobalS3Peers(disks)
+
// Initialize a new event notifier.
err = initEventNotifier(newObjectLayerFn())
fatalIf(err, "Unable to initialize event notification.")
diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go
index da9250891..f4adb8ce2 100644
--- a/cmd/server-startup-msg.go
+++ b/cmd/server-startup-msg.go
@@ -83,10 +83,10 @@ func printEventNotifiers() {
return
}
arnMsg := colorBlue("SQS ARNs: ")
- if len(globalEventNotifier.queueTargets) == 0 {
+ if len(globalEventNotifier.external.targets) == 0 {
arnMsg += colorBold(fmt.Sprintf(getFormatStr(len(""), 1), ""))
}
- for queueArn := range globalEventNotifier.queueTargets {
+ for queueArn := range globalEventNotifier.external.targets {
arnMsg += colorBold(fmt.Sprintf(getFormatStr(len(queueArn), 1), queueArn))
}
console.Println(arnMsg)
diff --git a/cmd/server_test.go b/cmd/server_test.go
index 67c59317e..d4b149fe7 100644
--- a/cmd/server_test.go
+++ b/cmd/server_test.go
@@ -105,125 +105,6 @@ func (s *TestSuiteCommon) TestBucketSQSNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
}
-// TestBucketNotification - Inserts the bucket notification and verifies it by fetching the notification back.
-func (s *TestSuiteCommon) TestBucketSNSNotification(c *C) {
- // Sample bucket notification.
- bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
-
- // generate a random bucket Name.
- bucketName := getRandomBucketName()
- // HTTP request to create the bucket.
- request, err := newTestSignedRequestV4("PUT", getMakeBucketURL(s.endPoint, bucketName),
- 0, nil, s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client := http.Client{}
- // execute the request.
- response, err := client.Do(request)
- c.Assert(err, IsNil)
- // assert the http response status code.
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
-
- c.Assert(err, IsNil)
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- // Fetch the uploaded policy.
- request, err = newTestSignedRequestV4("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil,
- s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body)
- c.Assert(err, IsNil)
- // Verify if downloaded policy matches with previousy uploaded.
- c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true)
-
- invalidBucketNotificationBuf := `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:444455556666:minio`
-
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:1:listen`
-
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "filter rule name must be either prefix or suffix", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefixhello\1arn:minio:sns:us-east-1:1:listen`
-
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen`
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest)
-
- bucketNotificationDuplicates := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listens3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
- request, err = newTestSignedRequestV4("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(bucketNotificationDuplicates)), bytes.NewReader([]byte(bucketNotificationDuplicates)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- verifyError(c, response, "InvalidArgument", "Configurations overlap. Configurations on the same bucket cannot share a common event type.", http.StatusBadRequest)
-}
-
// TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back.
// Deletes the policy and verifies the deletion by fetching it back.
func (s *TestSuiteCommon) TestBucketPolicy(c *C) {
diff --git a/cmd/server_v2_test.go b/cmd/server_v2_test.go
index 5996c4624..2f7858aa0 100644
--- a/cmd/server_v2_test.go
+++ b/cmd/server_v2_test.go
@@ -102,125 +102,6 @@ func (s *TestSuiteCommonV2) TestBucketSQSNotification(c *C) {
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
}
-// TestBucketNotification - Inserts the bucket notification and verifies it by fetching the notification back.
-func (s *TestSuiteCommonV2) TestBucketSNSNotification(c *C) {
- // Sample bucket notification.
- bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
-
- // generate a random bucket Name.
- bucketName := getRandomBucketName()
- // HTTP request to create the bucket.
- request, err := newTestSignedRequestV2("PUT", getMakeBucketURL(s.endPoint, bucketName),
- 0, nil, s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client := http.Client{}
- // execute the request.
- response, err := client.Do(request)
- c.Assert(err, IsNil)
- // assert the http response status code.
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
-
- c.Assert(err, IsNil)
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- // Fetch the uploaded policy.
- request, err = newTestSignedRequestV2("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil,
- s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- c.Assert(response.StatusCode, Equals, http.StatusOK)
-
- bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body)
- c.Assert(err, IsNil)
- // Verify if downloaded policy matches with previousy uploaded.
- c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true)
-
- invalidBucketNotificationBuf := `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:444455556666:minio`
-
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putinvalidimages/1arn:minio:sns:us-east-1:1:listen`
-
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "filter rule name must be either prefix or suffix", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefixhello\1arn:minio:sns:us-east-1:1:listen`
-
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "Size of filter rule value cannot exceed 1024 bytes in UTF-8 representation", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-west-1:444455556666:listen`
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
-
- verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest)
-
- invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest)
-
- bucketNotificationDuplicates := `s3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listens3:ObjectCreated:Putprefiximages/1arn:minio:sns:us-east-1:444455556666:listen`
- request, err = newTestSignedRequestV2("PUT", getPutNotificationURL(s.endPoint, bucketName),
- int64(len(bucketNotificationDuplicates)), bytes.NewReader([]byte(bucketNotificationDuplicates)), s.accessKey, s.secretKey)
- c.Assert(err, IsNil)
-
- client = http.Client{}
- // execute the HTTP request.
- response, err = client.Do(request)
- c.Assert(err, IsNil)
- verifyError(c, response, "InvalidArgument", "Configurations overlap. Configurations on the same bucket cannot share a common event type.", http.StatusBadRequest)
-}
-
// TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back.
// Deletes the policy and verifies the deletion by fetching it back.
func (s *TestSuiteCommonV2) TestBucketPolicy(c *C) {
diff --git a/cmd/signature-v4.go b/cmd/signature-v4.go
index 6b29579a3..1c44c53ae 100644
--- a/cmd/signature-v4.go
+++ b/cmd/signature-v4.go
@@ -147,7 +147,7 @@ func getSignature(signingKey []byte, stringToSign string) string {
// doesPolicySignatureMatch - Verify query headers with post policy
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html
-// returns true if matches, false otherwise. if error is not nil then it is always false
+// returns ErrNone if the signature matches.
func doesPolicySignatureMatch(formValues map[string]string) APIErrorCode {
// Access credentials.
cred := serverConfig.GetCredential()
@@ -193,7 +193,7 @@ func doesPolicySignatureMatch(formValues map[string]string) APIErrorCode {
// doesPresignedSignatureMatch - Verify query headers with presigned signature
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
-// returns true if matches, false otherwise. if error is not nil then it is always false
+// returns ErrNone if the signature matches.
func doesPresignedSignatureMatch(hashedPayload string, r *http.Request, region string) APIErrorCode {
// Access credentials.
cred := serverConfig.GetCredential()
@@ -316,7 +316,7 @@ func doesPresignedSignatureMatch(hashedPayload string, r *http.Request, region s
// doesSignatureMatch - Verify authorization header with calculated header in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html
-// returns true if matches, false otherwise. if error is not nil then it is always false
+// returns ErrNone if signature matches.
func doesSignatureMatch(hashedPayload string, r *http.Request, region string) APIErrorCode {
// Access credentials.
cred := serverConfig.GetCredential()
diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go
index f659e254a..f6d729ce9 100644
--- a/cmd/test-utils_test.go
+++ b/cmd/test-utils_test.go
@@ -149,6 +149,7 @@ type TestServer struct {
SecretKey string
Server *httptest.Server
Obj ObjectLayer
+ SrvCmdCfg serverCmdConfig
}
// Starts the test server and returns the TestServer instance.
@@ -236,6 +237,64 @@ func StartTestStorageRPCServer(t TestErrHandler, instanceType string, diskN int)
return testRPCServer
}
+// Sets up a Peers RPC test server.
+func StartTestPeersRPCServer(t TestErrHandler, instanceType string) TestServer {
+ // create temporary backend for the test server.
+ nDisks := 16
+ disks, err := getRandomDisks(nDisks)
+ if err != nil {
+ t.Fatal("Failed to create disks for the backend")
+ }
+
+ root, err := newTestConfig("us-east-1")
+ if err != nil {
+ t.Fatalf("%s", err)
+ }
+
+ // create an instance of TestServer.
+ testRPCServer := TestServer{}
+ // Get credential.
+ credentials := serverConfig.GetCredential()
+
+ testRPCServer.Root = root
+ testRPCServer.Disks = disks
+ testRPCServer.AccessKey = credentials.AccessKeyID
+ testRPCServer.SecretKey = credentials.SecretAccessKey
+
+ // create temporary backend for the test server.
+ objLayer, storageDisks, err := initObjectLayer(disks, nil)
+ if err != nil {
+ t.Fatalf("Failed obtaining Temp Backend: %s", err)
+ }
+
+ globalObjLayerMutex.Lock()
+ globalObjectAPI = objLayer
+ testRPCServer.Obj = objLayer
+ globalObjLayerMutex.Unlock()
+
+ srvCfg := serverCmdConfig{
+ disks: disks,
+ storageDisks: storageDisks,
+ }
+
+ mux := router.NewRouter()
+ // need storage layer for bucket config storage.
+ registerStorageRPCRouters(mux, srvCfg)
+ // need API layer to send requests, etc.
+ registerAPIRouter(mux)
+ // module being tested is Peer RPCs router.
+ registerS3PeerRPCRouter(mux)
+
+ // Run TestServer.
+ testRPCServer.Server = httptest.NewServer(mux)
+
+ // initialize remainder of serverCmdConfig
+ srvCfg.isDistXL = false
+ testRPCServer.SrvCmdCfg = srvCfg
+
+ return testRPCServer
+}
+
// Initializes control RPC endpoints.
// The object Layer will be a temp back used for testing purpose.
func initTestControlRPCEndPoint(srvCmdConfig serverCmdConfig) http.Handler {
@@ -595,7 +654,6 @@ func newTestStreamingSignedBadChunkDateRequest(method, urlStr string, contentLen
}
currTime := time.Now().UTC()
- fmt.Println("now: ", currTime)
signature, err := signStreamingRequest(req, accessKey, secretKey, currTime)
if err != nil {
return nil, err
@@ -603,7 +661,6 @@ func newTestStreamingSignedBadChunkDateRequest(method, urlStr string, contentLen
// skew the time between the chunk signature calculation and seed signature.
currTime = currTime.Add(1 * time.Second)
- fmt.Println("later: ", currTime)
req, err = assembleStreamingChunks(req, body, chunkSize, secretKey, signature, currTime)
return req, nil
}
@@ -625,14 +682,15 @@ func newTestStreamingSignedRequest(method, urlStr string, contentLength, chunkSi
return req, nil
}
-// Replaces any occurring '/' in string, into its encoded representation.
+// Replaces any occurring '/' in string, into its encoded
+// representation.
func percentEncodeSlash(s string) string {
return strings.Replace(s, "/", "%2F", -1)
}
// queryEncode - encodes query values in their URL encoded form. In
-// addition to the percent encoding performed by getURLEncodedName() used
-// here, it also percent encodes '/' (forward slash)
+// addition to the percent encoding performed by getURLEncodedName()
+// used here, it also percent encodes '/' (forward slash)
func queryEncode(v url.Values) string {
if v == nil {
return ""
diff --git a/cmd/utils.go b/cmd/utils.go
index cae58fbaf..e5ca67cc2 100644
--- a/cmd/utils.go
+++ b/cmd/utils.go
@@ -80,19 +80,38 @@ func splitNetPath(networkPath string) (netAddr, netPath string, err error) {
}
}
networkParts := strings.SplitN(networkPath, ":", 2)
- if len(networkParts) == 1 {
+ switch {
+ case len(networkParts) == 1:
return "", networkPath, nil
- }
- if networkParts[1] == "" {
+ case networkParts[1] == "":
return "", "", &net.AddrError{Err: "Missing path in network path", Addr: networkPath}
- } else if networkParts[0] == "" {
+ case networkParts[0] == "":
return "", "", &net.AddrError{Err: "Missing address in network path", Addr: networkPath}
- } else if !filepath.IsAbs(networkParts[1]) {
+ case !filepath.IsAbs(networkParts[1]):
return "", "", &net.AddrError{Err: "Network path should be absolute", Addr: networkPath}
}
return networkParts[0], networkParts[1], nil
}
+// Find local node through the command line arguments. Returns in
+// `host:port` format.
+func getLocalAddress(srvCmdConfig serverCmdConfig) string {
+ if !srvCmdConfig.isDistXL {
+ return fmt.Sprintf(":%d", globalMinioPort)
+ }
+ for _, export := range srvCmdConfig.disks {
+ // Validates if remote disk is local.
+ if isLocalStorage(export) {
+ var host string
+ if idx := strings.LastIndex(export, ":"); idx != -1 {
+ host = export[:idx]
+ }
+ return fmt.Sprintf("%s:%d", host, globalMinioPort)
+ }
+ }
+ return ""
+}
+
// xmlDecoder provide decoded value in xml.
func xmlDecoder(body io.Reader, v interface{}, size int64) error {
var lbody io.Reader
diff --git a/cmd/utils_test.go b/cmd/utils_test.go
index 800bb73af..3a9ecd45c 100644
--- a/cmd/utils_test.go
+++ b/cmd/utils_test.go
@@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"reflect"
+ "runtime"
"testing"
)
@@ -174,3 +175,65 @@ func TestMaxPartID(t *testing.T) {
}
}
}
+
+// Tests fetch local address.
+func TestLocalAddress(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ return
+ }
+ // need to set this to avoid stale values from other tests.
+ globalMinioPort = 9000
+ testCases := []struct {
+ srvCmdConfig serverCmdConfig
+ localAddr string
+ }{
+ // Test 1 - local address is found.
+ {
+ srvCmdConfig: serverCmdConfig{
+ isDistXL: true,
+ disks: []string{
+ "localhost:/mnt/disk1",
+ "1.1.1.2:/mnt/disk2",
+ "1.1.2.1:/mnt/disk3",
+ "1.1.2.2:/mnt/disk4",
+ },
+ },
+ localAddr: fmt.Sprintf("localhost:%d", globalMinioPort),
+ },
+ // Test 2 - local address is everything.
+ {
+ srvCmdConfig: serverCmdConfig{
+ isDistXL: false,
+ disks: []string{
+ "/mnt/disk1",
+ "/mnt/disk2",
+ "/mnt/disk3",
+ "/mnt/disk4",
+ },
+ },
+ localAddr: fmt.Sprintf(":%d", globalMinioPort),
+ },
+ // Test 3 - local address is not found.
+ {
+ srvCmdConfig: serverCmdConfig{
+ isDistXL: true,
+ disks: []string{
+ "1.1.1.1:/mnt/disk1",
+ "1.1.1.2:/mnt/disk2",
+ "1.1.2.1:/mnt/disk3",
+ "1.1.2.2:/mnt/disk4",
+ },
+ },
+ localAddr: "",
+ },
+ }
+
+ // Validates fetching local address.
+ for i, testCase := range testCases {
+ localAddr := getLocalAddress(testCase.srvCmdConfig)
+ if localAddr != testCase.localAddr {
+ t.Fatalf("Test %d: Expected %s, got %s", i+1, testCase.localAddr, localAddr)
+ }
+ }
+
+}