From 064c51162d960e7eb2605f5c61146c0288016152 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 4 Aug 2016 22:01:58 -0700 Subject: [PATCH] api: Add new ListenBucketNotificationHandler. (#2336) This API is precursor before implementing `minio lambda` and `mc` continous replication. This new api is an extention to BucketNofication APIs. // Request ``` GET /bucket?notificationARN=arn:minio:lambda:us-east-1:10:minio HTTP/1.1 ... ... ``` // Response ``` {"Records": ...} ... ... ... {"Records": ...} ``` --- .gitignore | 2 +- api-router.go | 2 + appveyor.yml | 1 + bucket-handlers.go | 29 +- bucket-notification-datatypes.go | 46 ++- bucket-notification-handlers.go | 239 +++++++---- bucket-notification-handlers_test.go | 90 +++++ bucket-notification-utils.go | 156 +++++-- bucket-notification-utils_test.go | 161 ++++++-- bucket-policy.go | 9 - config-migrate.go | 82 +++- config-old.go | 79 ++++ config-v5.go => config-v6.go | 85 ++-- event-notifier.go | 381 ++++++++++++++++++ event-notifier_test.go | 67 +++ fs-v1.go | 9 +- globals.go | 2 +- handler-utils.go | 14 + logger.go | 40 +- main.go | 6 - notifiers.go | 136 +++++++ queues_test.go => notifiers_test.go | 2 +- logger-amqp.go => notify-amqp.go | 46 +-- ...lasticsearch.go => notify-elasticsearch.go | 58 ++- logger-redis.go => notify-redis.go | 48 +-- object-handlers.go | 112 ++--- pkg/sys/stats_linux.go | 2 +- pkg/sys/stats_test.go | 16 + queues.go | 204 ---------- routers.go | 4 + server_test.go | 83 ++++ test-utils_test.go | 14 + notifier.go => update-notifier.go | 0 web-handlers.go | 25 +- xl-v1-multipart.go | 2 +- 35 files changed, 1600 insertions(+), 652 deletions(-) create mode 100644 bucket-notification-handlers_test.go rename config-v5.go => config-v6.go (64%) create mode 100644 event-notifier.go create mode 100644 event-notifier_test.go create mode 100644 notifiers.go rename queues_test.go => notifiers_test.go (97%) rename logger-amqp.go => notify-amqp.go (79%) rename logger-elasticsearch.go => notify-elasticsearch.go (63%) rename logger-redis.go => notify-redis.go (73%) create mode 100644 pkg/sys/stats_test.go delete mode 100644 queues.go rename notifier.go => update-notifier.go (100%) diff --git a/.gitignore b/.gitignore index 3b25db0a3..98d92f1b6 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,4 @@ vendor/**/*.json release .DS_Store *.syso -coverage.out \ No newline at end of file +coverage.txt diff --git a/api-router.go b/api-router.go index e69095cb6..f868f9241 100644 --- a/api-router.go +++ b/api-router.go @@ -62,6 +62,8 @@ func registerAPIRouter(mux *router.Router, api objectAPIHandlers) { bucket.Methods("GET").HandlerFunc(api.GetBucketPolicyHandler).Queries("policy", "") // GetBucketNotification bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "") + // ListenBucketNotification + bucket.Methods("GET").HandlerFunc(api.ListenBucketNotificationHandler).Queries("notificationARN", "{notificationARN:.*}") // ListMultipartUploads bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "") // ListObjectsV2 diff --git a/appveyor.yml b/appveyor.yml index eb59b0f11..79c50a825 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -24,6 +24,7 @@ install: build_script: - go test -race . - go test -race github.com/minio/minio/pkg... + - go test -coverprofile=coverage.txt -covermode=atomic - go run buildscripts/gen-ldflags.go > temp.txt - set /p BUILD_LDFLAGS== 0 { - _, err = io.CopyN(&buffer, r.Body, r.ContentLength) + bufferSize, err = io.CopyN(&buffer, r.Body, r.ContentLength) } else { - _, err = io.Copy(&buffer, r.Body) + bufferSize, err = io.Copy(&buffer, r.Body) } if err != nil { errorIf(err, "Unable to read incoming body.") @@ -196,16 +133,142 @@ func (api objectAPIHandlers) PutBucketNotificationHandler(w http.ResponseWriter, } // Proceed to save notification configuration. - size := int64(len(notificationConfigBytes)) - data := bytes.NewReader(notificationConfigBytes) notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) - _, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, size, data, nil) + _, err = api.ObjectAPI.PutObject(minioMetaBucket, notificationConfigPath, bufferSize, bytes.NewReader(buffer.Bytes()), nil) if err != nil { - errorIf(err, "Unable to write bucket notification configuration.", notificationConfigPath) + errorIf(err, "Unable to write bucket notification configuration.") writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) return } + // Set bucket notification config. + eventN.SetBucketNotificationConfig(bucket, ¬ificationCfg) + // Success. writeSuccessResponse(w, nil) } + +// writeNotification marshals notification message before writing to client. +func writeNotification(w http.ResponseWriter, notification map[string][]NotificationEvent) error { + // Invalid response writer. + if w == nil { + return errInvalidArgument + } + // Invalid notification input. + if notification == nil { + return errInvalidArgument + } + // Marshal notification data into XML and write to client. + notificationBytes, err := json.Marshal(¬ification) + if err != nil { + return err + } + // Add additional CRLF characters for client to + // differentiate the individual events properly. + _, err = w.Write(append(notificationBytes, crlf...)) + // Make sure we have flushed, this would set Transfer-Encoding: chunked. + w.(http.Flusher).Flush() + if err != nil { + return err + } + return nil +} + +// CRLF character used for chunked transfer in accordance with HTTP standards. +var crlf = []byte("\r\n") + +// sendBucketNotification - writes notification back to client on the response writer +// for each notification input, otherwise writes whitespace characters periodically +// to keep the connection active. Each notification messages are terminated by CRLF +// character. Upon any error received on response writer the for loop exits. +// +// TODO - do not log for all errors. +func sendBucketNotification(w http.ResponseWriter, arnListenerCh <-chan []NotificationEvent) { + var dummyEvents = map[string][]NotificationEvent{"Records": nil} + // Continuously write to client either timely empty structures + // every 5 seconds, or return back the notifications. + for { + select { + case events := <-arnListenerCh: + if err := writeNotification(w, map[string][]NotificationEvent{"Records": events}); err != nil { + errorIf(err, "Unable to write notification to client.") + return + } + case <-time.After(5 * time.Second): + if err := writeNotification(w, dummyEvents); err != nil { + errorIf(err, "Unable to write notification to client.") + return + } + } + } +} + +// Returns true if the queueARN is for an Minio queue. +func isMinL(lambdaARN arnLambda) bool { + return strings.HasSuffix(lambdaARN.Type, lambdaTypeMinio) +} + +// isMinioARNConfigured - verifies if one lambda ARN is valid and is enabled. +func isMinioARNConfigured(lambdaARN string, lambdaConfigs []lambdaConfig) bool { + for _, lambdaConfig := range lambdaConfigs { + // Validate if lambda ARN is already enabled. + if lambdaARN == lambdaConfig.LambdaARN { + return true + } + } + return false +} + +// ListenBucketNotificationHandler - list bucket notifications. +func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) { + // Validate request authorization. + if s3Error := checkAuth(r); s3Error != ErrNone { + writeErrorResponse(w, r, s3Error, r.URL.Path) + return + } + vars := mux.Vars(r) + bucket := vars["bucket"] + + // Get notification ARN. + lambdaARN := r.URL.Query().Get("notificationARN") + if lambdaARN == "" { + writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) + return + } + + // Validate if bucket exists. + _, err := api.ObjectAPI.GetBucketInfo(bucket) + if err != nil { + errorIf(err, "Unable to bucket info.") + writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path) + return + } + + notificationCfg := eventN.GetBucketNotificationConfig(bucket) + if notificationCfg == nil { + writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) + return + } + + // Notifications set, but do not have MINIO queue enabled, return. + if !isMinioARNConfigured(lambdaARN, notificationCfg.LambdaConfigs) { + writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) + return + } + + // Add all common headers. + setCommonHeaders(w) + + // Create a new notification event channel. + nEventCh := make(chan []NotificationEvent) + // Close the listener channel. + defer close(nEventCh) + + // Set lambda target. + eventN.SetLambdaTarget(lambdaARN, nEventCh) + // Remove lambda listener after the writer has closed or the client disconnected. + defer eventN.RemoveLambdaTarget(lambdaARN, nEventCh) + + // Start sending bucket notifications. + sendBucketNotification(w, nEventCh) +} diff --git a/bucket-notification-handlers_test.go b/bucket-notification-handlers_test.go new file mode 100644 index 000000000..90fe92901 --- /dev/null +++ b/bucket-notification-handlers_test.go @@ -0,0 +1,90 @@ +package main + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "testing" +) + +// Implement a dummy flush writer. +type flushWriter struct { + io.Writer +} + +// Flush writer is a dummy writer compatible with http.Flusher and http.ResponseWriter. +func (f *flushWriter) Flush() {} +func (f *flushWriter) Write(b []byte) (n int, err error) { return f.Writer.Write(b) } +func (f *flushWriter) Header() http.Header { return http.Header{} } +func (f *flushWriter) WriteHeader(code int) {} + +func newFlushWriter(writer io.Writer) *flushWriter { + return &flushWriter{writer} +} + +// Tests write notification code. +func TestWriteNotification(t *testing.T) { + // Initialize a new test config. + root, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Unable to initialize test config %s", err) + } + defer removeAll(root) + + var buffer bytes.Buffer + // Collection of test cases for each event writer. + testCases := []struct { + writer *flushWriter + event map[string][]NotificationEvent + err error + }{ + // Invalid input argument with writer `nil` - Test - 1 + { + writer: nil, + event: nil, + err: errInvalidArgument, + }, + // Invalid input argument with event `nil` - Test - 2 + { + writer: newFlushWriter(ioutil.Discard), + event: nil, + err: errInvalidArgument, + }, + // Unmarshal and write, validate last 5 bytes. - Test - 3 + { + writer: newFlushWriter(&buffer), + event: map[string][]NotificationEvent{ + "Records": {newNotificationEvent(eventData{ + Type: ObjectCreatedPut, + Bucket: "testbucket", + ObjInfo: ObjectInfo{ + Name: "key", + }, + ReqParams: map[string]string{ + "ip": "10.1.10.1", + }}), + }, + }, + err: nil, + }, + } + // Validates all the testcases for writing notification. + for _, testCase := range testCases { + err := writeNotification(testCase.writer, testCase.event) + if err != testCase.err { + t.Errorf("Unable to write notification %s", err) + } + // Validates if the ending string has 'crlf' + if err == nil && !bytes.HasSuffix(buffer.Bytes(), crlf) { + buf := buffer.Bytes()[buffer.Len()-5 : 0] + t.Errorf("Invalid suffix found from the writer last 5 bytes %s, expected `\r\n`", string(buf)) + } + // Not printing 'buf' on purpose, validates look for string '10.1.10.1'. + if err == nil && !bytes.Contains(buffer.Bytes(), []byte("10.1.10.1")) { + // Enable when debugging) + // fmt.Println(string(buffer.Bytes())) + t.Errorf("Requested content couldn't be found, expected `10.1.10.1`") + } + } +} diff --git a/bucket-notification-utils.go b/bucket-notification-utils.go index 5ae1738d6..24356ad3e 100644 --- a/bucket-notification-utils.go +++ b/bucket-notification-utils.go @@ -100,36 +100,76 @@ func checkFilterRules(filterRules []filterRule) APIErrorCode { return ErrNone } -// checkQueueArn - check if the queue arn is valid. -func checkQueueArn(queueArn string) APIErrorCode { - if !strings.HasPrefix(queueArn, minioSqs) { +// checkQueueARN - check if the queue arn is valid. +func checkQueueARN(queueARN string) APIErrorCode { + if !strings.HasPrefix(queueARN, minioSqs) { return ErrARNNotification } - if !strings.HasPrefix(queueArn, minioSqs+serverConfig.GetRegion()+":") { + if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") { + return ErrRegionNotification + } + return ErrNone +} + +// checkLambdaARN - check if the lambda arn is valid. +func checkLambdaARN(lambdaARN string) APIErrorCode { + if !strings.HasPrefix(lambdaARN, minioLambda) { + return ErrARNNotification + } + if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") { return ErrRegionNotification } return ErrNone } // Validate if we recognize the queue type. -func isValidQueue(sqsArn arnMinioSqs) bool { - amqpQ := isAMQPQueue(sqsArn) // Is amqp queue?. - elasticQ := isElasticQueue(sqsArn) // Is elastic queue?. - redisQ := isRedisQueue(sqsArn) // Is redis queue?. +func isValidQueue(sqsARN arnSQS) bool { + amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?. + elasticQ := isElasticQueue(sqsARN) // Is elastic queue?. + redisQ := isRedisQueue(sqsARN) // Is redis queue?. return amqpQ || elasticQ || redisQ } +// Validate if we recognize the lambda type. +func isValidLambda(lambdaARN arnLambda) bool { + return isMinL(lambdaARN) // Is minio lambda?. +} + +// Validates account id for input queue ARN. +func isValidQueueID(queueARN string) bool { + // Unmarshals QueueARN into structured object. + sqsARN := unmarshalSqsARN(queueARN) + // AMQP queue. + if isAMQPQueue(sqsARN) { + amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID) + return amqpN.Enable && amqpN.URL != "" + } else if isElasticQueue(sqsARN) { // Elastic queue. + elasticN := serverConfig.GetElasticSearchNotifyByID(sqsARN.AccountID) + return elasticN.Enable && elasticN.URL != "" + + } else if isRedisQueue(sqsARN) { // Redis queue. + redisN := serverConfig.GetRedisNotifyByID(sqsARN.AccountID) + return redisN.Enable && redisN.Addr != "" + } + return false +} + // Check - validates queue configuration and returns error if any. func checkQueueConfig(qConfig queueConfig) APIErrorCode { // Check queue arn is valid. - if s3Error := checkQueueArn(qConfig.QueueArn); s3Error != ErrNone { + if s3Error := checkQueueARN(qConfig.QueueARN); s3Error != ErrNone { return s3Error } - // Unmarshals QueueArn into structured object. - sqsArn := unmarshalSqsArn(qConfig.QueueArn) - // Validate if sqsArn requested any of the known supported queues. - if !isValidQueue(sqsArn) { + // Unmarshals QueueARN into structured object. + sqsARN := unmarshalSqsARN(qConfig.QueueARN) + // Validate if sqsARN requested any of the known supported queues. + if !isValidQueue(sqsARN) { + return ErrARNNotification + } + + // Validate if the account ID is correct. + if !isValidQueueID(qConfig.QueueARN) { return ErrARNNotification } @@ -147,6 +187,34 @@ func checkQueueConfig(qConfig queueConfig) APIErrorCode { return ErrNone } +// Check - validates queue configuration and returns error if any. +func checkLambdaConfig(lConfig lambdaConfig) APIErrorCode { + // Check queue arn is valid. + if s3Error := checkLambdaARN(lConfig.LambdaARN); s3Error != ErrNone { + return s3Error + } + + // Unmarshals QueueARN into structured object. + lambdaARN := unmarshalLambdaARN(lConfig.LambdaARN) + // Validate if lambdaARN requested any of the known supported queues. + if !isValidLambda(lambdaARN) { + return ErrARNNotification + } + + // Check if valid events are set in queue config. + if s3Error := checkEvents(lConfig.Events); s3Error != ErrNone { + return s3Error + } + + // Check if valid filters are set in queue config. + if s3Error := checkFilterRules(lConfig.Filter.Key.FilterRules); s3Error != ErrNone { + return s3Error + } + + // Success. + return ErrNone +} + // Validates all incoming queue configs, checkQueueConfig validates if the // input fields for each queues is not malformed and has valid configuration // information. If validation fails bucket notifications are not enabled. @@ -160,32 +228,70 @@ func validateQueueConfigs(queueConfigs []queueConfig) APIErrorCode { return ErrNone } +// Validates all incoming lambda configs, checkLambdaConfig validates if the +// input fields for each queues is not malformed and has valid configuration +// information. If validation fails bucket notifications are not enabled. +func validateLambdaConfigs(lambdaConfigs []lambdaConfig) APIErrorCode { + for _, lConfig := range lambdaConfigs { + if s3Error := checkLambdaConfig(lConfig); s3Error != ErrNone { + return s3Error + } + } + // Success. + return ErrNone +} + // Validates all the bucket notification configuration for their validity, // if one of the config is malformed or has invalid data it is rejected. // Configuration is never applied partially. func validateNotificationConfig(nConfig notificationConfig) APIErrorCode { - if s3Error := validateQueueConfigs(nConfig.QueueConfigurations); s3Error != ErrNone { + if s3Error := validateQueueConfigs(nConfig.QueueConfigs); s3Error != ErrNone { return s3Error } + if s3Error := validateLambdaConfigs(nConfig.LambdaConfigs); s3Error != ErrNone { + return s3Error + } + // Add validation for other configurations. return ErrNone } +// Unmarshals input value of AWS ARN format into minioLambda object. +// Returned value represents minio lambda type, currently supported are +// - minio +func unmarshalLambdaARN(lambdaARN string) arnLambda { + lambda := arnLambda{} + if !strings.HasPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") { + return lambda + } + lambdaType := strings.TrimPrefix(lambdaARN, minioLambda+serverConfig.GetRegion()+":") + switch { + case strings.HasSuffix(lambdaType, lambdaTypeMinio): + lambda.Type = lambdaTypeMinio + } // Add more lambda here. + lambda.AccountID = strings.TrimSuffix(lambdaType, ":"+lambda.Type) + return lambda +} + // Unmarshals input value of AWS ARN format into minioSqs object. // Returned value represents minio sqs types, currently supported are // - amqp // - elasticsearch // - redis -func unmarshalSqsArn(queueArn string) (mSqs arnMinioSqs) { - sqsType := strings.TrimPrefix(queueArn, minioSqs+serverConfig.GetRegion()+":") - mSqs = arnMinioSqs{} - switch sqsType { - case queueTypeAMQP: - mSqs.sqsType = queueTypeAMQP - case queueTypeElastic: - mSqs.sqsType = queueTypeElastic - case queueTypeRedis: - mSqs.sqsType = queueTypeRedis - } // Add more cases here. +func unmarshalSqsARN(queueARN string) (mSqs arnSQS) { + mSqs = arnSQS{} + if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") { + return mSqs + } + sqsType := strings.TrimPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") + switch { + case strings.HasSuffix(sqsType, queueTypeAMQP): + mSqs.Type = queueTypeAMQP + case strings.HasSuffix(sqsType, queueTypeElastic): + mSqs.Type = queueTypeElastic + case strings.HasSuffix(sqsType, queueTypeRedis): + mSqs.Type = queueTypeRedis + } // Add more queues here. + mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type) return mSqs } diff --git a/bucket-notification-utils_test.go b/bucket-notification-utils_test.go index 783f03a5f..d3e229a47 100644 --- a/bucket-notification-utils_test.go +++ b/bucket-notification-utils_test.go @@ -97,8 +97,8 @@ func TestValidEvents(t *testing.T) { } } -// Tests queue arn validation. -func TestQueueArn(t *testing.T) { +// Tests lambda arn validation. +func TestLambdaARN(t *testing.T) { rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("unable initialize config file, %s", err) @@ -106,38 +106,80 @@ func TestQueueArn(t *testing.T) { defer removeAll(rootPath) testCases := []struct { - queueArn string + lambdaARN string + errCode APIErrorCode + }{ + // Valid minio lambda with '1' account id. + { + lambdaARN: "arn:minio:lambda:us-east-1:1:minio", + errCode: ErrNone, + }, + // Valid minio lambda with '10' account id. + { + lambdaARN: "arn:minio:lambda:us-east-1:10:minio", + errCode: ErrNone, + }, + // Invalid empty queue arn. + { + lambdaARN: "", + errCode: ErrARNNotification, + }, + // Invalid region 'us-west-1' in queue arn. + { + lambdaARN: "arn:minio:lambda:us-west-1:1:redis", + errCode: ErrRegionNotification, + }, + } + + for i, testCase := range testCases { + errCode := checkLambdaARN(testCase.lambdaARN) + if testCase.errCode != errCode { + t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode) + } + } +} + +// Tests queue arn validation. +func TestQueueARN(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("unable initialize config file, %s", err) + } + defer removeAll(rootPath) + + testCases := []struct { + queueARN string errCode APIErrorCode }{ // Valid redis queue arn. { - queueArn: "arn:minio:sqs:us-east-1:1:redis", + queueARN: "arn:minio:sqs:us-east-1:1:redis", errCode: ErrNone, }, // Valid elasticsearch queue arn. { - queueArn: "arn:minio:sqs:us-east-1:1:elasticsearch", + queueARN: "arn:minio:sqs:us-east-1:1:elasticsearch", errCode: ErrNone, }, // Valid amqp queue arn. { - queueArn: "arn:minio:sqs:us-east-1:1:amqp", + queueARN: "arn:minio:sqs:us-east-1:1:amqp", errCode: ErrNone, }, // Invalid empty queue arn. { - queueArn: "", + queueARN: "", errCode: ErrARNNotification, }, // Invalid region 'us-west-1' in queue arn. { - queueArn: "arn:minio:sqs:us-west-1:1:redis", + queueARN: "arn:minio:sqs:us-west-1:1:redis", errCode: ErrRegionNotification, }, } for i, testCase := range testCases { - errCode := checkQueueArn(testCase.queueArn) + errCode := checkQueueARN(testCase.queueARN) if testCase.errCode != errCode { t.Errorf("Test %d: Expected \"%d\", got \"%d\"", i+1, testCase.errCode, errCode) } @@ -145,7 +187,7 @@ func TestQueueArn(t *testing.T) { } // Test unmarshal queue arn. -func TestUnmarshalSqsArn(t *testing.T) { +func TestUnmarshalLambdaARN(t *testing.T) { rootPath, err := newTestConfig("us-east-1") if err != nil { t.Fatalf("unable initialize config file, %s", err) @@ -153,50 +195,97 @@ func TestUnmarshalSqsArn(t *testing.T) { defer removeAll(rootPath) testCases := []struct { - queueArn string - sqsType string + lambdaARN string + Type string }{ - // Valid redis queue arn. + // Valid minio lambda arn. { - queueArn: "arn:minio:sqs:us-east-1:1:redis", - sqsType: "1:redis", - }, - // Valid elasticsearch queue arn. - { - queueArn: "arn:minio:sqs:us-east-1:1:elasticsearch", - sqsType: "1:elasticsearch", - }, - // Valid amqp queue arn. - { - queueArn: "arn:minio:sqs:us-east-1:1:amqp", - sqsType: "1:amqp", + lambdaARN: "arn:minio:lambda:us-east-1:1:lambda", + Type: "lambda", }, // Invalid empty queue arn. { - queueArn: "", - sqsType: "", + lambdaARN: "", + Type: "", }, // Invalid region 'us-west-1' in queue arn. { - queueArn: "arn:minio:sqs:us-west-1:1:redis", - sqsType: "", + lambdaARN: "arn:minio:lambda:us-west-1:1:lambda", + Type: "", }, // Partial queue arn. { - queueArn: "arn:minio:sqs:", - sqsType: "", + lambdaARN: "arn:minio:lambda:", + Type: "", }, // Invalid queue service value. { - queueArn: "arn:minio:sqs:us-east-1:1:*", - sqsType: "", + lambdaARN: "arn:minio:lambda:us-east-1:1:*", + Type: "", }, } for i, testCase := range testCases { - mSqs := unmarshalSqsArn(testCase.queueArn) - if testCase.sqsType != mSqs.sqsType { - t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.sqsType, mSqs.sqsType) + lambda := unmarshalLambdaARN(testCase.lambdaARN) + if testCase.Type != lambda.Type { + t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, lambda.Type) + } + } +} + +// Test unmarshal queue arn. +func TestUnmarshalSqsARN(t *testing.T) { + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("unable initialize config file, %s", err) + } + defer removeAll(rootPath) + + testCases := []struct { + queueARN string + Type string + }{ + // Valid redis queue arn. + { + queueARN: "arn:minio:sqs:us-east-1:1:redis", + Type: "redis", + }, + // Valid elasticsearch queue arn. + { + queueARN: "arn:minio:sqs:us-east-1:1:elasticsearch", + Type: "elasticsearch", + }, + // Valid amqp queue arn. + { + queueARN: "arn:minio:sqs:us-east-1:1:amqp", + Type: "amqp", + }, + // Invalid empty queue arn. + { + queueARN: "", + Type: "", + }, + // Invalid region 'us-west-1' in queue arn. + { + queueARN: "arn:minio:sqs:us-west-1:1:redis", + Type: "", + }, + // Partial queue arn. + { + queueARN: "arn:minio:sqs:", + Type: "", + }, + // Invalid queue service value. + { + queueARN: "arn:minio:sqs:us-east-1:1:*", + Type: "", + }, + } + + for i, testCase := range testCases { + mSqs := unmarshalSqsARN(testCase.queueARN) + if testCase.Type != mSqs.Type { + t.Errorf("Test %d: Expected \"%s\", got \"%s\"", i+1, testCase.Type, mSqs.Type) } } diff --git a/bucket-policy.go b/bucket-policy.go index 0c43cc478..afc55921a 100644 --- a/bucket-policy.go +++ b/bucket-policy.go @@ -30,15 +30,6 @@ func getOldBucketsConfigPath() (string, error) { return filepath.Join(configPath, "buckets"), nil } -// getBucketConfigPath - get bucket config path. -func getOldBucketConfigPath(bucket string) (string, error) { - bucketsConfigPath, err := getOldBucketsConfigPath() - if err != nil { - return "", err - } - return filepath.Join(bucketsConfigPath, bucket), nil -} - // readBucketPolicy - read bucket policy. func readBucketPolicy(api objectAPIHandlers, bucket string) ([]byte, error) { // Verify bucket is valid. diff --git a/config-migrate.go b/config-migrate.go index f5f571c72..6a25af5af 100644 --- a/config-migrate.go +++ b/config-migrate.go @@ -34,6 +34,8 @@ func migrateConfig() { migrateV3ToV4() // Migrate version '4' to '5'. migrateV4ToV5() + // Migrate version '5' to '6. + migrateV5ToV6() } // Version '1' is not supported anymore and deprecated, safe to delete. @@ -149,6 +151,72 @@ func migrateV3ToV4() { console.Println("Migration from version ‘" + cv3.Version + "’ to ‘" + srvConfig.Version + "’ completed successfully.") } +// Version '5' to '6' migrates config, removes previous fields related +// to backend types and server address. This change further simplifies +// the config for future additions. +func migrateV5ToV6() { + cv5, err := loadConfigV5() + if err != nil && os.IsNotExist(err) { + return + } + fatalIf(err, "Unable to load config version ‘5’.") + if cv5.Version != "5" { + return + } + + // Save only the new fields, ignore the rest. + srvConfig := &serverConfigV6{} + srvConfig.Version = globalMinioConfigVersion + srvConfig.Credential = cv5.Credential + srvConfig.Region = cv5.Region + if srvConfig.Region == "" { + // Region needs to be set for AWS Signature Version 4. + srvConfig.Region = "us-east-1" + } + srvConfig.Logger.Console = cv5.Logger.Console + srvConfig.Logger.File = cv5.Logger.File + srvConfig.Logger.Syslog = cv5.Logger.Syslog + srvConfig.Notify.AMQP = map[string]amqpNotify{ + "1": { + Enable: cv5.Logger.AMQP.Enable, + URL: cv5.Logger.AMQP.URL, + Exchange: cv5.Logger.AMQP.Exchange, + RoutingKey: cv5.Logger.AMQP.RoutingKey, + Mandatory: cv5.Logger.AMQP.Mandatory, + Immediate: cv5.Logger.AMQP.Immediate, + Durable: cv5.Logger.AMQP.Durable, + Internal: cv5.Logger.AMQP.Internal, + NoWait: cv5.Logger.AMQP.NoWait, + AutoDeleted: cv5.Logger.AMQP.AutoDeleted, + }, + } + srvConfig.Notify.ElasticSearch = map[string]elasticSearchNotify{ + "1": { + Enable: cv5.Logger.ElasticSearch.Enable, + URL: cv5.Logger.ElasticSearch.URL, + Index: cv5.Logger.ElasticSearch.Index, + }, + } + srvConfig.Notify.Redis = map[string]redisNotify{ + "1": { + Enable: cv5.Logger.Redis.Enable, + Addr: cv5.Logger.Redis.Addr, + Password: cv5.Logger.Redis.Password, + Key: cv5.Logger.Redis.Key, + }, + } + + qc, err := quick.New(srvConfig) + fatalIf(err, "Unable to initialize the quick config.") + configFile, err := getConfigFile() + fatalIf(err, "Unable to get config file.") + + err = qc.Save(configFile) + fatalIf(err, "Failed to migrate config from ‘"+cv5.Version+"’ to ‘"+srvConfig.Version+"’ failed.") + + console.Println("Migration from version ‘" + cv5.Version + "’ to ‘" + srvConfig.Version + "’ completed successfully.") +} + // Version '4' to '5' migrates config, removes previous fields related // to backend types and server address. This change further simplifies // the config for future additions. @@ -163,7 +231,7 @@ func migrateV4ToV5() { } // Save only the new fields, ignore the rest. - srvConfig := &serverConfigV5{} + srvConfig := &configV5{} srvConfig.Version = globalMinioConfigVersion srvConfig.Credential = cv4.Credential srvConfig.Region = cv4.Region @@ -174,15 +242,9 @@ func migrateV4ToV5() { srvConfig.Logger.Console = cv4.Logger.Console srvConfig.Logger.File = cv4.Logger.File srvConfig.Logger.Syslog = cv4.Logger.Syslog - srvConfig.Logger.AMQP = amqpLogger{ - Enable: false, - } - srvConfig.Logger.ElasticSearch = elasticSearchLogger{ - Enable: false, - } - srvConfig.Logger.Redis = redisLogger{ - Enable: false, - } + srvConfig.Logger.AMQP.Enable = false + srvConfig.Logger.ElasticSearch.Enable = false + srvConfig.Logger.Redis.Enable = false qc, err := quick.New(srvConfig) fatalIf(err, "Unable to initialize the quick config.") diff --git a/config-old.go b/config-old.go index d8bced22b..e19701cad 100644 --- a/config-old.go +++ b/config-old.go @@ -146,6 +146,7 @@ func loadConfigV3() (*configV3, error) { return c, nil } +// logger type representing version '4' logger config. type loggerV4 struct { Console struct { Enable bool `json:"enable"` @@ -195,3 +196,81 @@ func loadConfigV4() (*configV4, error) { } return c, nil } + +// logger type representing version '5' logger config. +type loggerV5 struct { + Console struct { + Enable bool `json:"enable"` + Level string `json:"level"` + } `json:"console"` + File struct { + Enable bool `json:"enable"` + Filename string `json:"fileName"` + Level string `json:"level"` + } `json:"file"` + Syslog struct { + Enable bool `json:"enable"` + Addr string `json:"address"` + Level string `json:"level"` + } `json:"syslog"` + AMQP struct { + Enable bool `json:"enable"` + Level string `json:"level"` + URL string `json:"url"` + Exchange string `json:"exchange"` + RoutingKey string `json:"routineKey"` + ExchangeType string `json:"exchangeType"` + Mandatory bool `json:"mandatory"` + Immediate bool `json:"immediate"` + Durable bool `json:"durable"` + Internal bool `json:"internal"` + NoWait bool `json:"noWait"` + AutoDeleted bool `json:"autoDeleted"` + } `json:"amqp"` + ElasticSearch struct { + Enable bool `json:"enable"` + Level string `json:"level"` + URL string `json:"url"` + Index string `json:"index"` + } `json:"elasticsearch"` + Redis struct { + Enable bool `json:"enable"` + Level string `json:"level"` + Addr string `json:"address"` + Password string `json:"password"` + Key string `json:"key"` + } `json:"redis"` +} + +// configV5 server configuration version '5'. +type configV5 struct { + Version string `json:"version"` + + // S3 API configuration. + Credential credential `json:"credential"` + Region string `json:"region"` + + // Additional error logging configuration. + Logger loggerV5 `json:"logger"` +} + +// loadConfigV5 load config version '5'. +func loadConfigV5() (*configV5, error) { + configFile, err := getConfigFile() + if err != nil { + return nil, err + } + if _, err = os.Stat(configFile); err != nil { + return nil, err + } + c := &configV5{} + c.Version = "5" + qc, err := quick.New(c) + if err != nil { + return nil, err + } + if err := qc.Load(configFile); err != nil { + return nil, err + } + return c, nil +} diff --git a/config-v5.go b/config-v6.go similarity index 64% rename from config-v5.go rename to config-v6.go index 730ed383c..b5a2fbdc4 100644 --- a/config-v5.go +++ b/config-v6.go @@ -23,8 +23,8 @@ import ( "github.com/minio/minio/pkg/quick" ) -// serverConfigV5 server configuration version '5'. -type serverConfigV5 struct { +// serverConfigV6 server configuration version '5'. +type serverConfigV6 struct { Version string `json:"version"` // S3 API configuration. @@ -34,6 +34,9 @@ type serverConfigV5 struct { // Additional error logging configuration. Logger logger `json:"logger"` + // Notification queue configuration. + Notify notifier `json:"notify"` + // Read Write mutex. rwMutex *sync.RWMutex } @@ -42,7 +45,7 @@ type serverConfigV5 struct { func initConfig() error { if !isConfigFileExists() { // Initialize server config. - srvCfg := &serverConfigV5{} + srvCfg := &serverConfigV6{} srvCfg.Version = globalMinioConfigVersion srvCfg.Region = "us-east-1" srvCfg.Credential = mustGenAccessKeys() @@ -73,7 +76,7 @@ func initConfig() error { if _, err = os.Stat(configFile); err != nil { return err } - srvCfg := &serverConfigV5{} + srvCfg := &serverConfigV6{} srvCfg.Version = globalMinioConfigVersion srvCfg.rwMutex = &sync.RWMutex{} qc, err := quick.New(srvCfg) @@ -92,10 +95,10 @@ func initConfig() error { } // serverConfig server config. -var serverConfig *serverConfigV5 +var serverConfig *serverConfigV6 // GetVersion get current config version. -func (s serverConfigV5) GetVersion() string { +func (s serverConfigV6) GetVersion() string { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Version @@ -103,117 +106,135 @@ func (s serverConfigV5) GetVersion() string { /// Logger related. -func (s *serverConfigV5) SetAMQPLogger(amqpl amqpLogger) { +func (s *serverConfigV6) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) { s.rwMutex.Lock() defer s.rwMutex.Unlock() - s.Logger.AMQP = amqpl + s.Notify.AMQP[accountID] = amqpn } -// GetAMQPLogger get current AMQP logger. -func (s serverConfigV5) GetAMQPLogger() amqpLogger { +func (s serverConfigV6) GetAMQP() map[string]amqpNotify { s.rwMutex.RLock() defer s.rwMutex.RUnlock() - return s.Logger.AMQP + return s.Notify.AMQP } -func (s *serverConfigV5) SetElasticSearchLogger(esLogger elasticSearchLogger) { +// GetAMQPNotify get current AMQP logger. +func (s serverConfigV6) GetAMQPNotifyByID(accountID string) amqpNotify { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + return s.Notify.AMQP[accountID] +} + +func (s *serverConfigV6) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) { s.rwMutex.Lock() defer s.rwMutex.Unlock() - s.Logger.ElasticSearch = esLogger + s.Notify.ElasticSearch[accountID] = esNotify } -// GetElasticSearchLogger get current ElasicSearch logger. -func (s serverConfigV5) GetElasticSearchLogger() elasticSearchLogger { +func (s serverConfigV6) GetElasticSearch() map[string]elasticSearchNotify { s.rwMutex.RLock() defer s.rwMutex.RUnlock() - return s.Logger.ElasticSearch + return s.Notify.ElasticSearch } -func (s *serverConfigV5) SetRedisLogger(rLogger redisLogger) { +// GetElasticSearchNotify get current ElasicSearch logger. +func (s serverConfigV6) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + return s.Notify.ElasticSearch[accountID] +} + +func (s *serverConfigV6) SetRedisNotifyByID(accountID string, rNotify redisNotify) { s.rwMutex.Lock() defer s.rwMutex.Unlock() - s.Logger.Redis = rLogger + s.Notify.Redis[accountID] = rNotify } -// GetRedisLogger get current Redis logger. -func (s serverConfigV5) GetRedisLogger() redisLogger { +func (s serverConfigV6) GetRedis() map[string]redisNotify { s.rwMutex.RLock() defer s.rwMutex.RUnlock() - return s.Logger.Redis + return s.Notify.Redis +} + +// GetRedisNotify get current Redis logger. +func (s serverConfigV6) GetRedisNotifyByID(accountID string) redisNotify { + s.rwMutex.RLock() + defer s.rwMutex.RUnlock() + return s.Notify.Redis[accountID] } // SetFileLogger set new file logger. -func (s *serverConfigV5) SetFileLogger(flogger fileLogger) { +func (s *serverConfigV6) SetFileLogger(flogger fileLogger) { s.rwMutex.Lock() defer s.rwMutex.Unlock() s.Logger.File = flogger } // GetFileLogger get current file logger. -func (s serverConfigV5) GetFileLogger() fileLogger { +func (s serverConfigV6) GetFileLogger() fileLogger { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Logger.File } // SetConsoleLogger set new console logger. -func (s *serverConfigV5) SetConsoleLogger(clogger consoleLogger) { +func (s *serverConfigV6) SetConsoleLogger(clogger consoleLogger) { s.rwMutex.Lock() defer s.rwMutex.Unlock() s.Logger.Console = clogger } // GetConsoleLogger get current console logger. -func (s serverConfigV5) GetConsoleLogger() consoleLogger { +func (s serverConfigV6) GetConsoleLogger() consoleLogger { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Logger.Console } // SetSyslogLogger set new syslog logger. -func (s *serverConfigV5) SetSyslogLogger(slogger syslogLogger) { +func (s *serverConfigV6) SetSyslogLogger(slogger syslogLogger) { s.rwMutex.Lock() defer s.rwMutex.Unlock() s.Logger.Syslog = slogger } // GetSyslogLogger get current syslog logger. -func (s *serverConfigV5) GetSyslogLogger() syslogLogger { +func (s *serverConfigV6) GetSyslogLogger() syslogLogger { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Logger.Syslog } // SetRegion set new region. -func (s *serverConfigV5) SetRegion(region string) { +func (s *serverConfigV6) SetRegion(region string) { s.rwMutex.Lock() defer s.rwMutex.Unlock() s.Region = region } // GetRegion get current region. -func (s serverConfigV5) GetRegion() string { +func (s serverConfigV6) GetRegion() string { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Region } // SetCredentials set new credentials. -func (s *serverConfigV5) SetCredential(creds credential) { +func (s *serverConfigV6) SetCredential(creds credential) { s.rwMutex.Lock() defer s.rwMutex.Unlock() s.Credential = creds } // GetCredentials get current credentials. -func (s serverConfigV5) GetCredential() credential { +func (s serverConfigV6) GetCredential() credential { s.rwMutex.RLock() defer s.rwMutex.RUnlock() return s.Credential } // Save config. -func (s serverConfigV5) Save() error { +func (s serverConfigV6) Save() error { s.rwMutex.RLock() defer s.rwMutex.RUnlock() diff --git a/event-notifier.go b/event-notifier.go new file mode 100644 index 000000000..861c930bb --- /dev/null +++ b/event-notifier.go @@ -0,0 +1,381 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "encoding/xml" + "errors" + "fmt" + "net/url" + "path" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// Global event notification queue. This is the queue that would be used to send all notifications. +type eventNotifier struct { + rwMutex *sync.RWMutex + + // Collection of 'bucket' and notification config. + notificationConfigs map[string]*notificationConfig + lambdaTargets map[string][]chan []NotificationEvent + queueTargets map[string]*logrus.Logger +} + +// Represents data to be sent with notification event. +type eventData struct { + Type EventName + Bucket string + ObjInfo ObjectInfo + ReqParams map[string]string +} + +// New notification event constructs a new notification event message from +// input request metadata which completed successfully. +func newNotificationEvent(event eventData) NotificationEvent { + /// Construct a new object created event. + region := serverConfig.GetRegion() + tnow := time.Now().UTC() + sequencer := fmt.Sprintf("%X", tnow.UnixNano()) + // Following blocks fills in all the necessary details of s3 event message structure. + // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html + nEvent := NotificationEvent{ + EventVersion: "2.0", + EventSource: "aws:s3", + AwsRegion: region, + EventTime: tnow.Format(iso8601Format), + EventName: event.Type.String(), + UserIdentity: defaultIdentity(), + RequestParameters: event.ReqParams, + ResponseElements: map[string]string{}, + S3: eventMeta{ + SchemaVersion: "1.0", + ConfigurationID: "Config", + Bucket: bucketMeta{ + Name: event.Bucket, + OwnerIdentity: defaultIdentity(), + ARN: "arn:aws:s3:::" + event.Bucket, + }, + }, + } + + escapedObj := url.QueryEscape(event.ObjInfo.Name) + // For delete object event type, we do not need to set ETag and Size. + if event.Type == ObjectRemovedDelete { + nEvent.S3.Object = objectMeta{ + Key: escapedObj, + Sequencer: sequencer, + } + return nEvent + } + // For all other events we should set ETag and Size. + nEvent.S3.Object = objectMeta{ + Key: escapedObj, + ETag: event.ObjInfo.MD5Sum, + Size: event.ObjInfo.Size, + Sequencer: sequencer, + } + // Success. + return nEvent +} + +// Fetch the saved queue target. +func (en eventNotifier) GetQueueTarget(queueARN string) *logrus.Logger { + return en.queueTargets[queueARN] +} + +func (en eventNotifier) GetLambdaTarget(lambdaARN string) []chan []NotificationEvent { + en.rwMutex.RLock() + defer en.rwMutex.RUnlock() + return en.lambdaTargets[lambdaARN] +} + +// Set a new lambda target for an input lambda ARN. +func (en *eventNotifier) SetLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) error { + en.rwMutex.Lock() + defer en.rwMutex.Unlock() + if listenerCh == nil { + return errors.New("invalid argument") + } + en.lambdaTargets[lambdaARN] = append(en.lambdaTargets[lambdaARN], listenerCh) + return nil +} + +// Remove lambda target for an input lambda ARN. +func (en *eventNotifier) RemoveLambdaTarget(lambdaARN string, listenerCh chan []NotificationEvent) { + en.rwMutex.Lock() + defer en.rwMutex.Unlock() + lambdaTarget, ok := en.lambdaTargets[lambdaARN] + if ok { + for i, savedListenerCh := range lambdaTarget { + if listenerCh == savedListenerCh { + lambdaTarget = append(lambdaTarget[:i], lambdaTarget[i+1:]...) + if len(lambdaTarget) == 0 { + delete(en.lambdaTargets, lambdaARN) + break + } + en.lambdaTargets[lambdaARN] = lambdaTarget + } + } + } +} + +// Returns true if bucket notification is set for the bucket, false otherwise. +func (en eventNotifier) IsBucketNotificationSet(bucket string) bool { + en.rwMutex.RLock() + defer en.rwMutex.RUnlock() + _, ok := en.notificationConfigs[bucket] + return ok +} + +// Fetch bucket notification config for an input bucket. +func (en eventNotifier) GetBucketNotificationConfig(bucket string) *notificationConfig { + en.rwMutex.RLock() + defer en.rwMutex.RUnlock() + return en.notificationConfigs[bucket] +} + +// Set a new notification config for a bucket, this operation will overwrite any previous +// notification configs for the bucket. +func (en *eventNotifier) SetBucketNotificationConfig(bucket string, notificationCfg *notificationConfig) error { + en.rwMutex.Lock() + defer en.rwMutex.Unlock() + if notificationCfg == nil { + return errors.New("invalid argument") + } + en.notificationConfigs[bucket] = notificationCfg + return nil +} + +// eventNotify notifies an event to relevant targets based on their +// bucket notification configs. +func eventNotify(event eventData) { + // Notifies a new event. + // List of events reported through this function are + // - s3:ObjectCreated:Put + // - s3:ObjectCreated:Post + // - s3:ObjectCreated:Copy + // - s3:ObjectCreated:CompleteMultipartUpload + // - s3:ObjectRemoved:Delete + + nConfig := eventN.GetBucketNotificationConfig(event.Bucket) + // No bucket notifications enabled, drop the event notification. + if len(nConfig.QueueConfigs) == 0 && len(nConfig.LambdaConfigs) == 0 { + return + } + + // Event type. + eventType := event.Type.String() + + // Object name. + objectName := event.ObjInfo.Name + + // Save the notification event to be sent. + notificationEvent := []NotificationEvent{newNotificationEvent(event)} + + // Validate if the event and object match the queue configs. + for _, qConfig := range nConfig.QueueConfigs { + eventMatch := eventMatch(eventType, qConfig.Events) + ruleMatch := filterRuleMatch(objectName, qConfig.Filter.Key.FilterRules) + if eventMatch && ruleMatch { + targetLog := eventN.GetQueueTarget(qConfig.QueueARN) + if targetLog != nil { + targetLog.WithFields(logrus.Fields{ + "Records": notificationEvent, + }).Info() + } + } + } + // Validate if the event and object match the lambda configs. + for _, lambdaConfig := range nConfig.LambdaConfigs { + ruleMatch := filterRuleMatch(objectName, lambdaConfig.Filter.Key.FilterRules) + eventMatch := eventMatch(eventType, lambdaConfig.Events) + if eventMatch && ruleMatch { + targetListeners := eventN.GetLambdaTarget(lambdaConfig.LambdaARN) + for _, listener := range targetListeners { + listener <- notificationEvent + } + } + } +} + +// loads notifcation config if any for a given bucket, returns back structured notification config. +func loadNotificationConfig(bucket string, objAPI ObjectLayer) (*notificationConfig, error) { + // Construct the notification config path. + notificationConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig) + objInfo, err := objAPI.GetObjectInfo(minioMetaBucket, notificationConfigPath) + if err != nil { + // 'notification.xml' not found return 'errNoSuchNotifications'. + // This is default when no bucket notifications are found on the bucket. + switch err.(type) { + case ObjectNotFound: + return nil, errNoSuchNotifications + } + // Returns error for other errors. + return nil, err + } + var buffer bytes.Buffer + err = objAPI.GetObject(minioMetaBucket, notificationConfigPath, 0, objInfo.Size, &buffer) + if err != nil { + // 'notification.xml' not found return 'errNoSuchNotifications'. + // This is default when no bucket notifications are found on the bucket. + switch err.(type) { + case ObjectNotFound: + return nil, errNoSuchNotifications + } + // Returns error for other errors. + return nil, err + } + + // Unmarshal notification bytes. + notificationConfigBytes := buffer.Bytes() + notificationCfg := ¬ificationConfig{} + if err = xml.Unmarshal(notificationConfigBytes, ¬ificationCfg); err != nil { + return nil, err + } // Successfully marshalled notification configuration. + + // Return success. + return notificationCfg, nil +} + +// loads all bucket notifications if present. +func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationConfig, error) { + // List buckets to proceed loading all notification configuration. + buckets, err := objAPI.ListBuckets() + if err != nil { + return nil, err + } + + configs := make(map[string]*notificationConfig) + + // Loads all bucket notifications. + for _, bucket := range buckets { + var nCfg *notificationConfig + nCfg, err = loadNotificationConfig(bucket.Name, objAPI) + if err != nil { + if err == errNoSuchNotifications { + continue + } + return nil, err + } + configs[bucket.Name] = nCfg + } + + // Success. + return configs, nil +} + +// Loads all queue targets, initializes each queueARNs depending on their config. +// Each instance of queueARN registers its own logrus to communicate with the +// queue service. QueueARN once initialized is not initialized again for the +// same queueARN, instead previous connection is used. +func loadAllQueueTargets() (map[string]*logrus.Logger, error) { + queueTargets := make(map[string]*logrus.Logger) + // Load all amqp targets, initialize their respective loggers. + for accountID, amqpN := range serverConfig.GetAMQP() { + if !amqpN.Enable { + continue + } + // Construct the queue ARN for AMQP. + queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeAMQP + // Queue target if already initialized we move to the next ARN. + _, ok := queueTargets[queueARN] + if ok { + continue + } + // Using accountID we can now initialize a new AMQP logrus instance. + amqpLog, err := newAMQPNotify(accountID) + if err != nil { + return nil, err + } + queueTargets[queueARN] = amqpLog + } + // Load redis targets, initialize their respective loggers. + for accountID, redisN := range serverConfig.GetRedis() { + if !redisN.Enable { + continue + } + // Construct the queue ARN for Redis. + queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeRedis + // Queue target if already initialized we move to the next ARN. + _, ok := queueTargets[queueARN] + if ok { + continue + } + // Using accountID we can now initialize a new Redis logrus instance. + redisLog, err := newRedisNotify(accountID) + if err != nil { + return nil, err + } + queueTargets[queueARN] = redisLog + } + // Load elastic targets, initialize their respective loggers. + for accountID, elasticN := range serverConfig.GetElasticSearch() { + if !elasticN.Enable { + continue + } + // Construct the queue ARN for Elastic. + queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeElastic + _, ok := queueTargets[queueARN] + if ok { + continue + } + // Using accountID we can now initialize a new ElasticSearch logrus instance. + elasticLog, err := newElasticNotify(accountID) + if err != nil { + return nil, err + } + queueTargets[queueARN] = elasticLog + } + // Successfully initialized queue targets. + return queueTargets, nil +} + +// Global instance of event notification queue. +var eventN *eventNotifier + +// Initialize event notifier. +func initEventNotifier(objAPI ObjectLayer) error { + if objAPI == nil { + return errInvalidArgument + } + + // Read all saved bucket notifications. + configs, err := loadAllBucketNotifications(objAPI) + if err != nil { + return err + } + + // Initializes all queue targets. + queueTargets, err := loadAllQueueTargets() + if err != nil { + return err + } + + // Inititalize event notifier queue. + eventN = &eventNotifier{ + rwMutex: &sync.RWMutex{}, + notificationConfigs: configs, + queueTargets: queueTargets, + lambdaTargets: make(map[string][]chan []NotificationEvent), + } + + return nil +} diff --git a/event-notifier_test.go b/event-notifier_test.go new file mode 100644 index 000000000..4fb08595b --- /dev/null +++ b/event-notifier_test.go @@ -0,0 +1,67 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "testing" + +// Tests various forms of inititalization of event notifier. +func TestInitEventNotifier(t *testing.T) { + fs, disk, err := getSingleNodeObjectLayer() + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + xl, disks, err := getXLObjectLayer() + if err != nil { + t.Fatal("Unable to initialize XL backend.", err) + } + + disks = append(disks, disk) + for _, d := range disks { + defer removeAll(d) + } + + // Collection of test cases for inititalizing event notifier. + testCases := []struct { + objAPI ObjectLayer + configs map[string]*notificationConfig + err error + }{ + // Test 1 - invalid arguments. + { + objAPI: nil, + err: errInvalidArgument, + }, + // Test 2 - valid FS object layer but no bucket notifications. + { + objAPI: fs, + err: nil, + }, + // Test 3 - valid XL object layer but no bucket notifications. + { + objAPI: xl, + err: nil, + }, + } + + // Validate if event notifier is properly initialized. + for i, testCase := range testCases { + err = initEventNotifier(testCase.objAPI) + if err != testCase.err { + t.Errorf("Test %d: Expected %s, but got: %s", i+1, testCase.err, err) + } + } +} diff --git a/fs-v1.go b/fs-v1.go index 14470a2ef..5f126de2f 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -137,12 +137,15 @@ func newFSObjects(disk string) (ObjectLayer, error) { shutdownFS(storage) }) - // Return successfully initialized object layer. - return fsObjects{ + // Initialize fs objects. + fs := fsObjects{ storage: storage, physicalDisk: disk, listPool: newTreeWalkPool(globalLookupTimeout), - }, nil + } + + // Return successfully initialized object layer. + return fs, nil } // StorageInfo - returns underlying storage statistics. diff --git a/globals.go b/globals.go index 842b7b8ba..891e48f60 100644 --- a/globals.go +++ b/globals.go @@ -28,7 +28,7 @@ const ( // minio configuration related constants. const ( - globalMinioConfigVersion = "5" + globalMinioConfigVersion = "6" globalMinioConfigDir = ".minio" globalMinioCertsDir = "certs" globalMinioCertFile = "public.crt" diff --git a/handler-utils.go b/handler-utils.go index 6dfc505e5..deb88c402 100644 --- a/handler-utils.go +++ b/handler-utils.go @@ -22,6 +22,7 @@ import ( "mime/multipart" "net/http" "strings" + "time" ) // Validates location constraint in PutBucket request body. @@ -129,3 +130,16 @@ func extractPostPolicyFormValues(reader *multipart.Reader) (filePart io.Reader, } return filePart, fileName, formValues, nil } + +// Send whitespace character, once every 5secs, until CompleteMultipartUpload is done. +// CompleteMultipartUpload method of the object layer indicates that it's done via doneCh +func sendWhiteSpaceChars(w http.ResponseWriter, doneCh <-chan struct{}) { + for { + select { + case <-time.After(5 * time.Second): + w.Write([]byte(" ")) + case <-doneCh: + return + } + } +} diff --git a/logger.go b/logger.go index a9dea0f30..13d352df3 100644 --- a/logger.go +++ b/logger.go @@ -19,15 +19,10 @@ package main import ( "bufio" "bytes" - "errors" - "os" - "runtime" "runtime/debug" - "strconv" "strings" "github.com/Sirupsen/logrus" - "github.com/dustin/go-humanize" ) type fields map[string]interface{} @@ -40,42 +35,13 @@ var log = logrus.New() // Default console logger. // - console [default] // - file // - syslog -// - amqp -// - elasticsearch -// type logger struct { - Console consoleLogger `json:"console"` - File fileLogger `json:"file"` - Syslog syslogLogger `json:"syslog"` - AMQP amqpLogger `json:"amqp"` - ElasticSearch elasticSearchLogger `json:"elasticsearch"` - Redis redisLogger `json:"redis"` + Console consoleLogger `json:"console"` + File fileLogger `json:"file"` + Syslog syslogLogger `json:"syslog"` // Add new loggers here. } -var errLoggerNotEnabled = errors.New("requested logger type is not enabled") - -// sysInfo returns useful system statistics. -func sysInfo() map[string]string { - host, err := os.Hostname() - if err != nil { - host = "" - } - memstats := &runtime.MemStats{} - runtime.ReadMemStats(memstats) - return map[string]string{ - "host.name": host, - "host.os": runtime.GOOS, - "host.arch": runtime.GOARCH, - "host.lang": runtime.Version(), - "host.cpus": strconv.Itoa(runtime.NumCPU()), - "mem.used": humanize.Bytes(memstats.Alloc), - "mem.total": humanize.Bytes(memstats.Sys), - "mem.heap.used": humanize.Bytes(memstats.HeapAlloc), - "mem.heap.total": humanize.Bytes(memstats.HeapSys), - } -} - // stackInfo returns printable stack trace. func stackInfo() string { // Convert stack-trace bytes to io.Reader. diff --git a/main.go b/main.go index 2a58157ad..f5afa00f8 100644 --- a/main.go +++ b/main.go @@ -76,12 +76,6 @@ func enableLoggers() { // Enable all loggers here. enableConsoleLogger() enableFileLogger() - - // Adding new bucket notification related loggers. - enableAMQPLogger() - enableElasticLogger() - enableRedisLogger() - // Add your logger here. } diff --git a/notifiers.go b/notifiers.go new file mode 100644 index 000000000..4e0ca4ea8 --- /dev/null +++ b/notifiers.go @@ -0,0 +1,136 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "errors" + "strings" + + "github.com/minio/minio/pkg/wildcard" +) + +// SQS type. +const ( + // Minio sqs ARN prefix. + minioSqs = "arn:minio:sqs:" + + // Static string indicating queue type 'amqp'. + queueTypeAMQP = "amqp" + // Static string indicating queue type 'elasticsearch'. + queueTypeElastic = "elasticsearch" + // Static string indicating queue type 'redis'. + queueTypeRedis = "redis" +) + +// Lambda type. +const ( + // Minio lambda ARN prefix. + minioLambda = "arn:minio:lambda:" + + // Static string indicating lambda type 'lambda'. + lambdaTypeMinio = "lambda" +) + +var errNotifyNotEnabled = errors.New("requested notifier not enabled") + +// Notifier represents collection of supported notification queues. +type notifier struct { + AMQP map[string]amqpNotify `json:"amqp"` + ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"` + Redis map[string]redisNotify `json:"redis"` + // Add new notification queues. +} + +// Returns true if queueArn is for an AMQP queue. +func isAMQPQueue(sqsArn arnSQS) bool { + if sqsArn.Type != queueTypeAMQP { + return false + } + amqpL := serverConfig.GetAMQPNotifyByID(sqsArn.AccountID) + if !amqpL.Enable { + return false + } + // Connect to amqp server to validate. + amqpC, err := dialAMQP(amqpL) + if err != nil { + errorIf(err, "Unable to connect to amqp service. %#v", amqpL) + return false + } + defer amqpC.Close() + return true +} + +// Returns true if queueArn is for an Redis queue. +func isRedisQueue(sqsArn arnSQS) bool { + if sqsArn.Type != queueTypeRedis { + return false + } + rNotify := serverConfig.GetRedisNotifyByID(sqsArn.AccountID) + if !rNotify.Enable { + return false + } + // Connect to redis server to validate. + rPool, err := dialRedis(rNotify) + if err != nil { + errorIf(err, "Unable to connect to redis service. %#v", rNotify) + return false + } + defer rPool.Close() + return true +} + +// Returns true if queueArn is for an ElasticSearch queue. +func isElasticQueue(sqsArn arnSQS) bool { + if sqsArn.Type != queueTypeElastic { + return false + } + esNotify := serverConfig.GetElasticSearchNotifyByID(sqsArn.AccountID) + if !esNotify.Enable { + return false + } + elasticC, err := dialElastic(esNotify) + if err != nil { + errorIf(err, "Unable to connect to elasticsearch service %#v", esNotify) + return false + } + defer elasticC.Stop() + return true +} + +// Match function matches wild cards in 'pattern' for events. +func eventMatch(eventType string, events []string) (ok bool) { + for _, event := range events { + ok = wildcard.Match(event, eventType) + if ok { + break + } + } + return ok +} + +// Filter rule match, matches an object against the filter rules. +func filterRuleMatch(object string, frs []filterRule) bool { + var prefixMatch, suffixMatch = true, true + for _, fr := range frs { + if isValidFilterNamePrefix(fr.Name) { + prefixMatch = strings.HasPrefix(object, fr.Value) + } else if isValidFilterNameSuffix(fr.Name) { + suffixMatch = strings.HasSuffix(object, fr.Value) + } + } + return prefixMatch && suffixMatch +} diff --git a/queues_test.go b/notifiers_test.go similarity index 97% rename from queues_test.go rename to notifiers_test.go index ba0702434..4b73cf770 100644 --- a/queues_test.go +++ b/notifiers_test.go @@ -99,7 +99,7 @@ func TestEventMatch(t *testing.T) { } for i, testCase := range testCases { - ok := eventMatch(testCase.eventName, testCase.events) + ok := eventMatch(testCase.eventName.String(), testCase.events) if testCase.match != ok { t.Errorf("Test %d: Expected \"%t\", got \"%t\"", i+1, testCase.match, ok) } diff --git a/logger-amqp.go b/notify-amqp.go similarity index 79% rename from logger-amqp.go rename to notify-amqp.go index 25bacde7e..983b1586e 100644 --- a/logger-amqp.go +++ b/notify-amqp.go @@ -17,15 +17,16 @@ package main import ( + "io/ioutil" + "github.com/Sirupsen/logrus" "github.com/streadway/amqp" ) -// amqpLogger - represents logrus compatible AMQP hook. +// amqpNotify - represents logrus compatible AMQP hook. // All fields represent AMQP configuration details. -type amqpLogger struct { +type amqpNotify struct { Enable bool `json:"enable"` - Level string `json:"level"` URL string `json:"url"` Exchange string `json:"exchange"` RoutingKey string `json:"routineKey"` @@ -39,13 +40,16 @@ type amqpLogger struct { } type amqpConn struct { - params amqpLogger + params amqpNotify *amqp.Connection } -func dialAMQP(amqpL amqpLogger) (amqpConn, error) { +// dialAMQP - dials and returns an amqpConnection instance, +// for sending notifications. Returns error if amqp logger +// is not enabled. +func dialAMQP(amqpL amqpNotify) (amqpConn, error) { if !amqpL.Enable { - return amqpConn{}, errLoggerNotEnabled + return amqpConn{}, errNotifyNotEnabled } conn, err := amqp.Dial(amqpL.URL) if err != nil { @@ -54,32 +58,31 @@ func dialAMQP(amqpL amqpLogger) (amqpConn, error) { return amqpConn{Connection: conn, params: amqpL}, nil } -func enableAMQPLogger() error { - amqpL := serverConfig.GetAMQPLogger() +func newAMQPNotify(accountID string) (*logrus.Logger, error) { + amqpL := serverConfig.GetAMQPNotifyByID(accountID) // Connect to amqp server. amqpC, err := dialAMQP(amqpL) if err != nil { - return err + return nil, err } - lvl, err := logrus.ParseLevel(amqpL.Level) - fatalIf(err, "Unknown log level found in the config file.") + amqpLog := logrus.New() + + // Disable writing to console. + amqpLog.Out = ioutil.Discard // Add a amqp hook. - log.Hooks.Add(amqpC) + amqpLog.Hooks.Add(amqpC) // Set default JSON formatter. - log.Formatter = new(logrus.JSONFormatter) + amqpLog.Formatter = new(logrus.JSONFormatter) - // Set default log level to info. - log.Level = lvl - - // Successfully enabled. - return nil + // Successfully enabled all AMQPs. + return amqpLog, nil } -// Fire is called when an event should be sent to the message broker. +// Fire is called when an event should be sent to the message broker.k func (q amqpConn) Fire(entry *logrus.Entry) error { ch, err := q.Connection.Channel() if err != nil { @@ -137,11 +140,6 @@ func (q amqpConn) Fire(entry *logrus.Entry) error { // Levels is available logging levels. func (q amqpConn) Levels() []logrus.Level { return []logrus.Level{ - logrus.PanicLevel, - logrus.FatalLevel, - logrus.ErrorLevel, - logrus.WarnLevel, logrus.InfoLevel, - logrus.DebugLevel, } } diff --git a/logger-elasticsearch.go b/notify-elasticsearch.go similarity index 63% rename from logger-elasticsearch.go rename to notify-elasticsearch.go index 6059f865f..e156079d6 100644 --- a/logger-elasticsearch.go +++ b/notify-elasticsearch.go @@ -18,81 +18,80 @@ package main import ( "errors" + "io/ioutil" "github.com/Sirupsen/logrus" "gopkg.in/olivere/elastic.v3" ) // elasticQueue is a elasticsearch event notification queue. -type elasticSearchLogger struct { +type elasticSearchNotify struct { Enable bool `json:"enable"` - Level string `json:"level"` URL string `json:"url"` Index string `json:"index"` } type elasticClient struct { *elastic.Client - params elasticSearchLogger + params elasticSearchNotify } // Connects to elastic search instance at URL. -func dialElastic(esLogger elasticSearchLogger) (*elastic.Client, error) { - if !esLogger.Enable { - return nil, errLoggerNotEnabled +func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) { + if !esNotify.Enable { + return nil, errNotifyNotEnabled } - client, err := elastic.NewClient(elastic.SetURL(esLogger.URL), elastic.SetSniff(false)) + client, err := elastic.NewClient(elastic.SetURL(esNotify.URL), elastic.SetSniff(false)) if err != nil { return nil, err } return client, nil } -// Enables elasticsearch logger. -func enableElasticLogger() error { - esLogger := serverConfig.GetElasticSearchLogger() +func newElasticNotify(accountID string) (*logrus.Logger, error) { + esNotify := serverConfig.GetElasticSearchNotifyByID(accountID) // Dial to elastic search. - client, err := dialElastic(esLogger) + client, err := dialElastic(esNotify) if err != nil { - return err + return nil, err } // Use the IndexExists service to check if a specified index exists. - exists, err := client.IndexExists(esLogger.Index).Do() + exists, err := client.IndexExists(esNotify.Index).Do() if err != nil { - return err + return nil, err } // Index does not exist, attempt to create it. if !exists { var createIndex *elastic.IndicesCreateResult - createIndex, err = client.CreateIndex(esLogger.Index).Do() + createIndex, err = client.CreateIndex(esNotify.Index).Do() if err != nil { - return err + return nil, err } if !createIndex.Acknowledged { - return errors.New("index not created") + return nil, errors.New("index not created") } } elasticCl := elasticClient{ Client: client, - params: esLogger, + params: esNotify, } - lvl, err := logrus.ParseLevel(esLogger.Level) - fatalIf(err, "Unknown log level found in the config file.") + elasticSearchLog := logrus.New() - // Add a elasticsearch hook. - log.Hooks.Add(elasticCl) + // Disable writing to console. + elasticSearchLog.Out = ioutil.Discard + + // Add a elasticSearch hook. + elasticSearchLog.Hooks.Add(elasticCl) // Set default JSON formatter. - log.Formatter = new(logrus.JSONFormatter) + elasticSearchLog.Formatter = new(logrus.JSONFormatter) - // Set default log level to info. - log.Level = lvl - - return nil + // Success, elastic search successfully initialized. + return elasticSearchLog, nil } // Fire is required to implement logrus hook @@ -108,11 +107,6 @@ func (q elasticClient) Fire(entry *logrus.Entry) error { // Required for logrus hook implementation func (q elasticClient) Levels() []logrus.Level { return []logrus.Level{ - logrus.PanicLevel, - logrus.FatalLevel, - logrus.ErrorLevel, - logrus.WarnLevel, logrus.InfoLevel, - logrus.DebugLevel, } } diff --git a/logger-redis.go b/notify-redis.go similarity index 73% rename from logger-redis.go rename to notify-redis.go index 504fabdf7..9d38d6beb 100644 --- a/logger-redis.go +++ b/notify-redis.go @@ -17,16 +17,16 @@ package main import ( + "io/ioutil" "time" "github.com/Sirupsen/logrus" "github.com/minio/redigo/redis" ) -// redisLogger to send logs to Redis server -type redisLogger struct { +// redisNotify to send logs to Redis server +type redisNotify struct { Enable bool `json:"enable"` - Level string `json:"level"` Addr string `json:"address"` Password string `json:"password"` Key string `json:"key"` @@ -34,17 +34,17 @@ type redisLogger struct { type redisConn struct { *redis.Pool - params redisLogger + params redisNotify } // Dial a new connection to redis instance at addr, optionally with a password if any. -func dialRedis(rLogger redisLogger) (*redis.Pool, error) { +func dialRedis(rNotify redisNotify) (*redis.Pool, error) { // Return error if redis not enabled. - if !rLogger.Enable { - return nil, errLoggerNotEnabled + if !rNotify.Enable { + return nil, errNotifyNotEnabled } - addr := rLogger.Addr - password := rLogger.Password + addr := rNotify.Addr + password := rNotify.Password rPool := &redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, @@ -81,35 +81,34 @@ func dialRedis(rLogger redisLogger) (*redis.Pool, error) { return rPool, nil } -func enableRedisLogger() error { - rLogger := serverConfig.GetRedisLogger() +func newRedisNotify(accountID string) (*logrus.Logger, error) { + rNotify := serverConfig.GetRedisNotifyByID(accountID) // Dial redis. - rPool, err := dialRedis(rLogger) + rPool, err := dialRedis(rNotify) if err != nil { - return err + return nil, err } rrConn := redisConn{ Pool: rPool, - params: rLogger, + params: rNotify, } - lvl, err := logrus.ParseLevel(rLogger.Level) - fatalIf(err, "Unknown log level found in the config file.") + redisLog := logrus.New() - // Add a elasticsearch hook. - log.Hooks.Add(rrConn) + redisLog.Out = ioutil.Discard // Set default JSON formatter. - log.Formatter = new(logrus.JSONFormatter) + redisLog.Formatter = new(logrus.JSONFormatter) - // Set default log level to info. - log.Level = lvl + redisLog.Hooks.Add(rrConn) - return nil + // Success, redis enabled. + return redisLog, nil } +// Fire is called when an event should be sent to the message broker. func (r redisConn) Fire(entry *logrus.Entry) error { rConn := r.Pool.Get() defer rConn.Close() @@ -129,11 +128,6 @@ func (r redisConn) Fire(entry *logrus.Entry) error { // Required for logrus hook implementation func (r redisConn) Levels() []logrus.Level { return []logrus.Level{ - logrus.PanicLevel, - logrus.FatalLevel, - logrus.ErrorLevel, - logrus.WarnLevel, logrus.InfoLevel, - logrus.DebugLevel, } } diff --git a/object-handlers.go b/object-handlers.go index 48e73aca5..e65882104 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -27,7 +27,6 @@ import ( "sort" "strconv" "strings" - "time" mux "github.com/gorilla/mux" ) @@ -356,20 +355,17 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // Explicitly close the reader, to avoid fd leaks. pipeReader.Close() - // Load notification config if any. - nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket) - // Notifications not set, return. - if err == errNoSuchNotifications { - return + if eventN.IsBucketNotificationSet(bucket) { + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedCopy, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } - // For all other errors, return. - if err != nil { - errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) - return - } - - // Notify object created event. - notifyObjectCreatedEvent(nConfig, ObjectCreatedCopy, bucket, objInfo) } // PutObjectHandler - PUT Object @@ -440,18 +436,6 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } writeSuccessResponse(w, nil) - // Load notification config if any. - nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket) - // Notifications not set, return. - if err == errNoSuchNotifications { - return - } - // For all other errors return. - if err != nil { - errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) - return - } - // Fetch object info for notifications. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { @@ -459,8 +443,17 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } - // Notify object created event. - notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, objInfo) + if eventN.IsBucketNotificationSet(bucket) { + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPut, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) + } } /// Multipart objectAPIHandlers @@ -614,19 +607,6 @@ func (api objectAPIHandlers) AbortMultipartUploadHandler(w http.ResponseWriter, writeSuccessNoContent(w) } -// Send whitespace character, once every 5secs, until CompleteMultipartUpload is done. -// CompleteMultipartUpload method of the object layer indicates that it's done via doneCh -func sendWhiteSpaceChars(w http.ResponseWriter, doneCh <-chan struct{}) { - for { - select { - case <-time.After(5 * time.Second): - w.Write([]byte(" ")) - case <-doneCh: - return - } - } -} - // ListObjectPartsHandler - List object parts func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -777,18 +757,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite w.Write(encodedSuccessResponse) w.(http.Flusher).Flush() - // Load notification config if any. - nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket) - // Notifications not set, return. - if err == errNoSuchNotifications { - return - } - // For all other errors. - if err != nil { - errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) - return - } - // Fetch object info for notifications. objInfo, err := api.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { @@ -796,8 +764,17 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite return } - // Notify object created event. - notifyObjectCreatedEvent(nConfig, ObjectCreatedCompleteMultipartUpload, bucket, objInfo) + if eventN.IsBucketNotificationSet(bucket) { + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedCompleteMultipartUpload, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) + } } /// Delete objectAPIHandlers @@ -834,18 +811,17 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } writeSuccessNoContent(w) - // Load notification config if any. - nConfig, err := loadNotificationConfig(api.ObjectAPI, bucket) - // Notifications not set, return. - if err == errNoSuchNotifications { - return + if eventN.IsBucketNotificationSet(bucket) { + // Notify object deleted event. + eventNotify(eventData{ + Type: ObjectRemovedDelete, + Bucket: bucket, + ObjInfo: ObjectInfo{ + Name: object, + }, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) } - // For all other errors, return. - if err != nil { - errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) - return - } - - // Notify object deleted event. - notifyObjectDeletedEvent(nConfig, bucket, object) } diff --git a/pkg/sys/stats_linux.go b/pkg/sys/stats_linux.go index 8099e72f2..dcf5a9fa7 100644 --- a/pkg/sys/stats_linux.go +++ b/pkg/sys/stats_linux.go @@ -6,7 +6,7 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + *shouldP * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software diff --git a/pkg/sys/stats_test.go b/pkg/sys/stats_test.go new file mode 100644 index 000000000..1a713aed8 --- /dev/null +++ b/pkg/sys/stats_test.go @@ -0,0 +1,16 @@ +// +build linux darwin windows + +package sys + +import "testing" + +// Test get stats result. +func TestGetStats(t *testing.T) { + stats, err := GetStats() + if err != nil { + t.Errorf("Tests: Expected `nil`, Got %s", err) + } + if stats.TotalRAM == 0 { + t.Errorf("Tests: Expected `n > 0`, Got %d", stats.TotalRAM) + } +} diff --git a/queues.go b/queues.go deleted file mode 100644 index 28ca2c980..000000000 --- a/queues.go +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Minio Cloud Storage, (C) 2016 Minio, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "fmt" - "net/url" - "strings" - "time" - - "github.com/Sirupsen/logrus" - "github.com/minio/minio/pkg/wildcard" -) - -const ( - minioSqs = "arn:minio:sqs:" - // Static string indicating queue type 'amqp'. - queueTypeAMQP = "1:amqp" - // Static string indicating queue type 'elasticsearch'. - queueTypeElastic = "1:elasticsearch" - // Static string indicating queue type 'redis'. - queueTypeRedis = "1:redis" -) - -// Returns true if queueArn is for an AMQP queue. -func isAMQPQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType != queueTypeAMQP { - return false - } - amqpL := serverConfig.GetAMQPLogger() - // Connect to amqp server to validate. - amqpC, err := dialAMQP(amqpL) - if err != nil { - errorIf(err, "Unable to connect to amqp service. %#v", amqpL) - return false - } - defer amqpC.Close() - return true -} - -// Returns true if queueArn is for an Redis queue. -func isRedisQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType != queueTypeRedis { - return false - } - rLogger := serverConfig.GetRedisLogger() - // Connect to redis server to validate. - rPool, err := dialRedis(rLogger) - if err != nil { - errorIf(err, "Unable to connect to redis service. %#v", rLogger) - return false - } - defer rPool.Close() - return true -} - -// Returns true if queueArn is for an ElasticSearch queue. -func isElasticQueue(sqsArn arnMinioSqs) bool { - if sqsArn.sqsType != queueTypeElastic { - return false - } - esLogger := serverConfig.GetElasticSearchLogger() - elasticC, err := dialElastic(esLogger) - if err != nil { - errorIf(err, "Unable to connect to elasticsearch service %#v", esLogger) - return false - } - defer elasticC.Stop() - return true -} - -// Match function matches wild cards in 'pattern' for events. -func eventMatch(eventType EventName, events []string) (ok bool) { - for _, event := range events { - ok = wildcard.Match(event, eventType.String()) - if ok { - break - } - } - return ok -} - -// Filter rule match, matches an object against the filter rules. -func filterRuleMatch(object string, frs []filterRule) bool { - var prefixMatch, suffixMatch = true, true - for _, fr := range frs { - if isValidFilterNamePrefix(fr.Name) { - prefixMatch = strings.HasPrefix(object, fr.Value) - } else if isValidFilterNameSuffix(fr.Name) { - suffixMatch = strings.HasSuffix(object, fr.Value) - } - } - return prefixMatch && suffixMatch -} - -// NotifyObjectCreatedEvent - notifies a new 's3:ObjectCreated' event. -// List of events reported through this function are -// - s3:ObjectCreated:Put -// - s3:ObjectCreated:Post -// - s3:ObjectCreated:Copy -// - s3:ObjectCreated:CompleteMultipartUpload -func notifyObjectCreatedEvent(nConfig notificationConfig, eventType EventName, bucket string, objInfo ObjectInfo) { - /// Construct a new object created event. - region := serverConfig.GetRegion() - tnow := time.Now().UTC() - sequencer := fmt.Sprintf("%X", tnow.UnixNano()) - // Following blocks fills in all the necessary details of s3 event message structure. - // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html - events := []*NotificationEvent{ - { - EventVersion: "2.0", - EventSource: "aws:s3", - AwsRegion: region, - EventTime: tnow.Format(iso8601Format), - EventName: eventType.String(), - UserIdentity: defaultIdentity(), - RequestParameters: map[string]string{}, - ResponseElements: map[string]string{}, - S3: eventMeta{ - SchemaVersion: "1.0", - ConfigurationID: "Config", - Bucket: bucketMeta{ - Name: bucket, - OwnerIdentity: defaultIdentity(), - ARN: "arn:aws:s3:::" + bucket, - }, - Object: objectMeta{ - Key: url.QueryEscape(objInfo.Name), - ETag: objInfo.MD5Sum, - Size: objInfo.Size, - Sequencer: sequencer, - }, - }, - }, - } - // Notify to all the configured queues. - for _, qConfig := range nConfig.QueueConfigurations { - ruleMatch := filterRuleMatch(objInfo.Name, qConfig.Filter.Key.FilterRules) - if eventMatch(eventType, qConfig.Events) && ruleMatch { - log.WithFields(logrus.Fields{ - "Records": events, - }).Info() - } - } -} - -// NotifyObjectRemovedEvent - notifies a new 's3:ObjectRemoved' event. -// List of events reported through this function are -// - s3:ObjectRemoved:Delete -func notifyObjectDeletedEvent(nConfig notificationConfig, bucket string, object string) { - region := serverConfig.GetRegion() - tnow := time.Now().UTC() - sequencer := fmt.Sprintf("%X", tnow.UnixNano()) - // Following blocks fills in all the necessary details of s3 event message structure. - // http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html - events := []*NotificationEvent{ - { - EventVersion: "2.0", - EventSource: "aws:s3", - AwsRegion: region, - EventTime: tnow.Format(iso8601Format), - EventName: ObjectRemovedDelete.String(), - UserIdentity: defaultIdentity(), - RequestParameters: map[string]string{}, - ResponseElements: map[string]string{}, - S3: eventMeta{ - SchemaVersion: "1.0", - ConfigurationID: "Config", - Bucket: bucketMeta{ - Name: bucket, - OwnerIdentity: defaultIdentity(), - ARN: "arn:aws:s3:::" + bucket, - }, - Object: objectMeta{ - Key: url.QueryEscape(object), - Sequencer: sequencer, - }, - }, - }, - } - // Notify to all the configured queues. - for _, qConfig := range nConfig.QueueConfigurations { - ruleMatch := filterRuleMatch(object, qConfig.Filter.Key.FilterRules) - if eventMatch(ObjectRemovedDelete, qConfig.Events) && ruleMatch { - log.WithFields(logrus.Fields{ - "Records": events, - }).Info() - } - } -} diff --git a/routers.go b/routers.go index b1a6b219a..b2e55f455 100644 --- a/routers.go +++ b/routers.go @@ -66,6 +66,10 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { ObjectAPI: objAPI, } + // Initialize a new event notifier. + err = initEventNotifier(objAPI) + fatalIf(err, "Unable to initialize event notification queue") + // Initialize router. mux := router.NewRouter() diff --git a/server_test.go b/server_test.go index b36d06e07..e6f1da9c7 100644 --- a/server_test.go +++ b/server_test.go @@ -74,6 +74,89 @@ func (s *TestSuiteCommon) TestAuth(c *C) { c.Assert(len(accessID), Equals, minioAccessID) } +// TestBucketNotification - Inserts the bucket notification and +// verifies it by fetching the notification back. +func (s *TestSuiteCommon) TestBucketNotification(c *C) { + // Sample bucket notification + bucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda` + + // generate a random bucket Name. + bucketName := getRandomBucketName() + // HTTP request to create the bucket. + request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName), + 0, nil, s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client := http.Client{} + // execute the request. + response, err := client.Do(request) + c.Assert(err, IsNil) + // assert the http response status code. + c.Assert(response.StatusCode, Equals, http.StatusOK) + + request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), + int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the HTTP request to create bucket. + response, err = client.Do(request) + + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + // Fetch the uploaded policy. + request, err = newTestSignedRequest("GET", getGetNotificationURL(s.endPoint, bucketName), 0, nil, + s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + response, err = client.Do(request) + c.Assert(err, IsNil) + c.Assert(response.StatusCode, Equals, http.StatusOK) + + bucketNotificationReadBuf, err := ioutil.ReadAll(response.Body) + c.Assert(err, IsNil) + // Verify if downloaded policy matches with previousy uploaded. + c.Assert(bytes.Equal([]byte(bucketNotificationBuf), bucketNotificationReadBuf), Equals, true) + + invalidBucketNotificationBuf := `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-east-1:444455556666:minio` + + request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), + int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the HTTP request to create bucket. + response, err = client.Do(request) + c.Assert(err, IsNil) + + verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest) + + invalidBucketNotificationBuf = `s3:ObjectCreated:Putprefiximages/1arn:minio:lambda:us-west-1:444455556666:lambda` + request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), + int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the HTTP request to create bucket. + response, err = client.Do(request) + c.Assert(err, IsNil) + + verifyError(c, response, "InvalidArgument", "A specified destination is in a different region than the bucket. You must use a destination that resides in the same region as the bucket.", http.StatusBadRequest) + + invalidBucketNotificationBuf = `s3:ObjectCreated:Invalidprefiximages/1arn:minio:lambda:us-east-1:444455556666:lambda` + request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName), + int64(len(invalidBucketNotificationBuf)), bytes.NewReader([]byte(invalidBucketNotificationBuf)), s.accessKey, s.secretKey) + c.Assert(err, IsNil) + + client = http.Client{} + // execute the HTTP request to create bucket. + response, err = client.Do(request) + c.Assert(err, IsNil) + verifyError(c, response, "InvalidArgument", "A specified event is not supported for notifications.", http.StatusBadRequest) +} + // TestBucketPolicy - Inserts the bucket policy and verifies it by fetching the policy back. // Deletes the policy and verifies the deletion by fetching it back. func (s *TestSuiteCommon) TestBucketPolicy(c *C) { diff --git a/test-utils_test.go b/test-utils_test.go index 6a4afb702..0012a3ace 100644 --- a/test-utils_test.go +++ b/test-utils_test.go @@ -559,6 +559,20 @@ func getHeadObjectURL(endPoint, bucketName, objectName string) string { return makeTestTargetURL(endPoint, bucketName, objectName, url.Values{}) } +// return URL for inserting bucket notification. +func getPutNotificationURL(endPoint, bucketName string) string { + queryValue := url.Values{} + queryValue.Set("notification", "") + return makeTestTargetURL(endPoint, bucketName, "", queryValue) +} + +// return URL for fetching bucket notification. +func getGetNotificationURL(endPoint, bucketName string) string { + queryValue := url.Values{} + queryValue.Set("notification", "") + return makeTestTargetURL(endPoint, bucketName, "", queryValue) +} + // return URL for inserting bucket policy. func getPutPolicyURL(endPoint, bucketName string) string { queryValue := url.Values{} diff --git a/notifier.go b/update-notifier.go similarity index 100% rename from notifier.go rename to update-notifier.go diff --git a/web-handlers.go b/web-handlers.go index 71b1b22d4..06297858d 100644 --- a/web-handlers.go +++ b/web-handlers.go @@ -386,18 +386,6 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - // Load notification config if any. - nConfig, err := loadNotificationConfig(web.ObjectAPI, bucket) - // Notifications not set, return. - if err == errNoSuchNotifications { - return - } - // For all other errors, return. - if err != nil { - errorIf(err, "Unable to load notification config for bucket: \"%s\"", bucket) - return - } - // Fetch object info for notifications. objInfo, err := web.ObjectAPI.GetObjectInfo(bucket, object) if err != nil { @@ -405,8 +393,17 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - // Notify object created event. - notifyObjectCreatedEvent(nConfig, ObjectCreatedPut, bucket, objInfo) + if eventN.IsBucketNotificationSet(bucket) { + // Notify object created event. + eventNotify(eventData{ + Type: ObjectCreatedPut, + Bucket: bucket, + ObjInfo: objInfo, + ReqParams: map[string]string{ + "sourceIPAddress": r.RemoteAddr, + }, + }) + } } // Download - file download handler. diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index baa844260..089e1aec0 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -320,7 +320,7 @@ func (xl xlObjects) NewMultipartUpload(bucket, object string, meta map[string]st // PutObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to single put operation but it is part -// of the multipart transcation. +// of the multipart transaction. // // Implements S3 compatible Upload Part API. func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Hex string) (string, error) {