diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index d6a1814a0..c1ea94a6a 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -19,6 +19,7 @@ package cmd import ( "reflect" "testing" + "time" ) // Tests event notify. @@ -38,7 +39,9 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { // remove the root folder after the test ends. defer removeAll(rootPath) - initEventNotifier(obj) + if err := initEventNotifier(obj); err != nil { + t.Fatal("Unexpected error:", err) + } // Notify object created event. eventNotify(eventData{ @@ -72,7 +75,7 @@ func testEventNotify(obj ObjectLayer, instanceType string, t TestErrHandler) { // Notify object created event. eventNotify(eventData{ - Type: ObjectCreatedPost, + Type: ObjectRemovedDelete, Bucket: bucketName, ObjInfo: ObjectInfo{ Bucket: bucketName, @@ -140,3 +143,236 @@ func TestInitEventNotifier(t *testing.T) { } } } + +// Test InitEventNotifier with faulty disks +func TestInitEventNotifierFaultyDisks(t *testing.T) { + // Prepare for tests + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + disk, err := getRandomDisks(1) + defer removeAll(disk[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + obj, err := getSingleNodeObjectLayer(disk[0]) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + bucketName := "bucket" + if err := obj.MakeBucket(bucketName); err != nil { + t.Fatal("Unexpected error:", err) + } + + fs := obj.(fsObjects) + fsstorage := fs.storage.(*posix) + + listenARN := "arn:minio:sns:us-east-1:1:listen" + queueARN := "arn:minio:sqs:us-east-1:1:redis" + + // Write a notification.xml in the disk + notificationXML := "" + notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + "" + notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + "" + notificationXML += "" + if err := fsstorage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { + t.Fatal("Unexpected error:", err) + } + + // Test initEventNotifier() with faulty disks + for i := 1; i <= 5; i++ { + fs.storage = newNaughtyDisk(fsstorage, map[int]error{i: errFaultyDisk}, nil) + if err := initEventNotifier(fs); errorCause(err) != errFaultyDisk { + t.Fatal("Unexpected error:", err) + } + } +} + +// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared +func TestInitEventNotifierWithAMQP(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + disk, err := getRandomDisks(1) + defer removeAll(disk[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + fs, err := getSingleNodeObjectLayer(disk[0]) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("AMQP config didn't fail.") + } +} + +// InitEventNotifierWithElasticSearch - test InitEventNotifier when ElasticSearch is not ready +func TestInitEventNotifierWithElasticSearch(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + disk, err := getRandomDisks(1) + defer removeAll(disk[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + fs, err := getSingleNodeObjectLayer(disk[0]) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("ElasticSearch config didn't fail.") + } +} + +// InitEventNotifierWithRedis - test InitEventNotifier when Redis is not ready +func TestInitEventNotifierWithRedis(t *testing.T) { + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + disk, err := getRandomDisks(1) + defer removeAll(disk[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + fs, err := getSingleNodeObjectLayer(disk[0]) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true}) + if err := initEventNotifier(fs); err == nil { + t.Fatal("Redis config didn't fail.") + } +} + +// TestListenBucketNotification - test Listen Bucket Notification process +func TestListenBucketNotification(t *testing.T) { + + bucketName := "bucket" + objectName := "object" + + // Prepare for tests + // Create fs backend + rootPath, err := newTestConfig("us-east-1") + if err != nil { + t.Fatalf("Init Test config failed") + } + // remove the root folder after the test ends. + defer removeAll(rootPath) + + disk, err := getRandomDisks(1) + defer removeAll(disk[0]) + if err != nil { + t.Fatal("Unable to create directories for FS backend. ", err) + } + obj, err := getSingleNodeObjectLayer(disk[0]) + if err != nil { + t.Fatal("Unable to initialize FS backend.", err) + } + + // Create the bucket to listen on + if err := obj.MakeBucket(bucketName); err != nil { + t.Fatal("Unexpected error:", err) + } + + listenARN := "arn:minio:sns:us-east-1:1:listen" + queueARN := "arn:minio:sqs:us-east-1:1:redis" + + fs := obj.(fsObjects) + storage := fs.storage.(*posix) + + // Create and store notification.xml with listen and queue notification configured + notificationXML := "" + notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + listenARN + "" + notificationXML += "s3:ObjectRemoved:*s3:ObjectRemoved:*" + queueARN + "" + notificationXML += "" + if err := storage.AppendFile(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, []byte(notificationXML)); err != nil { + t.Fatal("Unexpected error:", err) + } + + // Init event notifier + if err := initEventNotifier(fs); err != nil { + t.Fatal("Unexpected error:", err) + } + + // Check if the config is loaded + notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucketName) + if notificationCfg == nil { + t.Fatal("Cannot load bucket notification config") + } + if len(notificationCfg.TopicConfigs) != 1 || len(notificationCfg.QueueConfigs) != 1 { + t.Fatal("Notification config is not correctly loaded. Exactly one topic and one queue config are expected") + } + + // Check if listen notification config is enabled + if !isMinioSNSConfigured(listenARN, notificationCfg.TopicConfigs) { + t.Fatal("SNS listen is not configured.") + } + + // Create a new notification event channel. + nEventCh := make(chan []NotificationEvent) + // Close the listener channel. + defer close(nEventCh) + // Set sns target. + globalEventNotifier.SetSNSTarget(listenARN, nEventCh) + // Remove sns listener after the writer has closed or the client disconnected. + defer globalEventNotifier.RemoveSNSTarget(listenARN, nEventCh) + + // Fire an event notification + go eventNotify(eventData{ + Type: ObjectRemovedDelete, + Bucket: bucketName, + ObjInfo: ObjectInfo{ + Bucket: bucketName, + Name: objectName, + }, + ReqParams: map[string]string{ + "sourceIPAddress": "localhost:1337", + }, + }) + + // Wait for the event notification here, if nothing is received within 30 seconds, + // test error will be fired + select { + case n := <-nEventCh: + // Check that received event + if len(n) == 0 { + t.Fatal("Unexpected error occured") + } + if n[0].S3.Object.Key != objectName { + t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName) + } + break + case <-time.After(30 * time.Second): + break + } +}