mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
Add notifications by webhook.
Add a new config entry moving to version 13. ``` "webhook": { "1": { "enable": true, "address": "http://requestb.in/1i9al7m1" } } ```
This commit is contained in:
parent
f24753812a
commit
d6a327fbc5
@ -21,4 +21,4 @@ after_success:
|
|||||||
- bash <(curl -s https://codecov.io/bash)
|
- bash <(curl -s https://codecov.io/bash)
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.7.3
|
- 1.7.4
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -36,7 +35,7 @@ func mustGetRequestID(t time.Time) string {
|
|||||||
func setCommonHeaders(w http.ResponseWriter) {
|
func setCommonHeaders(w http.ResponseWriter) {
|
||||||
// Set unique request ID for each reply.
|
// Set unique request ID for each reply.
|
||||||
w.Header().Set(responseRequestIDKey, mustGetRequestID(time.Now().UTC()))
|
w.Header().Set(responseRequestIDKey, mustGetRequestID(time.Now().UTC()))
|
||||||
w.Header().Set("Server", ("Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")"))
|
w.Header().Set("Server", globalServerUserAgent)
|
||||||
w.Header().Set("Accept-Ranges", "bytes")
|
w.Header().Set("Accept-Ranges", "bytes")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +131,7 @@ func isValidQueueID(queueARN string) bool {
|
|||||||
// Unmarshals QueueARN into structured object.
|
// Unmarshals QueueARN into structured object.
|
||||||
sqsARN := unmarshalSqsARN(queueARN)
|
sqsARN := unmarshalSqsARN(queueARN)
|
||||||
// Is Queue identifier valid?.
|
// Is Queue identifier valid?.
|
||||||
|
|
||||||
if isAMQPQueue(sqsARN) { // AMQP eueue.
|
if isAMQPQueue(sqsARN) { // AMQP eueue.
|
||||||
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
|
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
|
||||||
return amqpN.Enable && amqpN.URL != ""
|
return amqpN.Enable && amqpN.URL != ""
|
||||||
@ -151,6 +152,9 @@ func isValidQueueID(queueARN string) bool {
|
|||||||
kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID)
|
kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID)
|
||||||
return (kafkaN.Enable && len(kafkaN.Brokers) > 0 &&
|
return (kafkaN.Enable && len(kafkaN.Brokers) > 0 &&
|
||||||
kafkaN.Topic != "")
|
kafkaN.Topic != "")
|
||||||
|
} else if isWebhookQueue(sqsARN) {
|
||||||
|
webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID)
|
||||||
|
return webhookN.Enable && webhookN.Endpoint != ""
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -241,6 +245,7 @@ func validateNotificationConfig(nConfig notificationConfig) APIErrorCode {
|
|||||||
// - redis
|
// - redis
|
||||||
// - postgresql
|
// - postgresql
|
||||||
// - kafka
|
// - kafka
|
||||||
|
// - webhook
|
||||||
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
|
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
|
||||||
mSqs = arnSQS{}
|
mSqs = arnSQS{}
|
||||||
if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
|
if !strings.HasPrefix(queueARN, minioSqs+serverConfig.GetRegion()+":") {
|
||||||
@ -260,6 +265,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
|
|||||||
mSqs.Type = queueTypePostgreSQL
|
mSqs.Type = queueTypePostgreSQL
|
||||||
case strings.HasSuffix(sqsType, queueTypeKafka):
|
case strings.HasSuffix(sqsType, queueTypeKafka):
|
||||||
mSqs.Type = queueTypeKafka
|
mSqs.Type = queueTypeKafka
|
||||||
|
case strings.HasSuffix(sqsType, queueTypeWebhook):
|
||||||
|
mSqs.Type = queueTypeWebhook
|
||||||
} // Add more queues here.
|
} // Add more queues here.
|
||||||
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
|
mSqs.AccountID = strings.TrimSuffix(sqsType, ":"+mSqs.Type)
|
||||||
return mSqs
|
return mSqs
|
||||||
|
@ -228,6 +228,12 @@ func TestQueueARN(t *testing.T) {
|
|||||||
queueARN string
|
queueARN string
|
||||||
errCode APIErrorCode
|
errCode APIErrorCode
|
||||||
}{
|
}{
|
||||||
|
|
||||||
|
// Valid webhook queue arn.
|
||||||
|
{
|
||||||
|
queueARN: "arn:minio:sqs:us-east-1:1:webhook",
|
||||||
|
errCode: ErrNone,
|
||||||
|
},
|
||||||
// Valid redis queue arn.
|
// Valid redis queue arn.
|
||||||
{
|
{
|
||||||
queueARN: "arn:minio:sqs:us-east-1:1:redis",
|
queueARN: "arn:minio:sqs:us-east-1:1:redis",
|
||||||
@ -306,6 +312,11 @@ func TestUnmarshalSQSARN(t *testing.T) {
|
|||||||
queueARN string
|
queueARN string
|
||||||
Type string
|
Type string
|
||||||
}{
|
}{
|
||||||
|
// Valid webhook queue arn.
|
||||||
|
{
|
||||||
|
queueARN: "arn:minio:sqs:us-east-1:1:webhook",
|
||||||
|
Type: "webhook",
|
||||||
|
},
|
||||||
// Valid redis queue arn.
|
// Valid redis queue arn.
|
||||||
{
|
{
|
||||||
queueARN: "arn:minio:sqs:us-east-1:1:redis",
|
queueARN: "arn:minio:sqs:us-east-1:1:redis",
|
||||||
|
@ -70,6 +70,10 @@ func migrateConfig() error {
|
|||||||
if err := migrateV11ToV12(); err != nil {
|
if err := migrateV11ToV12(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Migration version '12' to '13'.
|
||||||
|
if err := migrateV12ToV13(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -836,3 +840,97 @@ func migrateV11ToV12() error {
|
|||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Version '12' to '13' migration. Add support for custom webhook endpoint.
|
||||||
|
func migrateV12ToV13() error {
|
||||||
|
cv12, err := loadConfigV12()
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Unable to load config version ‘12’. %v", err)
|
||||||
|
}
|
||||||
|
if cv12.Version != "12" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy over fields from V12 into V13 config struct
|
||||||
|
srvConfig := &serverConfigV13{}
|
||||||
|
srvConfig.Version = "13"
|
||||||
|
srvConfig.Credential = cv12.Credential
|
||||||
|
srvConfig.Region = cv12.Region
|
||||||
|
if srvConfig.Region == "" {
|
||||||
|
// Region needs to be set for AWS Signature Version 4.
|
||||||
|
srvConfig.Region = "us-east-1"
|
||||||
|
}
|
||||||
|
srvConfig.Logger.Console = cv12.Logger.Console
|
||||||
|
srvConfig.Logger.File = cv12.Logger.File
|
||||||
|
|
||||||
|
// check and set notifiers config
|
||||||
|
if len(cv12.Notify.AMQP) == 0 {
|
||||||
|
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
|
||||||
|
srvConfig.Notify.AMQP["1"] = amqpNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.AMQP = cv12.Notify.AMQP
|
||||||
|
}
|
||||||
|
if len(cv12.Notify.ElasticSearch) == 0 {
|
||||||
|
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
|
||||||
|
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.ElasticSearch = cv12.Notify.ElasticSearch
|
||||||
|
}
|
||||||
|
if len(cv12.Notify.Redis) == 0 {
|
||||||
|
srvConfig.Notify.Redis = make(map[string]redisNotify)
|
||||||
|
srvConfig.Notify.Redis["1"] = redisNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.Redis = cv12.Notify.Redis
|
||||||
|
}
|
||||||
|
if len(cv12.Notify.PostgreSQL) == 0 {
|
||||||
|
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
|
||||||
|
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.PostgreSQL = cv12.Notify.PostgreSQL
|
||||||
|
}
|
||||||
|
if len(cv12.Notify.Kafka) == 0 {
|
||||||
|
srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
|
||||||
|
srvConfig.Notify.Kafka["1"] = kafkaNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.Kafka = cv12.Notify.Kafka
|
||||||
|
}
|
||||||
|
if len(cv12.Notify.NATS) == 0 {
|
||||||
|
srvConfig.Notify.NATS = make(map[string]natsNotify)
|
||||||
|
srvConfig.Notify.NATS["1"] = natsNotify{}
|
||||||
|
} else {
|
||||||
|
srvConfig.Notify.NATS = cv12.Notify.NATS
|
||||||
|
}
|
||||||
|
|
||||||
|
// V12 will not have a webhook config. So we initialize one here.
|
||||||
|
srvConfig.Notify.Webhook = make(map[string]webhookNotify)
|
||||||
|
srvConfig.Notify.Webhook["1"] = webhookNotify{}
|
||||||
|
|
||||||
|
qc, err := quick.New(srvConfig)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to initialize the quick config. %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
configFile, err := getConfigFile()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Unable to get config file. %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = qc.Save(configFile)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"Failed to migrate config from ‘"+
|
||||||
|
cv12.Version+"’ to ‘"+srvConfig.Version+
|
||||||
|
"’ failed. %v", err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.Println(
|
||||||
|
"Migration from version ‘" +
|
||||||
|
cv12.Version + "’ to ‘" + srvConfig.Version +
|
||||||
|
"’ completed successfully.",
|
||||||
|
)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -101,7 +101,10 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
|
|||||||
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
|
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
|
||||||
}
|
}
|
||||||
if err := migrateV11ToV12(); err != nil {
|
if err := migrateV11ToV12(); err != nil {
|
||||||
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
|
t.Fatal("migrate v11 to v12 should succeed when no config file is found")
|
||||||
|
}
|
||||||
|
if err := migrateV12ToV13(); err != nil {
|
||||||
|
t.Fatal("migrate v12 to v13 should succeed when no config file is found")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,5 +215,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
|
|||||||
if err := migrateV11ToV12(); err == nil {
|
if err := migrateV11ToV12(); err == nil {
|
||||||
t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json")
|
t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json")
|
||||||
}
|
}
|
||||||
|
if err := migrateV12ToV13(); err == nil {
|
||||||
|
t.Fatal("migrateConfigV12ToV13() should fail with a corrupted json")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -325,7 +325,8 @@ func loadConfigV6() (*configV6, error) {
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notifier represents collection of supported notification queues.
|
// Notifier represents collection of supported notification queues in version
|
||||||
|
// 1 without NATS streaming.
|
||||||
type notifierV1 struct {
|
type notifierV1 struct {
|
||||||
AMQP map[string]amqpNotify `json:"amqp"`
|
AMQP map[string]amqpNotify `json:"amqp"`
|
||||||
NATS map[string]natsNotifyV1 `json:"nats"`
|
NATS map[string]natsNotifyV1 `json:"nats"`
|
||||||
@ -335,6 +336,17 @@ type notifierV1 struct {
|
|||||||
Kafka map[string]kafkaNotify `json:"kafka"`
|
Kafka map[string]kafkaNotify `json:"kafka"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notifier represents collection of supported notification queues in version 2
|
||||||
|
// with NATS streaming but without webhook.
|
||||||
|
type notifierV2 struct {
|
||||||
|
AMQP map[string]amqpNotify `json:"amqp"`
|
||||||
|
NATS map[string]natsNotify `json:"nats"`
|
||||||
|
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
|
||||||
|
Redis map[string]redisNotify `json:"redis"`
|
||||||
|
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
|
||||||
|
Kafka map[string]kafkaNotify `json:"kafka"`
|
||||||
|
}
|
||||||
|
|
||||||
// configV7 server configuration version '7'.
|
// configV7 server configuration version '7'.
|
||||||
type serverConfigV7 struct {
|
type serverConfigV7 struct {
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
@ -538,3 +550,39 @@ func loadConfigV11() (*serverConfigV11, error) {
|
|||||||
}
|
}
|
||||||
return srvCfg, nil
|
return srvCfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serverConfigV12 server configuration version '12' which is like
|
||||||
|
// version '11' except it adds support for NATS streaming notifications.
|
||||||
|
type serverConfigV12 struct {
|
||||||
|
Version string `json:"version"`
|
||||||
|
|
||||||
|
// S3 API configuration.
|
||||||
|
Credential credential `json:"credential"`
|
||||||
|
Region string `json:"region"`
|
||||||
|
|
||||||
|
// Additional error logging configuration.
|
||||||
|
Logger logger `json:"logger"`
|
||||||
|
|
||||||
|
// Notification queue configuration.
|
||||||
|
Notify notifierV2 `json:"notify"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadConfigV12() (*serverConfigV12, error) {
|
||||||
|
configFile, err := getConfigFile()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, err = os.Stat(configFile); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
srvCfg := &serverConfigV12{}
|
||||||
|
srvCfg.Version = "12"
|
||||||
|
qc, err := quick.New(srvCfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := qc.Load(configFile); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return srvCfg, nil
|
||||||
|
}
|
||||||
|
@ -26,9 +26,9 @@ import (
|
|||||||
// Read Write mutex for safe access to ServerConfig.
|
// Read Write mutex for safe access to ServerConfig.
|
||||||
var serverConfigMu sync.RWMutex
|
var serverConfigMu sync.RWMutex
|
||||||
|
|
||||||
// serverConfigV12 server configuration version '12' which is like
|
// serverConfigV13 server configuration version '13' which is like
|
||||||
// version '11' except it adds support for NATS streaming notifications.
|
// version '12' except it adds support for webhook notification.
|
||||||
type serverConfigV12 struct {
|
type serverConfigV13 struct {
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
|
|
||||||
// S3 API configuration.
|
// S3 API configuration.
|
||||||
@ -47,7 +47,7 @@ type serverConfigV12 struct {
|
|||||||
func initConfig() (bool, error) {
|
func initConfig() (bool, error) {
|
||||||
if !isConfigFileExists() {
|
if !isConfigFileExists() {
|
||||||
// Initialize server config.
|
// Initialize server config.
|
||||||
srvCfg := &serverConfigV12{}
|
srvCfg := &serverConfigV13{}
|
||||||
srvCfg.Version = globalMinioConfigVersion
|
srvCfg.Version = globalMinioConfigVersion
|
||||||
srvCfg.Region = "us-east-1"
|
srvCfg.Region = "us-east-1"
|
||||||
srvCfg.Credential = newCredential()
|
srvCfg.Credential = newCredential()
|
||||||
@ -71,12 +71,15 @@ func initConfig() (bool, error) {
|
|||||||
srvCfg.Notify.PostgreSQL["1"] = postgreSQLNotify{}
|
srvCfg.Notify.PostgreSQL["1"] = postgreSQLNotify{}
|
||||||
srvCfg.Notify.Kafka = make(map[string]kafkaNotify)
|
srvCfg.Notify.Kafka = make(map[string]kafkaNotify)
|
||||||
srvCfg.Notify.Kafka["1"] = kafkaNotify{}
|
srvCfg.Notify.Kafka["1"] = kafkaNotify{}
|
||||||
|
srvCfg.Notify.Webhook = make(map[string]webhookNotify)
|
||||||
|
srvCfg.Notify.Webhook["1"] = webhookNotify{}
|
||||||
|
|
||||||
// Create config path.
|
// Create config path.
|
||||||
err := createConfigPath()
|
err := createConfigPath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// hold the mutex lock before a new config is assigned.
|
// hold the mutex lock before a new config is assigned.
|
||||||
// Save the new config globally.
|
// Save the new config globally.
|
||||||
// unlock the mutex.
|
// unlock the mutex.
|
||||||
@ -94,7 +97,7 @@ func initConfig() (bool, error) {
|
|||||||
if _, err = os.Stat(configFile); err != nil {
|
if _, err = os.Stat(configFile); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
srvCfg := &serverConfigV12{}
|
srvCfg := &serverConfigV13{}
|
||||||
srvCfg.Version = globalMinioConfigVersion
|
srvCfg.Version = globalMinioConfigVersion
|
||||||
qc, err := quick.New(srvCfg)
|
qc, err := quick.New(srvCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -116,10 +119,10 @@ func initConfig() (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// serverConfig server config.
|
// serverConfig server config.
|
||||||
var serverConfig *serverConfigV12
|
var serverConfig *serverConfigV13
|
||||||
|
|
||||||
// GetVersion get current config version.
|
// GetVersion get current config version.
|
||||||
func (s serverConfigV12) GetVersion() string {
|
func (s serverConfigV13) GetVersion() string {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -128,14 +131,14 @@ func (s serverConfigV12) GetVersion() string {
|
|||||||
|
|
||||||
/// Logger related.
|
/// Logger related.
|
||||||
|
|
||||||
func (s *serverConfigV12) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
|
func (s *serverConfigV13) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.AMQP[accountID] = amqpn
|
s.Notify.AMQP[accountID] = amqpn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetAMQP() map[string]amqpNotify {
|
func (s serverConfigV13) GetAMQP() map[string]amqpNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -143,7 +146,7 @@ func (s serverConfigV12) GetAMQP() map[string]amqpNotify {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetAMQPNotify get current AMQP logger.
|
// GetAMQPNotify get current AMQP logger.
|
||||||
func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify {
|
func (s serverConfigV13) GetAMQPNotifyByID(accountID string) amqpNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -151,35 +154,35 @@ func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
func (s *serverConfigV12) SetNATSNotifyByID(accountID string, natsn natsNotify) {
|
func (s *serverConfigV13) SetNATSNotifyByID(accountID string, natsn natsNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.NATS[accountID] = natsn
|
s.Notify.NATS[accountID] = natsn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetNATS() map[string]natsNotify {
|
func (s serverConfigV13) GetNATS() map[string]natsNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
return s.Notify.NATS
|
return s.Notify.NATS
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNATSNotify get current NATS logger.
|
// GetNATSNotify get current NATS logger.
|
||||||
func (s serverConfigV12) GetNATSNotifyByID(accountID string) natsNotify {
|
func (s serverConfigV13) GetNATSNotifyByID(accountID string) natsNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.NATS[accountID]
|
return s.Notify.NATS[accountID]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverConfigV12) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
|
func (s *serverConfigV13) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.ElasticSearch[accountID] = esNotify
|
s.Notify.ElasticSearch[accountID] = esNotify
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify {
|
func (s serverConfigV13) GetElasticSearch() map[string]elasticSearchNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -187,50 +190,72 @@ func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetElasticSearchNotify get current ElasicSearch logger.
|
// GetElasticSearchNotify get current ElasicSearch logger.
|
||||||
func (s serverConfigV12) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
|
func (s serverConfigV13) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.ElasticSearch[accountID]
|
return s.Notify.ElasticSearch[accountID]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverConfigV12) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
|
func (s *serverConfigV13) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.Redis[accountID] = rNotify
|
s.Notify.Redis[accountID] = rNotify
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetRedis() map[string]redisNotify {
|
func (s serverConfigV13) GetRedis() map[string]redisNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.Redis
|
return s.Notify.Redis
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s serverConfigV13) GetWebhook() map[string]webhookNotify {
|
||||||
|
serverConfigMu.RLock()
|
||||||
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
|
return s.Notify.Webhook
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetWebhookNotifyByID get current Webhook logger.
|
||||||
|
func (s serverConfigV13) GetWebhookNotifyByID(accountID string) webhookNotify {
|
||||||
|
serverConfigMu.RLock()
|
||||||
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
|
return s.Notify.Webhook[accountID]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serverConfigV13) SetWebhookNotifyByID(accountID string, pgn webhookNotify) {
|
||||||
|
serverConfigMu.Lock()
|
||||||
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
|
s.Notify.Webhook[accountID] = pgn
|
||||||
|
}
|
||||||
|
|
||||||
// GetRedisNotify get current Redis logger.
|
// GetRedisNotify get current Redis logger.
|
||||||
func (s serverConfigV12) GetRedisNotifyByID(accountID string) redisNotify {
|
func (s serverConfigV13) GetRedisNotifyByID(accountID string) redisNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.Redis[accountID]
|
return s.Notify.Redis[accountID]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverConfigV12) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
|
func (s *serverConfigV13) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.PostgreSQL[accountID] = pgn
|
s.Notify.PostgreSQL[accountID] = pgn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetPostgreSQL() map[string]postgreSQLNotify {
|
func (s serverConfigV13) GetPostgreSQL() map[string]postgreSQLNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.PostgreSQL
|
return s.Notify.PostgreSQL
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
|
func (s serverConfigV13) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -238,21 +263,21 @@ func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNot
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Kafka related functions
|
// Kafka related functions
|
||||||
func (s *serverConfigV12) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
|
func (s *serverConfigV13) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
s.Notify.Kafka[accountID] = kn
|
s.Notify.Kafka[accountID] = kn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetKafka() map[string]kafkaNotify {
|
func (s serverConfigV13) GetKafka() map[string]kafkaNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
return s.Notify.Kafka
|
return s.Notify.Kafka
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify {
|
func (s serverConfigV13) GetKafkaNotifyByID(accountID string) kafkaNotify {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -260,7 +285,7 @@ func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetFileLogger set new file logger.
|
// SetFileLogger set new file logger.
|
||||||
func (s *serverConfigV12) SetFileLogger(flogger fileLogger) {
|
func (s *serverConfigV13) SetFileLogger(flogger fileLogger) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
@ -268,7 +293,7 @@ func (s *serverConfigV12) SetFileLogger(flogger fileLogger) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetFileLogger get current file logger.
|
// GetFileLogger get current file logger.
|
||||||
func (s serverConfigV12) GetFileLogger() fileLogger {
|
func (s serverConfigV13) GetFileLogger() fileLogger {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -276,7 +301,7 @@ func (s serverConfigV12) GetFileLogger() fileLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetConsoleLogger set new console logger.
|
// SetConsoleLogger set new console logger.
|
||||||
func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) {
|
func (s *serverConfigV13) SetConsoleLogger(clogger consoleLogger) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
@ -284,7 +309,7 @@ func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetConsoleLogger get current console logger.
|
// GetConsoleLogger get current console logger.
|
||||||
func (s serverConfigV12) GetConsoleLogger() consoleLogger {
|
func (s serverConfigV13) GetConsoleLogger() consoleLogger {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -292,7 +317,7 @@ func (s serverConfigV12) GetConsoleLogger() consoleLogger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetRegion set new region.
|
// SetRegion set new region.
|
||||||
func (s *serverConfigV12) SetRegion(region string) {
|
func (s *serverConfigV13) SetRegion(region string) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
@ -300,7 +325,7 @@ func (s *serverConfigV12) SetRegion(region string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetRegion get current region.
|
// GetRegion get current region.
|
||||||
func (s serverConfigV12) GetRegion() string {
|
func (s serverConfigV13) GetRegion() string {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -308,7 +333,7 @@ func (s serverConfigV12) GetRegion() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetCredentials set new credentials.
|
// SetCredentials set new credentials.
|
||||||
func (s *serverConfigV12) SetCredential(creds credential) {
|
func (s *serverConfigV13) SetCredential(creds credential) {
|
||||||
serverConfigMu.Lock()
|
serverConfigMu.Lock()
|
||||||
defer serverConfigMu.Unlock()
|
defer serverConfigMu.Unlock()
|
||||||
|
|
||||||
@ -316,7 +341,7 @@ func (s *serverConfigV12) SetCredential(creds credential) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetCredentials get current credentials.
|
// GetCredentials get current credentials.
|
||||||
func (s serverConfigV12) GetCredential() credential {
|
func (s serverConfigV13) GetCredential() credential {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
||||||
@ -324,7 +349,7 @@ func (s serverConfigV12) GetCredential() credential {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save config.
|
// Save config.
|
||||||
func (s serverConfigV12) Save() error {
|
func (s serverConfigV13) Save() error {
|
||||||
serverConfigMu.RLock()
|
serverConfigMu.RLock()
|
||||||
defer serverConfigMu.RUnlock()
|
defer serverConfigMu.RUnlock()
|
||||||
|
|
@ -67,6 +67,13 @@ func TestServerConfig(t *testing.T) {
|
|||||||
t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4)
|
t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set new Webhook notification id.
|
||||||
|
serverConfig.SetWebhookNotifyByID("2", webhookNotify{})
|
||||||
|
savedNotifyCfg5 := serverConfig.GetWebhookNotifyByID("2")
|
||||||
|
if !reflect.DeepEqual(savedNotifyCfg5, webhookNotify{}) {
|
||||||
|
t.Errorf("Expecting Webhook config %#v found %#v", webhookNotify{}, savedNotifyCfg3)
|
||||||
|
}
|
||||||
|
|
||||||
// Set new console logger.
|
// Set new console logger.
|
||||||
serverConfig.SetConsoleLogger(consoleLogger{
|
serverConfig.SetConsoleLogger(consoleLogger{
|
||||||
Enable: true,
|
Enable: true,
|
@ -612,6 +612,28 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
|
|||||||
}
|
}
|
||||||
queueTargets[queueARN] = redisLog
|
queueTargets[queueARN] = redisLog
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load Webhook targets, initialize their respective loggers.
|
||||||
|
for accountID, webhookN := range serverConfig.GetWebhook() {
|
||||||
|
if !webhookN.Enable {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Construct the queue ARN for Webhook.
|
||||||
|
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeWebhook
|
||||||
|
_, ok := queueTargets[queueARN]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Using accountID we can now initialize a new Webhook logrus instance.
|
||||||
|
webhookLog, err := newWebhookNotify(accountID)
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
queueTargets[queueARN] = webhookLog
|
||||||
|
}
|
||||||
|
|
||||||
// Load elastic targets, initialize their respective loggers.
|
// Load elastic targets, initialize their respective loggers.
|
||||||
for accountID, elasticN := range serverConfig.GetElasticSearch() {
|
for accountID, elasticN := range serverConfig.GetElasticSearch() {
|
||||||
if !elasticN.Enable {
|
if !elasticN.Enable {
|
||||||
@ -637,6 +659,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
|
|||||||
}
|
}
|
||||||
queueTargets[queueARN] = elasticLog
|
queueTargets[queueARN] = elasticLog
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load PostgreSQL targets, initialize their respective loggers.
|
// Load PostgreSQL targets, initialize their respective loggers.
|
||||||
for accountID, pgN := range serverConfig.GetPostgreSQL() {
|
for accountID, pgN := range serverConfig.GetPostgreSQL() {
|
||||||
if !pgN.Enable {
|
if !pgN.Enable {
|
||||||
|
@ -77,6 +77,99 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InitEventNotifierWithPostgreSQL - tests InitEventNotifier when PostgreSQL is not prepared
|
||||||
|
func TestInitEventNotifierWithPostgreSQL(t *testing.T) {
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
rootPath, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Init Test config failed")
|
||||||
|
}
|
||||||
|
// remove the root directory after the test ends.
|
||||||
|
defer removeAll(rootPath)
|
||||||
|
|
||||||
|
disks, err := getRandomDisks(1)
|
||||||
|
defer removeAll(disks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
||||||
|
}
|
||||||
|
endpoints, err := parseStorageEndpoints(disks)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
fs, _, err := initObjectLayer(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to initialize FS backend.", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConfig.SetPostgreSQLNotifyByID("1", postgreSQLNotify{Enable: true})
|
||||||
|
if err := initEventNotifier(fs); err == nil {
|
||||||
|
t.Fatal("PostgreSQL config didn't fail.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitEventNotifierWithNATS - tests InitEventNotifier when NATS is not prepared
|
||||||
|
func TestInitEventNotifierWithNATS(t *testing.T) {
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
rootPath, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Init Test config failed")
|
||||||
|
}
|
||||||
|
// remove the root directory after the test ends.
|
||||||
|
defer removeAll(rootPath)
|
||||||
|
|
||||||
|
disks, err := getRandomDisks(1)
|
||||||
|
defer removeAll(disks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
||||||
|
}
|
||||||
|
endpoints, err := parseStorageEndpoints(disks)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
fs, _, err := initObjectLayer(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to initialize FS backend.", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConfig.SetNATSNotifyByID("1", natsNotify{Enable: true})
|
||||||
|
if err := initEventNotifier(fs); err == nil {
|
||||||
|
t.Fatal("NATS config didn't fail.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitEventNotifierWithWebHook - tests InitEventNotifier when WebHook is not prepared
|
||||||
|
func TestInitEventNotifierWithWebHook(t *testing.T) {
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
rootPath, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Init Test config failed")
|
||||||
|
}
|
||||||
|
// remove the root directory after the test ends.
|
||||||
|
defer removeAll(rootPath)
|
||||||
|
|
||||||
|
disks, err := getRandomDisks(1)
|
||||||
|
defer removeAll(disks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to create directories for FS backend. ", err)
|
||||||
|
}
|
||||||
|
endpoints, err := parseStorageEndpoints(disks)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
fs, _, err := initObjectLayer(endpoints)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to initialize FS backend.", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConfig.SetWebhookNotifyByID("1", webhookNotify{Enable: true})
|
||||||
|
if err := initEventNotifier(fs); err == nil {
|
||||||
|
t.Fatal("WebHook config didn't fail.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
|
// InitEventNotifierWithAMQP - tests InitEventNotifier when AMQP is not prepared
|
||||||
func TestInitEventNotifierWithAMQP(t *testing.T) {
|
func TestInitEventNotifierWithAMQP(t *testing.T) {
|
||||||
// initialize the server and obtain the credentials and root.
|
// initialize the server and obtain the credentials and root.
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -36,7 +37,7 @@ const (
|
|||||||
|
|
||||||
// minio configuration related constants.
|
// minio configuration related constants.
|
||||||
const (
|
const (
|
||||||
globalMinioConfigVersion = "12"
|
globalMinioConfigVersion = "13"
|
||||||
globalMinioConfigDir = ".minio"
|
globalMinioConfigDir = ".minio"
|
||||||
globalMinioCertsDir = "certs"
|
globalMinioCertsDir = "certs"
|
||||||
globalMinioCertsCADir = "CAs"
|
globalMinioCertsCADir = "CAs"
|
||||||
@ -96,6 +97,9 @@ var (
|
|||||||
// List of admin peers.
|
// List of admin peers.
|
||||||
globalAdminPeers = adminPeers{}
|
globalAdminPeers = adminPeers{}
|
||||||
|
|
||||||
|
// Minio server user agent string.
|
||||||
|
globalServerUserAgent = "Minio/" + ReleaseTag + " (" + runtime.GOOS + "; " + runtime.GOARCH + ")"
|
||||||
|
|
||||||
// Add new variable global values here.
|
// Add new variable global values here.
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -40,6 +40,8 @@ const (
|
|||||||
queueTypePostgreSQL = "postgresql"
|
queueTypePostgreSQL = "postgresql"
|
||||||
// Static string indicating queue type 'kafka'.
|
// Static string indicating queue type 'kafka'.
|
||||||
queueTypeKafka = "kafka"
|
queueTypeKafka = "kafka"
|
||||||
|
// Static string for Webhooks
|
||||||
|
queueTypeWebhook = "webhook"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Topic type.
|
// Topic type.
|
||||||
@ -61,6 +63,7 @@ type notifier struct {
|
|||||||
Redis map[string]redisNotify `json:"redis"`
|
Redis map[string]redisNotify `json:"redis"`
|
||||||
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
|
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
|
||||||
Kafka map[string]kafkaNotify `json:"kafka"`
|
Kafka map[string]kafkaNotify `json:"kafka"`
|
||||||
|
Webhook map[string]webhookNotify `json:"webhook"`
|
||||||
// Add new notification queues.
|
// Add new notification queues.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,6 +105,18 @@ func isNATSQueue(sqsArn arnSQS) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns true if queueArn is for an Webhook queue
|
||||||
|
func isWebhookQueue(sqsArn arnSQS) bool {
|
||||||
|
if sqsArn.Type != queueTypeWebhook {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID)
|
||||||
|
if !rNotify.Enable {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Returns true if queueArn is for an Redis queue.
|
// Returns true if queueArn is for an Redis queue.
|
||||||
func isRedisQueue(sqsArn arnSQS) bool {
|
func isRedisQueue(sqsArn arnSQS) bool {
|
||||||
if sqsArn.Type != queueTypeRedis {
|
if sqsArn.Type != queueTypeRedis {
|
||||||
|
@ -82,7 +82,7 @@ func newAMQPNotify(accountID string) (*logrus.Logger, error) {
|
|||||||
return amqpLog, nil
|
return amqpLog, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fire is called when an event should be sent to the message broker.k
|
// Fire is called when an event should be sent to the message broker.
|
||||||
func (q amqpConn) Fire(entry *logrus.Entry) error {
|
func (q amqpConn) Fire(entry *logrus.Entry) error {
|
||||||
ch, err := q.Connection.Channel()
|
ch, err := q.Connection.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
136
cmd/notify-webhook.go
Normal file
136
cmd/notify-webhook.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type webhookNotify struct {
|
||||||
|
Enable bool `json:"enable"`
|
||||||
|
Endpoint string `json:"endpoint"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpConn struct {
|
||||||
|
*http.Client
|
||||||
|
Endpoint string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lookup host address by dialing.
|
||||||
|
func lookupHost(addr string) error {
|
||||||
|
dialer := &net.Dialer{
|
||||||
|
Timeout: 300 * time.Millisecond,
|
||||||
|
KeepAlive: 300 * time.Millisecond,
|
||||||
|
}
|
||||||
|
nconn, err := dialer.Dial("tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nconn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initializes new webhook logrus notifier.
|
||||||
|
func newWebhookNotify(accountID string) (*logrus.Logger, error) {
|
||||||
|
rNotify := serverConfig.GetWebhookNotifyByID(accountID)
|
||||||
|
|
||||||
|
if rNotify.Endpoint == "" {
|
||||||
|
return nil, errInvalidArgument
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(rNotify.Endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = lookupHost(u.Host); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
conn := httpConn{
|
||||||
|
// Configure aggressive timeouts for client posts.
|
||||||
|
Client: &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
KeepAlive: 5 * time.Second,
|
||||||
|
}).DialContext,
|
||||||
|
TLSHandshakeTimeout: 3 * time.Second,
|
||||||
|
ResponseHeaderTimeout: 3 * time.Second,
|
||||||
|
ExpectContinueTimeout: 2 * time.Second,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Endpoint: rNotify.Endpoint,
|
||||||
|
}
|
||||||
|
|
||||||
|
notifyLog := logrus.New()
|
||||||
|
notifyLog.Out = ioutil.Discard
|
||||||
|
|
||||||
|
// Set default JSON formatter.
|
||||||
|
notifyLog.Formatter = new(logrus.JSONFormatter)
|
||||||
|
|
||||||
|
notifyLog.Hooks.Add(conn)
|
||||||
|
|
||||||
|
// Success
|
||||||
|
return notifyLog, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fire is called when an event should be sent to the message broker.
|
||||||
|
func (n httpConn) Fire(entry *logrus.Entry) error {
|
||||||
|
body, err := entry.Reader()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", n.Endpoint, body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set content-type.
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
// Set proper server user-agent.
|
||||||
|
req.Header.Set("User-Agent", globalServerUserAgent)
|
||||||
|
|
||||||
|
// Initiate the http request.
|
||||||
|
resp, err := n.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK &&
|
||||||
|
resp.StatusCode != http.StatusAccepted &&
|
||||||
|
resp.StatusCode != http.StatusContinue {
|
||||||
|
return fmt.Errorf("Unable to send event %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Levels are Required for logrus hook implementation
|
||||||
|
func (httpConn) Levels() []logrus.Level {
|
||||||
|
return []logrus.Level{
|
||||||
|
logrus.InfoLevel,
|
||||||
|
}
|
||||||
|
}
|
79
cmd/notify-webhook_test.go
Normal file
79
cmd/notify-webhook_test.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/Sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Custom post handler to handle POST requests.
|
||||||
|
type postHandler struct{}
|
||||||
|
|
||||||
|
func (p postHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != "POST" {
|
||||||
|
http.Error(w, fmt.Sprintf("Unexpected method %s", r.Method), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
io.Copy(w, r.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests web hook initialization.
|
||||||
|
func TestNewWebHookNotify(t *testing.T) {
|
||||||
|
root, err := newTestConfig("us-east-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer removeAll(root)
|
||||||
|
|
||||||
|
_, err = newWebhookNotify("1")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Unexpected should fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConfig.SetWebhookNotifyByID("10", webhookNotify{Enable: true, Endpoint: "http://www."})
|
||||||
|
_, err = newWebhookNotify("10")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Unexpected should fail with lookupHost")
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConfig.SetWebhookNotifyByID("15", webhookNotify{Enable: true, Endpoint: "http://%"})
|
||||||
|
_, err = newWebhookNotify("15")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("Unexpected should fail with invalid URL escape")
|
||||||
|
}
|
||||||
|
|
||||||
|
server := httptest.NewServer(postHandler{})
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
serverConfig.SetWebhookNotifyByID("20", webhookNotify{Enable: true, Endpoint: server.URL})
|
||||||
|
webhook, err := newWebhookNotify("20")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unexpected shouldn't fail", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
webhook.WithFields(logrus.Fields{
|
||||||
|
"Key": path.Join("bucket", "object"),
|
||||||
|
"EventType": "s3:ObjectCreated:Put",
|
||||||
|
}).Info()
|
||||||
|
}
|
@ -100,7 +100,37 @@ func (s *TestSuiteCommon) TestAuth(c *C) {
|
|||||||
c.Assert(len(cred.SecretKey), Equals, secretKeyMaxLen)
|
c.Assert(len(cred.SecretKey), Equals, secretKeyMaxLen)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TestSuiteCommon) TestBucketSQSNotification(c *C) {
|
func (s *TestSuiteCommon) TestBucketSQSNotificationWebHook(c *C) {
|
||||||
|
// Sample bucket notification.
|
||||||
|
bucketNotificationBuf := `<NotificationConfiguration><QueueConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Queue>arn:minio:sqs:us-east-1:444455556666:webhook</Queue></QueueConfiguration></NotificationConfiguration>`
|
||||||
|
// generate a random bucket Name.
|
||||||
|
bucketName := getRandomBucketName()
|
||||||
|
// HTTP request to create the bucket.
|
||||||
|
request, err := newTestSignedRequest("PUT", getMakeBucketURL(s.endPoint, bucketName),
|
||||||
|
0, nil, s.accessKey, s.secretKey, s.signer)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client := http.Client{Transport: s.transport}
|
||||||
|
// execute the request.
|
||||||
|
response, err := client.Do(request)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// assert the http response status code.
|
||||||
|
c.Assert(response.StatusCode, Equals, http.StatusOK)
|
||||||
|
|
||||||
|
request, err = newTestSignedRequest("PUT", getPutNotificationURL(s.endPoint, bucketName),
|
||||||
|
int64(len(bucketNotificationBuf)), bytes.NewReader([]byte(bucketNotificationBuf)), s.accessKey, s.secretKey, s.signer)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
client = http.Client{Transport: s.transport}
|
||||||
|
// execute the HTTP request.
|
||||||
|
response, err = client.Do(request)
|
||||||
|
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
verifyError(c, response, "InvalidArgument", "A specified destination ARN does not exist or is not well-formed. Verify the destination ARN.", http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *TestSuiteCommon) TestBucketSQSNotificationAMQP(c *C) {
|
||||||
// Sample bucket notification.
|
// Sample bucket notification.
|
||||||
bucketNotificationBuf := `<NotificationConfiguration><QueueConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Queue>arn:minio:sqs:us-east-1:444455556666:amqp</Queue></QueueConfiguration></NotificationConfiguration>`
|
bucketNotificationBuf := `<NotificationConfiguration><QueueConfiguration><Event>s3:ObjectCreated:Put</Event><Filter><S3Key><FilterRule><Name>prefix</Name><Value>images/</Value></FilterRule></S3Key></Filter><Id>1</Id><Queue>arn:minio:sqs:us-east-1:444455556666:amqp</Queue></QueueConfiguration></NotificationConfiguration>`
|
||||||
// generate a random bucket Name.
|
// generate a random bucket Name.
|
||||||
|
Loading…
Reference in New Issue
Block a user