make notification as separate package (#5294)

* Remove old notification files

* Add net package

* Add event package

* Modify minio to take new notification system
This commit is contained in:
Bala FA
2018-03-16 01:33:41 +05:30
committed by kannappanr
parent abffa00b76
commit 0e4431725c
117 changed files with 7677 additions and 9296 deletions

83
pkg/event/arn.go Normal file
View File

@@ -0,0 +1,83 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/xml"
"strings"
)
// ARN - SQS resource name representation.
type ARN struct {
TargetID
region string
}
// String - returns string representation.
func (arn ARN) String() string {
if arn.TargetID.ID == "" && arn.TargetID.Name == "" && arn.region == "" {
return ""
}
return "arn:minio:sqs:" + arn.region + ":" + arn.TargetID.String()
}
// MarshalXML - encodes to XML data.
func (arn ARN) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
return e.EncodeElement(arn.String(), start)
}
// UnmarshalXML - decodes XML data.
func (arn *ARN) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
var s string
if err := d.DecodeElement(&s, &start); err != nil {
return err
}
parsedARN, err := parseARN(s)
if err != nil {
return err
}
*arn = *parsedARN
return nil
}
// parseARN - parses string to ARN.
func parseARN(s string) (*ARN, error) {
// ARN must be in the format of arn:minio:sqs:<REGION>:<ID>:<TYPE>
if !strings.HasPrefix(s, "arn:minio:sqs:") {
return nil, &ErrInvalidARN{s}
}
tokens := strings.Split(s, ":")
if len(tokens) != 6 {
return nil, &ErrInvalidARN{s}
}
if tokens[4] == "" || tokens[5] == "" {
return nil, &ErrInvalidARN{s}
}
return &ARN{
region: tokens[3],
TargetID: TargetID{
ID: tokens[4],
Name: tokens[5],
},
}, nil
}

129
pkg/event/arn_test.go Normal file
View File

@@ -0,0 +1,129 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/xml"
"reflect"
"testing"
)
func TestARNString(t *testing.T) {
testCases := []struct {
arn ARN
expectedResult string
}{
{ARN{}, ""},
{ARN{TargetID{"1", "webhook"}, ""}, "arn:minio:sqs::1:webhook"},
{ARN{TargetID{"1", "webhook"}, "us-east-1"}, "arn:minio:sqs:us-east-1:1:webhook"},
}
for i, testCase := range testCases {
result := testCase.arn.String()
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestARNMarshalXML(t *testing.T) {
testCases := []struct {
arn ARN
expectedData []byte
expectErr bool
}{
{ARN{}, []byte("<ARN></ARN>"), false},
{ARN{TargetID{"1", "webhook"}, ""}, []byte("<ARN>arn:minio:sqs::1:webhook</ARN>"), false},
{ARN{TargetID{"1", "webhook"}, "us-east-1"}, []byte("<ARN>arn:minio:sqs:us-east-1:1:webhook</ARN>"), false},
}
for i, testCase := range testCases {
data, err := xml.Marshal(testCase.arn)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(data, testCase.expectedData) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, string(testCase.expectedData), string(data))
}
}
}
}
func TestARNUnmarshalXML(t *testing.T) {
testCases := []struct {
data []byte
expectedARN *ARN
expectErr bool
}{
{[]byte("<ARN></ARN>"), nil, true},
{[]byte("<ARN>arn:minio:sqs:::</ARN>"), nil, true},
{[]byte("<ARN>arn:minio:sqs::1:webhook</ARN>"), &ARN{TargetID{"1", "webhook"}, ""}, false},
{[]byte("<ARN>arn:minio:sqs:us-east-1:1:webhook</ARN>"), &ARN{TargetID{"1", "webhook"}, "us-east-1"}, false},
}
for i, testCase := range testCases {
arn := &ARN{}
err := xml.Unmarshal(testCase.data, &arn)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if *arn != *testCase.expectedARN {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedARN, arn)
}
}
}
}
func TestParseARN(t *testing.T) {
testCases := []struct {
s string
expectedARN *ARN
expectErr bool
}{
{"", nil, true},
{"arn:minio:sqs:::", nil, true},
{"arn:minio:sqs::1:webhook:remote", nil, true},
{"arn:aws:sqs::1:webhook", nil, true},
{"arn:minio:sns::1:webhook", nil, true},
{"arn:minio:sqs::1:webhook", &ARN{TargetID{"1", "webhook"}, ""}, false},
{"arn:minio:sqs:us-east-1:1:webhook", &ARN{TargetID{"1", "webhook"}, "us-east-1"}, false},
}
for i, testCase := range testCases {
arn, err := parseARN(testCase.s)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if *arn != *testCase.expectedARN {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedARN, arn)
}
}
}
}

292
pkg/event/config.go Normal file
View File

@@ -0,0 +1,292 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/xml"
"errors"
"io"
"reflect"
"strings"
"unicode/utf8"
"github.com/minio/minio-go/pkg/set"
)
// ValidateFilterRuleValue - checks if given value is filter rule value or not.
func ValidateFilterRuleValue(value string) error {
for _, segment := range strings.Split(value, "/") {
if segment == "." || segment == ".." {
return &ErrInvalidFilterValue{value}
}
}
if len(value) <= 1024 && utf8.ValidString(value) && !strings.Contains(value, `\`) {
return nil
}
return &ErrInvalidFilterValue{value}
}
// FilterRule - represents elements inside <FilterRule>...</FilterRule>
type FilterRule struct {
Name string `xml:"Name"`
Value string `xml:"Value"`
}
// UnmarshalXML - decodes XML data.
func (filter *FilterRule) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type filterRule FilterRule
rule := filterRule{}
if err := d.DecodeElement(&rule, &start); err != nil {
return err
}
if rule.Name != "prefix" && rule.Name != "suffix" {
return &ErrInvalidFilterName{rule.Name}
}
if err := ValidateFilterRuleValue(filter.Value); err != nil {
return err
}
*filter = FilterRule(rule)
return nil
}
// FilterRuleList - represents multiple <FilterRule>...</FilterRule>
type FilterRuleList struct {
Rules []FilterRule `xml:"FilterRule,omitempty"`
}
// UnmarshalXML - decodes XML data.
func (ruleList *FilterRuleList) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type filterRuleList FilterRuleList
rules := filterRuleList{}
if err := d.DecodeElement(&rules, &start); err != nil {
return err
}
// FilterRuleList must have only one prefix and/or suffix.
nameSet := set.NewStringSet()
for _, rule := range rules.Rules {
if nameSet.Contains(rule.Name) {
if rule.Name == "prefix" {
return &ErrFilterNamePrefix{}
}
return &ErrFilterNameSuffix{}
}
nameSet.Add(rule.Name)
}
*ruleList = FilterRuleList(rules)
return nil
}
// Pattern - returns pattern using prefix and suffix values.
func (ruleList FilterRuleList) Pattern() string {
var prefix string
var suffix string
for _, rule := range ruleList.Rules {
switch rule.Name {
case "prefix":
prefix = rule.Value
case "suffix":
suffix = rule.Value
}
}
return NewPattern(prefix, suffix)
}
// S3Key - represents elements inside <S3Key>...</S3Key>
type S3Key struct {
RuleList FilterRuleList `xml:"S3Key,omitempty" json:"S3Key,omitempty"`
}
// common - represents common elements inside <QueueConfiguration>, <CloudFunctionConfiguration>
// and <TopicConfiguration>
type common struct {
ID string `xml:"Id" json:"Id"`
Filter S3Key `xml:"Filter" json:"Filter"`
Events []Name `xml:"Event" json:"Event"`
}
// Queue - represents elements inside <QueueConfiguration>
type Queue struct {
common
ARN ARN `xml:"Queue"`
}
// UnmarshalXML - decodes XML data.
func (q *Queue) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type queue Queue
parsedQueue := queue{}
if err := d.DecodeElement(&parsedQueue, &start); err != nil {
return err
}
if len(parsedQueue.Events) == 0 {
return errors.New("missing event name(s)")
}
eventStringSet := set.NewStringSet()
for _, eventName := range parsedQueue.Events {
if eventStringSet.Contains(eventName.String()) {
return &ErrDuplicateEventName{eventName}
}
eventStringSet.Add(eventName.String())
}
*q = Queue(parsedQueue)
return nil
}
// Validate - checks whether queue has valid values or not.
func (q Queue) Validate(region string, targetList *TargetList) error {
if region != "" && q.ARN.region != region {
return &ErrUnknownRegion{q.ARN.region}
}
if !targetList.Exists(q.ARN.TargetID) {
return &ErrARNNotFound{q.ARN}
}
return nil
}
// SetRegion - sets region value to queue's ARN.
func (q *Queue) SetRegion(region string) {
q.ARN.region = region
}
// ToRulesMap - converts Queue to RulesMap
func (q Queue) ToRulesMap() RulesMap {
pattern := q.Filter.RuleList.Pattern()
return NewRulesMap(q.Events, pattern, q.ARN.TargetID)
}
// Unused. Available for completion.
type lambda struct {
common
ARN string `xml:"CloudFunction"`
}
// Unused. Available for completion.
type topic struct {
common
ARN string `xml:"Topic" json:"Topic"`
}
// Config - notification configuration described in
// http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
type Config struct {
XMLName xml.Name `xml:"NotificationConfiguration"`
QueueList []Queue `xml:"QueueConfiguration,omitempty"`
LambdaList []lambda `xml:"CloudFunctionConfiguration,omitempty"`
TopicList []topic `xml:"TopicConfiguration,omitempty"`
}
// UnmarshalXML - decodes XML data.
func (conf *Config) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
// Make subtype to avoid recursive UnmarshalXML().
type config Config
parsedConfig := config{}
if err := d.DecodeElement(&parsedConfig, &start); err != nil {
return err
}
if len(parsedConfig.QueueList) == 0 {
return errors.New("missing queue configuration(s)")
}
for i, q1 := range parsedConfig.QueueList[:len(parsedConfig.QueueList)-1] {
for _, q2 := range parsedConfig.QueueList[i+1:] {
if reflect.DeepEqual(q1, q2) {
return &ErrDuplicateQueueConfiguration{q1}
}
}
}
if len(parsedConfig.LambdaList) > 0 || len(parsedConfig.TopicList) > 0 {
return &ErrUnsupportedConfiguration{}
}
*conf = Config(parsedConfig)
return nil
}
// Validate - checks whether config has valid values or not.
func (conf Config) Validate(region string, targetList *TargetList) error {
for _, queue := range conf.QueueList {
if err := queue.Validate(region, targetList); err != nil {
return err
}
// TODO: Need to discuss/check why same ARN cannot be used in another queue configuration.
}
return nil
}
// SetRegion - sets region to all queue configuration.
func (conf *Config) SetRegion(region string) {
for i := range conf.QueueList {
conf.QueueList[i].SetRegion(region)
}
}
// ToRulesMap - converts all queue configuration to RulesMap.
func (conf *Config) ToRulesMap() RulesMap {
rulesMap := make(RulesMap)
for _, queue := range conf.QueueList {
rulesMap.Add(queue.ToRulesMap())
}
return rulesMap
}
// ParseConfig - parses data in reader to notification configuration.
func ParseConfig(reader io.Reader, region string, targetList *TargetList) (*Config, error) {
var config Config
if err := xml.NewDecoder(reader).Decode(&config); err != nil {
return nil, err
}
if len(config.QueueList) == 0 {
return nil, errors.New("missing queue configuration(s)")
}
if err := config.Validate(region, targetList); err != nil {
return nil, err
}
config.SetRegion(region)
return &config, nil
}

961
pkg/event/config_test.go Normal file
View File

@@ -0,0 +1,961 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/xml"
"reflect"
"strings"
"testing"
)
func TestValidateFilterRuleValue(t *testing.T) {
testCases := []struct {
value string
expectErr bool
}{
{"foo/.", true},
{"../foo", true},
{`foo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/bazfoo/bar/baz`, true},
{string([]byte{0xff, 0xfe, 0xfd}), true},
{`foo\bar`, true},
{"Hello/世界", false},
}
for i, testCase := range testCases {
err := ValidateFilterRuleValue(testCase.value)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestFilterRuleUnmarshalXML(t *testing.T) {
testCases := []struct {
data []byte
expectedResult *FilterRule
expectErr bool
}{
{[]byte(`<FilterRule></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Name></Name></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Value></Value></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Name></Name><Value></Value></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Name>Prefix</Name><Value>Hello/世界</Value></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Name>ends</Name><Value>foo/bar</Value></FilterRule>`), nil, true},
{[]byte(`<FilterRule><Name>prefix</Name><Value>Hello/世界</Value></FilterRule>`), &FilterRule{"prefix", "Hello/世界"}, false},
{[]byte(`<FilterRule><Name>suffix</Name><Value>foo/bar</Value></FilterRule>`), &FilterRule{"suffix", "foo/bar"}, false},
}
for i, testCase := range testCases {
result := &FilterRule{}
err := xml.Unmarshal(testCase.data, result)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
}
func TestFilterRuleListUnmarshalXML(t *testing.T) {
testCases := []struct {
data []byte
expectedResult *FilterRuleList
expectErr bool
}{
{[]byte(`<S3Key><FilterRule><Name>suffix</Name><Value>Hello/世界</Value></FilterRule><FilterRule><Name>suffix</Name><Value>foo/bar</Value></FilterRule></S3Key>`), nil, true},
{[]byte(`<S3Key><FilterRule><Name>prefix</Name><Value>Hello/世界</Value></FilterRule><FilterRule><Name>prefix</Name><Value>foo/bar</Value></FilterRule></S3Key>`), nil, true},
{[]byte(`<S3Key><FilterRule><Name>prefix</Name><Value>Hello/世界</Value></FilterRule></S3Key>`), &FilterRuleList{[]FilterRule{{"prefix", "Hello/世界"}}}, false},
{[]byte(`<S3Key><FilterRule><Name>suffix</Name><Value>foo/bar</Value></FilterRule></S3Key>`), &FilterRuleList{[]FilterRule{{"suffix", "foo/bar"}}}, false},
{[]byte(`<S3Key><FilterRule><Name>prefix</Name><Value>Hello/世界</Value></FilterRule><FilterRule><Name>suffix</Name><Value>foo/bar</Value></FilterRule></S3Key>`), &FilterRuleList{[]FilterRule{{"prefix", "Hello/世界"}, {"suffix", "foo/bar"}}}, false},
}
for i, testCase := range testCases {
result := &FilterRuleList{}
err := xml.Unmarshal(testCase.data, result)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
}
func TestFilterRuleListPattern(t *testing.T) {
testCases := []struct {
filterRuleList FilterRuleList
expectedResult string
}{
{FilterRuleList{}, ""},
{FilterRuleList{[]FilterRule{{"prefix", "Hello/世界"}}}, "Hello/世界*"},
{FilterRuleList{[]FilterRule{{"suffix", "foo/bar"}}}, "*foo/bar"},
{FilterRuleList{[]FilterRule{{"prefix", "Hello/世界"}, {"suffix", "foo/bar"}}}, "Hello/世界*foo/bar"},
}
for i, testCase := range testCases {
result := testCase.filterRuleList.Pattern()
if result != testCase.expectedResult {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestQueueUnmarshalXML(t *testing.T) {
dataCase1 := []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>`)
dataCase2 := []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>`)
dataCase3 := []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>`)
testCases := []struct {
data []byte
expectErr bool
}{
{dataCase1, false},
{dataCase2, false},
{dataCase3, true},
}
for i, testCase := range testCases {
err := xml.Unmarshal(testCase.data, &Queue{})
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestQueueValidate(t *testing.T) {
var data []byte
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>`)
queue1 := &Queue{}
if err := xml.Unmarshal(data, queue1); err != nil {
panic(err)
}
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>`)
queue2 := &Queue{}
if err := xml.Unmarshal(data, queue2); err != nil {
panic(err)
}
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:eu-west-2:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>`)
queue3 := &Queue{}
if err := xml.Unmarshal(data, queue3); err != nil {
panic(err)
}
targetList1 := NewTargetList()
targetList2 := NewTargetList()
if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
queue *Queue
region string
targetList *TargetList
expectErr bool
}{
{queue1, "eu-west-1", nil, true},
{queue2, "us-east-1", targetList1, true},
{queue3, "", targetList2, false},
{queue2, "us-east-1", targetList2, false},
}
for i, testCase := range testCases {
err := testCase.queue.Validate(testCase.region, testCase.targetList)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestQueueSetRegion(t *testing.T) {
var data []byte
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>`)
queue1 := &Queue{}
if err := xml.Unmarshal(data, queue1); err != nil {
panic(err)
}
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs::1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>`)
queue2 := &Queue{}
if err := xml.Unmarshal(data, queue2); err != nil {
panic(err)
}
testCases := []struct {
queue *Queue
region string
expectedResult ARN
}{
{queue1, "eu-west-1", ARN{TargetID{"1", "webhook"}, "eu-west-1"}},
{queue1, "", ARN{TargetID{"1", "webhook"}, ""}},
{queue2, "us-east-1", ARN{TargetID{"1", "webhook"}, "us-east-1"}},
{queue2, "", ARN{TargetID{"1", "webhook"}, ""}},
}
for i, testCase := range testCases {
testCase.queue.SetRegion(testCase.region)
result := testCase.queue.ARN
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestQueueToRulesMap(t *testing.T) {
var data []byte
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>`)
queueCase1 := &Queue{}
if err := xml.Unmarshal(data, queueCase1); err != nil {
panic(err)
}
data = []byte(`
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>`)
queueCase2 := &Queue{}
if err := xml.Unmarshal(data, queueCase2); err != nil {
panic(err)
}
rulesMapCase1 := NewRulesMap([]Name{ObjectAccessedAll, ObjectCreatedAll, ObjectRemovedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase2 := NewRulesMap([]Name{ObjectCreatedPut}, "images/*jpg", TargetID{"1", "webhook"})
testCases := []struct {
queue *Queue
expectedResult RulesMap
}{
{queueCase1, rulesMapCase1},
{queueCase2, rulesMapCase2},
}
for i, testCase := range testCases {
result := testCase.queue.ToRulesMap()
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestConfigUnmarshalXML(t *testing.T) {
dataCase1 := []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
dataCase2 := []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
dataCase3 := []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<QueueConfiguration>
<Id>2</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
dataCase4 := []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<CloudFunctionConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>suffix</Name>
<Value>.jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Cloudcode>arn:aws:lambda:us-west-2:444455556666:cloud-function-A</Cloudcode>
<Event>s3:ObjectCreated:Put</Event>
</CloudFunctionConfiguration>
<TopicConfiguration>
<Topic>arn:aws:sns:us-west-2:444455556666:sns-notification-one</Topic>
<Event>s3:ObjectCreated:*</Event>
</TopicConfiguration>
</NotificationConfiguration>
`)
testCases := []struct {
data []byte
expectErr bool
}{
{dataCase1, false},
{dataCase2, false},
{dataCase3, false},
{dataCase4, true},
}
for i, testCase := range testCases {
err := xml.Unmarshal(testCase.data, &Config{})
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestConfigValidate(t *testing.T) {
var data []byte
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config1 := &Config{}
if err := xml.Unmarshal(data, config1); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config2 := &Config{}
if err := xml.Unmarshal(data, config2); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<QueueConfiguration>
<Id>2</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config3 := &Config{}
if err := xml.Unmarshal(data, config3); err != nil {
panic(err)
}
targetList1 := NewTargetList()
targetList2 := NewTargetList()
if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
config *Config
region string
targetList *TargetList
expectErr bool
}{
{config1, "eu-west-1", nil, true},
{config2, "us-east-1", targetList1, true},
{config3, "", targetList2, false},
{config2, "us-east-1", targetList2, false},
}
for i, testCase := range testCases {
err := testCase.config.Validate(testCase.region, testCase.targetList)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestConfigSetRegion(t *testing.T) {
var data []byte
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config1 := &Config{}
if err := xml.Unmarshal(data, config1); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs::1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config2 := &Config{}
if err := xml.Unmarshal(data, config2); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<QueueConfiguration>
<Id>2</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:2:amqp</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config3 := &Config{}
if err := xml.Unmarshal(data, config3); err != nil {
panic(err)
}
testCases := []struct {
config *Config
region string
expectedResult []ARN
}{
{config1, "eu-west-1", []ARN{{TargetID{"1", "webhook"}, "eu-west-1"}}},
{config1, "", []ARN{{TargetID{"1", "webhook"}, ""}}},
{config2, "us-east-1", []ARN{{TargetID{"1", "webhook"}, "us-east-1"}}},
{config2, "", []ARN{{TargetID{"1", "webhook"}, ""}}},
{config3, "us-east-1", []ARN{{TargetID{"1", "webhook"}, "us-east-1"}, {TargetID{"2", "amqp"}, "us-east-1"}}},
{config3, "", []ARN{{TargetID{"1", "webhook"}, ""}, {TargetID{"2", "amqp"}, ""}}},
}
for i, testCase := range testCases {
testCase.config.SetRegion(testCase.region)
result := []ARN{}
for _, queue := range testCase.config.QueueList {
result = append(result, queue.ARN)
}
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestConfigToRulesMap(t *testing.T) {
var data []byte
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config1 := &Config{}
if err := xml.Unmarshal(data, config1); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs::1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config2 := &Config{}
if err := xml.Unmarshal(data, config2); err != nil {
panic(err)
}
data = []byte(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<QueueConfiguration>
<Id>2</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:2:amqp</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
config3 := &Config{}
if err := xml.Unmarshal(data, config3); err != nil {
panic(err)
}
rulesMapCase1 := NewRulesMap([]Name{ObjectAccessedAll, ObjectCreatedAll, ObjectRemovedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase2 := NewRulesMap([]Name{ObjectCreatedPut}, "images/*jpg", TargetID{"1", "webhook"})
rulesMapCase3 := NewRulesMap([]Name{ObjectAccessedAll, ObjectCreatedAll, ObjectRemovedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase3.add([]Name{ObjectCreatedPut}, "images/*jpg", TargetID{"2", "amqp"})
testCases := []struct {
config *Config
expectedResult RulesMap
}{
{config1, rulesMapCase1},
{config2, rulesMapCase2},
{config3, rulesMapCase3},
}
for i, testCase := range testCases {
result := testCase.config.ToRulesMap()
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestParseConfig(t *testing.T) {
reader1 := strings.NewReader(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
reader2 := strings.NewReader(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
reader3 := strings.NewReader(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<QueueConfiguration>
<Id>2</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>images/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
`)
reader4 := strings.NewReader(`
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter></Filter>
<Queue>arn:minio:sqs:us-east-1:1:webhook</Queue>
<Event>s3:ObjectAccessed:*</Event>
<Event>s3:ObjectCreated:*</Event>
<Event>s3:ObjectRemoved:*</Event>
</QueueConfiguration>
<CloudFunctionConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>suffix</Name>
<Value>.jpg</Value>
</FilterRule>
</S3Key>
</Filter>
<Cloudcode>arn:aws:lambda:us-west-2:444455556666:cloud-function-A</Cloudcode>
<Event>s3:ObjectCreated:Put</Event>
</CloudFunctionConfiguration>
<TopicConfiguration>
<Topic>arn:aws:sns:us-west-2:444455556666:sns-notification-one</Topic>
<Event>s3:ObjectCreated:*</Event>
</TopicConfiguration>
</NotificationConfiguration>
`)
targetList1 := NewTargetList()
targetList2 := NewTargetList()
if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
reader *strings.Reader
region string
targetList *TargetList
expectErr bool
}{
{reader1, "eu-west-1", nil, true},
{reader2, "us-east-1", targetList1, true},
{reader4, "us-east-1", targetList1, true},
{reader3, "", targetList2, false},
{reader2, "us-east-1", targetList2, false},
}
for i, testCase := range testCases {
if _, err := testCase.reader.Seek(0, 0); err != nil {
panic(err)
}
_, err := ParseConfig(testCase.reader, testCase.region, testCase.targetList)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}

152
pkg/event/errors.go Normal file
View File

@@ -0,0 +1,152 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/xml"
"fmt"
)
// IsEventError - checks whether given error is event error or not.
func IsEventError(err error) bool {
switch err.(type) {
case ErrInvalidFilterName, *ErrInvalidFilterName:
return true
case ErrFilterNamePrefix, *ErrFilterNamePrefix:
return true
case ErrFilterNameSuffix, *ErrFilterNameSuffix:
return true
case ErrInvalidFilterValue, *ErrInvalidFilterValue:
return true
case ErrDuplicateEventName, *ErrDuplicateEventName:
return true
case ErrUnsupportedConfiguration, *ErrUnsupportedConfiguration:
return true
case ErrDuplicateQueueConfiguration, *ErrDuplicateQueueConfiguration:
return true
case ErrUnknownRegion, *ErrUnknownRegion:
return true
case ErrARNNotFound, *ErrARNNotFound:
return true
case ErrInvalidARN, *ErrInvalidARN:
return true
case ErrInvalidEventName, *ErrInvalidEventName:
return true
}
return false
}
// ErrInvalidFilterName - invalid filter name error.
type ErrInvalidFilterName struct {
FilterName string
}
func (err ErrInvalidFilterName) Error() string {
return fmt.Sprintf("invalid filter name '%v'", err.FilterName)
}
// ErrFilterNamePrefix - more than one prefix usage error.
type ErrFilterNamePrefix struct{}
func (err ErrFilterNamePrefix) Error() string {
return fmt.Sprintf("more than one prefix in filter rule")
}
// ErrFilterNameSuffix - more than one suffix usage error.
type ErrFilterNameSuffix struct{}
func (err ErrFilterNameSuffix) Error() string {
return fmt.Sprintf("more than one suffix in filter rule")
}
// ErrInvalidFilterValue - invalid filter value error.
type ErrInvalidFilterValue struct {
FilterValue string
}
func (err ErrInvalidFilterValue) Error() string {
return fmt.Sprintf("invalid filter value '%v'", err.FilterValue)
}
// ErrDuplicateEventName - duplicate event name error.
type ErrDuplicateEventName struct {
EventName Name
}
func (err ErrDuplicateEventName) Error() string {
return fmt.Sprintf("duplicate event name '%v' found", err.EventName)
}
// ErrUnsupportedConfiguration - unsupported configuration error.
type ErrUnsupportedConfiguration struct{}
func (err ErrUnsupportedConfiguration) Error() string {
return "topic or cloud function configuration is not supported"
}
// ErrDuplicateQueueConfiguration - duplicate queue configuration error.
type ErrDuplicateQueueConfiguration struct {
Queue Queue
}
func (err ErrDuplicateQueueConfiguration) Error() string {
var message string
if data, xerr := xml.Marshal(err.Queue); xerr != nil {
message = fmt.Sprintf("%+v", err.Queue)
} else {
message = string(data)
}
return fmt.Sprintf("duplicate queue configuration %v", message)
}
// ErrUnknownRegion - unknown region error.
type ErrUnknownRegion struct {
Region string
}
func (err ErrUnknownRegion) Error() string {
return fmt.Sprintf("unknown region '%v'", err.Region)
}
// ErrARNNotFound - ARN not found error.
type ErrARNNotFound struct {
ARN ARN
}
func (err ErrARNNotFound) Error() string {
return fmt.Sprintf("ARN '%v' not found", err.ARN)
}
// ErrInvalidARN - invalid ARN error.
type ErrInvalidARN struct {
ARN string
}
func (err ErrInvalidARN) Error() string {
return fmt.Sprintf("invalid ARN '%v'", err.ARN)
}
// ErrInvalidEventName - invalid event name error.
type ErrInvalidEventName struct {
Name string
}
func (err ErrInvalidEventName) Error() string {
return fmt.Sprintf("invalid event name '%v'", err.Name)
}

88
pkg/event/event.go Normal file
View File

@@ -0,0 +1,88 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
const (
// NamespaceFormat - namespace log format used in some event targets.
NamespaceFormat = "namespace"
// AccessFormat - access log format used in some event targets.
AccessFormat = "access"
// AMZTimeFormat - event time format.
AMZTimeFormat = "2006-01-02T15:04:05Z"
)
// Identity represents access key who caused the event.
type Identity struct {
PrincipalID string `json:"principalId"`
}
// Bucket represents bucket metadata of the event.
type Bucket struct {
Name string `json:"name"`
OwnerIdentity Identity `json:"ownerIdentity"`
ARN string `json:"arn"`
}
// Object represents object metadata of the event.
type Object struct {
Key string `json:"key"`
Size int64 `json:"size,omitempty"`
ETag string `json:"eTag,omitempty"`
ContentType string `json:"contentType,omitempty"`
UserMetadata map[string]string `json:"userMetadata,omitempty"`
VersionID string `json:"versionId,omitempty"`
Sequencer string `json:"sequencer"`
}
// Metadata represents event metadata.
type Metadata struct {
SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationID string `json:"configurationId"`
Bucket Bucket `json:"bucket"`
Object Object `json:"object"`
}
// Source represents client information who triggered the event.
type Source struct {
Host string `json:"host"`
Port string `json:"port"`
UserAgent string `json:"userAgent"`
}
// Event represents event notification information defined in
// http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html.
type Event struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
AwsRegion string `json:"awsRegion"`
EventTime string `json:"eventTime"`
EventName Name `json:"eventName"`
UserIdentity Identity `json:"userIdentity"`
RequestParameters map[string]string `json:"requestParameters"`
ResponseElements map[string]string `json:"responseElements"`
S3 Metadata `json:"s3"`
Source Source `json:"source"`
}
// Log represents event information for some event targets.
type Log struct {
EventName Name
Key string
Records []Event
}

152
pkg/event/name.go Normal file
View File

@@ -0,0 +1,152 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/json"
"encoding/xml"
)
// Name - event type enum.
// Refer http://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#notification-how-to-event-types-and-destinations
type Name int
// Values of Name
const (
ObjectAccessedAll Name = 1 + iota
ObjectAccessedGet
ObjectAccessedHead
ObjectCreatedAll
ObjectCreatedCompleteMultipartUpload
ObjectCreatedCopy
ObjectCreatedPost
ObjectCreatedPut
ObjectRemovedAll
ObjectRemovedDelete
)
// Expand - returns expanded values of abbreviated event type.
func (name Name) Expand() []Name {
switch name {
case ObjectAccessedAll:
return []Name{ObjectAccessedGet, ObjectAccessedHead}
case ObjectCreatedAll:
return []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy, ObjectCreatedPost, ObjectCreatedPut}
case ObjectRemovedAll:
return []Name{ObjectRemovedDelete}
default:
return []Name{name}
}
}
// String - returns string representation of event type.
func (name Name) String() string {
switch name {
case ObjectAccessedAll:
return "s3:ObjectAccessed:*"
case ObjectAccessedGet:
return "s3:ObjectAccessed:Get"
case ObjectAccessedHead:
return "s3:ObjectAccessed:Head"
case ObjectCreatedAll:
return "s3:ObjectCreated:*"
case ObjectCreatedCompleteMultipartUpload:
return "s3:ObjectCreated:CompleteMultipartUpload"
case ObjectCreatedCopy:
return "s3:ObjectCreated:Copy"
case ObjectCreatedPost:
return "s3:ObjectCreated:Post"
case ObjectCreatedPut:
return "s3:ObjectCreated:Put"
case ObjectRemovedAll:
return "s3:ObjectRemoved:*"
case ObjectRemovedDelete:
return "s3:ObjectRemoved:Delete"
}
return ""
}
// MarshalXML - encodes to XML data.
func (name Name) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
return e.EncodeElement(name.String(), start)
}
// UnmarshalXML - decodes XML data.
func (name *Name) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
var s string
if err := d.DecodeElement(&s, &start); err != nil {
return err
}
eventName, err := ParseName(s)
if err != nil {
return err
}
*name = eventName
return nil
}
// MarshalJSON - encodes to JSON data.
func (name Name) MarshalJSON() ([]byte, error) {
return json.Marshal(name.String())
}
// UnmarshalJSON - decodes JSON data.
func (name *Name) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
eventName, err := ParseName(s)
if err != nil {
return err
}
*name = eventName
return nil
}
// ParseName - parses string to Name.
func ParseName(s string) (Name, error) {
switch s {
case "s3:ObjectAccessed:*":
return ObjectAccessedAll, nil
case "s3:ObjectAccessed:Get":
return ObjectAccessedGet, nil
case "s3:ObjectAccessed:Head":
return ObjectAccessedHead, nil
case "s3:ObjectCreated:*":
return ObjectCreatedAll, nil
case "s3:ObjectCreated:CompleteMultipartUpload":
return ObjectCreatedCompleteMultipartUpload, nil
case "s3:ObjectCreated:Copy":
return ObjectCreatedCopy, nil
case "s3:ObjectCreated:Post":
return ObjectCreatedPost, nil
case "s3:ObjectCreated:Put":
return ObjectCreatedPut, nil
case "s3:ObjectRemoved:*":
return ObjectRemovedAll, nil
case "s3:ObjectRemoved:Delete":
return ObjectRemovedDelete, nil
default:
return 0, &ErrInvalidEventName{s}
}
}

220
pkg/event/name_test.go Normal file
View File

@@ -0,0 +1,220 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/json"
"encoding/xml"
"reflect"
"testing"
)
func TestNameExpand(t *testing.T) {
testCases := []struct {
name Name
expectedResult []Name
}{
{ObjectAccessedAll, []Name{ObjectAccessedGet, ObjectAccessedHead}},
{ObjectCreatedAll, []Name{ObjectCreatedCompleteMultipartUpload, ObjectCreatedCopy, ObjectCreatedPost, ObjectCreatedPut}},
{ObjectRemovedAll, []Name{ObjectRemovedDelete}},
{ObjectAccessedHead, []Name{ObjectAccessedHead}},
}
for i, testCase := range testCases {
result := testCase.name.Expand()
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestNameString(t *testing.T) {
var blankName Name
testCases := []struct {
name Name
expectedResult string
}{
{ObjectAccessedAll, "s3:ObjectAccessed:*"},
{ObjectAccessedGet, "s3:ObjectAccessed:Get"},
{ObjectAccessedHead, "s3:ObjectAccessed:Head"},
{ObjectCreatedAll, "s3:ObjectCreated:*"},
{ObjectCreatedCompleteMultipartUpload, "s3:ObjectCreated:CompleteMultipartUpload"},
{ObjectCreatedCopy, "s3:ObjectCreated:Copy"},
{ObjectCreatedPost, "s3:ObjectCreated:Post"},
{ObjectCreatedPut, "s3:ObjectCreated:Put"},
{ObjectRemovedAll, "s3:ObjectRemoved:*"},
{ObjectRemovedDelete, "s3:ObjectRemoved:Delete"},
{blankName, ""},
}
for i, testCase := range testCases {
result := testCase.name.String()
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestNameMarshalXML(t *testing.T) {
var blankName Name
testCases := []struct {
name Name
expectedData []byte
expectErr bool
}{
{ObjectAccessedAll, []byte("<Name>s3:ObjectAccessed:*</Name>"), false},
{ObjectRemovedDelete, []byte("<Name>s3:ObjectRemoved:Delete</Name>"), false},
{blankName, []byte("<Name></Name>"), false},
}
for i, testCase := range testCases {
data, err := xml.Marshal(testCase.name)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(data, testCase.expectedData) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, string(testCase.expectedData), string(data))
}
}
}
}
func TestNameUnmarshalXML(t *testing.T) {
var blankName Name
testCases := []struct {
data []byte
expectedName Name
expectErr bool
}{
{[]byte("<Name>s3:ObjectAccessed:*</Name>"), ObjectAccessedAll, false},
{[]byte("<Name>s3:ObjectRemoved:Delete</Name>"), ObjectRemovedDelete, false},
{[]byte("<Name></Name>"), blankName, true},
}
for i, testCase := range testCases {
var name Name
err := xml.Unmarshal(testCase.data, &name)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(name, testCase.expectedName) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedName, name)
}
}
}
}
func TestNameMarshalJSON(t *testing.T) {
var blankName Name
testCases := []struct {
name Name
expectedData []byte
expectErr bool
}{
{ObjectAccessedAll, []byte(`"s3:ObjectAccessed:*"`), false},
{ObjectRemovedDelete, []byte(`"s3:ObjectRemoved:Delete"`), false},
{blankName, []byte(`""`), false},
}
for i, testCase := range testCases {
data, err := json.Marshal(testCase.name)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(data, testCase.expectedData) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, string(testCase.expectedData), string(data))
}
}
}
}
func TestNameUnmarshalJSON(t *testing.T) {
var blankName Name
testCases := []struct {
data []byte
expectedName Name
expectErr bool
}{
{[]byte(`"s3:ObjectAccessed:*"`), ObjectAccessedAll, false},
{[]byte(`"s3:ObjectRemoved:Delete"`), ObjectRemovedDelete, false},
{[]byte(`""`), blankName, true},
}
for i, testCase := range testCases {
var name Name
err := json.Unmarshal(testCase.data, &name)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(name, testCase.expectedName) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedName, name)
}
}
}
}
func TestParseName(t *testing.T) {
var blankName Name
testCases := []struct {
s string
expectedName Name
expectErr bool
}{
{"s3:ObjectAccessed:*", ObjectAccessedAll, false},
{"s3:ObjectRemoved:Delete", ObjectRemovedDelete, false},
{"", blankName, true},
}
for i, testCase := range testCases {
name, err := ParseName(testCase.s)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(name, testCase.expectedName) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedName, name)
}
}
}
}

102
pkg/event/rules.go Normal file
View File

@@ -0,0 +1,102 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"strings"
"github.com/minio/minio/pkg/wildcard"
)
// NewPattern - create new pattern for prefix/suffix.
func NewPattern(prefix, suffix string) (pattern string) {
if prefix != "" {
if !strings.HasSuffix(prefix, "*") {
prefix += "*"
}
pattern = prefix
}
if suffix != "" {
if !strings.HasPrefix(suffix, "*") {
suffix = "*" + suffix
}
pattern += suffix
}
pattern = strings.Replace(pattern, "**", "*", -1)
return pattern
}
// Rules - event rules
type Rules map[string]TargetIDSet
// Add - adds pattern and target ID.
func (rules Rules) Add(pattern string, targetID TargetID) {
rules[pattern] = NewTargetIDSet(targetID).Union(rules[pattern])
}
// Match - returns TargetIDSet matching object name in rules.
func (rules Rules) Match(objectName string) TargetIDSet {
targetIDs := NewTargetIDSet()
for pattern, targetIDSet := range rules {
if wildcard.MatchSimple(pattern, objectName) {
targetIDs = targetIDs.Union(targetIDSet)
}
}
return targetIDs
}
// Clone - returns copy of this rules.
func (rules Rules) Clone() Rules {
rulesCopy := make(Rules)
for pattern, targetIDSet := range rules {
rulesCopy[pattern] = targetIDSet.Clone()
}
return rulesCopy
}
// Union - returns union with given rules as new rules.
func (rules Rules) Union(rules2 Rules) Rules {
nrules := rules.Clone()
for pattern, targetIDSet := range rules2 {
nrules[pattern] = nrules[pattern].Union(targetIDSet)
}
return nrules
}
// Difference - returns diffrence with given rules as new rules.
func (rules Rules) Difference(rules2 Rules) Rules {
nrules := make(Rules)
for pattern, targetIDSet := range rules {
if nv := targetIDSet.Difference(rules2[pattern]); len(nv) > 0 {
nrules[pattern] = nv
}
}
return nrules
}

275
pkg/event/rules_test.go Normal file
View File

@@ -0,0 +1,275 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"reflect"
"testing"
)
func TestNewPattern(t *testing.T) {
testCases := []struct {
prefix string
suffix string
expectedResult string
}{
{"", "", ""},
{"*", "", "*"},
{"", "*", "*"},
{"images/", "", "images/*"},
{"images/*", "", "images/*"},
{"", "jpg", "*jpg"},
{"", "*jpg", "*jpg"},
{"images/", "jpg", "images/*jpg"},
{"images/*", "jpg", "images/*jpg"},
{"images/", "*jpg", "images/*jpg"},
{"images/*", "*jpg", "images/*jpg"},
{"201*/images/", "jpg", "201*/images/*jpg"},
}
for i, testCase := range testCases {
result := NewPattern(testCase.prefix, testCase.suffix)
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestRulesAdd(t *testing.T) {
rulesCase1 := make(Rules)
rulesCase2 := make(Rules)
rulesCase2.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase3 := make(Rules)
rulesCase3.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase4 := make(Rules)
rulesCase4.Add(NewPattern("", "*.jpg"), TargetID{"1", "webhook"})
rulesCase5 := make(Rules)
rulesCase6 := make(Rules)
rulesCase6.Add(NewPattern("", "*.jpg"), TargetID{"1", "webhook"})
rulesCase7 := make(Rules)
rulesCase7.Add(NewPattern("", "*.jpg"), TargetID{"1", "webhook"})
rulesCase8 := make(Rules)
rulesCase8.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
testCases := []struct {
rules Rules
pattern string
targetID TargetID
expectedResult int
}{
{rulesCase1, NewPattern("*", ""), TargetID{"1", "webhook"}, 1},
{rulesCase2, NewPattern("*", ""), TargetID{"2", "amqp"}, 2},
{rulesCase3, NewPattern("2010*", ""), TargetID{"1", "webhook"}, 1},
{rulesCase4, NewPattern("*", ""), TargetID{"1", "webhook"}, 2},
{rulesCase5, NewPattern("", "*.jpg"), TargetID{"1", "webhook"}, 1},
{rulesCase6, NewPattern("", "*"), TargetID{"2", "amqp"}, 2},
{rulesCase7, NewPattern("", "*.jpg"), TargetID{"1", "webhook"}, 1},
{rulesCase8, NewPattern("", "*.jpg"), TargetID{"1", "webhook"}, 2},
}
for i, testCase := range testCases {
testCase.rules.Add(testCase.pattern, testCase.targetID)
result := len(testCase.rules)
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestRulesMatch(t *testing.T) {
rulesCase1 := make(Rules)
rulesCase2 := make(Rules)
rulesCase2.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
rulesCase3 := make(Rules)
rulesCase3.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase3.Add(NewPattern("", "*.png"), TargetID{"2", "amqp"})
rulesCase4 := make(Rules)
rulesCase4.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
testCases := []struct {
rules Rules
objectName string
expectedResult TargetIDSet
}{
{rulesCase1, "photos.jpg", NewTargetIDSet()},
{rulesCase2, "photos.jpg", NewTargetIDSet(TargetID{"1", "webhook"})},
{rulesCase3, "2010/photos.jpg", NewTargetIDSet(TargetID{"1", "webhook"})},
{rulesCase4, "2000/photos.jpg", NewTargetIDSet()},
}
for i, testCase := range testCases {
result := testCase.rules.Match(testCase.objectName)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestRulesClone(t *testing.T) {
rulesCase1 := make(Rules)
rulesCase2 := make(Rules)
rulesCase2.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase3 := make(Rules)
rulesCase3.Add(NewPattern("", "*.jpg"), TargetID{"1", "webhook"})
testCases := []struct {
rules Rules
prefix string
targetID TargetID
}{
{rulesCase1, "2010*", TargetID{"1", "webhook"}},
{rulesCase2, "2000*", TargetID{"2", "amqp"}},
{rulesCase3, "2010*", TargetID{"1", "webhook"}},
}
for i, testCase := range testCases {
result := testCase.rules.Clone()
if !reflect.DeepEqual(result, testCase.rules) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.rules, result)
}
result.Add(NewPattern(testCase.prefix, ""), testCase.targetID)
if reflect.DeepEqual(result, testCase.rules) {
t.Fatalf("test %v: result: expected: not equal, got: equal", i+1)
}
}
}
func TestRulesUnion(t *testing.T) {
rulesCase1 := make(Rules)
rules2Case1 := make(Rules)
expectedResultCase1 := make(Rules)
rulesCase2 := make(Rules)
rules2Case2 := make(Rules)
rules2Case2.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
expectedResultCase2 := make(Rules)
expectedResultCase2.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
rulesCase3 := make(Rules)
rulesCase3.Add(NewPattern("", "*"), TargetID{"1", "webhook"})
rules2Case3 := make(Rules)
expectedResultCase3 := make(Rules)
expectedResultCase3.Add(NewPattern("", "*"), TargetID{"1", "webhook"})
rulesCase4 := make(Rules)
rulesCase4.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rules2Case4 := make(Rules)
rules2Case4.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
expectedResultCase4 := make(Rules)
expectedResultCase4.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase5 := make(Rules)
rulesCase5.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rulesCase5.Add(NewPattern("", "*.png"), TargetID{"2", "amqp"})
rules2Case5 := make(Rules)
rules2Case5.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
expectedResultCase5 := make(Rules)
expectedResultCase5.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
expectedResultCase5.Add(NewPattern("", "*.png"), TargetID{"2", "amqp"})
expectedResultCase5.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
testCases := []struct {
rules Rules
rules2 Rules
expectedResult Rules
}{
{rulesCase1, rules2Case1, expectedResultCase1},
{rulesCase2, rules2Case2, expectedResultCase2},
{rulesCase3, rules2Case3, expectedResultCase3},
{rulesCase4, rules2Case4, expectedResultCase4},
{rulesCase5, rules2Case5, expectedResultCase5},
}
for i, testCase := range testCases {
result := testCase.rules.Union(testCase.rules2)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestRulesDifference(t *testing.T) {
rulesCase1 := make(Rules)
rules2Case1 := make(Rules)
expectedResultCase1 := make(Rules)
rulesCase2 := make(Rules)
rules2Case2 := make(Rules)
rules2Case2.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
expectedResultCase2 := make(Rules)
rulesCase3 := make(Rules)
rulesCase3.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
rules2Case3 := make(Rules)
expectedResultCase3 := make(Rules)
expectedResultCase3.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
rulesCase4 := make(Rules)
rulesCase4.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
rules2Case4 := make(Rules)
rules2Case4.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rules2Case4.Add(NewPattern("", "*.png"), TargetID{"2", "amqp"})
expectedResultCase4 := make(Rules)
expectedResultCase4.Add(NewPattern("*", "*"), TargetID{"1", "webhook"})
rulesCase5 := make(Rules)
rulesCase5.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
rulesCase5.Add(NewPattern("", "*"), TargetID{"2", "amqp"})
rules2Case5 := make(Rules)
rules2Case5.Add(NewPattern("2010*", ""), TargetID{"1", "webhook"})
rules2Case5.Add(NewPattern("", "*"), TargetID{"2", "amqp"})
expectedResultCase5 := make(Rules)
expectedResultCase5.Add(NewPattern("*", ""), TargetID{"1", "webhook"})
testCases := []struct {
rules Rules
rules2 Rules
expectedResult Rules
}{
{rulesCase1, rules2Case1, expectedResultCase1},
{rulesCase2, rules2Case2, expectedResultCase2},
{rulesCase3, rules2Case3, expectedResultCase3},
{rulesCase4, rules2Case4, expectedResultCase4},
{rulesCase5, rules2Case5, expectedResultCase5},
}
for i, testCase := range testCases {
result := testCase.rules.Difference(testCase.rules2)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}

78
pkg/event/rulesmap.go Normal file
View File

@@ -0,0 +1,78 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
// RulesMap - map of rules for every event name.
type RulesMap map[Name]Rules
// add - adds event names, prefixes, suffixes and target ID to rules map.
func (rulesMap RulesMap) add(eventNames []Name, pattern string, targetID TargetID) {
rules := make(Rules)
rules.Add(pattern, targetID)
for _, eventName := range eventNames {
for _, name := range eventName.Expand() {
rulesMap[name] = rulesMap[name].Union(rules)
}
}
}
// Clone - returns copy of this rules map.
func (rulesMap RulesMap) Clone() RulesMap {
rulesMapCopy := make(RulesMap)
for eventName, rules := range rulesMap {
rulesMapCopy[eventName] = rules.Clone()
}
return rulesMapCopy
}
// Add - adds given rules map.
func (rulesMap RulesMap) Add(rulesMap2 RulesMap) {
for eventName, rules := range rulesMap2 {
rulesMap[eventName] = rules.Union(rulesMap[eventName])
}
}
// Remove - removes given rules map.
func (rulesMap RulesMap) Remove(rulesMap2 RulesMap) {
for eventName, rules := range rulesMap {
if nr := rules.Difference(rulesMap2[eventName]); len(nr) != 0 {
rulesMap[eventName] = nr
} else {
delete(rulesMap, eventName)
}
}
}
// Match - returns TargetIDSet matching object name and event name in rules map.
func (rulesMap RulesMap) Match(eventName Name, objectName string) TargetIDSet {
return rulesMap[eventName].Match(objectName)
}
// NewRulesMap - creates new rules map with given values.
func NewRulesMap(eventNames []Name, pattern string, targetID TargetID) RulesMap {
// If pattern is empty, add '*' wildcard to match all.
if pattern == "" {
pattern = "*"
}
rulesMap := make(RulesMap)
rulesMap.add(eventNames, pattern, targetID)
return rulesMap
}

182
pkg/event/rulesmap_test.go Normal file
View File

@@ -0,0 +1,182 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"reflect"
"testing"
)
func TestRulesMapClone(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapToAddCase1 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapToAddCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
rulesMapCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
rulesMapToAddCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
testCases := []struct {
rulesMap RulesMap
rulesMapToAdd RulesMap
}{
{rulesMapCase1, rulesMapToAddCase1},
{rulesMapCase2, rulesMapToAddCase2},
{rulesMapCase3, rulesMapToAddCase3},
}
for i, testCase := range testCases {
result := testCase.rulesMap.Clone()
if !reflect.DeepEqual(result, testCase.rulesMap) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.rulesMap, result)
}
result.Add(testCase.rulesMapToAdd)
if reflect.DeepEqual(result, testCase.rulesMap) {
t.Fatalf("test %v: result: expected: not equal, got: equal", i+1)
}
}
}
func TestRulesMapAdd(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapToAddCase1 := make(RulesMap)
expectedResultCase1 := make(RulesMap)
rulesMapCase2 := make(RulesMap)
rulesMapToAddCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
expectedResultCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapToAddCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
expectedResultCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
expectedResultCase3.add([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
testCases := []struct {
rulesMap RulesMap
rulesMapToAdd RulesMap
expectedResult RulesMap
}{
{rulesMapCase1, rulesMapToAddCase1, expectedResultCase1},
{rulesMapCase2, rulesMapToAddCase2, expectedResultCase2},
{rulesMapCase3, rulesMapToAddCase3, expectedResultCase3},
}
for i, testCase := range testCases {
testCase.rulesMap.Add(testCase.rulesMapToAdd)
if !reflect.DeepEqual(testCase.rulesMap, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, testCase.rulesMap)
}
}
}
func TestRulesMapRemove(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapToAddCase1 := make(RulesMap)
expectedResultCase1 := make(RulesMap)
rulesMapCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapToAddCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
expectedResultCase2 := make(RulesMap)
rulesMapCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
rulesMapCase3.add([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapToAddCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
expectedResultCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
testCases := []struct {
rulesMap RulesMap
rulesMapToAdd RulesMap
expectedResult RulesMap
}{
{rulesMapCase1, rulesMapToAddCase1, expectedResultCase1},
{rulesMapCase2, rulesMapToAddCase2, expectedResultCase2},
{rulesMapCase3, rulesMapToAddCase3, expectedResultCase3},
}
for i, testCase := range testCases {
testCase.rulesMap.Remove(testCase.rulesMapToAdd)
if !reflect.DeepEqual(testCase.rulesMap, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, testCase.rulesMap)
}
}
}
func TestRulesMapMatch(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapCase2 := NewRulesMap([]Name{ObjectCreatedAll}, "*", TargetID{"1", "webhook"})
rulesMapCase3 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
rulesMapCase4 := NewRulesMap([]Name{ObjectCreatedAll}, "2010*.jpg", TargetID{"1", "webhook"})
rulesMapCase4.add([]Name{ObjectCreatedAll}, "*", TargetID{"2", "amqp"})
testCases := []struct {
rulesMap RulesMap
eventName Name
objectName string
expectedResult TargetIDSet
}{
{rulesMapCase1, ObjectCreatedPut, "2010/photo.jpg", NewTargetIDSet()},
{rulesMapCase2, ObjectCreatedPut, "2010/photo.jpg", NewTargetIDSet(TargetID{"1", "webhook"})},
{rulesMapCase3, ObjectCreatedPut, "2000/photo.png", NewTargetIDSet()},
{rulesMapCase4, ObjectCreatedPut, "2000/photo.png", NewTargetIDSet(TargetID{"2", "amqp"})},
}
for i, testCase := range testCases {
result := testCase.rulesMap.Match(testCase.eventName, testCase.objectName)
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestNewRulesMap(t *testing.T) {
rulesMapCase1 := make(RulesMap)
rulesMapCase1.add([]Name{ObjectAccessedGet, ObjectAccessedHead}, "*", TargetID{"1", "webhook"})
rulesMapCase2 := make(RulesMap)
rulesMapCase2.add([]Name{ObjectAccessedGet, ObjectAccessedHead, ObjectCreatedPut}, "*", TargetID{"1", "webhook"})
rulesMapCase3 := make(RulesMap)
rulesMapCase3.add([]Name{ObjectRemovedDelete}, "2010*.jpg", TargetID{"1", "webhook"})
testCases := []struct {
eventNames []Name
pattern string
targetID TargetID
expectedResult RulesMap
}{
{[]Name{ObjectAccessedAll}, "", TargetID{"1", "webhook"}, rulesMapCase1},
{[]Name{ObjectAccessedAll, ObjectCreatedPut}, "", TargetID{"1", "webhook"}, rulesMapCase2},
{[]Name{ObjectRemovedDelete}, "2010*.jpg", TargetID{"1", "webhook"}, rulesMapCase3},
}
for i, testCase := range testCases {
result := NewRulesMap(testCase.eventNames, testCase.pattern, testCase.targetID)
if !reflect.DeepEqual(result, testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}

150
pkg/event/target/amqp.go Normal file
View File

@@ -0,0 +1,150 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"net"
"net/url"
"sync"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/streadway/amqp"
)
// AMQPArgs - AMQP target arguments.
type AMQPArgs struct {
Enable bool `json:"enable"`
URL xnet.URL `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routingKey"`
ExchangeType string `json:"exchangeType"`
DeliveryMode uint8 `json:"deliveryMode"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
}
// AMQPTarget - AMQP target
type AMQPTarget struct {
id event.TargetID
args AMQPArgs
conn *amqp.Connection
connMutex sync.Mutex
}
// ID - returns TargetID.
func (target *AMQPTarget) ID() event.TargetID {
return target.id
}
func (target *AMQPTarget) channel() (*amqp.Channel, error) {
isAMQPClosedErr := func(err error) bool {
if err == amqp.ErrClosed {
return true
}
if nerr, ok := err.(*net.OpError); ok {
return (nerr.Err.Error() == "use of closed network connection")
}
return false
}
target.connMutex.Lock()
defer target.connMutex.Unlock()
ch, err := target.conn.Channel()
if err == nil {
return ch, nil
}
if !isAMQPClosedErr(err) {
return nil, err
}
var conn *amqp.Connection
if conn, err = amqp.Dial(target.args.URL.String()); err != nil {
return nil, err
}
if ch, err = conn.Channel(); err != nil {
return nil, err
}
target.conn = conn
return ch, nil
}
// Send - sends event to AMQP.
func (target *AMQPTarget) Send(eventData event.Event) error {
ch, err := target.channel()
if err != nil {
return err
}
defer func() {
// FIXME: log returned error. ignore time being.
_ = ch.Close()
}()
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable,
target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil {
return err
}
return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{
ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode,
Body: data,
})
}
// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
return nil
}
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs) (*AMQPTarget, error) {
conn, err := amqp.Dial(args.URL.String())
if err != nil {
return nil, err
}
return &AMQPTarget{
id: event.TargetID{id, "amqp"},
args: args,
conn: conn,
}, nil
}

View File

@@ -0,0 +1,132 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"context"
"fmt"
"net/url"
"time"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"gopkg.in/olivere/elastic.v5"
)
// ElasticsearchArgs - Elasticsearch target arguments.
type ElasticsearchArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
URL xnet.URL `json:"url"`
Index string `json:"index"`
}
// ElasticsearchTarget - Elasticsearch target.
type ElasticsearchTarget struct {
id event.TargetID
args ElasticsearchArgs
client *elastic.Client
}
// ID - returns target ID.
func (target *ElasticsearchTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Elasticsearch.
func (target *ElasticsearchTarget) Send(eventData event.Event) (err error) {
var key string
remove := func() error {
_, err := target.client.Delete().Index(target.args.Index).Type("event").Id(key).Do(context.Background())
return err
}
update := func() error {
_, err := target.client.Index().Index(target.args.Index).Type("event").BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Id(key).Do(context.Background())
return err
}
add := func() error {
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil {
return err
}
eventTimeMS := fmt.Sprintf("%d", eventTime.UnixNano()/1000000)
_, err = target.client.Index().Index(target.args.Index).Type("event").Timestamp(eventTimeMS).BodyJson(map[string]interface{}{"Records": []event.Event{eventData}}).Do(context.Background())
return err
}
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key = eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
err = remove()
} else {
err = update()
}
return err
}
if target.args.Format == event.AccessFormat {
return add()
}
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *ElasticsearchTarget) Close() error {
return nil
}
// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs) (*ElasticsearchTarget, error) {
client, err := elastic.NewClient(elastic.SetURL(args.URL.String()), elastic.SetSniff(false), elastic.SetMaxRetries(10))
if err != nil {
return nil, err
}
exists, err := client.IndexExists(args.Index).Do(context.Background())
if err != nil {
return nil, err
}
if !exists {
var createIndex *elastic.IndicesCreateResult
if createIndex, err = client.CreateIndex(args.Index).Do(context.Background()); err != nil {
return nil, err
}
if !createIndex.Acknowledged {
return nil, fmt.Errorf("index %v not created", args.Index)
}
}
return &ElasticsearchTarget{
id: event.TargetID{id, "elasticsearch"},
args: args,
client: client,
}, nil
}

View File

@@ -0,0 +1,141 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/skyrings/skyring-common/tools/uuid"
)
// HTTPClientTarget - HTTP client target.
type HTTPClientTarget struct {
id event.TargetID
w http.ResponseWriter
eventCh chan []byte
DoneCh chan struct{}
stopCh chan struct{}
isStopped uint32
isRunning uint32
}
// ID - returns target ID.
func (target HTTPClientTarget) ID() event.TargetID {
return target.id
}
func (target *HTTPClientTarget) start() {
go func() {
defer func() {
atomic.AddUint32(&target.isRunning, 1)
// Close DoneCh to indicate we are done.
close(target.DoneCh)
}()
write := func(event []byte) error {
if _, err := target.w.Write(event); err != nil {
return err
}
target.w.(http.Flusher).Flush()
return nil
}
for {
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
select {
case <-target.stopCh:
// We are asked to stop.
return
case event, ok := <-target.eventCh:
if !ok {
// Got read error. Exit the goroutine.
return
}
if err := write(event); err != nil {
// Got write error to the client. Exit the goroutine.
return
}
case <-keepAliveTicker.C:
if err := write([]byte(" ")); err != nil {
// Got write error to the client. Exit the goroutine.
return
}
}
}
}()
}
// Send - sends event to HTTP client.
func (target *HTTPClientTarget) Send(eventData event.Event) error {
if atomic.LoadUint32(&target.isRunning) != 0 {
return errors.New("closed http connection")
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
data = append(data, byte('\n'))
select {
case target.eventCh <- data:
return nil
case <-target.DoneCh:
return errors.New("error in sending event")
}
}
// Close - closes underneath goroutine.
func (target *HTTPClientTarget) Close() error {
atomic.AddUint32(&target.isStopped, 1)
if atomic.LoadUint32(&target.isStopped) == 1 {
close(target.stopCh)
}
return nil
}
func mustGetNewUUID() string {
uuid, err := uuid.New()
if err != nil {
panic(fmt.Sprintf("%s. Unable to generate random UUID", err))
}
return uuid.String()
}
// NewHTTPClientTarget - creates new HTTP client target.
func NewHTTPClientTarget(host xnet.Host, w http.ResponseWriter) *HTTPClientTarget {
c := &HTTPClientTarget{
id: event.TargetID{"httpclient" + "+" + mustGetNewUUID() + "+" + host.Name, host.Port.String()},
w: w,
eventCh: make(chan []byte),
DoneCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
c.start()
return c
}

97
pkg/event/target/kafka.go Normal file
View File

@@ -0,0 +1,97 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"net/url"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
sarama "gopkg.in/Shopify/sarama.v1"
)
// KafkaArgs - Kafka target arguments.
type KafkaArgs struct {
Enable bool `json:"enable"`
Brokers []xnet.Host `json:"brokers"`
Topic string `json:"topic"`
}
// KafkaTarget - Kafka target.
type KafkaTarget struct {
id event.TargetID
args KafkaArgs
producer sarama.SyncProducer
}
// ID - returns target ID.
func (target *KafkaTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Kafka.
func (target *KafkaTarget) Send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
msg := sarama.ProducerMessage{
Topic: target.args.Topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(data),
}
_, _, err = target.producer.SendMessage(&msg)
return err
}
// Close - closes underneath kafka connection.
func (target *KafkaTarget) Close() error {
return target.producer.Close()
}
// NewKafkaTarget - creates new Kafka target.
func NewKafkaTarget(id string, args KafkaArgs) (*KafkaTarget, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
brokers := []string{}
for _, broker := range args.Brokers {
brokers = append(brokers, broker.String())
}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &KafkaTarget{
id: event.TargetID{id, "kafka"},
args: args,
producer: producer,
}, nil
}

117
pkg/event/target/mqtt.go Normal file
View File

@@ -0,0 +1,117 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"net/url"
"time"
"github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// MQTTArgs - MQTT target arguments.
type MQTTArgs struct {
Enable bool `json:"enable"`
Broker xnet.URL `json:"broker"`
Topic string `json:"topic"`
QoS byte `json:"qos"`
ClientID string `json:"clientId"`
User string `json:"username"`
Password string `json:"password"`
MaxReconnectInterval time.Duration `json:"reconnectInterval"`
KeepAlive time.Duration `json:"keepAliveInterval"`
RootCAs *x509.CertPool `json:"-"`
}
// MQTTTarget - MQTT target.
type MQTTTarget struct {
id event.TargetID
args MQTTArgs
client mqtt.Client
}
// ID - returns target ID.
func (target *MQTTTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to MQTT.
func (target *MQTTTarget) Send(eventData event.Event) error {
if !target.client.IsConnected() {
token := target.client.Connect()
if token.Wait() {
if err := token.Error(); err != nil {
return err
}
}
}
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
token := target.client.Publish(target.args.Topic, target.args.QoS, false, string(data))
if token.Wait() {
return token.Error()
}
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *MQTTTarget) Close() error {
return nil
}
// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs) (*MQTTTarget, error) {
options := &mqtt.ClientOptions{
ClientID: args.ClientID,
CleanSession: true,
Username: args.User,
Password: args.Password,
MaxReconnectInterval: args.MaxReconnectInterval,
KeepAlive: args.KeepAlive,
TLSConfig: tls.Config{RootCAs: args.RootCAs},
}
options.AddBroker(args.Broker.String())
client := mqtt.NewClient(options)
token := client.Connect()
if token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MQTTTarget{
id: event.TargetID{id, "mqtt"},
args: args,
client: client,
}, nil
}

226
pkg/event/target/mysql.go Normal file
View File

@@ -0,0 +1,226 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// MySQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in MySQL. On each
// object removal, the corresponding row is deleted from the table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a command with the required structure is:
//
// CREATE TABLE myminio (
// key_name VARCHAR(2048),
// value JSONB,
// PRIMARY KEY (key_name),
// );
//
// MySQL's "INSERT ... ON DUPLICATE ..." feature (UPSERT) is used
// here. The implementation has been tested with MySQL Ver 14.14
// Distrib 5.7.17.
//
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package target
import (
"database/sql"
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/go-sql-driver/mysql"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
const (
mysqlTableExists = `SELECT 1 FROM %s;`
mysqlCreateNamespaceTable = `CREATE TABLE %s (key_name VARCHAR(2048), value JSON, PRIMARY KEY (key_name));`
mysqlCreateAccessTable = `CREATE TABLE %s (event_time DATETIME NOT NULL, event_data JSON);`
mysqlUpdateRow = `INSERT INTO %s (key_name, value) VALUES (?, ?) ON DUPLICATE KEY UPDATE value=VALUES(value);`
mysqlDeleteRow = `DELETE FROM %s WHERE key_name = ?;`
mysqlInsertRow = `INSERT INTO %s (event_time, event_data) VALUES (?, ?);`
)
// MySQLArgs - MySQL target arguments.
type MySQLArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
DSN string `json:"dsnString"`
Table string `json:"table"`
Host xnet.URL `json:"host"`
Port string `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
}
// MySQLTarget - MySQL target.
type MySQLTarget struct {
id event.TargetID
args MySQLArgs
updateStmt *sql.Stmt
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
}
// ID - returns target ID.
func (target *MySQLTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to MySQL.
func (target *MySQLTarget) Send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = target.deleteStmt.Exec(key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = target.updateStmt.Exec(key, data)
}
return err
}
if target.args.Format == event.AccessFormat {
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil {
return err
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
_, err = target.insertStmt.Exec(eventTime, data)
return err
}
return nil
}
// Close - closes underneath connections to MySQL database.
func (target *MySQLTarget) Close() error {
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
}
if target.deleteStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.deleteStmt.Close()
}
if target.insertStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.insertStmt.Close()
}
return target.db.Close()
}
// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs) (*MySQLTarget, error) {
if args.DSN == "" {
config := mysql.Config{
User: args.User,
Passwd: args.Password,
Net: "tcp",
Addr: args.Host.String() + ":" + args.Port,
DBName: args.Database,
}
args.DSN = config.FormatDSN()
}
db, err := sql.Open("mysql", args.DSN)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
if _, err = db.Exec(fmt.Sprintf(mysqlTableExists, args.Table)); err != nil {
createStmt := mysqlCreateNamespaceTable
if args.Format == event.AccessFormat {
createStmt = mysqlCreateAccessTable
}
if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil {
return nil, err
}
}
var updateStmt, deleteStmt, insertStmt *sql.Stmt
switch args.Format {
case event.NamespaceFormat:
// insert or update statement
if updateStmt, err = db.Prepare(fmt.Sprintf(mysqlUpdateRow, args.Table)); err != nil {
return nil, err
}
// delete statement
if deleteStmt, err = db.Prepare(fmt.Sprintf(mysqlDeleteRow, args.Table)); err != nil {
return nil, err
}
case event.AccessFormat:
// insert statement
if insertStmt, err = db.Prepare(fmt.Sprintf(mysqlInsertRow, args.Table)); err != nil {
return nil, err
}
}
return &MySQLTarget{
id: event.TargetID{id, "mysql"},
args: args,
updateStmt: updateStmt,
deleteStmt: deleteStmt,
insertStmt: insertStmt,
db: db,
}, nil
}

143
pkg/event/target/nats.go Normal file
View File

@@ -0,0 +1,143 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"net/url"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats"
)
// NATSArgs - NATS target arguments.
type NATSArgs struct {
Enable bool `json:"enable"`
Address xnet.Host `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
Streaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
ClientID string `json:"clientID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
} `json:"streaming"`
}
// NATSTarget - NATS target.
type NATSTarget struct {
id event.TargetID
args NATSArgs
natsConn *nats.Conn
stanConn stan.Conn
}
// ID - returns target ID.
func (target *NATSTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to NATS.
func (target *NATSTarget) Send(eventData event.Event) (err error) {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
if target.stanConn != nil {
if target.args.Streaming.Async {
_, err = target.stanConn.PublishAsync(target.args.Subject, data, nil)
} else {
err = target.stanConn.Publish(target.args.Subject, data)
}
} else {
err = target.natsConn.Publish(target.args.Subject, data)
}
return err
}
// Close - closes underneath connections to NATS server.
func (target *NATSTarget) Close() (err error) {
if target.stanConn != nil {
err = target.stanConn.Close()
}
if target.natsConn != nil {
target.natsConn.Close()
}
return err
}
// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn
var err error
if args.Streaming.Enable {
scheme := "nats"
if args.Secure {
scheme = "tls"
}
addressURL := scheme + "://" + args.Username + ":" + args.Password + "@" + args.Address.String()
clientID := args.Streaming.ClientID
if clientID == "" {
clientID = mustGetNewUUID()
}
connOpts := []stan.Option{stan.NatsURL(addressURL)}
if args.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts, stan.MaxPubAcksInflight(args.Streaming.MaxPubAcksInflight))
}
stanConn, err = stan.Connect(args.Streaming.ClusterID, clientID, connOpts...)
} else {
options := nats.DefaultOptions
options.Url = "nats://" + args.Address.String()
options.User = args.Username
options.Password = args.Password
options.Token = args.Token
options.Secure = args.Secure
natsConn, err = options.Connect()
}
if err != nil {
return nil, err
}
return &NATSTarget{
id: event.TargetID{id, "nats"},
args: args,
stanConn: stanConn,
natsConn: natsConn,
}, nil
}

View File

@@ -0,0 +1,233 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// PostgreSQL Notifier implementation. Two formats, "namespace" and
// "access" are supported.
//
// * Namespace format
//
// On each create or update object event in Minio Object storage
// server, a row is created or updated in the table in Postgres. On
// each object removal, the corresponding row is deleted from the
// table.
//
// A table with a specific structure (column names, column types, and
// primary key/uniqueness constraint) is used. The user may set the
// table name in the configuration. A sample SQL command that creates
// a table with the required structure is:
//
// CREATE TABLE myminio (
// key VARCHAR PRIMARY KEY,
// value JSONB
// );
//
// PostgreSQL's "INSERT ... ON CONFLICT ... DO UPDATE ..." feature
// (UPSERT) is used here, so the minimum version of PostgreSQL
// required is 9.5.
//
// * Access format
//
// On each event, a row is appended to the configured table. There is
// no deletion or modification of existing rows.
//
// A different table schema is used for this format. A sample SQL
// commant that creates a table with the required structure is:
//
// CREATE TABLE myminio (
// event_time TIMESTAMP WITH TIME ZONE NOT NULL,
// event_data JSONB
// );
package target
import (
"database/sql"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
_ "github.com/lib/pq" // Register postgres driver
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
const (
psqlTableExists = `SELECT 1 FROM %s;`
psqlCreateNamespaceTable = `CREATE TABLE %s (key VARCHAR PRIMARY KEY, value JSONB);`
psqlCreateAccessTable = `CREATE TABLE %s (event_time TIMESTAMP WITH TIME ZONE NOT NULL, event_data JSONB);`
psqlUpdateRow = `INSERT INTO %s (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value;`
psqlDeleteRow = `DELETE FROM %s WHERE key = $1;`
psqlInsertRow = `INSERT INTO %s (event_time, event_data) VALUES ($1, $2);`
)
// PostgreSQLArgs - PostgreSQL target arguments.
type PostgreSQLArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
ConnectionString string `json:"connectionString"`
Table string `json:"table"`
Host xnet.URL `json:"host"` // default: localhost
Port string `json:"port"` // default: 5432
User string `json:"user"` // default: user running minio
Password string `json:"password"` // default: no password
Database string `json:"database"` // default: same as user
}
// PostgreSQLTarget - PostgreSQL target.
type PostgreSQLTarget struct {
id event.TargetID
args PostgreSQLArgs
updateStmt *sql.Stmt
deleteStmt *sql.Stmt
insertStmt *sql.Stmt
db *sql.DB
}
// ID - returns target ID.
func (target *PostgreSQLTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to PostgreSQL.
func (target *PostgreSQLTarget) Send(eventData event.Event) error {
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = target.deleteStmt.Exec(key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = target.updateStmt.Exec(key, data)
}
return err
}
if target.args.Format == event.AccessFormat {
eventTime, err := time.Parse(event.AMZTimeFormat, eventData.EventTime)
if err != nil {
return err
}
data, err := json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}})
if err != nil {
return err
}
_, err = target.insertStmt.Exec(eventTime, data)
return err
}
return nil
}
// Close - closes underneath connections to PostgreSQL database.
func (target *PostgreSQLTarget) Close() error {
if target.updateStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.updateStmt.Close()
}
if target.deleteStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.deleteStmt.Close()
}
if target.insertStmt != nil {
// FIXME: log returned error. ignore time being.
_ = target.insertStmt.Close()
}
return target.db.Close()
}
// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs) (*PostgreSQLTarget, error) {
params := []string{args.ConnectionString}
if !args.Host.IsEmpty() {
params = append(params, "host="+args.Host.String())
}
if args.Port != "" {
params = append(params, "port="+args.Port)
}
if args.User != "" {
params = append(params, "user="+args.User)
}
if args.Password != "" {
params = append(params, "password="+args.Password)
}
if args.Database != "" {
params = append(params, "dbname="+args.Database)
}
connStr := strings.Join(params, " ")
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
if _, err = db.Exec(fmt.Sprintf(psqlTableExists, args.Table)); err != nil {
createStmt := psqlCreateNamespaceTable
if args.Format == event.AccessFormat {
createStmt = psqlCreateAccessTable
}
if _, err = db.Exec(fmt.Sprintf(createStmt, args.Table)); err != nil {
return nil, err
}
}
var updateStmt, deleteStmt, insertStmt *sql.Stmt
switch args.Format {
case event.NamespaceFormat:
// insert or update statement
if updateStmt, err = db.Prepare(fmt.Sprintf(psqlUpdateRow, args.Table)); err != nil {
return nil, err
}
// delete statement
if deleteStmt, err = db.Prepare(fmt.Sprintf(psqlDeleteRow, args.Table)); err != nil {
return nil, err
}
case event.AccessFormat:
// insert statement
if insertStmt, err = db.Prepare(fmt.Sprintf(psqlInsertRow, args.Table)); err != nil {
return nil, err
}
}
return &PostgreSQLTarget{
id: event.TargetID{id, "postgresql"},
args: args,
updateStmt: updateStmt,
deleteStmt: deleteStmt,
insertStmt: insertStmt,
db: db,
}, nil
}

156
pkg/event/target/redis.go Normal file
View File

@@ -0,0 +1,156 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"encoding/json"
"fmt"
"net/url"
"time"
"github.com/garyburd/redigo/redis"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// RedisArgs - Redis target arguments.
type RedisArgs struct {
Enable bool `json:"enable"`
Format string `json:"format"`
Addr xnet.Host `json:"address"`
Password string `json:"password"`
Key string `json:"key"`
}
// RedisTarget - Redis target.
type RedisTarget struct {
id event.TargetID
args RedisArgs
pool *redis.Pool
}
// ID - returns target ID.
func (target *RedisTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Redis.
func (target *RedisTarget) Send(eventData event.Event) error {
conn := target.pool.Get()
defer func() {
// FIXME: log returned error. ignore time being.
_ = conn.Close()
}()
if target.args.Format == event.NamespaceFormat {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
if eventData.EventName == event.ObjectRemovedDelete {
_, err = conn.Do("HDEL", target.args.Key, key)
} else {
var data []byte
if data, err = json.Marshal(struct{ Records []event.Event }{[]event.Event{eventData}}); err != nil {
return err
}
_, err = conn.Do("HSET", target.args.Key, key, data)
}
return err
}
if target.args.Format == event.AccessFormat {
data, err := json.Marshal([]interface{}{eventData.EventTime, []event.Event{eventData}})
if err != nil {
return err
}
_, err = conn.Do("RPUSH", target.args.Key, data)
return err
}
return nil
}
// Close - does nothing and available for interface compatibility.
func (target *RedisTarget) Close() error {
return nil
}
// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs) (*RedisTarget, error) {
pool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 2 * 60 * time.Second,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", args.Addr.String())
if err != nil {
return nil, err
}
if args.Password == "" {
return conn, nil
}
if _, err = conn.Do("AUTH", args.Password); err != nil {
// FIXME: log returned error. ignore time being.
_ = conn.Close()
return nil, err
}
return conn, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
conn := pool.Get()
defer func() {
// FIXME: log returned error. ignore time being.
_ = conn.Close()
}()
if _, err := conn.Do("PING"); err != nil {
return nil, err
}
typeAvailable, err := redis.String(conn.Do("TYPE", args.Key))
if err != nil {
return nil, err
}
if typeAvailable != "none" {
expectedType := "hash"
if args.Format == event.AccessFormat {
expectedType = "list"
}
if typeAvailable != expectedType {
return nil, fmt.Errorf("expected type %v does not match with available type %v", expectedType, typeAvailable)
}
}
return &RedisTarget{
id: event.TargetID{id, "redis"},
args: args,
pool: pool,
}, nil
}

113
pkg/event/target/webhook.go Normal file
View File

@@ -0,0 +1,113 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package target
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"time"
"github.com/minio/minio/pkg/event"
xnet "github.com/minio/minio/pkg/net"
)
// WebhookArgs - Webhook target arguments.
type WebhookArgs struct {
Enable bool `json:"enable"`
Endpoint xnet.URL `json:"endpoint"`
RootCAs *x509.CertPool `json:"-"`
}
// WebhookTarget - Webhook target.
type WebhookTarget struct {
id event.TargetID
args WebhookArgs
httpClient *http.Client
}
// ID - returns target ID.
func (target WebhookTarget) ID() event.TargetID {
return target.id
}
// Send - sends event to Webhook.
func (target *WebhookTarget) Send(eventData event.Event) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{eventData.EventName, key, []event.Event{eventData}})
if err != nil {
return err
}
req, err := http.NewRequest("POST", target.args.Endpoint.String(), bytes.NewReader(data))
if err != nil {
return err
}
// req.Header.Set("User-Agent", globalServerUserAgent)
req.Header.Set("Content-Type", "application/json")
resp, err := target.httpClient.Do(req)
if err != nil {
return err
}
// FIXME: log returned error. ignore time being.
_ = resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted, http.StatusContinue:
return nil
default:
return fmt.Errorf("sending event failed with %v", resp.Status)
}
}
// Close - does nothing and available for interface compatibility.
func (target *WebhookTarget) Close() error {
return nil
}
// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs) *WebhookTarget {
return &WebhookTarget{
id: event.TargetID{id, "webhook"},
args: args,
httpClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: args.RootCAs},
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 5 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: 3 * time.Second,
ExpectContinueTimeout: 2 * time.Second,
},
},
}
}

73
pkg/event/targetid.go Normal file
View File

@@ -0,0 +1,73 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"encoding/json"
"fmt"
"strings"
)
// TargetID - holds identification and name strings of notification target.
type TargetID struct {
ID string
Name string
}
// String - returns string representation.
func (tid TargetID) String() string {
return tid.ID + ":" + tid.Name
}
// ToARN - converts to ARN.
func (tid TargetID) ToARN(region string) ARN {
return ARN{TargetID: tid, region: region}
}
// MarshalJSON - encodes to JSON data.
func (tid TargetID) MarshalJSON() ([]byte, error) {
return json.Marshal(tid.String())
}
// UnmarshalJSON - decodes JSON data.
func (tid *TargetID) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
targetID, err := parseTargetID(s)
if err != nil {
return err
}
*tid = *targetID
return nil
}
// parseTargetID - parses string to TargetID.
func parseTargetID(s string) (*TargetID, error) {
tokens := strings.Split(s, ":")
if len(tokens) != 2 {
return nil, fmt.Errorf("invalid TargetID format '%v'", s)
}
return &TargetID{
ID: tokens[0],
Name: tokens[1],
}, nil
}

117
pkg/event/targetid_test.go Normal file
View File

@@ -0,0 +1,117 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"reflect"
"testing"
)
func TestTargetDString(t *testing.T) {
testCases := []struct {
tid TargetID
expectedResult string
}{
{TargetID{}, ":"},
{TargetID{"1", "webhook"}, "1:webhook"},
{TargetID{"httpclient+2e33cdee-fbec-4bdd-917e-7d8e3c5a2531", "localhost:55638"}, "httpclient+2e33cdee-fbec-4bdd-917e-7d8e3c5a2531:localhost:55638"},
}
for i, testCase := range testCases {
result := testCase.tid.String()
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestTargetDToARN(t *testing.T) {
tid := TargetID{"1", "webhook"}
testCases := []struct {
tid TargetID
region string
expectedARN ARN
}{
{tid, "", ARN{TargetID: tid, region: ""}},
{tid, "us-east-1", ARN{TargetID: tid, region: "us-east-1"}},
}
for i, testCase := range testCases {
arn := testCase.tid.ToARN(testCase.region)
if arn != testCase.expectedARN {
t.Fatalf("test %v: ARN: expected: %v, got: %v", i+1, testCase.expectedARN, arn)
}
}
}
func TestTargetDMarshalJSON(t *testing.T) {
testCases := []struct {
tid TargetID
expectedData []byte
expectErr bool
}{
{TargetID{}, []byte(`":"`), false},
{TargetID{"1", "webhook"}, []byte(`"1:webhook"`), false},
{TargetID{"httpclient+2e33cdee-fbec-4bdd-917e-7d8e3c5a2531", "localhost:55638"}, []byte(`"httpclient+2e33cdee-fbec-4bdd-917e-7d8e3c5a2531:localhost:55638"`), false},
}
for i, testCase := range testCases {
data, err := testCase.tid.MarshalJSON()
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if !reflect.DeepEqual(data, testCase.expectedData) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, string(testCase.expectedData), string(data))
}
}
}
}
func TestTargetDUnmarshalJSON(t *testing.T) {
testCases := []struct {
data []byte
expectedTargetID *TargetID
expectErr bool
}{
{[]byte(`""`), nil, true},
{[]byte(`"httpclient+2e33cdee-fbec-4bdd-917e-7d8e3c5a2531:localhost:55638"`), nil, true},
{[]byte(`":"`), &TargetID{}, false},
{[]byte(`"1:webhook"`), &TargetID{"1", "webhook"}, false},
}
for i, testCase := range testCases {
targetID := &TargetID{}
err := targetID.UnmarshalJSON(testCase.data)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
if *targetID != *testCase.expectedTargetID {
t.Fatalf("test %v: TargetID: expected: %v, got: %v", i+1, testCase.expectedTargetID, targetID)
}
}
}
}

82
pkg/event/targetidset.go Normal file
View File

@@ -0,0 +1,82 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import "fmt"
// TargetIDSet - Set representation of TargetIDs.
type TargetIDSet map[TargetID]struct{}
// ToSlice - returns TargetID slice from TargetIDSet.
func (set TargetIDSet) ToSlice() []TargetID {
keys := make([]TargetID, 0, len(set))
for k := range set {
keys = append(keys, k)
}
return keys
}
// String - returns string representation.
func (set TargetIDSet) String() string {
return fmt.Sprintf("%v", set.ToSlice())
}
// Clone - returns copy of this set.
func (set TargetIDSet) Clone() TargetIDSet {
setCopy := NewTargetIDSet()
for k, v := range set {
setCopy[k] = v
}
return setCopy
}
// add - adds TargetID to the set.
func (set TargetIDSet) add(targetID TargetID) {
set[targetID] = struct{}{}
}
// Union - returns union with given set as new set.
func (set TargetIDSet) Union(sset TargetIDSet) TargetIDSet {
nset := set.Clone()
for k := range sset {
nset.add(k)
}
return nset
}
// Difference - returns diffrence with given set as new set.
func (set TargetIDSet) Difference(sset TargetIDSet) TargetIDSet {
nset := NewTargetIDSet()
for k := range set {
if _, ok := sset[k]; !ok {
nset.add(k)
}
}
return nset
}
// NewTargetIDSet - creates new TargetID set with given TargetIDs.
func NewTargetIDSet(targetIDs ...TargetID) TargetIDSet {
set := make(TargetIDSet)
for _, targetID := range targetIDs {
set.add(targetID)
}
return set
}

View File

@@ -0,0 +1,159 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"reflect"
"testing"
)
func TestTargetIDSetToSlice(t *testing.T) {
testCases := []struct {
set TargetIDSet
expectedResult []TargetID
}{
{NewTargetIDSet(), []TargetID{}},
{NewTargetIDSet(TargetID{"1", "webhook"}), []TargetID{{"1", "webhook"}}},
{NewTargetIDSet(TargetID{"1", "webhook"}, TargetID{"2", "amqp"}), []TargetID{{"1", "webhook"}, {"2", "amqp"}}},
}
for i, testCase := range testCases {
result := testCase.set.ToSlice()
if len(result) != len(testCase.expectedResult) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
for _, targetID1 := range result {
var found bool
for _, targetID2 := range testCase.expectedResult {
if reflect.DeepEqual(targetID1, targetID2) {
found = true
break
}
}
if !found {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
}
func TestTargetIDSetString(t *testing.T) {
testCases := []struct {
set TargetIDSet
expectedResult string
}{
{NewTargetIDSet(), "[]"},
{NewTargetIDSet(TargetID{"1", "webhook"}), "[1:webhook]"},
}
for i, testCase := range testCases {
result := testCase.set.String()
if result != testCase.expectedResult {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestTargetIDSetClone(t *testing.T) {
testCases := []struct {
set TargetIDSet
targetIDToAdd TargetID
}{
{NewTargetIDSet(), TargetID{"1", "webhook"}},
{NewTargetIDSet(TargetID{"1", "webhook"}), TargetID{"2", "webhook"}},
{NewTargetIDSet(TargetID{"1", "webhook"}, TargetID{"2", "amqp"}), TargetID{"2", "webhook"}},
}
for i, testCase := range testCases {
result := testCase.set.Clone()
if !reflect.DeepEqual(result, testCase.set) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.set, result)
}
result.add(testCase.targetIDToAdd)
if reflect.DeepEqual(result, testCase.set) {
t.Fatalf("test %v: result: expected: not equal, got: equal", i+1)
}
}
}
func TestTargetIDSetUnion(t *testing.T) {
testCases := []struct {
set TargetIDSet
setToAdd TargetIDSet
expectedResult TargetIDSet
}{
{NewTargetIDSet(), NewTargetIDSet(), NewTargetIDSet()},
{NewTargetIDSet(), NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"1", "webhook"})},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(), NewTargetIDSet(TargetID{"1", "webhook"})},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"2", "amqp"}), NewTargetIDSet(TargetID{"1", "webhook"}, TargetID{"2", "amqp"})},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"1", "webhook"})},
}
for i, testCase := range testCases {
result := testCase.set.Union(testCase.setToAdd)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestTargetIDSetDifference(t *testing.T) {
testCases := []struct {
set TargetIDSet
setToRemove TargetIDSet
expectedResult TargetIDSet
}{
{NewTargetIDSet(), NewTargetIDSet(), NewTargetIDSet()},
{NewTargetIDSet(), NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet()},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(), NewTargetIDSet(TargetID{"1", "webhook"})},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"2", "amqp"}), NewTargetIDSet(TargetID{"1", "webhook"})},
{NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet(TargetID{"1", "webhook"}), NewTargetIDSet()},
}
for i, testCase := range testCases {
result := testCase.set.Difference(testCase.setToRemove)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestNewTargetIDSet(t *testing.T) {
testCases := []struct {
targetIDs []TargetID
expectedResult TargetIDSet
}{
{[]TargetID{}, NewTargetIDSet()},
{[]TargetID{{"1", "webhook"}}, NewTargetIDSet(TargetID{"1", "webhook"})},
{[]TargetID{{"1", "webhook"}, {"2", "amqp"}}, NewTargetIDSet(TargetID{"1", "webhook"}, TargetID{"2", "amqp"})},
}
for i, testCase := range testCases {
result := NewTargetIDSet(testCase.targetIDs...)
if !reflect.DeepEqual(testCase.expectedResult, result) {
t.Fatalf("test %v: result: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}

127
pkg/event/targetlist.go Normal file
View File

@@ -0,0 +1,127 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"fmt"
"sync"
)
// Target - event target interface
type Target interface {
ID() TargetID
Send(Event) error
Close() error
}
// TargetList - holds list of targets indexed by target ID.
type TargetList struct {
sync.RWMutex
targets map[TargetID]Target
}
// Add - adds unique target to target list.
func (list *TargetList) Add(target Target) error {
list.Lock()
defer list.Unlock()
if _, ok := list.targets[target.ID()]; ok {
return fmt.Errorf("target %v already exists", target.ID())
}
list.targets[target.ID()] = target
return nil
}
// Exists - checks whether target by target ID exists or not.
func (list *TargetList) Exists(id TargetID) bool {
list.RLock()
defer list.RUnlock()
_, found := list.targets[id]
return found
}
// Remove - closes and removes targets by given target IDs.
func (list *TargetList) Remove(ids ...TargetID) map[TargetID]error {
list.Lock()
defer list.Unlock()
errors := make(map[TargetID]error)
var wg sync.WaitGroup
for _, id := range ids {
if target, ok := list.targets[id]; ok {
wg.Add(1)
go func(id TargetID, target Target) {
defer wg.Done()
if err := target.Close(); err != nil {
errors[id] = err
}
}(id, target)
}
}
wg.Wait()
for _, id := range ids {
delete(list.targets, id)
}
return errors
}
// List - returns available target IDs.
func (list *TargetList) List() []TargetID {
list.RLock()
defer list.RUnlock()
keys := []TargetID{}
for k := range list.targets {
keys = append(keys, k)
}
return keys
}
// Send - sends events to targets identified by target IDs.
func (list *TargetList) Send(event Event, targetIDs ...TargetID) map[TargetID]error {
list.Lock()
defer list.Unlock()
errors := make(map[TargetID]error)
var wg sync.WaitGroup
for _, id := range targetIDs {
if target, ok := list.targets[id]; ok {
wg.Add(1)
go func(id TargetID, target Target) {
defer wg.Done()
if err := target.Send(event); err != nil {
errors[id] = err
}
}(id, target)
}
}
wg.Wait()
return errors
}
// NewTargetList - creates TargetList.
func NewTargetList() *TargetList {
return &TargetList{targets: make(map[TargetID]Target)}
}

View File

@@ -0,0 +1,272 @@
/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package event
import (
"crypto/rand"
"errors"
"reflect"
"testing"
"time"
)
type ExampleTarget struct {
id TargetID
sendErr bool
closeErr bool
}
func (target ExampleTarget) ID() TargetID {
return target.id
}
func (target ExampleTarget) Send(eventData Event) error {
b := make([]byte, 1)
if _, err := rand.Read(b); err != nil {
panic(err)
}
time.Sleep(time.Duration(b[0]) * time.Millisecond)
if target.sendErr {
return errors.New("send error")
}
return nil
}
func (target ExampleTarget) Close() error {
if target.closeErr {
return errors.New("close error")
}
return nil
}
func TestTargetListAdd(t *testing.T) {
targetListCase1 := NewTargetList()
targetListCase2 := NewTargetList()
if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase3 := NewTargetList()
if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
targetList *TargetList
target Target
expectedResult []TargetID
expectErr bool
}{
{targetListCase1, &ExampleTarget{TargetID{"1", "webhook"}, false, false}, []TargetID{{"1", "webhook"}}, false},
{targetListCase2, &ExampleTarget{TargetID{"1", "webhook"}, false, false}, []TargetID{{"2", "testcase"}, {"1", "webhook"}}, false},
{targetListCase3, &ExampleTarget{TargetID{"3", "testcase"}, false, false}, nil, true},
}
for i, testCase := range testCases {
err := testCase.targetList.Add(testCase.target)
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
if !testCase.expectErr {
result := testCase.targetList.List()
if len(result) != len(testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
for _, targetID1 := range result {
var found bool
for _, targetID2 := range testCase.expectedResult {
if reflect.DeepEqual(targetID1, targetID2) {
found = true
break
}
}
if !found {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
}
}
func TestTargetListExists(t *testing.T) {
targetListCase1 := NewTargetList()
targetListCase2 := NewTargetList()
if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase3 := NewTargetList()
if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
targetList *TargetList
targetID TargetID
expectedResult bool
}{
{targetListCase1, TargetID{"1", "webhook"}, false},
{targetListCase2, TargetID{"1", "webhook"}, false},
{targetListCase3, TargetID{"3", "testcase"}, true},
}
for i, testCase := range testCases {
result := testCase.targetList.Exists(testCase.targetID)
if result != testCase.expectedResult {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
func TestTargetListRemove(t *testing.T) {
targetListCase1 := NewTargetList()
targetListCase2 := NewTargetList()
if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase3 := NewTargetList()
if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, true}); err != nil {
panic(err)
}
testCases := []struct {
targetList *TargetList
targetID TargetID
expectErr bool
}{
{targetListCase1, TargetID{"1", "webhook"}, false},
{targetListCase2, TargetID{"1", "webhook"}, false},
{targetListCase3, TargetID{"3", "testcase"}, true},
}
for i, testCase := range testCases {
errors := testCase.targetList.Remove(testCase.targetID)
err := errors[testCase.targetID]
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestTargetListList(t *testing.T) {
targetListCase1 := NewTargetList()
targetListCase2 := NewTargetList()
if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase3 := NewTargetList()
if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil {
panic(err)
}
if err := targetListCase3.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil {
panic(err)
}
testCases := []struct {
targetList *TargetList
expectedResult []TargetID
}{
{targetListCase1, []TargetID{}},
{targetListCase2, []TargetID{{"2", "testcase"}}},
{targetListCase3, []TargetID{{"3", "testcase"}, {"1", "webhook"}}},
}
for i, testCase := range testCases {
result := testCase.targetList.List()
if len(result) != len(testCase.expectedResult) {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
for _, targetID1 := range result {
var found bool
for _, targetID2 := range testCase.expectedResult {
if reflect.DeepEqual(targetID1, targetID2) {
found = true
break
}
}
if !found {
t.Fatalf("test %v: data: expected: %v, got: %v", i+1, testCase.expectedResult, result)
}
}
}
}
func TestTargetListSend(t *testing.T) {
targetListCase1 := NewTargetList()
targetListCase2 := NewTargetList()
if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase3 := NewTargetList()
if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil {
panic(err)
}
targetListCase4 := NewTargetList()
if err := targetListCase4.Add(&ExampleTarget{TargetID{"4", "testcase"}, true, false}); err != nil {
panic(err)
}
testCases := []struct {
targetList *TargetList
targetID TargetID
expectErr bool
}{
{targetListCase1, TargetID{"1", "webhook"}, false},
{targetListCase2, TargetID{"1", "non-existent"}, false},
{targetListCase3, TargetID{"3", "testcase"}, false},
{targetListCase4, TargetID{"4", "testcase"}, true},
}
for i, testCase := range testCases {
errors := testCase.targetList.Send(Event{}, testCase.targetID)
err := errors[testCase.targetID]
expectErr := (err != nil)
if expectErr != testCase.expectErr {
t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr)
}
}
}
func TestNewTargetList(t *testing.T) {
if result := NewTargetList(); result == nil {
t.Fatalf("test: result: expected: <non-nil>, got: <nil>")
}
}