config: setter/getter for Notifier and Logger into its own struct. (#3721)

This is an attempt cleanup code and keep the top level config
functions simpler and easy to understand where as move the
notifier related code and logger setter/getter methods as part
of their own struct.

Locks are now held properly not globally by configMutex, but
instead as private variables.

Final fix for #3700
This commit is contained in:
Harshavardhana 2017-02-09 15:20:54 -08:00 committed by GitHub
parent f38222c0cc
commit 1b4bb94ac4
20 changed files with 338 additions and 256 deletions

View File

@ -133,27 +133,27 @@ func isValidQueueID(queueARN string) bool {
// Is Queue identifier valid?.
if isAMQPQueue(sqsARN) { // AMQP eueue.
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
amqpN := serverConfig.Notify.GetAMQPByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
} else if isNATSQueue(sqsARN) {
natsN := serverConfig.GetNATSNotifyByID(sqsARN.AccountID)
natsN := serverConfig.Notify.GetNATSByID(sqsARN.AccountID)
return natsN.Enable && natsN.Address != ""
} else if isElasticQueue(sqsARN) { // Elastic queue.
elasticN := serverConfig.GetElasticSearchNotifyByID(sqsARN.AccountID)
elasticN := serverConfig.Notify.GetElasticSearchByID(sqsARN.AccountID)
return elasticN.Enable && elasticN.URL != ""
} else if isRedisQueue(sqsARN) { // Redis queue.
redisN := serverConfig.GetRedisNotifyByID(sqsARN.AccountID)
redisN := serverConfig.Notify.GetRedisByID(sqsARN.AccountID)
return redisN.Enable && redisN.Addr != ""
} else if isPostgreSQLQueue(sqsARN) {
pgN := serverConfig.GetPostgreSQLNotifyByID(sqsARN.AccountID)
pgN := serverConfig.Notify.GetPostgreSQLByID(sqsARN.AccountID)
// Postgres can work with only default conn. info.
return pgN.Enable
} else if isKafkaQueue(sqsARN) {
kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID)
kafkaN := serverConfig.Notify.GetKafkaByID(sqsARN.AccountID)
return (kafkaN.Enable && len(kafkaN.Brokers) > 0 &&
kafkaN.Topic != "")
} else if isWebhookQueue(sqsARN) {
webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID)
webhookN := serverConfig.Notify.GetWebhookByID(sqsARN.AccountID)
return webhookN.Enable && webhookN.Endpoint != ""
}
return false

View File

@ -855,7 +855,10 @@ func migrateV12ToV13() error {
}
// Copy over fields from V12 into V13 config struct
srvConfig := &serverConfigV13{}
srvConfig := &serverConfigV13{
Logger: &logger{},
Notify: &notifier{},
}
srvConfig.Version = "13"
srvConfig.Credential = cv12.Credential
srvConfig.Region = cv12.Region

View File

@ -36,10 +36,10 @@ type serverConfigV13 struct {
Region string `json:"region"`
// Additional error logging configuration.
Logger logger `json:"logger"`
Logger *logger `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify *notifier `json:"notify"`
}
// newConfig - initialize a new server config, saves creds from env
@ -47,7 +47,10 @@ type serverConfigV13 struct {
// and those are saved.
func newConfig(envCreds credential) error {
// Initialize server config.
srvCfg := &serverConfigV13{}
srvCfg := &serverConfigV13{
Logger: &logger{},
Notify: &notifier{},
}
srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = globalMinioDefaultRegion
@ -148,193 +151,6 @@ func (s serverConfigV13) GetVersion() string {
return s.Version
}
/// Logger related.
func (s *serverConfigV13) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.AMQP[accountID] = amqpn
}
func (s serverConfigV13) GetAMQP() map[string]amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.AMQP
}
// GetAMQPNotify get current AMQP logger.
func (s serverConfigV13) GetAMQPNotifyByID(accountID string) amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.AMQP[accountID]
}
//
func (s *serverConfigV13) SetNATSNotifyByID(accountID string, natsn natsNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.NATS[accountID] = natsn
}
func (s serverConfigV13) GetNATS() map[string]natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS
}
// GetNATSNotify get current NATS logger.
func (s serverConfigV13) GetNATSNotifyByID(accountID string) natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS[accountID]
}
func (s *serverConfigV13) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.ElasticSearch[accountID] = esNotify
}
func (s serverConfigV13) GetElasticSearch() map[string]elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.ElasticSearch
}
// GetElasticSearchNotify get current ElasicSearch logger.
func (s serverConfigV13) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
func (s *serverConfigV13) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Redis[accountID] = rNotify
}
func (s serverConfigV13) GetRedis() map[string]redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Redis
}
func (s serverConfigV13) GetWebhook() map[string]webhookNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Webhook
}
// GetWebhookNotifyByID get current Webhook logger.
func (s serverConfigV13) GetWebhookNotifyByID(accountID string) webhookNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Webhook[accountID]
}
func (s *serverConfigV13) SetWebhookNotifyByID(accountID string, pgn webhookNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Webhook[accountID] = pgn
}
// GetRedisNotify get current Redis logger.
func (s serverConfigV13) GetRedisNotifyByID(accountID string) redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Redis[accountID]
}
func (s *serverConfigV13) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.PostgreSQL[accountID] = pgn
}
func (s serverConfigV13) GetPostgreSQL() map[string]postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.PostgreSQL
}
func (s serverConfigV13) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.PostgreSQL[accountID]
}
// Kafka related functions
func (s *serverConfigV13) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Kafka[accountID] = kn
}
func (s serverConfigV13) GetKafka() map[string]kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Kafka
}
func (s serverConfigV13) GetKafkaNotifyByID(accountID string) kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Kafka[accountID]
}
// SetFileLogger set new file logger.
func (s *serverConfigV13) SetFileLogger(flogger fileLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Logger.File = flogger
}
// GetFileLogger get current file logger.
func (s serverConfigV13) GetFileLogger() fileLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Logger.File
}
// SetConsoleLogger set new console logger.
func (s *serverConfigV13) SetConsoleLogger(clogger consoleLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Logger.Console = clogger
}
// GetConsoleLogger get current console logger.
func (s serverConfigV13) GetConsoleLogger() consoleLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Logger.Console
}
// SetRegion set new region.
func (s *serverConfigV13) SetRegion(region string) {
serverConfigMu.Lock()

View File

@ -40,54 +40,54 @@ func TestServerConfig(t *testing.T) {
}
// Set new amqp notification id.
serverConfig.SetAMQPNotifyByID("2", amqpNotify{})
savedNotifyCfg1 := serverConfig.GetAMQPNotifyByID("2")
serverConfig.Notify.SetAMQPByID("2", amqpNotify{})
savedNotifyCfg1 := serverConfig.Notify.GetAMQPByID("2")
if !reflect.DeepEqual(savedNotifyCfg1, amqpNotify{}) {
t.Errorf("Expecting AMQP config %#v found %#v", amqpNotify{}, savedNotifyCfg1)
}
// Set new elastic search notification id.
serverConfig.SetElasticSearchNotifyByID("2", elasticSearchNotify{})
savedNotifyCfg2 := serverConfig.GetElasticSearchNotifyByID("2")
serverConfig.Notify.SetElasticSearchByID("2", elasticSearchNotify{})
savedNotifyCfg2 := serverConfig.Notify.GetElasticSearchByID("2")
if !reflect.DeepEqual(savedNotifyCfg2, elasticSearchNotify{}) {
t.Errorf("Expecting Elasticsearch config %#v found %#v", elasticSearchNotify{}, savedNotifyCfg2)
}
// Set new redis notification id.
serverConfig.SetRedisNotifyByID("2", redisNotify{})
savedNotifyCfg3 := serverConfig.GetRedisNotifyByID("2")
serverConfig.Notify.SetRedisByID("2", redisNotify{})
savedNotifyCfg3 := serverConfig.Notify.GetRedisByID("2")
if !reflect.DeepEqual(savedNotifyCfg3, redisNotify{}) {
t.Errorf("Expecting Redis config %#v found %#v", redisNotify{}, savedNotifyCfg3)
}
// Set new kafka notification id.
serverConfig.SetKafkaNotifyByID("2", kafkaNotify{})
savedNotifyCfg4 := serverConfig.GetKafkaNotifyByID("2")
serverConfig.Notify.SetKafkaByID("2", kafkaNotify{})
savedNotifyCfg4 := serverConfig.Notify.GetKafkaByID("2")
if !reflect.DeepEqual(savedNotifyCfg4, kafkaNotify{}) {
t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4)
}
// Set new Webhook notification id.
serverConfig.SetWebhookNotifyByID("2", webhookNotify{})
savedNotifyCfg5 := serverConfig.GetWebhookNotifyByID("2")
serverConfig.Notify.SetWebhookByID("2", webhookNotify{})
savedNotifyCfg5 := serverConfig.Notify.GetWebhookByID("2")
if !reflect.DeepEqual(savedNotifyCfg5, webhookNotify{}) {
t.Errorf("Expecting Webhook config %#v found %#v", webhookNotify{}, savedNotifyCfg3)
}
// Set new console logger.
serverConfig.SetConsoleLogger(consoleLogger{
serverConfig.Logger.SetConsole(consoleLogger{
Enable: true,
})
consoleCfg := serverConfig.GetConsoleLogger()
consoleCfg := serverConfig.Logger.GetConsole()
if !reflect.DeepEqual(consoleCfg, consoleLogger{Enable: true}) {
t.Errorf("Expecting console logger config %#v found %#v", consoleLogger{Enable: true}, consoleCfg)
}
// Set new file logger.
serverConfig.SetFileLogger(fileLogger{
serverConfig.Logger.SetFile(fileLogger{
Enable: true,
})
fileCfg := serverConfig.GetFileLogger()
fileCfg := serverConfig.Logger.GetFile()
if !reflect.DeepEqual(fileCfg, fileLogger{Enable: true}) {
t.Errorf("Expecting file logger config %#v found %#v", fileLogger{Enable: true}, consoleCfg)
}

View File

@ -531,7 +531,7 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon
func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
queueTargets := make(map[string]*logrus.Logger)
// Load all amqp targets, initialize their respective loggers.
for accountID, amqpN := range serverConfig.GetAMQP() {
for accountID, amqpN := range serverConfig.Notify.GetAMQP() {
if !amqpN.Enable {
continue
}
@ -558,7 +558,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
queueTargets[queueARN] = amqpLog
}
// Load all nats targets, initialize their respective loggers.
for accountID, natsN := range serverConfig.GetNATS() {
for accountID, natsN := range serverConfig.Notify.GetNATS() {
if !natsN.Enable {
continue
}
@ -586,7 +586,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
// Load redis targets, initialize their respective loggers.
for accountID, redisN := range serverConfig.GetRedis() {
for accountID, redisN := range serverConfig.Notify.GetRedis() {
if !redisN.Enable {
continue
}
@ -614,7 +614,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
// Load Webhook targets, initialize their respective loggers.
for accountID, webhookN := range serverConfig.GetWebhook() {
for accountID, webhookN := range serverConfig.Notify.GetWebhook() {
if !webhookN.Enable {
continue
}
@ -635,7 +635,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
// Load elastic targets, initialize their respective loggers.
for accountID, elasticN := range serverConfig.GetElasticSearch() {
for accountID, elasticN := range serverConfig.Notify.GetElasticSearch() {
if !elasticN.Enable {
continue
}
@ -661,7 +661,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
// Load PostgreSQL targets, initialize their respective loggers.
for accountID, pgN := range serverConfig.GetPostgreSQL() {
for accountID, pgN := range serverConfig.Notify.GetPostgreSQL() {
if !pgN.Enable {
continue
}
@ -686,7 +686,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
queueTargets[queueARN] = pgLog
}
// Load Kafka targets, initialize their respective loggers.
for accountID, kafkaN := range serverConfig.GetKafka() {
for accountID, kafkaN := range serverConfig.Notify.GetKafka() {
if !kafkaN.Enable {
continue
}

View File

@ -106,7 +106,7 @@ func TestInitEventNotifierWithPostgreSQL(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetPostgreSQLNotifyByID("1", postgreSQLNotify{Enable: true})
serverConfig.Notify.SetPostgreSQLByID("1", postgreSQLNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("PostgreSQL config didn't fail.")
}
@ -137,7 +137,7 @@ func TestInitEventNotifierWithNATS(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetNATSNotifyByID("1", natsNotify{Enable: true})
serverConfig.Notify.SetNATSByID("1", natsNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("NATS config didn't fail.")
}
@ -168,7 +168,7 @@ func TestInitEventNotifierWithWebHook(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetWebhookNotifyByID("1", webhookNotify{Enable: true})
serverConfig.Notify.SetWebhookByID("1", webhookNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("WebHook config didn't fail.")
}
@ -199,7 +199,7 @@ func TestInitEventNotifierWithAMQP(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true})
serverConfig.Notify.SetAMQPByID("1", amqpNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("AMQP config didn't fail.")
}
@ -230,7 +230,7 @@ func TestInitEventNotifierWithElasticSearch(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true})
serverConfig.Notify.SetElasticSearchByID("1", elasticSearchNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("ElasticSearch config didn't fail.")
}
@ -261,7 +261,7 @@ func TestInitEventNotifierWithRedis(t *testing.T) {
t.Fatal("Unable to initialize FS backend.", err)
}
serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true})
serverConfig.Notify.SetRedisByID("1", redisNotify{Enable: true})
if err := initEventNotifier(fs); err == nil {
t.Fatal("Redis config didn't fail.")
}

View File

@ -26,7 +26,7 @@ type consoleLogger struct {
// enable console logger.
func enableConsoleLogger() {
clogger := serverConfig.GetConsoleLogger()
clogger := serverConfig.Logger.GetConsole()
if !clogger.Enable {
return
}

View File

@ -35,7 +35,7 @@ type localFile struct {
}
func enableFileLogger() {
flogger := serverConfig.GetFileLogger()
flogger := serverConfig.Logger.GetFile()
if !flogger.Enable || flogger.Filename == "" {
return
}

View File

@ -39,11 +39,41 @@ var log = struct {
// - console [default]
// - file
type logger struct {
sync.RWMutex
Console consoleLogger `json:"console"`
File fileLogger `json:"file"`
// Add new loggers here.
}
/// Logger related.
// SetFile set new file logger.
func (l *logger) SetFile(flogger fileLogger) {
l.Lock()
defer l.Unlock()
l.File = flogger
}
// GetFileLogger get current file logger.
func (l *logger) GetFile() fileLogger {
l.RLock()
defer l.RUnlock()
return l.File
}
// SetConsole set new console logger.
func (l *logger) SetConsole(clogger consoleLogger) {
l.Lock()
defer l.Unlock()
l.Console = clogger
}
func (l *logger) GetConsole() consoleLogger {
l.RLock()
defer l.RUnlock()
return l.Console
}
// Get file, line, function name of the caller.
func callerSource() string {
pc, file, line, success := runtime.Caller(2)

228
cmd/notifier-config.go Normal file
View File

@ -0,0 +1,228 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "sync"
// Notifier represents collection of supported notification queues.
type notifier struct {
sync.RWMutex
AMQP amqpConfigs `json:"amqp"`
NATS natsConfigs `json:"nats"`
ElasticSearch elasticSearchConfigs `json:"elasticsearch"`
Redis redisConfigs `json:"redis"`
PostgreSQL postgreSQLConfigs `json:"postgresql"`
Kafka kafkaConfigs `json:"kafka"`
Webhook webhookConfigs `json:"webhook"`
// Add new notification queues.
}
type amqpConfigs map[string]amqpNotify
func (a amqpConfigs) Clone() amqpConfigs {
a2 := make(amqpConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type natsConfigs map[string]natsNotify
func (a natsConfigs) Clone() natsConfigs {
a2 := make(natsConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type elasticSearchConfigs map[string]elasticSearchNotify
func (a elasticSearchConfigs) Clone() elasticSearchConfigs {
a2 := make(elasticSearchConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type redisConfigs map[string]redisNotify
func (a redisConfigs) Clone() redisConfigs {
a2 := make(redisConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type postgreSQLConfigs map[string]postgreSQLNotify
func (a postgreSQLConfigs) Clone() postgreSQLConfigs {
a2 := make(postgreSQLConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type kafkaConfigs map[string]kafkaNotify
func (a kafkaConfigs) Clone() kafkaConfigs {
a2 := make(kafkaConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
type webhookConfigs map[string]webhookNotify
func (a webhookConfigs) Clone() webhookConfigs {
a2 := make(webhookConfigs, len(a))
for k, v := range a {
a2[k] = v
}
return a2
}
func (n *notifier) SetAMQPByID(accountID string, amqpn amqpNotify) {
n.Lock()
defer n.Unlock()
n.AMQP[accountID] = amqpn
}
func (n *notifier) GetAMQP() map[string]amqpNotify {
n.RLock()
defer n.RUnlock()
return n.AMQP.Clone()
}
func (n *notifier) GetAMQPByID(accountID string) amqpNotify {
n.RLock()
defer n.RUnlock()
return n.AMQP[accountID]
}
func (n *notifier) SetNATSByID(accountID string, natsn natsNotify) {
n.Lock()
defer n.Unlock()
n.NATS[accountID] = natsn
}
func (n *notifier) GetNATS() map[string]natsNotify {
n.RLock()
defer n.RUnlock()
return n.NATS.Clone()
}
func (n *notifier) GetNATSByID(accountID string) natsNotify {
n.RLock()
defer n.RUnlock()
return n.NATS[accountID]
}
func (n *notifier) SetElasticSearchByID(accountID string, es elasticSearchNotify) {
n.Lock()
defer n.Unlock()
n.ElasticSearch[accountID] = es
}
func (n *notifier) GetElasticSearchByID(accountID string) elasticSearchNotify {
n.RLock()
defer n.RUnlock()
return n.ElasticSearch[accountID]
}
func (n *notifier) GetElasticSearch() map[string]elasticSearchNotify {
n.RLock()
defer n.RUnlock()
return n.ElasticSearch.Clone()
}
func (n *notifier) SetRedisByID(accountID string, r redisNotify) {
n.Lock()
defer n.Unlock()
n.Redis[accountID] = r
}
func (n *notifier) GetRedis() map[string]redisNotify {
n.RLock()
defer n.RUnlock()
return n.Redis.Clone()
}
func (n *notifier) GetRedisByID(accountID string) redisNotify {
n.RLock()
defer n.RUnlock()
return n.Redis[accountID]
}
func (n *notifier) GetWebhook() map[string]webhookNotify {
n.RLock()
defer n.RUnlock()
return n.Webhook.Clone()
}
func (n *notifier) GetWebhookByID(accountID string) webhookNotify {
n.RLock()
defer n.RUnlock()
return n.Webhook[accountID]
}
func (n *notifier) SetWebhookByID(accountID string, pgn webhookNotify) {
n.Lock()
defer n.Unlock()
n.Webhook[accountID] = pgn
}
func (n *notifier) SetPostgreSQLByID(accountID string, pgn postgreSQLNotify) {
n.Lock()
defer n.Unlock()
n.PostgreSQL[accountID] = pgn
}
func (n *notifier) GetPostgreSQL() map[string]postgreSQLNotify {
n.RLock()
defer n.RUnlock()
return n.PostgreSQL.Clone()
}
func (n *notifier) GetPostgreSQLByID(accountID string) postgreSQLNotify {
n.RLock()
defer n.RUnlock()
return n.PostgreSQL[accountID]
}
func (n *notifier) SetKafkaByID(accountID string, kn kafkaNotify) {
n.Lock()
defer n.Unlock()
n.Kafka[accountID] = kn
}
func (n *notifier) GetKafka() map[string]kafkaNotify {
n.RLock()
defer n.RUnlock()
return n.Kafka.Clone()
}
func (n *notifier) GetKafkaByID(accountID string) kafkaNotify {
n.RLock()
defer n.RUnlock()
return n.Kafka[accountID]
}

View File

@ -0,0 +1,17 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd

View File

@ -54,24 +54,12 @@ const (
var errNotifyNotEnabled = errors.New("requested notifier not enabled")
// Notifier represents collection of supported notification queues.
type notifier struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotify `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
Webhook map[string]webhookNotify `json:"webhook"`
// Add new notification queues.
}
// Returns true if queueArn is for an AMQP queue.
func isAMQPQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeAMQP {
return false
}
amqpL := serverConfig.GetAMQPNotifyByID(sqsArn.AccountID)
amqpL := serverConfig.Notify.GetAMQPByID(sqsArn.AccountID)
if !amqpL.Enable {
return false
}
@ -90,7 +78,7 @@ func isNATSQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeNATS {
return false
}
natsL := serverConfig.GetNATSNotifyByID(sqsArn.AccountID)
natsL := serverConfig.Notify.GetNATSByID(sqsArn.AccountID)
if !natsL.Enable {
return false
}
@ -109,7 +97,7 @@ func isWebhookQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeWebhook {
return false
}
rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID)
rNotify := serverConfig.Notify.GetWebhookByID(sqsArn.AccountID)
if !rNotify.Enable {
return false
}
@ -121,7 +109,7 @@ func isRedisQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeRedis {
return false
}
rNotify := serverConfig.GetRedisNotifyByID(sqsArn.AccountID)
rNotify := serverConfig.Notify.GetRedisByID(sqsArn.AccountID)
if !rNotify.Enable {
return false
}
@ -140,7 +128,7 @@ func isElasticQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeElastic {
return false
}
esNotify := serverConfig.GetElasticSearchNotifyByID(sqsArn.AccountID)
esNotify := serverConfig.Notify.GetElasticSearchByID(sqsArn.AccountID)
if !esNotify.Enable {
return false
}
@ -158,7 +146,7 @@ func isPostgreSQLQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypePostgreSQL {
return false
}
pgNotify := serverConfig.GetPostgreSQLNotifyByID(sqsArn.AccountID)
pgNotify := serverConfig.Notify.GetPostgreSQLByID(sqsArn.AccountID)
if !pgNotify.Enable {
return false
}
@ -176,7 +164,7 @@ func isKafkaQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeKafka {
return false
}
kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(sqsArn.AccountID)
kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(sqsArn.AccountID)
if !kafkaNotifyCfg.Enable {
return false
}

View File

@ -59,7 +59,7 @@ func dialAMQP(amqpL amqpNotify) (amqpConn, error) {
}
func newAMQPNotify(accountID string) (*logrus.Logger, error) {
amqpL := serverConfig.GetAMQPNotifyByID(accountID)
amqpL := serverConfig.Notify.GetAMQPByID(accountID)
// Connect to amqp server.
amqpC, err := dialAMQP(amqpL)

View File

@ -55,7 +55,7 @@ func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) {
}
func newElasticNotify(accountID string) (*logrus.Logger, error) {
esNotify := serverConfig.GetElasticSearchNotifyByID(accountID)
esNotify := serverConfig.Notify.GetElasticSearchByID(accountID)
// Dial to elastic search.
client, err := dialElastic(esNotify)

View File

@ -75,7 +75,7 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) {
}
func newKafkaNotify(accountID string) (*logrus.Logger, error) {
kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(accountID)
kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(accountID)
// Try connecting to the configured Kafka broker(s).
kc, err := dialKafka(kafkaNotifyCfg)

View File

@ -127,7 +127,7 @@ func closeNATS(conn natsIOConn) {
}
func newNATSNotify(accountID string) (*logrus.Logger, error) {
natsL := serverConfig.GetNATSNotifyByID(accountID)
natsL := serverConfig.Notify.GetNATSByID(accountID)
// Connect to nats server.
natsC, err := dialNATS(natsL, false)

View File

@ -174,7 +174,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) {
}
func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) {
pgNotify := serverConfig.GetPostgreSQLNotifyByID(accountID)
pgNotify := serverConfig.Notify.GetPostgreSQLByID(accountID)
// Dial postgres
pgC, err := dialPostgreSQL(pgNotify)

View File

@ -83,7 +83,7 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) {
}
func newRedisNotify(accountID string) (*logrus.Logger, error) {
rNotify := serverConfig.GetRedisNotifyByID(accountID)
rNotify := serverConfig.Notify.GetRedisByID(accountID)
// Dial redis.
rPool, err := dialRedis(rNotify)

View File

@ -52,7 +52,7 @@ func lookupEndpoint(u *url.URL) error {
// Initializes new webhook logrus notifier.
func newWebhookNotify(accountID string) (*logrus.Logger, error) {
rNotify := serverConfig.GetWebhookNotifyByID(accountID)
rNotify := serverConfig.Notify.GetWebhookByID(accountID)
if rNotify.Endpoint == "" {
return nil, errInvalidArgument

View File

@ -51,13 +51,13 @@ func TestNewWebHookNotify(t *testing.T) {
t.Fatal("Unexpected should fail")
}
serverConfig.SetWebhookNotifyByID("10", webhookNotify{Enable: true, Endpoint: "http://www."})
serverConfig.Notify.SetWebhookByID("10", webhookNotify{Enable: true, Endpoint: "http://www."})
_, err = newWebhookNotify("10")
if err == nil {
t.Fatal("Unexpected should fail with lookupHost")
}
serverConfig.SetWebhookNotifyByID("15", webhookNotify{Enable: true, Endpoint: "http://%"})
serverConfig.Notify.SetWebhookByID("15", webhookNotify{Enable: true, Endpoint: "http://%"})
_, err = newWebhookNotify("15")
if err == nil {
t.Fatal("Unexpected should fail with invalid URL escape")
@ -66,7 +66,7 @@ func TestNewWebHookNotify(t *testing.T) {
server := httptest.NewServer(postHandler{})
defer server.Close()
serverConfig.SetWebhookNotifyByID("20", webhookNotify{Enable: true, Endpoint: server.URL})
serverConfig.Notify.SetWebhookByID("20", webhookNotify{Enable: true, Endpoint: server.URL})
webhook, err := newWebhookNotify("20")
if err != nil {
t.Fatal("Unexpected shouldn't fail", err)