diff --git a/cmd/bucket-notification-utils.go b/cmd/bucket-notification-utils.go index 2c0e735f5..8845c5cdb 100644 --- a/cmd/bucket-notification-utils.go +++ b/cmd/bucket-notification-utils.go @@ -133,27 +133,27 @@ func isValidQueueID(queueARN string) bool { // Is Queue identifier valid?. if isAMQPQueue(sqsARN) { // AMQP eueue. - amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID) + amqpN := serverConfig.Notify.GetAMQPByID(sqsARN.AccountID) return amqpN.Enable && amqpN.URL != "" } else if isNATSQueue(sqsARN) { - natsN := serverConfig.GetNATSNotifyByID(sqsARN.AccountID) + natsN := serverConfig.Notify.GetNATSByID(sqsARN.AccountID) return natsN.Enable && natsN.Address != "" } else if isElasticQueue(sqsARN) { // Elastic queue. - elasticN := serverConfig.GetElasticSearchNotifyByID(sqsARN.AccountID) + elasticN := serverConfig.Notify.GetElasticSearchByID(sqsARN.AccountID) return elasticN.Enable && elasticN.URL != "" } else if isRedisQueue(sqsARN) { // Redis queue. - redisN := serverConfig.GetRedisNotifyByID(sqsARN.AccountID) + redisN := serverConfig.Notify.GetRedisByID(sqsARN.AccountID) return redisN.Enable && redisN.Addr != "" } else if isPostgreSQLQueue(sqsARN) { - pgN := serverConfig.GetPostgreSQLNotifyByID(sqsARN.AccountID) + pgN := serverConfig.Notify.GetPostgreSQLByID(sqsARN.AccountID) // Postgres can work with only default conn. info. return pgN.Enable } else if isKafkaQueue(sqsARN) { - kafkaN := serverConfig.GetKafkaNotifyByID(sqsARN.AccountID) + kafkaN := serverConfig.Notify.GetKafkaByID(sqsARN.AccountID) return (kafkaN.Enable && len(kafkaN.Brokers) > 0 && kafkaN.Topic != "") } else if isWebhookQueue(sqsARN) { - webhookN := serverConfig.GetWebhookNotifyByID(sqsARN.AccountID) + webhookN := serverConfig.Notify.GetWebhookByID(sqsARN.AccountID) return webhookN.Enable && webhookN.Endpoint != "" } return false diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index f153f17ff..d6a07b045 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -855,7 +855,10 @@ func migrateV12ToV13() error { } // Copy over fields from V12 into V13 config struct - srvConfig := &serverConfigV13{} + srvConfig := &serverConfigV13{ + Logger: &logger{}, + Notify: ¬ifier{}, + } srvConfig.Version = "13" srvConfig.Credential = cv12.Credential srvConfig.Region = cv12.Region diff --git a/cmd/config-v13.go b/cmd/config-v13.go index 8e66c5f70..f1ee32b90 100644 --- a/cmd/config-v13.go +++ b/cmd/config-v13.go @@ -36,10 +36,10 @@ type serverConfigV13 struct { Region string `json:"region"` // Additional error logging configuration. - Logger logger `json:"logger"` + Logger *logger `json:"logger"` // Notification queue configuration. - Notify notifier `json:"notify"` + Notify *notifier `json:"notify"` } // newConfig - initialize a new server config, saves creds from env @@ -47,7 +47,10 @@ type serverConfigV13 struct { // and those are saved. func newConfig(envCreds credential) error { // Initialize server config. - srvCfg := &serverConfigV13{} + srvCfg := &serverConfigV13{ + Logger: &logger{}, + Notify: ¬ifier{}, + } srvCfg.Version = globalMinioConfigVersion srvCfg.Region = globalMinioDefaultRegion @@ -148,193 +151,6 @@ func (s serverConfigV13) GetVersion() string { return s.Version } -/// Logger related. - -func (s *serverConfigV13) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.AMQP[accountID] = amqpn -} - -func (s serverConfigV13) GetAMQP() map[string]amqpNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.AMQP -} - -// GetAMQPNotify get current AMQP logger. -func (s serverConfigV13) GetAMQPNotifyByID(accountID string) amqpNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.AMQP[accountID] -} - -// -func (s *serverConfigV13) SetNATSNotifyByID(accountID string, natsn natsNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.NATS[accountID] = natsn -} - -func (s serverConfigV13) GetNATS() map[string]natsNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - return s.Notify.NATS -} - -// GetNATSNotify get current NATS logger. -func (s serverConfigV13) GetNATSNotifyByID(accountID string) natsNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.NATS[accountID] -} - -func (s *serverConfigV13) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.ElasticSearch[accountID] = esNotify -} - -func (s serverConfigV13) GetElasticSearch() map[string]elasticSearchNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.ElasticSearch -} - -// GetElasticSearchNotify get current ElasicSearch logger. -func (s serverConfigV13) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.ElasticSearch[accountID] -} - -func (s *serverConfigV13) SetRedisNotifyByID(accountID string, rNotify redisNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.Redis[accountID] = rNotify -} - -func (s serverConfigV13) GetRedis() map[string]redisNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Redis -} - -func (s serverConfigV13) GetWebhook() map[string]webhookNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Webhook -} - -// GetWebhookNotifyByID get current Webhook logger. -func (s serverConfigV13) GetWebhookNotifyByID(accountID string) webhookNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Webhook[accountID] -} - -func (s *serverConfigV13) SetWebhookNotifyByID(accountID string, pgn webhookNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.Webhook[accountID] = pgn -} - -// GetRedisNotify get current Redis logger. -func (s serverConfigV13) GetRedisNotifyByID(accountID string) redisNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Redis[accountID] -} - -func (s *serverConfigV13) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.PostgreSQL[accountID] = pgn -} - -func (s serverConfigV13) GetPostgreSQL() map[string]postgreSQLNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.PostgreSQL -} - -func (s serverConfigV13) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.PostgreSQL[accountID] -} - -// Kafka related functions -func (s *serverConfigV13) SetKafkaNotifyByID(accountID string, kn kafkaNotify) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Notify.Kafka[accountID] = kn -} - -func (s serverConfigV13) GetKafka() map[string]kafkaNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Kafka -} - -func (s serverConfigV13) GetKafkaNotifyByID(accountID string) kafkaNotify { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Notify.Kafka[accountID] -} - -// SetFileLogger set new file logger. -func (s *serverConfigV13) SetFileLogger(flogger fileLogger) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Logger.File = flogger -} - -// GetFileLogger get current file logger. -func (s serverConfigV13) GetFileLogger() fileLogger { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Logger.File -} - -// SetConsoleLogger set new console logger. -func (s *serverConfigV13) SetConsoleLogger(clogger consoleLogger) { - serverConfigMu.Lock() - defer serverConfigMu.Unlock() - - s.Logger.Console = clogger -} - -// GetConsoleLogger get current console logger. -func (s serverConfigV13) GetConsoleLogger() consoleLogger { - serverConfigMu.RLock() - defer serverConfigMu.RUnlock() - - return s.Logger.Console -} - // SetRegion set new region. func (s *serverConfigV13) SetRegion(region string) { serverConfigMu.Lock() diff --git a/cmd/config-v13_test.go b/cmd/config-v13_test.go index b620b00ff..bd505c743 100644 --- a/cmd/config-v13_test.go +++ b/cmd/config-v13_test.go @@ -40,54 +40,54 @@ func TestServerConfig(t *testing.T) { } // Set new amqp notification id. - serverConfig.SetAMQPNotifyByID("2", amqpNotify{}) - savedNotifyCfg1 := serverConfig.GetAMQPNotifyByID("2") + serverConfig.Notify.SetAMQPByID("2", amqpNotify{}) + savedNotifyCfg1 := serverConfig.Notify.GetAMQPByID("2") if !reflect.DeepEqual(savedNotifyCfg1, amqpNotify{}) { t.Errorf("Expecting AMQP config %#v found %#v", amqpNotify{}, savedNotifyCfg1) } // Set new elastic search notification id. - serverConfig.SetElasticSearchNotifyByID("2", elasticSearchNotify{}) - savedNotifyCfg2 := serverConfig.GetElasticSearchNotifyByID("2") + serverConfig.Notify.SetElasticSearchByID("2", elasticSearchNotify{}) + savedNotifyCfg2 := serverConfig.Notify.GetElasticSearchByID("2") if !reflect.DeepEqual(savedNotifyCfg2, elasticSearchNotify{}) { t.Errorf("Expecting Elasticsearch config %#v found %#v", elasticSearchNotify{}, savedNotifyCfg2) } // Set new redis notification id. - serverConfig.SetRedisNotifyByID("2", redisNotify{}) - savedNotifyCfg3 := serverConfig.GetRedisNotifyByID("2") + serverConfig.Notify.SetRedisByID("2", redisNotify{}) + savedNotifyCfg3 := serverConfig.Notify.GetRedisByID("2") if !reflect.DeepEqual(savedNotifyCfg3, redisNotify{}) { t.Errorf("Expecting Redis config %#v found %#v", redisNotify{}, savedNotifyCfg3) } // Set new kafka notification id. - serverConfig.SetKafkaNotifyByID("2", kafkaNotify{}) - savedNotifyCfg4 := serverConfig.GetKafkaNotifyByID("2") + serverConfig.Notify.SetKafkaByID("2", kafkaNotify{}) + savedNotifyCfg4 := serverConfig.Notify.GetKafkaByID("2") if !reflect.DeepEqual(savedNotifyCfg4, kafkaNotify{}) { t.Errorf("Expecting Kafka config %#v found %#v", kafkaNotify{}, savedNotifyCfg4) } // Set new Webhook notification id. - serverConfig.SetWebhookNotifyByID("2", webhookNotify{}) - savedNotifyCfg5 := serverConfig.GetWebhookNotifyByID("2") + serverConfig.Notify.SetWebhookByID("2", webhookNotify{}) + savedNotifyCfg5 := serverConfig.Notify.GetWebhookByID("2") if !reflect.DeepEqual(savedNotifyCfg5, webhookNotify{}) { t.Errorf("Expecting Webhook config %#v found %#v", webhookNotify{}, savedNotifyCfg3) } // Set new console logger. - serverConfig.SetConsoleLogger(consoleLogger{ + serverConfig.Logger.SetConsole(consoleLogger{ Enable: true, }) - consoleCfg := serverConfig.GetConsoleLogger() + consoleCfg := serverConfig.Logger.GetConsole() if !reflect.DeepEqual(consoleCfg, consoleLogger{Enable: true}) { t.Errorf("Expecting console logger config %#v found %#v", consoleLogger{Enable: true}, consoleCfg) } // Set new file logger. - serverConfig.SetFileLogger(fileLogger{ + serverConfig.Logger.SetFile(fileLogger{ Enable: true, }) - fileCfg := serverConfig.GetFileLogger() + fileCfg := serverConfig.Logger.GetFile() if !reflect.DeepEqual(fileCfg, fileLogger{Enable: true}) { t.Errorf("Expecting file logger config %#v found %#v", fileLogger{Enable: true}, consoleCfg) } diff --git a/cmd/event-notifier.go b/cmd/event-notifier.go index fa3eb5b7b..8ebc5462d 100644 --- a/cmd/event-notifier.go +++ b/cmd/event-notifier.go @@ -531,7 +531,7 @@ func loadAllBucketNotifications(objAPI ObjectLayer) (map[string]*notificationCon func loadAllQueueTargets() (map[string]*logrus.Logger, error) { queueTargets := make(map[string]*logrus.Logger) // Load all amqp targets, initialize their respective loggers. - for accountID, amqpN := range serverConfig.GetAMQP() { + for accountID, amqpN := range serverConfig.Notify.GetAMQP() { if !amqpN.Enable { continue } @@ -558,7 +558,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { queueTargets[queueARN] = amqpLog } // Load all nats targets, initialize their respective loggers. - for accountID, natsN := range serverConfig.GetNATS() { + for accountID, natsN := range serverConfig.Notify.GetNATS() { if !natsN.Enable { continue } @@ -586,7 +586,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } // Load redis targets, initialize their respective loggers. - for accountID, redisN := range serverConfig.GetRedis() { + for accountID, redisN := range serverConfig.Notify.GetRedis() { if !redisN.Enable { continue } @@ -614,7 +614,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } // Load Webhook targets, initialize their respective loggers. - for accountID, webhookN := range serverConfig.GetWebhook() { + for accountID, webhookN := range serverConfig.Notify.GetWebhook() { if !webhookN.Enable { continue } @@ -635,7 +635,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } // Load elastic targets, initialize their respective loggers. - for accountID, elasticN := range serverConfig.GetElasticSearch() { + for accountID, elasticN := range serverConfig.Notify.GetElasticSearch() { if !elasticN.Enable { continue } @@ -661,7 +661,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { } // Load PostgreSQL targets, initialize their respective loggers. - for accountID, pgN := range serverConfig.GetPostgreSQL() { + for accountID, pgN := range serverConfig.Notify.GetPostgreSQL() { if !pgN.Enable { continue } @@ -686,7 +686,7 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) { queueTargets[queueARN] = pgLog } // Load Kafka targets, initialize their respective loggers. - for accountID, kafkaN := range serverConfig.GetKafka() { + for accountID, kafkaN := range serverConfig.Notify.GetKafka() { if !kafkaN.Enable { continue } diff --git a/cmd/event-notifier_test.go b/cmd/event-notifier_test.go index 0b52f65da..415350c24 100644 --- a/cmd/event-notifier_test.go +++ b/cmd/event-notifier_test.go @@ -106,7 +106,7 @@ func TestInitEventNotifierWithPostgreSQL(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetPostgreSQLNotifyByID("1", postgreSQLNotify{Enable: true}) + serverConfig.Notify.SetPostgreSQLByID("1", postgreSQLNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("PostgreSQL config didn't fail.") } @@ -137,7 +137,7 @@ func TestInitEventNotifierWithNATS(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetNATSNotifyByID("1", natsNotify{Enable: true}) + serverConfig.Notify.SetNATSByID("1", natsNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("NATS config didn't fail.") } @@ -168,7 +168,7 @@ func TestInitEventNotifierWithWebHook(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetWebhookNotifyByID("1", webhookNotify{Enable: true}) + serverConfig.Notify.SetWebhookByID("1", webhookNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("WebHook config didn't fail.") } @@ -199,7 +199,7 @@ func TestInitEventNotifierWithAMQP(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetAMQPNotifyByID("1", amqpNotify{Enable: true}) + serverConfig.Notify.SetAMQPByID("1", amqpNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("AMQP config didn't fail.") } @@ -230,7 +230,7 @@ func TestInitEventNotifierWithElasticSearch(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetElasticSearchNotifyByID("1", elasticSearchNotify{Enable: true}) + serverConfig.Notify.SetElasticSearchByID("1", elasticSearchNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("ElasticSearch config didn't fail.") } @@ -261,7 +261,7 @@ func TestInitEventNotifierWithRedis(t *testing.T) { t.Fatal("Unable to initialize FS backend.", err) } - serverConfig.SetRedisNotifyByID("1", redisNotify{Enable: true}) + serverConfig.Notify.SetRedisByID("1", redisNotify{Enable: true}) if err := initEventNotifier(fs); err == nil { t.Fatal("Redis config didn't fail.") } diff --git a/cmd/logger-console-hook.go b/cmd/logger-console-hook.go index 653b6ff04..fdc5264e8 100644 --- a/cmd/logger-console-hook.go +++ b/cmd/logger-console-hook.go @@ -26,7 +26,7 @@ type consoleLogger struct { // enable console logger. func enableConsoleLogger() { - clogger := serverConfig.GetConsoleLogger() + clogger := serverConfig.Logger.GetConsole() if !clogger.Enable { return } diff --git a/cmd/logger-file-hook.go b/cmd/logger-file-hook.go index a0aad40a5..c7b5f6652 100644 --- a/cmd/logger-file-hook.go +++ b/cmd/logger-file-hook.go @@ -35,7 +35,7 @@ type localFile struct { } func enableFileLogger() { - flogger := serverConfig.GetFileLogger() + flogger := serverConfig.Logger.GetFile() if !flogger.Enable || flogger.Filename == "" { return } diff --git a/cmd/logger.go b/cmd/logger.go index 733e5606e..4aba14543 100644 --- a/cmd/logger.go +++ b/cmd/logger.go @@ -39,11 +39,41 @@ var log = struct { // - console [default] // - file type logger struct { + sync.RWMutex Console consoleLogger `json:"console"` File fileLogger `json:"file"` // Add new loggers here. } +/// Logger related. + +// SetFile set new file logger. +func (l *logger) SetFile(flogger fileLogger) { + l.Lock() + defer l.Unlock() + l.File = flogger +} + +// GetFileLogger get current file logger. +func (l *logger) GetFile() fileLogger { + l.RLock() + defer l.RUnlock() + return l.File +} + +// SetConsole set new console logger. +func (l *logger) SetConsole(clogger consoleLogger) { + l.Lock() + defer l.Unlock() + l.Console = clogger +} + +func (l *logger) GetConsole() consoleLogger { + l.RLock() + defer l.RUnlock() + return l.Console +} + // Get file, line, function name of the caller. func callerSource() string { pc, file, line, success := runtime.Caller(2) diff --git a/cmd/notifier-config.go b/cmd/notifier-config.go new file mode 100644 index 000000000..2c9728158 --- /dev/null +++ b/cmd/notifier-config.go @@ -0,0 +1,228 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import "sync" + +// Notifier represents collection of supported notification queues. +type notifier struct { + sync.RWMutex + AMQP amqpConfigs `json:"amqp"` + NATS natsConfigs `json:"nats"` + ElasticSearch elasticSearchConfigs `json:"elasticsearch"` + Redis redisConfigs `json:"redis"` + PostgreSQL postgreSQLConfigs `json:"postgresql"` + Kafka kafkaConfigs `json:"kafka"` + Webhook webhookConfigs `json:"webhook"` + // Add new notification queues. +} + +type amqpConfigs map[string]amqpNotify + +func (a amqpConfigs) Clone() amqpConfigs { + a2 := make(amqpConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type natsConfigs map[string]natsNotify + +func (a natsConfigs) Clone() natsConfigs { + a2 := make(natsConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type elasticSearchConfigs map[string]elasticSearchNotify + +func (a elasticSearchConfigs) Clone() elasticSearchConfigs { + a2 := make(elasticSearchConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type redisConfigs map[string]redisNotify + +func (a redisConfigs) Clone() redisConfigs { + a2 := make(redisConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type postgreSQLConfigs map[string]postgreSQLNotify + +func (a postgreSQLConfigs) Clone() postgreSQLConfigs { + a2 := make(postgreSQLConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type kafkaConfigs map[string]kafkaNotify + +func (a kafkaConfigs) Clone() kafkaConfigs { + a2 := make(kafkaConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +type webhookConfigs map[string]webhookNotify + +func (a webhookConfigs) Clone() webhookConfigs { + a2 := make(webhookConfigs, len(a)) + for k, v := range a { + a2[k] = v + } + return a2 +} + +func (n *notifier) SetAMQPByID(accountID string, amqpn amqpNotify) { + n.Lock() + defer n.Unlock() + n.AMQP[accountID] = amqpn +} + +func (n *notifier) GetAMQP() map[string]amqpNotify { + n.RLock() + defer n.RUnlock() + return n.AMQP.Clone() +} + +func (n *notifier) GetAMQPByID(accountID string) amqpNotify { + n.RLock() + defer n.RUnlock() + return n.AMQP[accountID] +} + +func (n *notifier) SetNATSByID(accountID string, natsn natsNotify) { + n.Lock() + defer n.Unlock() + n.NATS[accountID] = natsn +} + +func (n *notifier) GetNATS() map[string]natsNotify { + n.RLock() + defer n.RUnlock() + return n.NATS.Clone() +} + +func (n *notifier) GetNATSByID(accountID string) natsNotify { + n.RLock() + defer n.RUnlock() + return n.NATS[accountID] +} + +func (n *notifier) SetElasticSearchByID(accountID string, es elasticSearchNotify) { + n.Lock() + defer n.Unlock() + n.ElasticSearch[accountID] = es +} + +func (n *notifier) GetElasticSearchByID(accountID string) elasticSearchNotify { + n.RLock() + defer n.RUnlock() + return n.ElasticSearch[accountID] +} + +func (n *notifier) GetElasticSearch() map[string]elasticSearchNotify { + n.RLock() + defer n.RUnlock() + return n.ElasticSearch.Clone() +} + +func (n *notifier) SetRedisByID(accountID string, r redisNotify) { + n.Lock() + defer n.Unlock() + n.Redis[accountID] = r +} + +func (n *notifier) GetRedis() map[string]redisNotify { + n.RLock() + defer n.RUnlock() + return n.Redis.Clone() +} + +func (n *notifier) GetRedisByID(accountID string) redisNotify { + n.RLock() + defer n.RUnlock() + return n.Redis[accountID] +} + +func (n *notifier) GetWebhook() map[string]webhookNotify { + n.RLock() + defer n.RUnlock() + return n.Webhook.Clone() +} + +func (n *notifier) GetWebhookByID(accountID string) webhookNotify { + n.RLock() + defer n.RUnlock() + return n.Webhook[accountID] +} + +func (n *notifier) SetWebhookByID(accountID string, pgn webhookNotify) { + n.Lock() + defer n.Unlock() + n.Webhook[accountID] = pgn +} + +func (n *notifier) SetPostgreSQLByID(accountID string, pgn postgreSQLNotify) { + n.Lock() + defer n.Unlock() + n.PostgreSQL[accountID] = pgn +} + +func (n *notifier) GetPostgreSQL() map[string]postgreSQLNotify { + n.RLock() + defer n.RUnlock() + return n.PostgreSQL.Clone() +} + +func (n *notifier) GetPostgreSQLByID(accountID string) postgreSQLNotify { + n.RLock() + defer n.RUnlock() + return n.PostgreSQL[accountID] +} + +func (n *notifier) SetKafkaByID(accountID string, kn kafkaNotify) { + n.Lock() + defer n.Unlock() + n.Kafka[accountID] = kn +} + +func (n *notifier) GetKafka() map[string]kafkaNotify { + n.RLock() + defer n.RUnlock() + return n.Kafka.Clone() +} + +func (n *notifier) GetKafkaByID(accountID string) kafkaNotify { + n.RLock() + defer n.RUnlock() + return n.Kafka[accountID] +} diff --git a/cmd/notifier-config_test.go b/cmd/notifier-config_test.go new file mode 100644 index 000000000..a416825a9 --- /dev/null +++ b/cmd/notifier-config_test.go @@ -0,0 +1,17 @@ +/* + * Minio Cloud Storage, (C) 2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd diff --git a/cmd/notifiers.go b/cmd/notifiers.go index f1f1c4cbc..1d3ed1bfd 100644 --- a/cmd/notifiers.go +++ b/cmd/notifiers.go @@ -54,24 +54,12 @@ const ( var errNotifyNotEnabled = errors.New("requested notifier not enabled") -// Notifier represents collection of supported notification queues. -type notifier struct { - AMQP map[string]amqpNotify `json:"amqp"` - NATS map[string]natsNotify `json:"nats"` - ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"` - Redis map[string]redisNotify `json:"redis"` - PostgreSQL map[string]postgreSQLNotify `json:"postgresql"` - Kafka map[string]kafkaNotify `json:"kafka"` - Webhook map[string]webhookNotify `json:"webhook"` - // Add new notification queues. -} - // Returns true if queueArn is for an AMQP queue. func isAMQPQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeAMQP { return false } - amqpL := serverConfig.GetAMQPNotifyByID(sqsArn.AccountID) + amqpL := serverConfig.Notify.GetAMQPByID(sqsArn.AccountID) if !amqpL.Enable { return false } @@ -90,7 +78,7 @@ func isNATSQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeNATS { return false } - natsL := serverConfig.GetNATSNotifyByID(sqsArn.AccountID) + natsL := serverConfig.Notify.GetNATSByID(sqsArn.AccountID) if !natsL.Enable { return false } @@ -109,7 +97,7 @@ func isWebhookQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeWebhook { return false } - rNotify := serverConfig.GetWebhookNotifyByID(sqsArn.AccountID) + rNotify := serverConfig.Notify.GetWebhookByID(sqsArn.AccountID) if !rNotify.Enable { return false } @@ -121,7 +109,7 @@ func isRedisQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeRedis { return false } - rNotify := serverConfig.GetRedisNotifyByID(sqsArn.AccountID) + rNotify := serverConfig.Notify.GetRedisByID(sqsArn.AccountID) if !rNotify.Enable { return false } @@ -140,7 +128,7 @@ func isElasticQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeElastic { return false } - esNotify := serverConfig.GetElasticSearchNotifyByID(sqsArn.AccountID) + esNotify := serverConfig.Notify.GetElasticSearchByID(sqsArn.AccountID) if !esNotify.Enable { return false } @@ -158,7 +146,7 @@ func isPostgreSQLQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypePostgreSQL { return false } - pgNotify := serverConfig.GetPostgreSQLNotifyByID(sqsArn.AccountID) + pgNotify := serverConfig.Notify.GetPostgreSQLByID(sqsArn.AccountID) if !pgNotify.Enable { return false } @@ -176,7 +164,7 @@ func isKafkaQueue(sqsArn arnSQS) bool { if sqsArn.Type != queueTypeKafka { return false } - kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(sqsArn.AccountID) + kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(sqsArn.AccountID) if !kafkaNotifyCfg.Enable { return false } diff --git a/cmd/notify-amqp.go b/cmd/notify-amqp.go index a5b87f72d..9168cc959 100644 --- a/cmd/notify-amqp.go +++ b/cmd/notify-amqp.go @@ -59,7 +59,7 @@ func dialAMQP(amqpL amqpNotify) (amqpConn, error) { } func newAMQPNotify(accountID string) (*logrus.Logger, error) { - amqpL := serverConfig.GetAMQPNotifyByID(accountID) + amqpL := serverConfig.Notify.GetAMQPByID(accountID) // Connect to amqp server. amqpC, err := dialAMQP(amqpL) diff --git a/cmd/notify-elasticsearch.go b/cmd/notify-elasticsearch.go index 98249d16d..508b7ed9e 100644 --- a/cmd/notify-elasticsearch.go +++ b/cmd/notify-elasticsearch.go @@ -55,7 +55,7 @@ func dialElastic(esNotify elasticSearchNotify) (*elastic.Client, error) { } func newElasticNotify(accountID string) (*logrus.Logger, error) { - esNotify := serverConfig.GetElasticSearchNotifyByID(accountID) + esNotify := serverConfig.Notify.GetElasticSearchByID(accountID) // Dial to elastic search. client, err := dialElastic(esNotify) diff --git a/cmd/notify-kafka.go b/cmd/notify-kafka.go index d6e5d1f66..7ddf454b8 100644 --- a/cmd/notify-kafka.go +++ b/cmd/notify-kafka.go @@ -75,7 +75,7 @@ func dialKafka(kn kafkaNotify) (kafkaConn, error) { } func newKafkaNotify(accountID string) (*logrus.Logger, error) { - kafkaNotifyCfg := serverConfig.GetKafkaNotifyByID(accountID) + kafkaNotifyCfg := serverConfig.Notify.GetKafkaByID(accountID) // Try connecting to the configured Kafka broker(s). kc, err := dialKafka(kafkaNotifyCfg) diff --git a/cmd/notify-nats.go b/cmd/notify-nats.go index bec7cdcdf..b18751ded 100644 --- a/cmd/notify-nats.go +++ b/cmd/notify-nats.go @@ -127,7 +127,7 @@ func closeNATS(conn natsIOConn) { } func newNATSNotify(accountID string) (*logrus.Logger, error) { - natsL := serverConfig.GetNATSNotifyByID(accountID) + natsL := serverConfig.Notify.GetNATSByID(accountID) // Connect to nats server. natsC, err := dialNATS(natsL, false) diff --git a/cmd/notify-postgresql.go b/cmd/notify-postgresql.go index 5e1634655..116092520 100644 --- a/cmd/notify-postgresql.go +++ b/cmd/notify-postgresql.go @@ -174,7 +174,7 @@ func dialPostgreSQL(pgN postgreSQLNotify) (pgConn, error) { } func newPostgreSQLNotify(accountID string) (*logrus.Logger, error) { - pgNotify := serverConfig.GetPostgreSQLNotifyByID(accountID) + pgNotify := serverConfig.Notify.GetPostgreSQLByID(accountID) // Dial postgres pgC, err := dialPostgreSQL(pgNotify) diff --git a/cmd/notify-redis.go b/cmd/notify-redis.go index f44333cca..8e382edb5 100644 --- a/cmd/notify-redis.go +++ b/cmd/notify-redis.go @@ -83,7 +83,7 @@ func dialRedis(rNotify redisNotify) (*redis.Pool, error) { } func newRedisNotify(accountID string) (*logrus.Logger, error) { - rNotify := serverConfig.GetRedisNotifyByID(accountID) + rNotify := serverConfig.Notify.GetRedisByID(accountID) // Dial redis. rPool, err := dialRedis(rNotify) diff --git a/cmd/notify-webhook.go b/cmd/notify-webhook.go index 875167f79..7bdb8f086 100644 --- a/cmd/notify-webhook.go +++ b/cmd/notify-webhook.go @@ -52,7 +52,7 @@ func lookupEndpoint(u *url.URL) error { // Initializes new webhook logrus notifier. func newWebhookNotify(accountID string) (*logrus.Logger, error) { - rNotify := serverConfig.GetWebhookNotifyByID(accountID) + rNotify := serverConfig.Notify.GetWebhookByID(accountID) if rNotify.Endpoint == "" { return nil, errInvalidArgument diff --git a/cmd/notify-webhook_test.go b/cmd/notify-webhook_test.go index 4740901f0..ff974f751 100644 --- a/cmd/notify-webhook_test.go +++ b/cmd/notify-webhook_test.go @@ -51,13 +51,13 @@ func TestNewWebHookNotify(t *testing.T) { t.Fatal("Unexpected should fail") } - serverConfig.SetWebhookNotifyByID("10", webhookNotify{Enable: true, Endpoint: "http://www."}) + serverConfig.Notify.SetWebhookByID("10", webhookNotify{Enable: true, Endpoint: "http://www."}) _, err = newWebhookNotify("10") if err == nil { t.Fatal("Unexpected should fail with lookupHost") } - serverConfig.SetWebhookNotifyByID("15", webhookNotify{Enable: true, Endpoint: "http://%"}) + serverConfig.Notify.SetWebhookByID("15", webhookNotify{Enable: true, Endpoint: "http://%"}) _, err = newWebhookNotify("15") if err == nil { t.Fatal("Unexpected should fail with invalid URL escape") @@ -66,7 +66,7 @@ func TestNewWebHookNotify(t *testing.T) { server := httptest.NewServer(postHandler{}) defer server.Close() - serverConfig.SetWebhookNotifyByID("20", webhookNotify{Enable: true, Endpoint: server.URL}) + serverConfig.Notify.SetWebhookByID("20", webhookNotify{Enable: true, Endpoint: server.URL}) webhook, err := newWebhookNotify("20") if err != nil { t.Fatal("Unexpected shouldn't fail", err)