mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
1d8a8c63db
Verify() was being called by caller after the data has been successfully read after io.EOF. This disconnection opens a race under concurrent access to such an object. Verification is not necessary outside of Read() call, we can simply just do checksum verification right inside Read() call at io.EOF. This approach simplifies the usage.
588 lines
17 KiB
Go
588 lines
17 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// Test InitEventNotifier with faulty disks
|
|
func TestInitEventNotifierFaultyDisks(t *testing.T) {
|
|
// Prepare for tests
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(4)
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
defer removeRoots(disks)
|
|
obj, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
bucketName := "bucket"
|
|
if err := obj.MakeBucketWithLocation(bucketName, ""); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
xl := obj.(*xlObjects)
|
|
|
|
listenARN := "arn:minio:sns:us-east-1:1:listen"
|
|
queueARN := "arn:minio:sqs:us-east-1:1:redis"
|
|
|
|
// Write a notification.xml in the disk
|
|
notificationXML := "<NotificationConfiguration>"
|
|
notificationXML += "<TopicConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Topic>" + listenARN + "</Topic></TopicConfiguration>"
|
|
notificationXML += "<QueueConfiguration><Event>s3:ObjectRemoved:*</Event><Event>s3:ObjectRemoved:*</Event><Queue>" + queueARN + "</Queue></QueueConfiguration>"
|
|
notificationXML += "</NotificationConfiguration>"
|
|
size := int64(len([]byte(notificationXML)))
|
|
reader := bytes.NewReader([]byte(notificationXML))
|
|
if _, err := xl.PutObject(minioMetaBucket, bucketConfigPrefix+"/"+bucketName+"/"+bucketNotificationConfig, mustGetHashReader(t, reader, size, "", ""), nil); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
for i, d := range xl.storageDisks {
|
|
xl.storageDisks[i] = newNaughtyDisk(d.(*retryStorage), nil, errFaultyDisk)
|
|
}
|
|
// Test initEventNotifier() with faulty disks
|
|
for i := 1; i <= 3; i++ {
|
|
if err := initEventNotifier(xl); errorCause(err) != errFaultyDisk {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithPostgreSQL - tests InitEventNotifier when PostgreSQL is not prepared
|
|
func TestInitEventNotifierWithPostgreSQL(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetPostgreSQLByID("1", postgreSQLNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("PostgreSQL config didn't fail.")
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithNATS - tests InitEventNotifier when NATS is not prepared
|
|
func TestInitEventNotifierWithNATS(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetNATSByID("1", natsNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("NATS config didn't fail.")
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithWebHook - tests InitEventNotifier when WebHook is not prepared
|
|
func TestInitEventNotifierWithWebHook(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetWebhookByID("1", webhookNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("WebHook config didn't fail.")
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
|
|
func TestInitEventNotifierWithAMQP(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetAMQPByID("1", amqpNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("AMQP config didn't fail.")
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithElasticSearch - test InitEventNotifier when ElasticSearch is not ready
|
|
func TestInitEventNotifierWithElasticSearch(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetElasticSearchByID("1", elasticSearchNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("ElasticSearch config didn't fail.")
|
|
}
|
|
}
|
|
|
|
// InitEventNotifierWithRedis - test InitEventNotifier when Redis is not ready
|
|
func TestInitEventNotifierWithRedis(t *testing.T) {
|
|
// initialize the server and obtain the credentials and root.
|
|
// credentials are necessary to sign the HTTP request.
|
|
rootPath, err := newTestConfig(globalMinioDefaultRegion)
|
|
if err != nil {
|
|
t.Fatalf("Init Test config failed")
|
|
}
|
|
// remove the root directory after the test ends.
|
|
defer os.RemoveAll(rootPath)
|
|
|
|
disks, err := getRandomDisks(1)
|
|
defer os.RemoveAll(disks[0])
|
|
if err != nil {
|
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
|
}
|
|
fs, _, err := initObjectLayer(mustGetNewEndpointList(disks...))
|
|
if err != nil {
|
|
t.Fatal("Unable to initialize FS backend.", err)
|
|
}
|
|
|
|
serverConfig.Notify.SetRedisByID("1", redisNotify{Enable: true})
|
|
if err := initEventNotifier(fs); err == nil {
|
|
t.Fatal("Redis config didn't fail.")
|
|
}
|
|
}
|
|
|
|
type TestPeerRPCServerData struct {
|
|
serverType string
|
|
testServer TestServer
|
|
}
|
|
|
|
func (s *TestPeerRPCServerData) Setup(t *testing.T) {
|
|
s.testServer = StartTestPeersRPCServer(t, s.serverType)
|
|
|
|
// setup port and minio addr
|
|
host, port := mustSplitHostPort(s.testServer.Server.Listener.Addr().String())
|
|
globalMinioHost = host
|
|
globalMinioPort = port
|
|
globalMinioAddr = getEndpointsLocalAddr(s.testServer.endpoints)
|
|
|
|
// initialize the peer client(s)
|
|
initGlobalS3Peers(s.testServer.Disks)
|
|
}
|
|
|
|
func (s *TestPeerRPCServerData) TearDown() {
|
|
s.testServer.Stop()
|
|
_ = os.RemoveAll(s.testServer.Root)
|
|
for _, d := range s.testServer.Disks {
|
|
_ = os.RemoveAll(d.Path)
|
|
}
|
|
}
|
|
|
|
func TestSetNGetBucketNotification(t *testing.T) {
|
|
s := TestPeerRPCServerData{serverType: "XL"}
|
|
|
|
// setup and teardown
|
|
s.Setup(t)
|
|
defer s.TearDown()
|
|
|
|
bucketName := getRandomBucketName()
|
|
|
|
obj := s.testServer.Obj
|
|
if err := initEventNotifier(obj); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
globalEventNotifier.SetBucketNotificationConfig(bucketName, ¬ificationConfig{})
|
|
nConfig := globalEventNotifier.GetBucketNotificationConfig(bucketName)
|
|
if nConfig == nil {
|
|
t.Errorf("Notification expected to be set, but notification not set.")
|
|
}
|
|
|
|
if !reflect.DeepEqual(nConfig, ¬ificationConfig{}) {
|
|
t.Errorf("Mismatching notification configs.")
|
|
}
|
|
}
|
|
|
|
func TestInitEventNotifier(t *testing.T) {
|
|
currentIsDistXL := globalIsDistXL
|
|
defer func() {
|
|
globalIsDistXL = currentIsDistXL
|
|
}()
|
|
|
|
s := TestPeerRPCServerData{serverType: "XL"}
|
|
|
|
// setup and teardown
|
|
s.Setup(t)
|
|
defer s.TearDown()
|
|
|
|
// test if empty object layer arg. returns expected error.
|
|
if err := initEventNotifier(nil); err == nil || err != errInvalidArgument {
|
|
t.Fatalf("initEventNotifier returned unexpected error value - %v", err)
|
|
}
|
|
|
|
obj := s.testServer.Obj
|
|
bucketName := getRandomBucketName()
|
|
// declare sample configs
|
|
filterRules := []filterRule{
|
|
{
|
|
Name: "prefix",
|
|
Value: "minio",
|
|
},
|
|
{
|
|
Name: "suffix",
|
|
Value: "*.jpg",
|
|
},
|
|
}
|
|
sampleSvcCfg := ServiceConfig{
|
|
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
|
|
filterStruct{
|
|
keyFilter{filterRules},
|
|
},
|
|
"1",
|
|
}
|
|
sampleNotifCfg := notificationConfig{
|
|
QueueConfigs: []queueConfig{
|
|
{
|
|
ServiceConfig: sampleSvcCfg,
|
|
QueueARN: "testqARN",
|
|
},
|
|
},
|
|
}
|
|
sampleListenCfg := []listenerConfig{
|
|
{
|
|
TopicConfig: topicConfig{ServiceConfig: sampleSvcCfg,
|
|
TopicARN: "testlARN"},
|
|
TargetServer: globalMinioAddr,
|
|
},
|
|
}
|
|
|
|
// create bucket
|
|
if err := obj.MakeBucketWithLocation(bucketName, ""); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
// bucket is created, now writing should not give errors.
|
|
if err := persistNotificationConfig(bucketName, &sampleNotifCfg, obj); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
if err := persistListenerConfig(bucketName, sampleListenCfg, obj); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
// needed to load listener config from disk for testing (in
|
|
// single peer mode, the listener config is ignored, but here
|
|
// we want to test the loading from disk too.)
|
|
globalIsDistXL = true
|
|
|
|
// test event notifier init
|
|
if err := initEventNotifier(obj); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
// fetch bucket configs and verify
|
|
ncfg := globalEventNotifier.GetBucketNotificationConfig(bucketName)
|
|
if ncfg == nil {
|
|
t.Error("Bucket notification was not present for ", bucketName)
|
|
}
|
|
if len(ncfg.QueueConfigs) != 1 || ncfg.QueueConfigs[0].QueueARN != "testqARN" {
|
|
t.Error("Unexpected bucket notification found - ", *ncfg)
|
|
}
|
|
if globalEventNotifier.GetExternalTarget("testqARN") != nil {
|
|
t.Error("A logger was not expected to be found as it was not enabled in the config.")
|
|
}
|
|
|
|
lcfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
|
|
if lcfg == nil {
|
|
t.Error("Bucket listener was not present for ", bucketName)
|
|
}
|
|
if len(lcfg) != 1 || lcfg[0].TargetServer != globalMinioAddr || lcfg[0].TopicConfig.TopicARN != "testlARN" {
|
|
t.Error("Unexpected listener config found - ", lcfg[0])
|
|
}
|
|
if globalEventNotifier.GetInternalTarget("testlARN") == nil {
|
|
t.Error("A listen logger was not found.")
|
|
}
|
|
}
|
|
|
|
func TestListenBucketNotification(t *testing.T) {
|
|
currentIsDistXL := globalIsDistXL
|
|
defer func() {
|
|
globalIsDistXL = currentIsDistXL
|
|
}()
|
|
|
|
s := TestPeerRPCServerData{serverType: "XL"}
|
|
// setup and teardown
|
|
s.Setup(t)
|
|
defer s.TearDown()
|
|
|
|
// test initialisation
|
|
obj := s.testServer.Obj
|
|
|
|
bucketName := "bucket"
|
|
objectName := "object"
|
|
|
|
// Create the bucket to listen on
|
|
if err := obj.MakeBucketWithLocation(bucketName, ""); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
listenARN := fmt.Sprintf("%s:%s:1:%s-%s",
|
|
minioTopic,
|
|
serverConfig.GetRegion(),
|
|
snsTypeMinio,
|
|
s.testServer.Server.Listener.Addr(),
|
|
)
|
|
lcfg := listenerConfig{
|
|
TopicConfig: topicConfig{
|
|
ServiceConfig{
|
|
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
|
|
filterStruct{},
|
|
"0",
|
|
},
|
|
listenARN,
|
|
},
|
|
TargetServer: globalMinioAddr,
|
|
}
|
|
|
|
// write listener config to storage layer
|
|
lcfgs := []listenerConfig{lcfg}
|
|
if err := persistListenerConfig(bucketName, lcfgs, obj); err != nil {
|
|
t.Fatalf("Test Setup error: %v", err)
|
|
}
|
|
|
|
// needed to load listener config from disk for testing (in
|
|
// single peer mode, the listener config is ingored, but here
|
|
// we want to test the loading from disk too.)
|
|
globalIsDistXL = true
|
|
|
|
// Init event notifier
|
|
if err := initEventNotifier(obj); err != nil {
|
|
t.Fatal("Unexpected error:", err)
|
|
}
|
|
|
|
// Check if the config is loaded
|
|
listenerCfg := globalEventNotifier.GetBucketListenerConfig(bucketName)
|
|
if listenerCfg == nil {
|
|
t.Fatal("Cannot load bucket listener config")
|
|
}
|
|
if len(listenerCfg) != 1 {
|
|
t.Fatal("Listener config is not correctly loaded. Exactly one listener config is expected")
|
|
}
|
|
|
|
// Check if topic ARN is correct
|
|
if listenerCfg[0].TopicConfig.TopicARN != listenARN {
|
|
t.Fatal("Configured topic ARN is incorrect.")
|
|
}
|
|
|
|
// Create a new notification event channel.
|
|
nEventCh := make(chan []NotificationEvent)
|
|
// Close the listener channel.
|
|
defer close(nEventCh)
|
|
// Add events channel for listener.
|
|
if err := globalEventNotifier.AddListenerChan(listenARN, nEventCh); err != nil {
|
|
t.Fatalf("Test Setup error: %v", err)
|
|
}
|
|
// Remove listen channel after the writer has closed or the
|
|
// client disconnected.
|
|
defer globalEventNotifier.RemoveListenerChan(listenARN)
|
|
|
|
// Fire an event notification
|
|
go eventNotify(eventData{
|
|
Type: ObjectRemovedDelete,
|
|
Bucket: bucketName,
|
|
ObjInfo: ObjectInfo{
|
|
Bucket: bucketName,
|
|
Name: objectName,
|
|
},
|
|
ReqParams: map[string]string{
|
|
"sourceIPAddress": "localhost:1337",
|
|
},
|
|
})
|
|
|
|
// Wait for the event notification here, if nothing is received within 30 seconds,
|
|
// test error will be fired
|
|
select {
|
|
case n := <-nEventCh:
|
|
// Check that received event
|
|
if len(n) == 0 {
|
|
t.Fatal("Unexpected error occurred")
|
|
}
|
|
if n[0].S3.Object.Key != objectName {
|
|
t.Fatalf("Received wrong object name in notification, expected %s, received %s", n[0].S3.Object.Key, objectName)
|
|
}
|
|
break
|
|
case <-time.After(3 * time.Second):
|
|
break
|
|
}
|
|
|
|
}
|
|
|
|
func TestAddRemoveBucketListenerConfig(t *testing.T) {
|
|
s := TestPeerRPCServerData{serverType: "XL"}
|
|
|
|
// setup and teardown
|
|
s.Setup(t)
|
|
defer s.TearDown()
|
|
|
|
// test code
|
|
obj := s.testServer.Obj
|
|
if err := initEventNotifier(obj); err != nil {
|
|
t.Fatalf("Failed to initialize event notifier: %v", err)
|
|
}
|
|
|
|
// Make a bucket to store topicConfigs.
|
|
randBucket := getRandomBucketName()
|
|
if err := obj.MakeBucketWithLocation(randBucket, ""); err != nil {
|
|
t.Fatalf("Failed to make bucket %s", randBucket)
|
|
}
|
|
|
|
// Add a topicConfig to an empty notificationConfig.
|
|
accountID := fmt.Sprintf("%d", UTCNow().UnixNano())
|
|
accountARN := fmt.Sprintf(
|
|
"arn:minio:sqs:%s:%s:listen-%s",
|
|
serverConfig.GetRegion(),
|
|
accountID,
|
|
globalMinioAddr,
|
|
)
|
|
|
|
// Make topic configuration
|
|
filterRules := []filterRule{
|
|
{
|
|
Name: "prefix",
|
|
Value: "minio",
|
|
},
|
|
{
|
|
Name: "suffix",
|
|
Value: "*.jpg",
|
|
},
|
|
}
|
|
sampleTopicCfg := topicConfig{
|
|
TopicARN: accountARN,
|
|
ServiceConfig: ServiceConfig{
|
|
[]string{"s3:ObjectRemoved:*", "s3:ObjectCreated:*"},
|
|
filterStruct{
|
|
keyFilter{filterRules},
|
|
},
|
|
"sns-" + accountID,
|
|
},
|
|
}
|
|
sampleListenerCfg := &listenerConfig{
|
|
TopicConfig: sampleTopicCfg,
|
|
TargetServer: globalMinioAddr,
|
|
}
|
|
testCases := []struct {
|
|
lCfg *listenerConfig
|
|
expectedErr error
|
|
}{
|
|
{sampleListenerCfg, nil},
|
|
{nil, errInvalidArgument},
|
|
}
|
|
|
|
for i, test := range testCases {
|
|
err := AddBucketListenerConfig(randBucket, test.lCfg, obj)
|
|
if err != test.expectedErr {
|
|
t.Errorf(
|
|
"Test %d: Failed with error %v, expected to fail with %v",
|
|
i+1, err, test.expectedErr,
|
|
)
|
|
}
|
|
}
|
|
|
|
// test remove listener actually removes a listener
|
|
RemoveBucketListenerConfig(randBucket, sampleListenerCfg, obj)
|
|
// since it does not return errors we fetch the config and
|
|
// check
|
|
lcSlice := globalEventNotifier.GetBucketListenerConfig(randBucket)
|
|
if len(lcSlice) != 0 {
|
|
t.Errorf("Remove Listener Config Test: did not remove listener config - %v",
|
|
lcSlice)
|
|
}
|
|
}
|