api: Add new ListenBucketNotificationHandler. (#2336)

This API is precursor before implementing `minio lambda` and `mc` continous replication.

This new api is an extention to BucketNofication APIs.

// Request
```
GET /bucket?notificationARN=arn:minio:lambda:us-east-1:10:minio HTTP/1.1
...
...
```

// Response
```

{"Records": ...}
...
...
...
{"Records": ...}
```
This commit is contained in:
Harshavardhana 2016-08-04 22:01:58 -07:00 committed by Anand Babu (AB) Periasamy
parent 90c20a8c11
commit 064c51162d
35 changed files with 1600 additions and 652 deletions

2
.gitignore vendored
View File

@ -15,4 +15,4 @@ vendor/**/*.json
release release
.DS_Store .DS_Store
*.syso *.syso
coverage.out coverage.txt

View File

@ -62,6 +62,8 @@ func registerAPIRouter(mux *router.Router, api objectAPIHandlers) {
bucket.Methods("GET").HandlerFunc(api.GetBucketPolicyHandler).Queries("policy", "") bucket.Methods("GET").HandlerFunc(api.GetBucketPolicyHandler).Queries("policy", "")
// GetBucketNotification // GetBucketNotification
bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "") bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
// ListenBucketNotification
bucket.Methods("GET").HandlerFunc(api.ListenBucketNotificationHandler).Queries("notificationARN", "{notificationARN:.*}")
// ListMultipartUploads // ListMultipartUploads
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "") bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
// ListObjectsV2 // ListObjectsV2

View File

@ -24,6 +24,7 @@ install:
build_script: build_script:
- go test -race . - go test -race .
- go test -race github.com/minio/minio/pkg... - go test -race github.com/minio/minio/pkg...
- go test -coverprofile=coverage.txt -covermode=atomic
- go run buildscripts/gen-ldflags.go > temp.txt - go run buildscripts/gen-ldflags.go > temp.txt
- set /p BUILD_LDFLAGS=<temp.txt - set /p BUILD_LDFLAGS=<temp.txt
- go build -ldflags="%BUILD_LDFLAGS%" -o %GOPATH%\bin\minio.exe - go build -ldflags="%BUILD_LDFLAGS%" -o %GOPATH%\bin\minio.exe

View File

@ -368,20 +368,10 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
Key: object, Key: object,
ETag: md5Sum, ETag: md5Sum,
}) })
setCommonHeaders(w)
writeSuccessResponse(w, encodedSuccessResponse)
// Load notification config if any. setCommonHeaders(w)
nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket)
// Notifications not set, return. writeSuccessResponse(w, encodedSuccessResponse)
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Fetch object info for notifications. // Fetch object info for notifications.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
@ -390,8 +380,17 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return return
} }
if eventN.IsBucketNotificationSet(bucket) {
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedPost, bucket, objInfo) eventNotify(eventData{
Type: ObjectCreatedPost,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }
// HeadBucketHandler - HEAD Bucket // HeadBucketHandler - HEAD Bucket

View File

@ -39,7 +39,7 @@ type queueConfig struct {
Key keyFilter `xml:"S3Key,omitempty"` Key keyFilter `xml:"S3Key,omitempty"`
} }
ID string `xml:"Id"` ID string `xml:"Id"`
QueueArn string `xml:"Queue"` QueueARN string `xml:"Queue"`
} }
// Topic SNS configuration, this is a compliance field not used by minio yet. // Topic SNS configuration, this is a compliance field not used by minio yet.
@ -49,26 +49,26 @@ type topicConfig struct {
Key keyFilter `xml:"S3Key"` Key keyFilter `xml:"S3Key"`
} }
ID string `xml:"Id"` ID string `xml:"Id"`
TopicArn string `xml:"Topic"` TopicARN string `xml:"Topic"`
} }
// Lambda function configuration, this is a compliance field not used by minio yet. // Lambda function configuration, this is a compliance field not used by minio yet.
type lambdaFuncConfig struct { type lambdaConfig struct {
Events []string `xml:"Event"` Events []string `xml:"Event"`
Filter struct { Filter struct {
Key keyFilter `xml:"S3Key"` Key keyFilter `xml:"S3Key,omitempty"`
} }
ID string `xml:"Id"` ID string `xml:"Id"`
LambdaFunctionArn string `xml:"CloudFunction"` LambdaARN string `xml:"CloudFunction"`
} }
// Notification configuration structure represents the XML format of // Notification configuration structure represents the XML format of
// notification configuration of buckets. // notification configuration of buckets.
type notificationConfig struct { type notificationConfig struct {
XMLName xml.Name `xml:"NotificationConfiguration"` XMLName xml.Name `xml:"NotificationConfiguration"`
QueueConfigurations []queueConfig `xml:"QueueConfiguration"` QueueConfigs []queueConfig `xml:"QueueConfiguration"`
TopicConfigurations []topicConfig `xml:"TopicConfiguration"` TopicConfigs []topicConfig `xml:"TopicConfiguration"`
LambdaConfigurations []lambdaFuncConfig `xml:"CloudFunctionConfiguration"` LambdaConfigs []lambdaConfig `xml:"CloudFunctionConfiguration"`
} }
// Internal error used to signal notifications not set. // Internal error used to signal notifications not set.
@ -81,9 +81,9 @@ type EventName int
const ( const (
// ObjectCreatedPut is s3:ObjectCreated:Put // ObjectCreatedPut is s3:ObjectCreated:Put
ObjectCreatedPut EventName = iota ObjectCreatedPut EventName = iota
// ObjectCreatedPost is s3:ObjectCreated:POst // ObjectCreatedPost is s3:ObjectCreated:Post
ObjectCreatedPost ObjectCreatedPost
// ObjectCreatedCopy is s3:ObjectCreated:Post // ObjectCreatedCopy is s3:ObjectCreated:Copy
ObjectCreatedCopy ObjectCreatedCopy
// ObjectCreatedCompleteMultipartUpload is s3:ObjectCreated:CompleteMultipartUpload // ObjectCreatedCompleteMultipartUpload is s3:ObjectCreated:CompleteMultipartUpload
ObjectCreatedCompleteMultipartUpload ObjectCreatedCompleteMultipartUpload
@ -155,12 +155,24 @@ type NotificationEvent struct {
S3 eventMeta `json:"s3"` S3 eventMeta `json:"s3"`
} }
// Represents the minio sqs type and inputs. // Represents the minio lambda type and account id's.
type arnMinioSqs struct { type arnLambda struct {
sqsType string Type string
AccountID string
} }
// Stringer for constructing AWS ARN compatible string. // Stringer for constructing AWS ARN compatible string.
func (m arnMinioSqs) String() string { func (m arnLambda) String() string {
return minioSqs + serverConfig.GetRegion() + ":" + m.sqsType return minioLambda + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
}
// Represents the minio sqs type and account id's.
type arnSQS struct {
Type string
AccountID string
}
// Stringer for constructing AWS ARN compatible string.
func (m arnSQS) String() string {
return minioSqs + serverConfig.GetRegion() + ":" + m.AccountID + ":" + m.Type
} }

View File

@ -18,10 +18,13 @@ package main
import ( import (
"bytes" "bytes"
"encoding/json"
"encoding/xml" "encoding/xml"
"io" "io"
"net/http" "net/http"
"path" "path"
"strings"
"time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -31,37 +34,6 @@ const (
bucketNotificationConfig = "notification.xml" bucketNotificationConfig = "notification.xml"
) )
// loads notifcation config if any for a given bucket, returns back structured notification config.
func loadNotificationConfig(objAPI ObjectLayer, bucket string) (nConfig notificationConfig, err error) {
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
var objInfo ObjectInfo
objInfo, err = objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
if err != nil {
switch err.(type) {
case ObjectNotFound:
return notificationConfig{}, errNoSuchNotifications
}
return notificationConfig{}, err
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
if err != nil {
switch err.(type) {
case ObjectNotFound:
return notificationConfig{}, errNoSuchNotifications
}
return notificationConfig{}, err
}
// Unmarshal notification bytes.
notificationConfigBytes := buffer.Bytes()
if err = xml.Unmarshal(notificationConfigBytes, &nConfig); err != nil {
return notificationConfig{}, err
} // Successfully marshalled notification configuration.
return nConfig, nil
}
// GetBucketNotificationHandler - This implementation of the GET // GetBucketNotificationHandler - This implementation of the GET
// operation uses the notification subresource to return the // operation uses the notification subresource to return the
// notification configuration of a bucket. If notifications are // notification configuration of a bucket. If notifications are
@ -75,63 +47,27 @@ func (api objectAPIHandlers) GetBucketNotificationHandler(w http.ResponseWriter,
} }
vars := mux.Vars(r) vars := mux.Vars(r)
bucket := vars["bucket"] bucket := vars["bucket"]
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) // Attempt to successfully load notification config.
objInfo, err := api.ObjectAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) nConfig, err := loadNotificationConfig(bucket, api.ObjectAPI)
if err != nil { if err != nil && err != errNoSuchNotifications {
switch err.(type) { errorIf(err, "Unable to read notification configuration.")
case ObjectNotFound:
writeSuccessResponse(w, nil)
return
}
errorIf(err, "Unable to read notification configuration.", notificationConfigPath)
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return return
} }
// For no notifications we write a dummy XML.
// Indicates if any data was written to the http.ResponseWriter if err == errNoSuchNotifications {
dataWritten := false // Complies with the s3 behavior in this regard.
nConfig = &notificationConfig{}
// io.Writer type which keeps track if any data was written.
writer := funcToWriter(func(p []byte) (int, error) {
if !dataWritten {
// Set headers on the first write.
// Set standard object headers.
setObjectHeaders(w, objInfo, nil)
// Set any additional requested response headers.
setGetRespHeaders(w, r.URL.Query())
dataWritten = true
} }
return w.Write(p) notificationBytes, err := xml.Marshal(nConfig)
})
// Reads the object at startOffset and writes to func writer..
err = api.ObjectAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, writer)
if err != nil { if err != nil {
if !dataWritten { // For any marshalling failure.
switch err.(type) { errorIf(err, "Unable to marshal notification configuration into XML.", err)
case ObjectNotFound: writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
writeSuccessResponse(w, nil)
return return
} }
// Error response only if no data has been written to client yet. i.e if // Success.
// partial data has already been written before an error writeSuccessResponse(w, notificationBytes)
// occurred then no point in setting StatusCode and
// sending error XML.
apiErr := toAPIErrorCode(err)
writeErrorResponse(w, r, apiErr, r.URL.Path)
}
errorIf(err, "Unable to write to client.")
return
}
if !dataWritten {
// If ObjectAPI.GetObject did not return error and no data has
// been written it would mean that it is a 0-byte object.
// call wrter.Write(nil) to set appropriate headers.
writer.Write(nil)
}
} }
// PutBucketNotificationHandler - Minio notification feature enables // PutBucketNotificationHandler - Minio notification feature enables
@ -169,10 +105,11 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
// Reads the incoming notification configuration. // Reads the incoming notification configuration.
var buffer bytes.Buffer var buffer bytes.Buffer
var bufferSize int64
if r.ContentLength >= 0 { if r.ContentLength >= 0 {
_, err = io.CopyN(&buffer, r.Body, r.ContentLength) bufferSize, err = io.CopyN(&buffer, r.Body, r.ContentLength)
} else { } else {
_, err = io.Copy(&buffer, r.Body) bufferSize, err = io.Copy(&buffer, r.Body)
} }
if err != nil { if err != nil {
errorIf(err, "Unable to read incoming body.") errorIf(err, "Unable to read incoming body.")
@ -196,16 +133,142 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter,
} }
// Proceed to save notification configuration. // Proceed to save notification configuration.
size := int64(len(notificationConfigBytes))
data := bytes.NewReader(notificationConfigBytes)
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
_, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, size, data, nil) _, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil)
if err != nil { if err != nil {
errorIf(err, "Unable to write bucket notification configuration.", notificationConfigPath) errorIf(err, "Unable to write bucket notification configuration.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return return
} }
// Set bucket notification config.
eventN.SetBucketNotificationConfig(bucket, &notificationCfg)
// Success. // Success.
writeSuccessResponse(w, nil) writeSuccessResponse(w, nil)
} }
// writeNotification marshals notification message before writing to client.
func writeNotification(w http.ResponseWriter, notification map[string][]NotificationEvent) error {
// Invalid response writer.
if w == nil {
return errInvalidArgument
}
// Invalid notification input.
if notification == nil {
return errInvalidArgument
}
// Marshal notification data into XML and write to client.
notificationBytes, err := json.Marshal(&notification)
if err != nil {
return err
}
// Add additional CRLF characters for client to
// differentiate the individual events properly.
_, err = w.Write(append(notificationBytes, crlf...))
// Make sure we have flushed, this would set Transfer-Encoding: chunked.
w.(http.Flusher).Flush()
if err != nil {
return err
}
return nil
}
// CRLF character used for chunked transfer in accordance with HTTP standards.
var crlf = []byte("\r\n")
// sendBucketNotification - writes notification back to client on the response writer
// for each notification input, otherwise writes whitespace characters periodically
// to keep the connection active. Each notification messages are terminated by CRLF
// character. Upon any error received on response writer the for loop exits.
//
// TODO - do not log for all errors.
func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) {
var dummyEvents = map[string][]NotificationEvent{"Records": nil}
// Continuously write to client either timely empty structures
// every 5 seconds, or return back the notifications.
for {
select {
case events := <-arnListenerCh:
if err := writeNotification(w, map[string][]NotificationEvent{"Records": events}); err != nil {
errorIf(err, "Unable to write notification to client.")
return
}
case <-time.After(5 * time.Second):
if err := writeNotification(w, dummyEvents); err != nil {
errorIf(err, "Unable to write notification to client.")
return
}
}
}
}
// Returns true if the queueARN is for an Minio queue.
func isMinL(lambdaARN arnLambda) bool {
return strings.HasSuffix(lambdaARN.Type, lambdaTypeMinio)
}
// isMinioARNConfigured - verifies if one lambda ARN is valid and is enabled.
func isMinioARNConfigured(lambdaARN string, lambdaConfigs []lambdaConfig) bool {
for _, lambdaConfig := range lambdaConfigs {
// Validate if lambda ARN is already enabled.
if lambdaARN == lambdaConfig.LambdaARN {
return true
}
}
return false
}
// ListenBucketNotificationHandler - list bucket notifications.
func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
// Validate request authorization.
if s3Error := checkAuth(r); s3Error != ErrNone {
writeErrorResponse(w, r, s3Error, r.URL.Path)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
// Get notification ARN.
lambdaARN := r.URL.Query().Get("notificationARN")
if lambdaARN == "" {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
// Validate if bucket exists.
_, err := api.ObjectAPI.GetBucketInfo(bucket)
if err != nil {
errorIf(err, "Unable to bucket info.")
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return
}
notificationCfg := eventN.GetBucketNotificationConfig(bucket)
if notificationCfg == nil {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
// Notifications set, but do not have MINIO queue enabled, return.
if !isMinioARNConfigured(lambdaARN, notificationCfg.LambdaConfigs) {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path)
return
}
// Add all common headers.
setCommonHeaders(w)
// Create a new notification event channel.
nEventCh := make(chan []NotificationEvent)
// Close the listener channel.
defer close(nEventCh)
// Set lambda target.
eventN.SetLambdaTarget(lambdaARN, nEventCh)
// Remove lambda listener after the writer has closed or the client disconnected.
defer eventN.RemoveLambdaTarget(lambdaARN, nEventCh)
// Start sending bucket notifications.
sendBucketNotification(w, nEventCh)
}

View File

@ -0,0 +1,90 @@
package main
import (
"bytes"
"io"
"io/ioutil"
"net/http"
"testing"
)
// Implement a dummy flush writer.
type flushWriter struct {
io.Writer
}
// Flush writer is a dummy writer compatible with http.Flusher and http.ResponseWriter.
func (f *flushWriter) Flush() {}
func (f *flushWriter) Write(b []byte) (n int, err error) { return f.Writer.Write(b) }
func (f *flushWriter) Header() http.Header { return http.Header{} }
func (f *flushWriter) WriteHeader(code int) {}
func newFlushWriter(writer io.Writer) *flushWriter {
return &flushWriter{writer}
}
// Tests write notification code.
func TestWriteNotification(t *testing.T) {
// Initialize a new test config.
root, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Unable to initialize test config %s", err)
}
defer removeAll(root)
var buffer bytes.Buffer
// Collection of test cases for each event writer.
testCases := []struct {
writer *flushWriter
event map[string][]NotificationEvent
err error
}{
// Invalid input argument with writer `nil` - Test - 1
{
writer: nil,
event: nil,
err: errInvalidArgument,
},
// Invalid input argument with event `nil` - Test - 2
{
writer: newFlushWriter(ioutil.Discard),
event: nil,
err: errInvalidArgument,
},
// Unmarshal and write, validate last 5 bytes. - Test - 3
{
writer: newFlushWriter(&buffer),
event: map[string][]NotificationEvent{
"Records": {newNotificationEvent(eventData{
Type: ObjectCreatedPut,
Bucket: "testbucket",
ObjInfo: ObjectInfo{
Name: "key",
},
ReqParams: map[string]string{
"ip": "10.1.10.1",
}}),
},
},
err: nil,
},
}
// Validates all the testcases for writing notification.
for _, testCase := range testCases {
err := writeNotification(testCase.writer, testCase.event)
if err != testCase.err {
t.Errorf("Unable to write notification %s", err)
}
// Validates if the ending string has 'crlf'
if err == nil && !bytes.HasSuffix(buffer.Bytes(), crlf) {
buf := buffer.Bytes()[buffer.Len()-5 : 0]
t.Errorf("Invalid suffix found from the writer last 5 bytes %s, expected `\r\n`", string(buf))
}
// Not printing 'buf' on purpose, validates look for string '10.1.10.1'.
if err == nil && !bytes.Contains(buffer.Bytes(), []byte("10.1.10.1")) {
// Enable when debugging)
// fmt.Println(string(buffer.Bytes()))
t.Errorf("Requested content couldn't be found, expected `10.1.10.1`")
}
}
}

View File

@ -100,36 +100,76 @@ func checkFilterRules(filterRules []filterRule) APIErrorCode {
return ErrNone return ErrNone
} }
// checkQueueArn - check if the queue arn is valid. // checkQueueARN - check if the queue arn is valid.
func checkQueueArn(queueArn string) APIErrorCode { func checkQueueARN(queueARN string) APIErrorCode {
if !strings.HasPrefix(queueArn, minioSqs) { if !strings.HasPrefix(queueARN, minioSqs) {
return ErrARNNotification return ErrARNNotification
} }
if !strings.HasPrefix(queueArn, minioSqs+serverConfig.GetRegion()+":") { if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
return ErrRegionNotification
}
return ErrNone
}
// checkLambdaARN - check if the lambda arn is valid.
func checkLambdaARN(lambdaARN string) APIErrorCode {
if !strings.HasPrefix(lambdaARN, minioLambda) {
return ErrARNNotification
}
if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
return ErrRegionNotification return ErrRegionNotification
} }
return ErrNone return ErrNone
} }
// Validate if we recognize the queue type. // Validate if we recognize the queue type.
func isValidQueue(sqsArn arnMinioSqs) bool { func isValidQueue(sqsARN arnSQS) bool {
amqpQ := isAMQPQueue(sqsArn) // Is amqp queue?. amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?.
elasticQ := isElasticQueue(sqsArn) // Is elastic queue?. elasticQ := isElasticQueue(sqsARN) // Is elastic queue?.
redisQ := isRedisQueue(sqsArn) // Is redis queue?. redisQ := isRedisQueue(sqsARN) // Is redis queue?.
return amqpQ || elasticQ || redisQ return amqpQ || elasticQ || redisQ
} }
// Validate if we recognize the lambda type.
func isValidLambda(lambdaARN arnLambda) bool {
return isMinL(lambdaARN) // Is minio lambda?.
}
// Validates account id for input queue ARN.
func isValidQueueID(queueARN string) bool {
// Unmarshals QueueARN into structured object.
sqsARN := unmarshalSqsARN(queueARN)
// AMQP queue.
if isAMQPQueue(sqsARN) {
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
} else if isElasticQueue(sqsARN) { // Elastic queue.
elasticN := serverConfig.GetElasticSearchNotifyByID(sqsARN.AccountID)
return elasticN.Enable && elasticN.URL != ""
} else if isRedisQueue(sqsARN) { // Redis queue.
redisN := serverConfig.GetRedisNotifyByID(sqsARN.AccountID)
return redisN.Enable && redisN.Addr != ""
}
return false
}
// Check - validates queue configuration and returns error if any. // Check - validates queue configuration and returns error if any.
func checkQueueConfig(qConfig queueConfig) APIErrorCode { func checkQueueConfig(qConfig queueConfig) APIErrorCode {
// Check queue arn is valid. // Check queue arn is valid.
if s3Error := checkQueueArn(qConfig.QueueArn); s3Error != ErrNone { if s3Error := checkQueueARN(qConfig.QueueARN); s3Error != ErrNone {
return s3Error return s3Error
} }
// Unmarshals QueueArn into structured object. // Unmarshals QueueARN into structured object.
sqsArn := unmarshalSqsArn(qConfig.QueueArn) sqsARN := unmarshalSqsARN(qConfig.QueueARN)
// Validate if sqsArn requested any of the known supported queues. // Validate if sqsARN requested any of the known supported queues.
if !isValidQueue(sqsArn) { if !isValidQueue(sqsARN) {
return ErrARNNotification
}
// Validate if the account ID is correct.
if !isValidQueueID(qConfig.QueueARN) {
return ErrARNNotification return ErrARNNotification
} }
@ -147,6 +187,34 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode {
return ErrNone return ErrNone
} }
// Check - validates queue configuration and returns error if any.
func checkLambdaConfig(lConfig lambdaConfig) APIErrorCode {
// Check queue arn is valid.
if s3Error := checkLambdaARN(lConfig.LambdaARN); s3Error != ErrNone {
return s3Error
}
// Unmarshals QueueARN into structured object.
lambdaARN := unmarshalLambdaARN(lConfig.LambdaARN)
// Validate if lambdaARN requested any of the known supported queues.
if !isValidLambda(lambdaARN) {
return ErrARNNotification
}
// Check if valid events are set in queue config.
if s3Error := checkEvents(lConfig.Events); s3Error != ErrNone {
return s3Error
}
// Check if valid filters are set in queue config.
if s3Error := checkFilterRules(lConfig.Filter.Key.FilterRules); s3Error != ErrNone {
return s3Error
}
// Success.
return ErrNone
}
// Validates all incoming queue configs, checkQueueConfig validates if the // Validates all incoming queue configs, checkQueueConfig validates if the
// input fields for each queues is not malformed and has valid configuration // input fields for each queues is not malformed and has valid configuration
// information. If validation fails bucket notifications are not enabled. // information. If validation fails bucket notifications are not enabled.
@ -160,32 +228,70 @@ func validateQueueConfigs(queueConfigs []queueConfig) APIErrorCode {
return ErrNone return ErrNone
} }
// Validates all incoming lambda configs, checkLambdaConfig validates if the
// input fields for each queues is not malformed and has valid configuration
// information. If validation fails bucket notifications are not enabled.
func validateLambdaConfigs(lambdaConfigs []lambdaConfig) APIErrorCode {
for _, lConfig := range lambdaConfigs {
if s3Error := checkLambdaConfig(lConfig); s3Error != ErrNone {
return s3Error
}
}
// Success.
return ErrNone
}
// Validates all the bucket notification configuration for their validity, // Validates all the bucket notification configuration for their validity,
// if one of the config is malformed or has invalid data it is rejected. // if one of the config is malformed or has invalid data it is rejected.
// Configuration is never applied partially. // Configuration is never applied partially.
func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
if s3Error := validateQueueConfigs(nConfig.QueueConfigurations); s3Error != ErrNone { if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone {
return s3Error return s3Error
} }
if s3Error := validateLambdaConfigs(nConfig.LambdaConfigs); s3Error != ErrNone {
return s3Error
}
// Add validation for other configurations. // Add validation for other configurations.
return ErrNone return ErrNone
} }
// Unmarshals input value of AWS ARN format into minioLambda object.
// Returned value represents minio lambda type, currently supported are
// - minio
func unmarshalLambdaARN(lambdaARN string) arnLambda {
lambda := arnLambda{}
if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") {
return lambda
}
lambdaType := strings.TrimPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":")
switch {
case strings.HasSuffix(lambdaType, lambdaTypeMinio):
lambda.Type = lambdaTypeMinio
} // Add more lambda here.
lambda.AccountID = strings.TrimSuffix(lambdaType, ":"+lambda.Type)
return lambda
}
// Unmarshals input value of AWS ARN format into minioSqs object. // Unmarshals input value of AWS ARN format into minioSqs object.
// Returned value represents minio sqs types, currently supported are // Returned value represents minio sqs types, currently supported are
// - amqp // - amqp
// - elasticsearch // - elasticsearch
// - redis // - redis
func unmarshalSqsArn(queueArn string) (mSqs arnMinioSqs) { func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
sqsType := strings.TrimPrefix(queueArn, minioSqs+serverConfig.GetRegion()+":") mSqs = arnSQS{}
mSqs = arnMinioSqs{} if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
switch sqsType { return mSqs
case queueTypeAMQP: }
mSqs.sqsType = queueTypeAMQP sqsType := strings.TrimPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":")
case queueTypeElastic: switch {
mSqs.sqsType = queueTypeElastic case strings.HasSuffix(sqsType, queueTypeAMQP):
case queueTypeRedis: mSqs.Type = queueTypeAMQP
mSqs.sqsType = queueTypeRedis case strings.HasSuffix(sqsType, queueTypeElastic):
} // Add more cases here. mSqs.Type = queueTypeElastic
case strings.HasSuffix(sqsType, queueTypeRedis):
mSqs.Type = queueTypeRedis
} // Add more queues here.
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
return mSqs return mSqs
} }

View File

@ -97,8 +97,8 @@ func TestValidEvents(t *testing.T) {
} }
} }
// Tests queue arn validation. // Tests lambda arn validation.
func TestQueueArn(t *testing.T) { func TestLambdaARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1") rootPath, err := newTestConfig("us-east-1")
if err != nil { if err != nil {
t.Fatalf("unable initialize config file, %s", err) t.Fatalf("unable initialize config file, %s", err)
@ -106,38 +106,80 @@ func TestQueueArn(t *testing.T) {
defer removeAll(rootPath) defer removeAll(rootPath)
testCases := []struct { testCases := []struct {
queueArn string lambdaARN string
errCode APIErrorCode errCode APIErrorCode
}{ }{
// Valid redis queue arn. // Valid minio lambda with '1' account id.
{ {
queueArn: "arn:minio:sqs:us-east-1:1:redis", lambdaARN: "arn:minio:lambda:us-east-1:1:minio",
errCode: ErrNone, errCode: ErrNone,
}, },
// Valid elasticsearch queue arn. // Valid minio lambda with '10' account id.
{ {
queueArn: "arn:minio:sqs:us-east-1:1:elasticsearch", lambdaARN: "arn:minio:lambda:us-east-1:10:minio",
errCode: ErrNone,
},
// Valid amqp queue arn.
{
queueArn: "arn:minio:sqs:us-east-1:1:amqp",
errCode: ErrNone, errCode: ErrNone,
}, },
// Invalid empty queue arn. // Invalid empty queue arn.
{ {
queueArn: "", lambdaARN: "",
errCode: ErrARNNotification, errCode: ErrARNNotification,
}, },
// Invalid region 'us-west-1' in queue arn. // Invalid region 'us-west-1' in queue arn.
{ {
queueArn: "arn:minio:sqs:us-west-1:1:redis", lambdaARN: "arn:minio:lambda:us-west-1:1:redis",
errCode: ErrRegionNotification, errCode: ErrRegionNotification,
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
errCode := checkQueueArn(testCase.queueArn) errCode := checkLambdaARN(testCase.lambdaARN)
if testCase.errCode != errCode {
t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode)
}
}
}
// Tests queue arn validation.
func TestQueueARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
}
defer removeAll(rootPath)
testCases := []struct {
queueARN string
errCode APIErrorCode
}{
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
errCode: ErrNone,
},
// Valid elasticsearch queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:elasticsearch",
errCode: ErrNone,
},
// Valid amqp queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:amqp",
errCode: ErrNone,
},
// Invalid empty queue arn.
{
queueARN: "",
errCode: ErrARNNotification,
},
// Invalid region 'us-west-1' in queue arn.
{
queueARN: "arn:minio:sqs:us-west-1:1:redis",
errCode: ErrRegionNotification,
},
}
for i, testCase := range testCases {
errCode := checkQueueARN(testCase.queueARN)
if testCase.errCode != errCode { if testCase.errCode != errCode {
t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode) t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode)
} }
@ -145,7 +187,7 @@ func TestQueueArn(t *testing.T) {
} }
// Test unmarshal queue arn. // Test unmarshal queue arn.
func TestUnmarshalSqsArn(t *testing.T) { func TestUnmarshalLambdaARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1") rootPath, err := newTestConfig("us-east-1")
if err != nil { if err != nil {
t.Fatalf("unable initialize config file, %s", err) t.Fatalf("unable initialize config file, %s", err)
@ -153,50 +195,97 @@ func TestUnmarshalSqsArn(t *testing.T) {
defer removeAll(rootPath) defer removeAll(rootPath)
testCases := []struct { testCases := []struct {
queueArn string lambdaARN string
sqsType string Type string
}{ }{
// Valid redis queue arn. // Valid minio lambda arn.
{ {
queueArn: "arn:minio:sqs:us-east-1:1:redis", lambdaARN: "arn:minio:lambda:us-east-1:1:lambda",
sqsType: "1:redis", Type: "lambda",
},
// Valid elasticsearch queue arn.
{
queueArn: "arn:minio:sqs:us-east-1:1:elasticsearch",
sqsType: "1:elasticsearch",
},
// Valid amqp queue arn.
{
queueArn: "arn:minio:sqs:us-east-1:1:amqp",
sqsType: "1:amqp",
}, },
// Invalid empty queue arn. // Invalid empty queue arn.
{ {
queueArn: "", lambdaARN: "",
sqsType: "", Type: "",
}, },
// Invalid region 'us-west-1' in queue arn. // Invalid region 'us-west-1' in queue arn.
{ {
queueArn: "arn:minio:sqs:us-west-1:1:redis", lambdaARN: "arn:minio:lambda:us-west-1:1:lambda",
sqsType: "", Type: "",
}, },
// Partial queue arn. // Partial queue arn.
{ {
queueArn: "arn:minio:sqs:", lambdaARN: "arn:minio:lambda:",
sqsType: "", Type: "",
}, },
// Invalid queue service value. // Invalid queue service value.
{ {
queueArn: "arn:minio:sqs:us-east-1:1:*", lambdaARN: "arn:minio:lambda:us-east-1:1:*",
sqsType: "", Type: "",
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
mSqs := unmarshalSqsArn(testCase.queueArn) lambda := unmarshalLambdaARN(testCase.lambdaARN)
if testCase.sqsType != mSqs.sqsType { if testCase.Type != lambda.Type {
t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.sqsType, mSqs.sqsType) t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, lambda.Type)
}
}
}
// Test unmarshal queue arn.
func TestUnmarshalSqsARN(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("unable initialize config file, %s", err)
}
defer removeAll(rootPath)
testCases := []struct {
queueARN string
Type string
}{
// Valid redis queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:redis",
Type: "redis",
},
// Valid elasticsearch queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:elasticsearch",
Type: "elasticsearch",
},
// Valid amqp queue arn.
{
queueARN: "arn:minio:sqs:us-east-1:1:amqp",
Type: "amqp",
},
// Invalid empty queue arn.
{
queueARN: "",
Type: "",
},
// Invalid region 'us-west-1' in queue arn.
{
queueARN: "arn:minio:sqs:us-west-1:1:redis",
Type: "",
},
// Partial queue arn.
{
queueARN: "arn:minio:sqs:",
Type: "",
},
// Invalid queue service value.
{
queueARN: "arn:minio:sqs:us-east-1:1:*",
Type: "",
},
}
for i, testCase := range testCases {
mSqs := unmarshalSqsARN(testCase.queueARN)
if testCase.Type != mSqs.Type {
t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, mSqs.Type)
} }
} }

View File

@ -30,15 +30,6 @@ func getOldBucketsConfigPath() (string, error) {
return filepath.Join(configPath, "buckets"), nil return filepath.Join(configPath, "buckets"), nil
} }
// getBucketConfigPath - get bucket config path.
func getOldBucketConfigPath(bucket string) (string, error) {
bucketsConfigPath, err := getOldBucketsConfigPath()
if err != nil {
return "", err
}
return filepath.Join(bucketsConfigPath, bucket), nil
}
// readBucketPolicy - read bucket policy. // readBucketPolicy - read bucket policy.
func readBucketPolicy(api objectAPIHandlers, bucket string) ([]byte, error) { func readBucketPolicy(api objectAPIHandlers, bucket string) ([]byte, error) {
// Verify bucket is valid. // Verify bucket is valid.

View File

@ -34,6 +34,8 @@ func migrateConfig() {
migrateV3ToV4() migrateV3ToV4()
// Migrate version '4' to '5'. // Migrate version '4' to '5'.
migrateV4ToV5() migrateV4ToV5()
// Migrate version '5' to '6.
migrateV5ToV6()
} }
// Version '1' is not supported anymore and deprecated, safe to delete. // Version '1' is not supported anymore and deprecated, safe to delete.
@ -149,6 +151,72 @@ func migrateV3ToV4() {
console.Println("Migration from version " + cv3.Version + " to " + srvConfig.Version + " completed successfully.") console.Println("Migration from version " + cv3.Version + " to " + srvConfig.Version + " completed successfully.")
} }
// Version '5' to '6' migrates config, removes previous fields related
// to backend types and server address. This change further simplifies
// the config for future additions.
func migrateV5ToV6() {
cv5, err := loadConfigV5()
if err != nil && os.IsNotExist(err) {
return
}
fatalIf(err, "Unable to load config version 5.")
if cv5.Version != "5" {
return
}
// Save only the new fields, ignore the rest.
srvConfig := &serverConfigV6{}
srvConfig.Version = globalMinioConfigVersion
srvConfig.Credential = cv5.Credential
srvConfig.Region = cv5.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = "us-east-1"
}
srvConfig.Logger.Console = cv5.Logger.Console
srvConfig.Logger.File = cv5.Logger.File
srvConfig.Logger.Syslog = cv5.Logger.Syslog
srvConfig.Notify.AMQP = map[string]amqpNotify{
"1": {
Enable: cv5.Logger.AMQP.Enable,
URL: cv5.Logger.AMQP.URL,
Exchange: cv5.Logger.AMQP.Exchange,
RoutingKey: cv5.Logger.AMQP.RoutingKey,
Mandatory: cv5.Logger.AMQP.Mandatory,
Immediate: cv5.Logger.AMQP.Immediate,
Durable: cv5.Logger.AMQP.Durable,
Internal: cv5.Logger.AMQP.Internal,
NoWait: cv5.Logger.AMQP.NoWait,
AutoDeleted: cv5.Logger.AMQP.AutoDeleted,
},
}
srvConfig.Notify.ElasticSearch = map[string]elasticSearchNotify{
"1": {
Enable: cv5.Logger.ElasticSearch.Enable,
URL: cv5.Logger.ElasticSearch.URL,
Index: cv5.Logger.ElasticSearch.Index,
},
}
srvConfig.Notify.Redis = map[string]redisNotify{
"1": {
Enable: cv5.Logger.Redis.Enable,
Addr: cv5.Logger.Redis.Addr,
Password: cv5.Logger.Redis.Password,
Key: cv5.Logger.Redis.Key,
},
}
qc, err := quick.New(srvConfig)
fatalIf(err, "Unable to initialize the quick config.")
configFile, err := getConfigFile()
fatalIf(err, "Unable to get config file.")
err = qc.Save(configFile)
fatalIf(err, "Failed to migrate config from "+cv5.Version+" to "+srvConfig.Version+" failed.")
console.Println("Migration from version " + cv5.Version + " to " + srvConfig.Version + " completed successfully.")
}
// Version '4' to '5' migrates config, removes previous fields related // Version '4' to '5' migrates config, removes previous fields related
// to backend types and server address. This change further simplifies // to backend types and server address. This change further simplifies
// the config for future additions. // the config for future additions.
@ -163,7 +231,7 @@ func migrateV4ToV5() {
} }
// Save only the new fields, ignore the rest. // Save only the new fields, ignore the rest.
srvConfig := &serverConfigV5{} srvConfig := &configV5{}
srvConfig.Version = globalMinioConfigVersion srvConfig.Version = globalMinioConfigVersion
srvConfig.Credential = cv4.Credential srvConfig.Credential = cv4.Credential
srvConfig.Region = cv4.Region srvConfig.Region = cv4.Region
@ -174,15 +242,9 @@ func migrateV4ToV5() {
srvConfig.Logger.Console = cv4.Logger.Console srvConfig.Logger.Console = cv4.Logger.Console
srvConfig.Logger.File = cv4.Logger.File srvConfig.Logger.File = cv4.Logger.File
srvConfig.Logger.Syslog = cv4.Logger.Syslog srvConfig.Logger.Syslog = cv4.Logger.Syslog
srvConfig.Logger.AMQP = amqpLogger{ srvConfig.Logger.AMQP.Enable = false
Enable: false, srvConfig.Logger.ElasticSearch.Enable = false
} srvConfig.Logger.Redis.Enable = false
srvConfig.Logger.ElasticSearch = elasticSearchLogger{
Enable: false,
}
srvConfig.Logger.Redis = redisLogger{
Enable: false,
}
qc, err := quick.New(srvConfig) qc, err := quick.New(srvConfig)
fatalIf(err, "Unable to initialize the quick config.") fatalIf(err, "Unable to initialize the quick config.")

View File

@ -146,6 +146,7 @@ func loadConfigV3() (*configV3, error) {
return c, nil return c, nil
} }
// logger type representing version '4' logger config.
type loggerV4 struct { type loggerV4 struct {
Console struct { Console struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
@ -195,3 +196,81 @@ func loadConfigV4() (*configV4, error) {
} }
return c, nil return c, nil
} }
// logger type representing version '5' logger config.
type loggerV5 struct {
Console struct {
Enable bool `json:"enable"`
Level string `json:"level"`
} `json:"console"`
File struct {
Enable bool `json:"enable"`
Filename string `json:"fileName"`
Level string `json:"level"`
} `json:"file"`
Syslog struct {
Enable bool `json:"enable"`
Addr string `json:"address"`
Level string `json:"level"`
} `json:"syslog"`
AMQP struct {
Enable bool `json:"enable"`
Level string `json:"level"`
URL string `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routineKey"`
ExchangeType string `json:"exchangeType"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
} `json:"amqp"`
ElasticSearch struct {
Enable bool `json:"enable"`
Level string `json:"level"`
URL string `json:"url"`
Index string `json:"index"`
} `json:"elasticsearch"`
Redis struct {
Enable bool `json:"enable"`
Level string `json:"level"`
Addr string `json:"address"`
Password string `json:"password"`
Key string `json:"key"`
} `json:"redis"`
}
// configV5 server configuration version '5'.
type configV5 struct {
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
// Additional error logging configuration.
Logger loggerV5 `json:"logger"`
}
// loadConfigV5 load config version '5'.
func loadConfigV5() (*configV5, error) {
configFile, err := getConfigFile()
if err != nil {
return nil, err
}
if _, err = os.Stat(configFile); err != nil {
return nil, err
}
c := &configV5{}
c.Version = "5"
qc, err := quick.New(c)
if err != nil {
return nil, err
}
if err := qc.Load(configFile); err != nil {
return nil, err
}
return c, nil
}

View File

@ -23,8 +23,8 @@ import (
"github.com/minio/minio/pkg/quick" "github.com/minio/minio/pkg/quick"
) )
// serverConfigV5 server configuration version '5'. // serverConfigV6 server configuration version '5'.
type serverConfigV5 struct { type serverConfigV6 struct {
Version string `json:"version"` Version string `json:"version"`
// S3 API configuration. // S3 API configuration.
@ -34,6 +34,9 @@ type serverConfigV5 struct {
// Additional error logging configuration. // Additional error logging configuration.
Logger logger `json:"logger"` Logger logger `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
// Read Write mutex. // Read Write mutex.
rwMutex *sync.RWMutex rwMutex *sync.RWMutex
} }
@ -42,7 +45,7 @@ type serverConfigV5 struct {
func initConfig() error { func initConfig() error {
if !isConfigFileExists() { if !isConfigFileExists() {
// Initialize server config. // Initialize server config.
srvCfg := &serverConfigV5{} srvCfg := &serverConfigV6{}
srvCfg.Version = globalMinioConfigVersion srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = "us-east-1" srvCfg.Region = "us-east-1"
srvCfg.Credential = mustGenAccessKeys() srvCfg.Credential = mustGenAccessKeys()
@ -73,7 +76,7 @@ func initConfig() error {
if _, err = os.Stat(configFile); err != nil { if _, err = os.Stat(configFile); err != nil {
return err return err
} }
srvCfg := &serverConfigV5{} srvCfg := &serverConfigV6{}
srvCfg.Version = globalMinioConfigVersion srvCfg.Version = globalMinioConfigVersion
srvCfg.rwMutex = &sync.RWMutex{} srvCfg.rwMutex = &sync.RWMutex{}
qc, err := quick.New(srvCfg) qc, err := quick.New(srvCfg)
@ -92,10 +95,10 @@ func initConfig() error {
} }
// serverConfig server config. // serverConfig server config.
var serverConfig *serverConfigV5 var serverConfig *serverConfigV6
// GetVersion get current config version. // GetVersion get current config version.
func (s serverConfigV5) GetVersion() string { func (s serverConfigV6) GetVersion() string {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Version return s.Version
@ -103,117 +106,135 @@ func (s serverConfigV5) GetVersion() string {
/// Logger related. /// Logger related.
func (s *serverConfigV5) SetAMQPLogger(amqpl amqpLogger) { func (s *serverConfigV6) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.AMQP = amqpl s.Notify.AMQP[accountID] = amqpn
} }
// GetAMQPLogger get current AMQP logger. func (s serverConfigV6) GetAMQP() map[string]amqpNotify {
func (s serverConfigV5) GetAMQPLogger() amqpLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.AMQP return s.Notify.AMQP
} }
func (s *serverConfigV5) SetElasticSearchLogger(esLogger elasticSearchLogger) { // GetAMQPNotify get current AMQP logger.
func (s serverConfigV6) GetAMQPNotifyByID(accountID string) amqpNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.AMQP[accountID]
}
func (s *serverConfigV6) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.ElasticSearch = esLogger s.Notify.ElasticSearch[accountID] = esNotify
} }
// GetElasticSearchLogger get current ElasicSearch logger. func (s serverConfigV6) GetElasticSearch() map[string]elasticSearchNotify {
func (s serverConfigV5) GetElasticSearchLogger() elasticSearchLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.ElasticSearch return s.Notify.ElasticSearch
} }
func (s *serverConfigV5) SetRedisLogger(rLogger redisLogger) { // GetElasticSearchNotify get current ElasicSearch logger.
func (s serverConfigV6) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
func (s *serverConfigV6) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.Redis = rLogger s.Notify.Redis[accountID] = rNotify
} }
// GetRedisLogger get current Redis logger. func (s serverConfigV6) GetRedis() map[string]redisNotify {
func (s serverConfigV5) GetRedisLogger() redisLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.Redis return s.Notify.Redis
}
// GetRedisNotify get current Redis logger.
func (s serverConfigV6) GetRedisNotifyByID(accountID string) redisNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.Redis[accountID]
} }
// SetFileLogger set new file logger. // SetFileLogger set new file logger.
func (s *serverConfigV5) SetFileLogger(flogger fileLogger) { func (s *serverConfigV6) SetFileLogger(flogger fileLogger) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.File = flogger s.Logger.File = flogger
} }
// GetFileLogger get current file logger. // GetFileLogger get current file logger.
func (s serverConfigV5) GetFileLogger() fileLogger { func (s serverConfigV6) GetFileLogger() fileLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.File return s.Logger.File
} }
// SetConsoleLogger set new console logger. // SetConsoleLogger set new console logger.
func (s *serverConfigV5) SetConsoleLogger(clogger consoleLogger) { func (s *serverConfigV6) SetConsoleLogger(clogger consoleLogger) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.Console = clogger s.Logger.Console = clogger
} }
// GetConsoleLogger get current console logger. // GetConsoleLogger get current console logger.
func (s serverConfigV5) GetConsoleLogger() consoleLogger { func (s serverConfigV6) GetConsoleLogger() consoleLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.Console return s.Logger.Console
} }
// SetSyslogLogger set new syslog logger. // SetSyslogLogger set new syslog logger.
func (s *serverConfigV5) SetSyslogLogger(slogger syslogLogger) { func (s *serverConfigV6) SetSyslogLogger(slogger syslogLogger) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Logger.Syslog = slogger s.Logger.Syslog = slogger
} }
// GetSyslogLogger get current syslog logger. // GetSyslogLogger get current syslog logger.
func (s *serverConfigV5) GetSyslogLogger() syslogLogger { func (s *serverConfigV6) GetSyslogLogger() syslogLogger {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Logger.Syslog return s.Logger.Syslog
} }
// SetRegion set new region. // SetRegion set new region.
func (s *serverConfigV5) SetRegion(region string) { func (s *serverConfigV6) SetRegion(region string) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Region = region s.Region = region
} }
// GetRegion get current region. // GetRegion get current region.
func (s serverConfigV5) GetRegion() string { func (s serverConfigV6) GetRegion() string {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Region return s.Region
} }
// SetCredentials set new credentials. // SetCredentials set new credentials.
func (s *serverConfigV5) SetCredential(creds credential) { func (s *serverConfigV6) SetCredential(creds credential) {
s.rwMutex.Lock() s.rwMutex.Lock()
defer s.rwMutex.Unlock() defer s.rwMutex.Unlock()
s.Credential = creds s.Credential = creds
} }
// GetCredentials get current credentials. // GetCredentials get current credentials.
func (s serverConfigV5) GetCredential() credential { func (s serverConfigV6) GetCredential() credential {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()
return s.Credential return s.Credential
} }
// Save config. // Save config.
func (s serverConfigV5) Save() error { func (s serverConfigV6) Save() error {
s.rwMutex.RLock() s.rwMutex.RLock()
defer s.rwMutex.RUnlock() defer s.rwMutex.RUnlock()

381
event-notifier.go Normal file
View File

@ -0,0 +1,381 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"bytes"
"encoding/xml"
"errors"
"fmt"
"net/url"
"path"
"sync"
"time"
"github.com/Sirupsen/logrus"
)
// Global event notification queue. This is the queue that would be used to send all notifications.
type eventNotifier struct {
rwMutex *sync.RWMutex
// Collection of 'bucket' and notification config.
notificationConfigs map[string]*notificationConfig
lambdaTargets map[string][]chan []NotificationEvent
queueTargets map[string]*logrus.Logger
}
// Represents data to be sent with notification event.
type eventData struct {
Type EventName
Bucket string
ObjInfo ObjectInfo
ReqParams map[string]string
}
// New notification event constructs a new notification event message from
// input request metadata which completed successfully.
func newNotificationEvent(event eventData) NotificationEvent {
/// Construct a new object created event.
region := serverConfig.GetRegion()
tnow := time.Now().UTC()
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
// Following blocks fills in all the necessary details of s3 event message structure.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
nEvent := NotificationEvent{
EventVersion: "2.0",
EventSource: "aws:s3",
AwsRegion: region,
EventTime: tnow.Format(iso8601Format),
EventName: event.Type.String(),
UserIdentity: defaultIdentity(),
RequestParameters: event.ReqParams,
ResponseElements: map[string]string{},
S3: eventMeta{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: bucketMeta{
Name: event.Bucket,
OwnerIdentity: defaultIdentity(),
ARN: "arn:aws:s3:::" + event.Bucket,
},
},
}
escapedObj := url.QueryEscape(event.ObjInfo.Name)
// For delete object event type, we do not need to set ETag and Size.
if event.Type == ObjectRemovedDelete {
nEvent.S3.Object = objectMeta{
Key: escapedObj,
Sequencer: sequencer,
}
return nEvent
}
// For all other events we should set ETag and Size.
nEvent.S3.Object = objectMeta{
Key: escapedObj,
ETag: event.ObjInfo.MD5Sum,
Size: event.ObjInfo.Size,
Sequencer: sequencer,
}
// Success.
return nEvent
}
// Fetch the saved queue target.
func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger {
return en.queueTargets[queueARN]
}
func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
return en.lambdaTargets[lambdaARN]
}
// Set a new lambda target for an input lambda ARN.
func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if listenerCh == nil {
return errors.New("invalid argument")
}
en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh)
return nil
}
// Remove lambda target for an input lambda ARN.
func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
lambdaTarget, ok := en.lambdaTargets[lambdaARN]
if ok {
for i, savedListenerCh := range lambdaTarget {
if listenerCh == savedListenerCh {
lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...)
if len(lambdaTarget) == 0 {
delete(en.lambdaTargets, lambdaARN)
break
}
en.lambdaTargets[lambdaARN] = lambdaTarget
}
}
}
}
// Returns true if bucket notification is set for the bucket, false otherwise.
func (en eventNotifier) IsBucketNotificationSet(bucket string) bool {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
_, ok := en.notificationConfigs[bucket]
return ok
}
// Fetch bucket notification config for an input bucket.
func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig {
en.rwMutex.RLock()
defer en.rwMutex.RUnlock()
return en.notificationConfigs[bucket]
}
// Set a new notification config for a bucket, this operation will overwrite any previous
// notification configs for the bucket.
func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error {
en.rwMutex.Lock()
defer en.rwMutex.Unlock()
if notificationCfg == nil {
return errors.New("invalid argument")
}
en.notificationConfigs[bucket] = notificationCfg
return nil
}
// eventNotify notifies an event to relevant targets based on their
// bucket notification configs.
func eventNotify(event eventData) {
// Notifies a new event.
// List of events reported through this function are
// - s3:ObjectCreated:Put
// - s3:ObjectCreated:Post
// - s3:ObjectCreated:Copy
// - s3:ObjectCreated:CompleteMultipartUpload
// - s3:ObjectRemoved:Delete
nConfig := eventN.GetBucketNotificationConfig(event.Bucket)
// No bucket notifications enabled, drop the event notification.
if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 {
return
}
// Event type.
eventType := event.Type.String()
// Object name.
objectName := event.ObjInfo.Name
// Save the notification event to be sent.
notificationEvent := []NotificationEvent{newNotificationEvent(event)}
// Validate if the event and object match the queue configs.
for _, qConfig := range nConfig.QueueConfigs {
eventMatch := eventMatch(eventType, qConfig.Events)
ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules)
if eventMatch && ruleMatch {
targetLog := eventN.GetQueueTarget(qConfig.QueueARN)
if targetLog != nil {
targetLog.WithFields(logrus.Fields{
"Records": notificationEvent,
}).Info()
}
}
}
// Validate if the event and object match the lambda configs.
for _, lambdaConfig := range nConfig.LambdaConfigs {
ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules)
eventMatch := eventMatch(eventType, lambdaConfig.Events)
if eventMatch && ruleMatch {
targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN)
for _, listener := range targetListeners {
listener <- notificationEvent
}
}
}
}
// loads notifcation config if any for a given bucket, returns back structured notification config.
func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) {
// Construct the notification config path.
notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
}
// Returns error for other errors.
return nil, err
}
var buffer bytes.Buffer
err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer)
if err != nil {
// 'notification.xml' not found return 'errNoSuchNotifications'.
// This is default when no bucket notifications are found on the bucket.
switch err.(type) {
case ObjectNotFound:
return nil, errNoSuchNotifications
}
// Returns error for other errors.
return nil, err
}
// Unmarshal notification bytes.
notificationConfigBytes := buffer.Bytes()
notificationCfg := &notificationConfig{}
if err = xml.Unmarshal(notificationConfigBytes, &notificationCfg); err != nil {
return nil, err
} // Successfully marshalled notification configuration.
// Return success.
return notificationCfg, nil
}
// loads all bucket notifications if present.
func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) {
// List buckets to proceed loading all notification configuration.
buckets, err := objAPI.ListBuckets()
if err != nil {
return nil, err
}
configs := make(map[string]*notificationConfig)
// Loads all bucket notifications.
for _, bucket := range buckets {
var nCfg *notificationConfig
nCfg, err = loadNotificationConfig(bucket.Name, objAPI)
if err != nil {
if err == errNoSuchNotifications {
continue
}
return nil, err
}
configs[bucket.Name] = nCfg
}
// Success.
return configs, nil
}
// Loads all queue targets, initializes each queueARNs depending on their config.
// Each instance of queueARN registers its own logrus to communicate with the
// queue service. QueueARN once initialized is not initialized again for the
// same queueARN, instead previous connection is used.
func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
queueTargets := make(map[string]*logrus.Logger)
// Load all amqp targets, initialize their respective loggers.
for accountID, amqpN := range serverConfig.GetAMQP() {
if !amqpN.Enable {
continue
}
// Construct the queue ARN for AMQP.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new AMQP logrus instance.
amqpLog, err := newAMQPNotify(accountID)
if err != nil {
return nil, err
}
queueTargets[queueARN] = amqpLog
}
// Load redis targets, initialize their respective loggers.
for accountID, redisN := range serverConfig.GetRedis() {
if !redisN.Enable {
continue
}
// Construct the queue ARN for Redis.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new Redis logrus instance.
redisLog, err := newRedisNotify(accountID)
if err != nil {
return nil, err
}
queueTargets[queueARN] = redisLog
}
// Load elastic targets, initialize their respective loggers.
for accountID, elasticN := range serverConfig.GetElasticSearch() {
if !elasticN.Enable {
continue
}
// Construct the queue ARN for Elastic.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new ElasticSearch logrus instance.
elasticLog, err := newElasticNotify(accountID)
if err != nil {
return nil, err
}
queueTargets[queueARN] = elasticLog
}
// Successfully initialized queue targets.
return queueTargets, nil
}
// Global instance of event notification queue.
var eventN *eventNotifier
// Initialize event notifier.
func initEventNotifier(objAPI ObjectLayer) error {
if objAPI == nil {
return errInvalidArgument
}
// Read all saved bucket notifications.
configs, err := loadAllBucketNotifications(objAPI)
if err != nil {
return err
}
// Initializes all queue targets.
queueTargets, err := loadAllQueueTargets()
if err != nil {
return err
}
// Inititalize event notifier queue.
eventN = &eventNotifier{
rwMutex: &sync.RWMutex{},
notificationConfigs: configs,
queueTargets: queueTargets,
lambdaTargets: make(map[string][]chan []NotificationEvent),
}
return nil
}

67
event-notifier_test.go Normal file
View File

@ -0,0 +1,67 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "testing"
// Tests various forms of inititalization of event notifier.
func TestInitEventNotifier(t *testing.T) {
fs, disk, err := getSingleNodeObjectLayer()
if err != nil {
t.Fatal("Unable to initialize FS backend.", err)
}
xl, disks, err := getXLObjectLayer()
if err != nil {
t.Fatal("Unable to initialize XL backend.", err)
}
disks = append(disks, disk)
for _, d := range disks {
defer removeAll(d)
}
// Collection of test cases for inititalizing event notifier.
testCases := []struct {
objAPI ObjectLayer
configs map[string]*notificationConfig
err error
}{
// Test 1 - invalid arguments.
{
objAPI: nil,
err: errInvalidArgument,
},
// Test 2 - valid FS object layer but no bucket notifications.
{
objAPI: fs,
err: nil,
},
// Test 3 - valid XL object layer but no bucket notifications.
{
objAPI: xl,
err: nil,
},
}
// Validate if event notifier is properly initialized.
for i, testCase := range testCases {
err = initEventNotifier(testCase.objAPI)
if err != testCase.err {
t.Errorf("Test %d: Expected %s, but got: %s", i+1, testCase.err, err)
}
}
}

View File

@ -137,12 +137,15 @@ func newFSObjects(disk string) (ObjectLayer, error) {
shutdownFS(storage) shutdownFS(storage)
}) })
// Return successfully initialized object layer. // Initialize fs objects.
return fsObjects{ fs := fsObjects{
storage: storage, storage: storage,
physicalDisk: disk, physicalDisk: disk,
listPool: newTreeWalkPool(globalLookupTimeout), listPool: newTreeWalkPool(globalLookupTimeout),
}, nil }
// Return successfully initialized object layer.
return fs, nil
} }
// StorageInfo - returns underlying storage statistics. // StorageInfo - returns underlying storage statistics.

View File

@ -28,7 +28,7 @@ const (
// minio configuration related constants. // minio configuration related constants.
const ( const (
globalMinioConfigVersion = "5" globalMinioConfigVersion = "6"
globalMinioConfigDir = ".minio" globalMinioConfigDir = ".minio"
globalMinioCertsDir = "certs" globalMinioCertsDir = "certs"
globalMinioCertFile = "public.crt" globalMinioCertFile = "public.crt"

View File

@ -22,6 +22,7 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"strings" "strings"
"time"
) )
// Validates location constraint in PutBucket request body. // Validates location constraint in PutBucket request body.
@ -129,3 +130,16 @@ func extractPostPolicyFormValues(reader *multipart.Reader) (filePart io.Reader,
} }
return filePart, fileName, formValues, nil return filePart, fileName, formValues, nil
} }
// Send whitespace character, once every 5secs, until CompleteMultipartUpload is done.
// CompleteMultipartUpload method of the object layer indicates that it's done via doneCh
func sendWhiteSpaceChars(w http.ResponseWriter, doneCh <-chan struct{}) {
for {
select {
case <-time.After(5 * time.Second):
w.Write([]byte(" "))
case <-doneCh:
return
}
}
}

View File

@ -19,15 +19,10 @@ package main
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"errors"
"os"
"runtime"
"runtime/debug" "runtime/debug"
"strconv"
"strings" "strings"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/dustin/go-humanize"
) )
type fields map[string]interface{} type fields map[string]interface{}
@ -40,42 +35,13 @@ var log = logrus.New() // Default console logger.
// - console [default] // - console [default]
// - file // - file
// - syslog // - syslog
// - amqp
// - elasticsearch
//
type logger struct { type logger struct {
Console consoleLogger `json:"console"` Console consoleLogger `json:"console"`
File fileLogger `json:"file"` File fileLogger `json:"file"`
Syslog syslogLogger `json:"syslog"` Syslog syslogLogger `json:"syslog"`
AMQP amqpLogger `json:"amqp"`
ElasticSearch elasticSearchLogger `json:"elasticsearch"`
Redis redisLogger `json:"redis"`
// Add new loggers here. // Add new loggers here.
} }
var errLoggerNotEnabled = errors.New("requested logger type is not enabled")
// sysInfo returns useful system statistics.
func sysInfo() map[string]string {
host, err := os.Hostname()
if err != nil {
host = ""
}
memstats := &runtime.MemStats{}
runtime.ReadMemStats(memstats)
return map[string]string{
"host.name": host,
"host.os": runtime.GOOS,
"host.arch": runtime.GOARCH,
"host.lang": runtime.Version(),
"host.cpus": strconv.Itoa(runtime.NumCPU()),
"mem.used": humanize.Bytes(memstats.Alloc),
"mem.total": humanize.Bytes(memstats.Sys),
"mem.heap.used": humanize.Bytes(memstats.HeapAlloc),
"mem.heap.total": humanize.Bytes(memstats.HeapSys),
}
}
// stackInfo returns printable stack trace. // stackInfo returns printable stack trace.
func stackInfo() string { func stackInfo() string {
// Convert stack-trace bytes to io.Reader. // Convert stack-trace bytes to io.Reader.

View File

@ -76,12 +76,6 @@ func enableLoggers() {
// Enable all loggers here. // Enable all loggers here.
enableConsoleLogger() enableConsoleLogger()
enableFileLogger() enableFileLogger()
// Adding new bucket notification related loggers.
enableAMQPLogger()
enableElasticLogger()
enableRedisLogger()
// Add your logger here. // Add your logger here.
} }

136
notifiers.go Normal file
View File

@ -0,0 +1,136 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"errors"
"strings"
"github.com/minio/minio/pkg/wildcard"
)
// SQS type.
const (
// Minio sqs ARN prefix.
minioSqs = "arn:minio:sqs:"
// Static string indicating queue type 'amqp'.
queueTypeAMQP = "amqp"
// Static string indicating queue type 'elasticsearch'.
queueTypeElastic = "elasticsearch"
// Static string indicating queue type 'redis'.
queueTypeRedis = "redis"
)
// Lambda type.
const (
// Minio lambda ARN prefix.
minioLambda = "arn:minio:lambda:"
// Static string indicating lambda type 'lambda'.
lambdaTypeMinio = "lambda"
)
var errNotifyNotEnabled = errors.New("requested notifier not enabled")
// Notifier represents collection of supported notification queues.
type notifier struct {
AMQP map[string]amqpNotify `json:"amqp"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
// Add new notification queues.
}
// Returns true if queueArn is for an AMQP queue.
func isAMQPQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeAMQP {
return false
}
amqpL := serverConfig.GetAMQPNotifyByID(sqsArn.AccountID)
if !amqpL.Enable {
return false
}
// Connect to amqp server to validate.
amqpC, err := dialAMQP(amqpL)
if err != nil {
errorIf(err, "Unable to connect to amqp service. %#v", amqpL)
return false
}
defer amqpC.Close()
return true
}
// Returns true if queueArn is for an Redis queue.
func isRedisQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeRedis {
return false
}
rNotify := serverConfig.GetRedisNotifyByID(sqsArn.AccountID)
if !rNotify.Enable {
return false
}
// Connect to redis server to validate.
rPool, err := dialRedis(rNotify)
if err != nil {
errorIf(err, "Unable to connect to redis service. %#v", rNotify)
return false
}
defer rPool.Close()
return true
}
// Returns true if queueArn is for an ElasticSearch queue.
func isElasticQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeElastic {
return false
}
esNotify := serverConfig.GetElasticSearchNotifyByID(sqsArn.AccountID)
if !esNotify.Enable {
return false
}
elasticC, err := dialElastic(esNotify)
if err != nil {
errorIf(err, "Unable to connect to elasticsearch service %#v", esNotify)
return false
}
defer elasticC.Stop()
return true
}
// Match function matches wild cards in 'pattern' for events.
func eventMatch(eventType string, events []string) (ok bool) {
for _, event := range events {
ok = wildcard.Match(event, eventType)
if ok {
break
}
}
return ok
}
// Filter rule match, matches an object against the filter rules.
func filterRuleMatch(object string, frs []filterRule) bool {
var prefixMatch, suffixMatch = true, true
for _, fr := range frs {
if isValidFilterNamePrefix(fr.Name) {
prefixMatch = strings.HasPrefix(object, fr.Value)
} else if isValidFilterNameSuffix(fr.Name) {
suffixMatch = strings.HasSuffix(object, fr.Value)
}
}
return prefixMatch && suffixMatch
}

View File

@ -99,7 +99,7 @@ func TestEventMatch(t *testing.T) {
} }
for i, testCase := range testCases { for i, testCase := range testCases {
ok := eventMatch(testCase.eventName, testCase.events) ok := eventMatch(testCase.eventName.String(), testCase.events)
if testCase.match != ok { if testCase.match != ok {
t.Errorf("Test %d: Expected \"%t\", got \"%t\"", i+1, testCase.match, ok) t.Errorf("Test %d: Expected \"%t\", got \"%t\"", i+1, testCase.match, ok)
} }

View File

@ -17,15 +17,16 @@
package main package main
import ( import (
"io/ioutil"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
// amqpLogger - represents logrus compatible AMQP hook. // amqpNotify - represents logrus compatible AMQP hook.
// All fields represent AMQP configuration details. // All fields represent AMQP configuration details.
type amqpLogger struct { type amqpNotify struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Level string `json:"level"`
URL string `json:"url"` URL string `json:"url"`
Exchange string `json:"exchange"` Exchange string `json:"exchange"`
RoutingKey string `json:"routineKey"` RoutingKey string `json:"routineKey"`
@ -39,13 +40,16 @@ type amqpLogger struct {
} }
type amqpConn struct { type amqpConn struct {
params amqpLogger params amqpNotify
*amqp.Connection *amqp.Connection
} }
func dialAMQP(amqpL amqpLogger) (amqpConn, error) { // dialAMQP - dials and returns an amqpConnection instance,
// for sending notifications. Returns error if amqp logger
// is not enabled.
func dialAMQP(amqpL amqpNotify) (amqpConn, error) {
if !amqpL.Enable { if !amqpL.Enable {
return amqpConn{}, errLoggerNotEnabled return amqpConn{}, errNotifyNotEnabled
} }
conn, err := amqp.Dial(amqpL.URL) conn, err := amqp.Dial(amqpL.URL)
if err != nil { if err != nil {
@ -54,32 +58,31 @@ func dialAMQP(amqpL amqpLogger) (amqpConn, error) {
return amqpConn{Connection: conn, params: amqpL}, nil return amqpConn{Connection: conn, params: amqpL}, nil
} }
func enableAMQPLogger() error { func newAMQPNotify(accountID string) (*logrus.Logger, error) {
amqpL := serverConfig.GetAMQPLogger() amqpL := serverConfig.GetAMQPNotifyByID(accountID)
// Connect to amqp server. // Connect to amqp server.
amqpC, err := dialAMQP(amqpL) amqpC, err := dialAMQP(amqpL)
if err != nil { if err != nil {
return err return nil, err
} }
lvl, err := logrus.ParseLevel(amqpL.Level) amqpLog := logrus.New()
fatalIf(err, "Unknown log level found in the config file.")
// Disable writing to console.
amqpLog.Out = ioutil.Discard
// Add a amqp hook. // Add a amqp hook.
log.Hooks.Add(amqpC) amqpLog.Hooks.Add(amqpC)
// Set default JSON formatter. // Set default JSON formatter.
log.Formatter = new(logrus.JSONFormatter) amqpLog.Formatter = new(logrus.JSONFormatter)
// Set default log level to info. // Successfully enabled all AMQPs.
log.Level = lvl return amqpLog, nil
// Successfully enabled.
return nil
} }
// Fire is called when an event should be sent to the message broker. // Fire is called when an event should be sent to the message broker.k
func (q amqpConn) Fire(entry *logrus.Entry) error { func (q amqpConn) Fire(entry *logrus.Entry) error {
ch, err := q.Connection.Channel() ch, err := q.Connection.Channel()
if err != nil { if err != nil {
@ -137,11 +140,6 @@ func (q amqpConn) Fire(entry *logrus.Entry) error {
// Levels is available logging levels. // Levels is available logging levels.
func (q amqpConn) Levels() []logrus.Level { func (q amqpConn) Levels() []logrus.Level {
return []logrus.Level{ return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel, logrus.InfoLevel,
logrus.DebugLevel,
} }
} }

View File

@ -18,81 +18,80 @@ package main
import ( import (
"errors" "errors"
"io/ioutil"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"gopkg.in/olivere/elastic.v3" "gopkg.in/olivere/elastic.v3"
) )
// elasticQueue is a elasticsearch event notification queue. // elasticQueue is a elasticsearch event notification queue.
type elasticSearchLogger struct { type elasticSearchNotify struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Level string `json:"level"`
URL string `json:"url"` URL string `json:"url"`
Index string `json:"index"` Index string `json:"index"`
} }
type elasticClient struct { type elasticClient struct {
*elastic.Client *elastic.Client
params elasticSearchLogger params elasticSearchNotify
} }
// Connects to elastic search instance at URL. // Connects to elastic search instance at URL.
func dialElastic(esLogger elasticSearchLogger) (*elastic.Client, error) { func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) {
if !esLogger.Enable { if !esNotify.Enable {
return nil, errLoggerNotEnabled return nil, errNotifyNotEnabled
} }
client, err := elastic.NewClient(elastic.SetURL(esLogger.URL), elastic.SetSniff(false)) client, err := elastic.NewClient(elastic.SetURL(esNotify.URL), elastic.SetSniff(false))
if err != nil { if err != nil {
return nil, err return nil, err
} }
return client, nil return client, nil
} }
// Enables elasticsearch logger. func newElasticNotify(accountID string) (*logrus.Logger, error) {
func enableElasticLogger() error { esNotify := serverConfig.GetElasticSearchNotifyByID(accountID)
esLogger := serverConfig.GetElasticSearchLogger()
// Dial to elastic search. // Dial to elastic search.
client, err := dialElastic(esLogger) client, err := dialElastic(esNotify)
if err != nil { if err != nil {
return err return nil, err
} }
// Use the IndexExists service to check if a specified index exists. // Use the IndexExists service to check if a specified index exists.
exists, err := client.IndexExists(esLogger.Index).Do() exists, err := client.IndexExists(esNotify.Index).Do()
if err != nil { if err != nil {
return err return nil, err
} }
// Index does not exist, attempt to create it. // Index does not exist, attempt to create it.
if !exists { if !exists {
var createIndex *elastic.IndicesCreateResult var createIndex *elastic.IndicesCreateResult
createIndex, err = client.CreateIndex(esLogger.Index).Do() createIndex, err = client.CreateIndex(esNotify.Index).Do()
if err != nil { if err != nil {
return err return nil, err
} }
if !createIndex.Acknowledged { if !createIndex.Acknowledged {
return errors.New("index not created") return nil, errors.New("index not created")
} }
} }
elasticCl := elasticClient{ elasticCl := elasticClient{
Client: client, Client: client,
params: esLogger, params: esNotify,
} }
lvl, err := logrus.ParseLevel(esLogger.Level) elasticSearchLog := logrus.New()
fatalIf(err, "Unknown log level found in the config file.")
// Add a elasticsearch hook. // Disable writing to console.
log.Hooks.Add(elasticCl) elasticSearchLog.Out = ioutil.Discard
// Add a elasticSearch hook.
elasticSearchLog.Hooks.Add(elasticCl)
// Set default JSON formatter. // Set default JSON formatter.
log.Formatter = new(logrus.JSONFormatter) elasticSearchLog.Formatter = new(logrus.JSONFormatter)
// Set default log level to info. // Success, elastic search successfully initialized.
log.Level = lvl return elasticSearchLog, nil
return nil
} }
// Fire is required to implement logrus hook // Fire is required to implement logrus hook
@ -108,11 +107,6 @@ func (q elasticClient) Fire(entry *logrus.Entry) error {
// Required for logrus hook implementation // Required for logrus hook implementation
func (q elasticClient) Levels() []logrus.Level { func (q elasticClient) Levels() []logrus.Level {
return []logrus.Level{ return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel, logrus.InfoLevel,
logrus.DebugLevel,
} }
} }

View File

@ -17,16 +17,16 @@
package main package main
import ( import (
"io/ioutil"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/minio/redigo/redis" "github.com/minio/redigo/redis"
) )
// redisLogger to send logs to Redis server // redisNotify to send logs to Redis server
type redisLogger struct { type redisNotify struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Level string `json:"level"`
Addr string `json:"address"` Addr string `json:"address"`
Password string `json:"password"` Password string `json:"password"`
Key string `json:"key"` Key string `json:"key"`
@ -34,17 +34,17 @@ type redisLogger struct {
type redisConn struct { type redisConn struct {
*redis.Pool *redis.Pool
params redisLogger params redisNotify
} }
// Dial a new connection to redis instance at addr, optionally with a password if any. // Dial a new connection to redis instance at addr, optionally with a password if any.
func dialRedis(rLogger redisLogger) (*redis.Pool, error) { func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
// Return error if redis not enabled. // Return error if redis not enabled.
if !rLogger.Enable { if !rNotify.Enable {
return nil, errLoggerNotEnabled return nil, errNotifyNotEnabled
} }
addr := rLogger.Addr addr := rNotify.Addr
password := rLogger.Password password := rNotify.Password
rPool := &redis.Pool{ rPool := &redis.Pool{
MaxIdle: 3, MaxIdle: 3,
IdleTimeout: 240 * time.Second, IdleTimeout: 240 * time.Second,
@ -81,35 +81,34 @@ func dialRedis(rLogger redisLogger) (*redis.Pool, error) {
return rPool, nil return rPool, nil
} }
func enableRedisLogger() error { func newRedisNotify(accountID string) (*logrus.Logger, error) {
rLogger := serverConfig.GetRedisLogger() rNotify := serverConfig.GetRedisNotifyByID(accountID)
// Dial redis. // Dial redis.
rPool, err := dialRedis(rLogger) rPool, err := dialRedis(rNotify)
if err != nil { if err != nil {
return err return nil, err
} }
rrConn := redisConn{ rrConn := redisConn{
Pool: rPool, Pool: rPool,
params: rLogger, params: rNotify,
} }
lvl, err := logrus.ParseLevel(rLogger.Level) redisLog := logrus.New()
fatalIf(err, "Unknown log level found in the config file.")
// Add a elasticsearch hook. redisLog.Out = ioutil.Discard
log.Hooks.Add(rrConn)
// Set default JSON formatter. // Set default JSON formatter.
log.Formatter = new(logrus.JSONFormatter) redisLog.Formatter = new(logrus.JSONFormatter)
// Set default log level to info. redisLog.Hooks.Add(rrConn)
log.Level = lvl
return nil // Success, redis enabled.
return redisLog, nil
} }
// Fire is called when an event should be sent to the message broker.
func (r redisConn) Fire(entry *logrus.Entry) error { func (r redisConn) Fire(entry *logrus.Entry) error {
rConn := r.Pool.Get() rConn := r.Pool.Get()
defer rConn.Close() defer rConn.Close()
@ -129,11 +128,6 @@ func (r redisConn) Fire(entry *logrus.Entry) error {
// Required for logrus hook implementation // Required for logrus hook implementation
func (r redisConn) Levels() []logrus.Level { func (r redisConn) Levels() []logrus.Level {
return []logrus.Level{ return []logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
logrus.WarnLevel,
logrus.InfoLevel, logrus.InfoLevel,
logrus.DebugLevel,
} }
} }

View File

@ -27,7 +27,6 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
mux "github.com/gorilla/mux" mux "github.com/gorilla/mux"
) )
@ -356,20 +355,17 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
// Explicitly close the reader, to avoid fd leaks. // Explicitly close the reader, to avoid fd leaks.
pipeReader.Close() pipeReader.Close()
// Load notification config if any. if eventN.IsBucketNotificationSet(bucket) {
nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedCopy, bucket, objInfo) eventNotify(eventData{
Type: ObjectCreatedCopy,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }
// PutObjectHandler - PUT Object // PutObjectHandler - PUT Object
@ -440,18 +436,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
} }
writeSuccessResponse(w, nil) writeSuccessResponse(w, nil)
// Load notification config if any.
nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors return.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Fetch object info for notifications. // Fetch object info for notifications.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil { if err != nil {
@ -459,8 +443,17 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return return
} }
if eventN.IsBucketNotificationSet(bucket) {
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, objInfo) eventNotify(eventData{
Type: ObjectCreatedPut,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }
/// Multipart objectAPIHandlers /// Multipart objectAPIHandlers
@ -614,19 +607,6 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter,
writeSuccessNoContent(w) writeSuccessNoContent(w)
} }
// Send whitespace character, once every 5secs, until CompleteMultipartUpload is done.
// CompleteMultipartUpload method of the object layer indicates that it's done via doneCh
func sendWhiteSpaceChars(w http.ResponseWriter, doneCh <-chan struct{}) {
for {
select {
case <-time.After(5 * time.Second):
w.Write([]byte(" "))
case <-doneCh:
return
}
}
}
// ListObjectPartsHandler - List object parts // ListObjectPartsHandler - List object parts
func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
@ -777,18 +757,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
w.Write(encodedSuccessResponse) w.Write(encodedSuccessResponse)
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
// Load notification config if any.
nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Fetch object info for notifications. // Fetch object info for notifications.
objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil { if err != nil {
@ -796,8 +764,17 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
return return
} }
if eventN.IsBucketNotificationSet(bucket) {
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedCompleteMultipartUpload, bucket, objInfo) eventNotify(eventData{
Type: ObjectCreatedCompleteMultipartUpload,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }
/// Delete objectAPIHandlers /// Delete objectAPIHandlers
@ -834,18 +811,17 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
} }
writeSuccessNoContent(w) writeSuccessNoContent(w)
// Load notification config if any. if eventN.IsBucketNotificationSet(bucket) {
nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Notify object deleted event. // Notify object deleted event.
notifyObjectDeletedEvent(nConfig, bucket, object) eventNotify(eventData{
Type: ObjectRemovedDelete,
Bucket: bucket,
ObjInfo: ObjectInfo{
Name: object,
},
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }

View File

@ -6,7 +6,7 @@
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *shouldP
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software

16
pkg/sys/stats_test.go Normal file
View File

@ -0,0 +1,16 @@
// +build linux darwin windows
package sys
import "testing"
// Test get stats result.
func TestGetStats(t *testing.T) {
stats, err := GetStats()
if err != nil {
t.Errorf("Tests: Expected `nil`, Got %s", err)
}
if stats.TotalRAM == 0 {
t.Errorf("Tests: Expected `n > 0`, Got %d", stats.TotalRAM)
}
}

204
queues.go
View File

@ -1,204 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/minio/minio/pkg/wildcard"
)
const (
minioSqs = "arn:minio:sqs:"
// Static string indicating queue type 'amqp'.
queueTypeAMQP = "1:amqp"
// Static string indicating queue type 'elasticsearch'.
queueTypeElastic = "1:elasticsearch"
// Static string indicating queue type 'redis'.
queueTypeRedis = "1:redis"
)
// Returns true if queueArn is for an AMQP queue.
func isAMQPQueue(sqsArn arnMinioSqs) bool {
if sqsArn.sqsType != queueTypeAMQP {
return false
}
amqpL := serverConfig.GetAMQPLogger()
// Connect to amqp server to validate.
amqpC, err := dialAMQP(amqpL)
if err != nil {
errorIf(err, "Unable to connect to amqp service. %#v", amqpL)
return false
}
defer amqpC.Close()
return true
}
// Returns true if queueArn is for an Redis queue.
func isRedisQueue(sqsArn arnMinioSqs) bool {
if sqsArn.sqsType != queueTypeRedis {
return false
}
rLogger := serverConfig.GetRedisLogger()
// Connect to redis server to validate.
rPool, err := dialRedis(rLogger)
if err != nil {
errorIf(err, "Unable to connect to redis service. %#v", rLogger)
return false
}
defer rPool.Close()
return true
}
// Returns true if queueArn is for an ElasticSearch queue.
func isElasticQueue(sqsArn arnMinioSqs) bool {
if sqsArn.sqsType != queueTypeElastic {
return false
}
esLogger := serverConfig.GetElasticSearchLogger()
elasticC, err := dialElastic(esLogger)
if err != nil {
errorIf(err, "Unable to connect to elasticsearch service %#v", esLogger)
return false
}
defer elasticC.Stop()
return true
}
// Match function matches wild cards in 'pattern' for events.
func eventMatch(eventType EventName, events []string) (ok bool) {
for _, event := range events {
ok = wildcard.Match(event, eventType.String())
if ok {
break
}
}
return ok
}
// Filter rule match, matches an object against the filter rules.
func filterRuleMatch(object string, frs []filterRule) bool {
var prefixMatch, suffixMatch = true, true
for _, fr := range frs {
if isValidFilterNamePrefix(fr.Name) {
prefixMatch = strings.HasPrefix(object, fr.Value)
} else if isValidFilterNameSuffix(fr.Name) {
suffixMatch = strings.HasSuffix(object, fr.Value)
}
}
return prefixMatch && suffixMatch
}
// NotifyObjectCreatedEvent - notifies a new 's3:ObjectCreated' event.
// List of events reported through this function are
// - s3:ObjectCreated:Put
// - s3:ObjectCreated:Post
// - s3:ObjectCreated:Copy
// - s3:ObjectCreated:CompleteMultipartUpload
func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, bucket string, objInfo ObjectInfo) {
/// Construct a new object created event.
region := serverConfig.GetRegion()
tnow := time.Now().UTC()
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
// Following blocks fills in all the necessary details of s3 event message structure.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
events := []*NotificationEvent{
{
EventVersion: "2.0",
EventSource: "aws:s3",
AwsRegion: region,
EventTime: tnow.Format(iso8601Format),
EventName: eventType.String(),
UserIdentity: defaultIdentity(),
RequestParameters: map[string]string{},
ResponseElements: map[string]string{},
S3: eventMeta{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: bucketMeta{
Name: bucket,
OwnerIdentity: defaultIdentity(),
ARN: "arn:aws:s3:::" + bucket,
},
Object: objectMeta{
Key: url.QueryEscape(objInfo.Name),
ETag: objInfo.MD5Sum,
Size: objInfo.Size,
Sequencer: sequencer,
},
},
},
}
// Notify to all the configured queues.
for _, qConfig := range nConfig.QueueConfigurations {
ruleMatch := filterRuleMatch(objInfo.Name, qConfig.Filter.Key.FilterRules)
if eventMatch(eventType, qConfig.Events) && ruleMatch {
log.WithFields(logrus.Fields{
"Records": events,
}).Info()
}
}
}
// NotifyObjectRemovedEvent - notifies a new 's3:ObjectRemoved' event.
// List of events reported through this function are
// - s3:ObjectRemoved:Delete
func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object string) {
region := serverConfig.GetRegion()
tnow := time.Now().UTC()
sequencer := fmt.Sprintf("%X", tnow.UnixNano())
// Following blocks fills in all the necessary details of s3 event message structure.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
events := []*NotificationEvent{
{
EventVersion: "2.0",
EventSource: "aws:s3",
AwsRegion: region,
EventTime: tnow.Format(iso8601Format),
EventName: ObjectRemovedDelete.String(),
UserIdentity: defaultIdentity(),
RequestParameters: map[string]string{},
ResponseElements: map[string]string{},
S3: eventMeta{
SchemaVersion: "1.0",
ConfigurationID: "Config",
Bucket: bucketMeta{
Name: bucket,
OwnerIdentity: defaultIdentity(),
ARN: "arn:aws:s3:::" + bucket,
},
Object: objectMeta{
Key: url.QueryEscape(object),
Sequencer: sequencer,
},
},
},
}
// Notify to all the configured queues.
for _, qConfig := range nConfig.QueueConfigurations {
ruleMatch := filterRuleMatch(object, qConfig.Filter.Key.FilterRules)
if eventMatch(ObjectRemovedDelete, qConfig.Events) && ruleMatch {
log.WithFields(logrus.Fields{
"Records": events,
}).Info()
}
}
}

View File

@ -66,6 +66,10 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
ObjectAPI: objAPI, ObjectAPI: objAPI,
} }
// Initialize a new event notifier.
err = initEventNotifier(objAPI)
fatalIf(err, "Unable to initialize event notification queue")
// Initialize router. // Initialize router.
mux := router.NewRouter() mux := router.NewRouter()

View File

@ -74,6 +74,89 @@ func (s *TestSuiteCommon) TestAuth(c *C) {
c.Assert(len(accessID), Equals, minioAccessID) c.Assert(len(accessID), Equals, minioAccessID)
} }
// TestBucketNotification - Inserts the bucket notification and
// verifies it by fetching the notification back.
func (s *TestSuiteCommon) TestBucketNotification(c *C) {
// Sample bucket notification
bucketNotificationBuf := `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
// generate a random bucket Name.
bucketName := getRandomBucketName()
// HTTP request to create the bucket.
request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName),
0, nil, s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client := http.Client{}
// execute the request.
response, err := client.Do(request)
c.Assert(err, IsNil)
// assert the http response status code.
c.Assert(response.StatusCode, Equals, http.StatusOK)
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
// execute the HTTP request to create bucket.
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
// Fetch the uploaded policy.
request, err = newTestSignedRequest("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil,
s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
response, err = client.Do(request)
c.Assert(err, IsNil)
c.Assert(response.StatusCode, Equals, http.StatusOK)
bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body)
c.Assert(err, IsNil)
// Verify if downloaded policy matches with previousy uploaded.
c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true)
invalidBucketNotificationBuf := `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:minio</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
// execute the HTTP request to create bucket.
response, err = client.Do(request)
c.Assert(err, IsNil)
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
invalidBucketNotificationBuf = `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-west-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
// execute the HTTP request to create bucket.
response, err = client.Do(request)
c.Assert(err, IsNil)
verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest)
invalidBucketNotificationBuf = `<NotificationConfiguration><CloudFunctionConfiguration><Event>s3:ObjectCreated:Invalid</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><CloudFunction>arn:minio:lambda:us-east-1:444455556666:lambda</CloudFunction></CloudFunctionConfiguration></NotificationConfiguration>`
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey)
c.Assert(err, IsNil)
client = http.Client{}
// execute the HTTP request to create bucket.
response, err = client.Do(request)
c.Assert(err, IsNil)
verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest)
}
// TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back. // TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back.
// Deletes the policy and verifies the deletion by fetching it back. // Deletes the policy and verifies the deletion by fetching it back.
func (s *TestSuiteCommon) TestBucketPolicy(c *C) { func (s *TestSuiteCommon) TestBucketPolicy(c *C) {

View File

@ -559,6 +559,20 @@ func getHeadObjectURL(endPoint, bucketName, objectName string) string {
return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{}) return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{})
} }
// return URL for inserting bucket notification.
func getPutNotificationURL(endPoint, bucketName string) string {
queryValue := url.Values{}
queryValue.Set("notification", "")
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
}
// return URL for fetching bucket notification.
func getGetNotificationURL(endPoint, bucketName string) string {
queryValue := url.Values{}
queryValue.Set("notification", "")
return makeTestTargetURL(endPoint, bucketName, "", queryValue)
}
// return URL for inserting bucket policy. // return URL for inserting bucket policy.
func getPutPolicyURL(endPoint, bucketName string) string { func getPutPolicyURL(endPoint, bucketName string) string {
queryValue := url.Values{} queryValue := url.Values{}

View File

@ -386,18 +386,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return return
} }
// Load notification config if any.
nConfig, err := loadNotificationConfig(web.ObjectAPI, bucket)
// Notifications not set, return.
if err == errNoSuchNotifications {
return
}
// For all other errors, return.
if err != nil {
errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket)
return
}
// Fetch object info for notifications. // Fetch object info for notifications.
objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object) objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object)
if err != nil { if err != nil {
@ -405,8 +393,17 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return return
} }
if eventN.IsBucketNotificationSet(bucket) {
// Notify object created event. // Notify object created event.
notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, objInfo) eventNotify(eventData{
Type: ObjectCreatedPut,
Bucket: bucket,
ObjInfo: objInfo,
ReqParams: map[string]string{
"sourceIPAddress": r.RemoteAddr,
},
})
}
} }
// Download - file download handler. // Download - file download handler.

View File

@ -320,7 +320,7 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st
// PutObjectPart - reads incoming stream and internally erasure codes // PutObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to single put operation but it is part // them. This call is similar to single put operation but it is part
// of the multipart transcation. // of the multipart transaction.
// //
// Implements S3 compatible Upload Part API. // Implements S3 compatible Upload Part API.
func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) { func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {