Add NATS notifier (#2795)

This commit is contained in:
Anis Elleuch
2016-09-30 07:42:10 +01:00
committed by Harshavardhana
parent 64083b9227
commit 1e6afac3bd
23 changed files with 4512 additions and 32 deletions

View File

@@ -150,9 +150,10 @@ func isMinioSNSConfigured(topicARN string, topicConfigs []topicConfig) bool {
// Validate if we recognize the queue type.
func isValidQueue(sqsARN arnSQS) bool {
amqpQ := isAMQPQueue(sqsARN) // Is amqp queue?.
natsQ := isNATSQueue(sqsARN) // Is nats queue?.
elasticQ := isElasticQueue(sqsARN) // Is elastic queue?.
redisQ := isRedisQueue(sqsARN) // Is redis queue?.
return amqpQ || elasticQ || redisQ
return amqpQ || natsQ || elasticQ || redisQ
}
// Validate if we recognize the topic type.
@@ -168,6 +169,9 @@ func isValidQueueID(queueARN string) bool {
if isAMQPQueue(sqsARN) { // AMQP eueue.
amqpN := serverConfig.GetAMQPNotifyByID(sqsARN.AccountID)
return amqpN.Enable && amqpN.URL != ""
} else if isNATSQueue(sqsARN) {
natsN := serverConfig.GetNATSNotifyByID(sqsARN.AccountID)
return natsN.Enable && natsN.Address != ""
} else if isElasticQueue(sqsARN) { // Elastic queue.
elasticN := serverConfig.GetElasticSearchNotifyByID(sqsARN.AccountID)
return elasticN.Enable && elasticN.URL != ""
@@ -347,6 +351,7 @@ func unmarshalTopicARN(topicARN string) arnTopic {
// Unmarshals input value of AWS ARN format into minioSqs object.
// Returned value represents minio sqs types, currently supported are
// - amqp
// - nats
// - elasticsearch
// - redis
func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
@@ -358,6 +363,8 @@ func unmarshalSqsARN(queueARN string) (mSqs arnSQS) {
switch {
case strings.HasSuffix(sqsType, queueTypeAMQP):
mSqs.Type = queueTypeAMQP
case strings.HasSuffix(sqsType, queueTypeNATS):
mSqs.Type = queueTypeNATS
case strings.HasSuffix(sqsType, queueTypeElastic):
mSqs.Type = queueTypeElastic
case strings.HasSuffix(sqsType, queueTypeRedis):

View File

@@ -50,6 +50,10 @@ func migrateConfig() error {
if err := migrateV6ToV7(); err != nil {
return err
}
// Migrate version '7' to '8'.
if err := migrateV7ToV8(); err != nil {
return err
}
return nil
}
@@ -378,3 +382,73 @@ func migrateV6ToV7() error {
console.Println("Migration from version " + cv6.Version + " to " + srvConfig.Version + " completed successfully.")
return nil
}
// Version '7' to '8' migrates config, removes previous fields related
// to backend types and server address. This change further simplifies
// the config for future additions.
func migrateV7ToV8() error {
cv7, err := loadConfigV7()
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Unable to load config version 7. %v", err)
}
if cv7.Version != "7" {
return nil
}
// Save only the new fields, ignore the rest.
srvConfig := &serverConfigV8{}
srvConfig.Version = globalMinioConfigVersion
srvConfig.Credential = cv7.Credential
srvConfig.Region = cv7.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = "us-east-1"
}
srvConfig.Logger.Console = cv7.Logger.Console
srvConfig.Logger.File = cv7.Logger.File
srvConfig.Logger.Syslog = cv7.Logger.Syslog
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.Redis = make(map[string]redisNotify)
if len(cv7.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
srvConfig.Notify.AMQP = cv7.Notify.AMQP
}
if len(cv7.Notify.NATS) == 0 {
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = cv7.Notify.NATS
}
if len(cv7.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
} else {
srvConfig.Notify.ElasticSearch = cv7.Notify.ElasticSearch
}
if len(cv7.Notify.Redis) == 0 {
srvConfig.Notify.Redis["1"] = redisNotify{}
} else {
srvConfig.Notify.Redis = cv7.Notify.Redis
}
qc, err := quick.New(srvConfig)
if err != nil {
return fmt.Errorf("Unable to initialize the quick config. %v", err)
}
configFile, err := getConfigFile()
if err != nil {
return fmt.Errorf("Unable to get config file. %v", err)
}
err = qc.Save(configFile)
if err != nil {
return fmt.Errorf("Failed to migrate config from "+cv7.Version+" to "+srvConfig.Version+" failed. %v", err)
}
console.Println("Migration from version " + cv7.Version + " to " + srvConfig.Version + " completed successfully.")
return nil
}

View File

@@ -23,7 +23,7 @@ import (
"testing"
)
const lastConfigVersion = 7
const lastConfigVersion = 8
// TestServerConfigMigrateV1 - tests if a config v1 is purged
func TestServerConfigMigrateV1(t *testing.T) {
@@ -90,10 +90,14 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
if err := migrateV6ToV7(); err != nil {
t.Fatal("migrate v6 to v7 should succeed when no config file is found")
}
if err := migrateV7ToV8(); err != nil {
t.Fatal("migrate v7 to v8 should succeed when no config file is found")
}
}
// TestServerConfigMigrateV2toV7 - tests if a config from v2 to v7 is successfully done
func TestServerConfigMigrateV2toV7(t *testing.T) {
// TestServerConfigMigrateV2toV8 - tests if a config from v2 to v8 is successfully done
func TestServerConfigMigrateV2toV8(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
@@ -184,5 +188,7 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV6ToV7(); err == nil {
t.Fatal("migrateConfigV6ToV7() should fail with a corrupted json")
}
if err := migrateV7ToV8(); err == nil {
t.Fatal("migrateConfigV7ToV8() should fail with a corrupted json")
}
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
"os"
"path/filepath"
"sync"
"github.com/minio/minio/pkg/quick"
)
@@ -310,3 +311,42 @@ func loadConfigV6() (*configV6, error) {
}
return c, nil
}
// configV7 server configuration version '7'.
type serverConfigV7 struct {
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
// Additional error logging configuration.
Logger logger `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
// Read Write mutex.
rwMutex *sync.RWMutex
}
// loadConfigV7 load config version '7'.
func loadConfigV7() (*serverConfigV7, error) {
configFile, err := getConfigFile()
if err != nil {
return nil, err
}
if _, err = os.Stat(configFile); err != nil {
return nil, err
}
c := &serverConfigV7{}
c.Version = "7"
qc, err := quick.New(c)
if err != nil {
return nil, err
}
if err := qc.Load(configFile); err != nil {
return nil, err
}
return c, nil
}

View File

@@ -23,8 +23,8 @@ import (
"github.com/minio/minio/pkg/quick"
)
// serverConfigV7 server configuration version '7'.
type serverConfigV7 struct {
// serverConfigV8 server configuration version '8'.
type serverConfigV8 struct {
Version string `json:"version"`
// S3 API configuration.
@@ -45,7 +45,7 @@ type serverConfigV7 struct {
func initConfig() error {
if !isConfigFileExists() {
// Initialize server config.
srvCfg := &serverConfigV7{}
srvCfg := &serverConfigV8{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = "us-east-1"
srvCfg.Credential = mustGenAccessKeys()
@@ -63,6 +63,9 @@ func initConfig() error {
srvCfg.Notify.ElasticSearch["1"] = elasticSearchNotify{}
srvCfg.Notify.Redis = make(map[string]redisNotify)
srvCfg.Notify.Redis["1"] = redisNotify{}
srvCfg.Notify.NATS = make(map[string]natsNotify)
srvCfg.Notify.NATS["1"] = natsNotify{}
srvCfg.rwMutex = &sync.RWMutex{}
// Create config path.
@@ -84,7 +87,7 @@ func initConfig() error {
if _, err = os.Stat(configFile); err != nil {
return err
}
srvCfg := &serverConfigV7{}
srvCfg := &serverConfigV8{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.rwMutex = &sync.RWMutex{}
qc, err := quick.New(srvCfg)
@@ -103,10 +106,10 @@ func initConfig() error {
}
// serverConfig server config.
var serverConfig *serverConfigV7
var serverConfig *serverConfigV8
// GetVersion get current config version.
func (s serverConfigV7) GetVersion() string {
func (s serverConfigV8) GetVersion() string {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Version
@@ -114,135 +117,155 @@ func (s serverConfigV7) GetVersion() string {
/// Logger related.
func (s *serverConfigV7) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
func (s *serverConfigV8) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.AMQP[accountID] = amqpn
}
func (s serverConfigV7) GetAMQP() map[string]amqpNotify {
func (s serverConfigV8) GetAMQP() map[string]amqpNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.AMQP
}
// GetAMQPNotify get current AMQP logger.
func (s serverConfigV7) GetAMQPNotifyByID(accountID string) amqpNotify {
func (s serverConfigV8) GetAMQPNotifyByID(accountID string) amqpNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.AMQP[accountID]
}
func (s *serverConfigV7) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
//
func (s *serverConfigV8) SetNATSNotifyByID(accountID string, natsn natsNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.NATS[accountID] = natsn
}
func (s serverConfigV8) GetNATS() map[string]natsNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.NATS
}
// GetNATSNotify get current NATS logger.
func (s serverConfigV8) GetNATSNotifyByID(accountID string) natsNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.NATS[accountID]
}
func (s *serverConfigV8) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.ElasticSearch[accountID] = esNotify
}
func (s serverConfigV7) GetElasticSearch() map[string]elasticSearchNotify {
func (s serverConfigV8) GetElasticSearch() map[string]elasticSearchNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.ElasticSearch
}
// GetElasticSearchNotify get current ElasicSearch logger.
func (s serverConfigV7) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
func (s serverConfigV8) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
func (s *serverConfigV7) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
func (s *serverConfigV8) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Notify.Redis[accountID] = rNotify
}
func (s serverConfigV7) GetRedis() map[string]redisNotify {
func (s serverConfigV8) GetRedis() map[string]redisNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.Redis
}
// GetRedisNotify get current Redis logger.
func (s serverConfigV7) GetRedisNotifyByID(accountID string) redisNotify {
func (s serverConfigV8) GetRedisNotifyByID(accountID string) redisNotify {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Notify.Redis[accountID]
}
// SetFileLogger set new file logger.
func (s *serverConfigV7) SetFileLogger(flogger fileLogger) {
func (s *serverConfigV8) SetFileLogger(flogger fileLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.File = flogger
}
// GetFileLogger get current file logger.
func (s serverConfigV7) GetFileLogger() fileLogger {
func (s serverConfigV8) GetFileLogger() fileLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.File
}
// SetConsoleLogger set new console logger.
func (s *serverConfigV7) SetConsoleLogger(clogger consoleLogger) {
func (s *serverConfigV8) SetConsoleLogger(clogger consoleLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.Console = clogger
}
// GetConsoleLogger get current console logger.
func (s serverConfigV7) GetConsoleLogger() consoleLogger {
func (s serverConfigV8) GetConsoleLogger() consoleLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.Console
}
// SetSyslogLogger set new syslog logger.
func (s *serverConfigV7) SetSyslogLogger(slogger syslogLogger) {
func (s *serverConfigV8) SetSyslogLogger(slogger syslogLogger) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Logger.Syslog = slogger
}
// GetSyslogLogger get current syslog logger.
func (s *serverConfigV7) GetSyslogLogger() syslogLogger {
func (s *serverConfigV8) GetSyslogLogger() syslogLogger {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Logger.Syslog
}
// SetRegion set new region.
func (s *serverConfigV7) SetRegion(region string) {
func (s *serverConfigV8) SetRegion(region string) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Region = region
}
// GetRegion get current region.
func (s serverConfigV7) GetRegion() string {
func (s serverConfigV8) GetRegion() string {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Region
}
// SetCredentials set new credentials.
func (s *serverConfigV7) SetCredential(creds credential) {
func (s *serverConfigV8) SetCredential(creds credential) {
s.rwMutex.Lock()
defer s.rwMutex.Unlock()
s.Credential = creds
}
// GetCredentials get current credentials.
func (s serverConfigV7) GetCredential() credential {
func (s serverConfigV8) GetCredential() credential {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()
return s.Credential
}
// Save config.
func (s serverConfigV7) Save() error {
func (s serverConfigV8) Save() error {
s.rwMutex.RLock()
defer s.rwMutex.RUnlock()

View File

@@ -332,6 +332,34 @@ func loadAllQueueTargets() (map[string]*logrus.Logger, error) {
}
queueTargets[queueARN] = amqpLog
}
// Load all nats targets, initialize their respective loggers.
for accountID, natsN := range serverConfig.GetNATS() {
if !natsN.Enable {
continue
}
// Construct the queue ARN for NATS.
queueARN := minioSqs + serverConfig.GetRegion() + ":" + accountID + ":" + queueTypeNATS
// Queue target if already initialized we move to the next ARN.
_, ok := queueTargets[queueARN]
if ok {
continue
}
// Using accountID we can now initialize a new NATS logrus instance.
natsLog, err := newNATSNotify(accountID)
if err != nil {
// Encapsulate network error to be more informative.
if _, ok := err.(net.Error); ok {
return nil, &net.OpError{
Op: "Connecting to " + queueARN,
Net: "tcp",
Err: err,
}
}
return nil, err
}
queueTargets[queueARN] = natsLog
}
// Load redis targets, initialize their respective loggers.
for accountID, redisN := range serverConfig.GetRedis() {
if !redisN.Enable {

View File

@@ -30,7 +30,7 @@ const (
// minio configuration related constants.
const (
globalMinioConfigVersion = "7"
globalMinioConfigVersion = "8"
globalMinioConfigDir = ".minio"
globalMinioCertsDir = "certs"
globalMinioCertFile = "public.crt"

View File

@@ -30,6 +30,8 @@ const (
// Static string indicating queue type 'amqp'.
queueTypeAMQP = "amqp"
// Static string indicating queue type 'nats'.
queueTypeNATS = "nats"
// Static string indicating queue type 'elasticsearch'.
queueTypeElastic = "elasticsearch"
// Static string indicating queue type 'redis'.
@@ -50,6 +52,7 @@ var errNotifyNotEnabled = errors.New("requested notifier not enabled")
// Notifier represents collection of supported notification queues.
type notifier struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotify `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
// Add new notification queues.
@@ -74,6 +77,25 @@ func isAMQPQueue(sqsArn arnSQS) bool {
return true
}
// Returns true if natsArn is for an NATS queue.
func isNATSQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeNATS {
return false
}
natsL := serverConfig.GetNATSNotifyByID(sqsArn.AccountID)
if !natsL.Enable {
return false
}
// Connect to nats server to validate.
natsC, err := dialNATS(natsL)
if err != nil {
errorIf(err, "Unable to connect to nats service. %#v", natsL)
return false
}
defer natsC.Close()
return true
}
// Returns true if queueArn is for an Redis queue.
func isRedisQueue(sqsArn arnSQS) bool {
if sqsArn.Type != queueTypeRedis {

108
cmd/notify-nats.go Normal file
View File

@@ -0,0 +1,108 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io/ioutil"
"github.com/Sirupsen/logrus"
"github.com/nats-io/nats"
)
// natsNotify - represents logrus compatible NATS hook.
// All fields represent NATS configuration details.
type natsNotify struct {
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
}
type natsConn struct {
params natsNotify
*nats.Conn
}
// dialNATS - dials and returns an natsConn instance,
// for sending notifications. Returns error if nats logger
// is not enabled.
func dialNATS(natsL natsNotify) (natsConn, error) {
if !natsL.Enable {
return natsConn{}, errNotifyNotEnabled
}
// Configure and connect to NATS server
natsC := nats.DefaultOptions
natsC.Url = "nats://" + natsL.Address
natsC.User = natsL.Username
natsC.Password = natsL.Password
natsC.Token = natsL.Token
natsC.Secure = natsL.Secure
conn, err := natsC.Connect()
if err != nil {
return natsConn{}, err
}
return natsConn{Conn: conn, params: natsL}, nil
}
func newNATSNotify(accountID string) (*logrus.Logger, error) {
natsL := serverConfig.GetNATSNotifyByID(accountID)
// Connect to nats server.
natsC, err := dialNATS(natsL)
if err != nil {
return nil, err
}
natsLog := logrus.New()
// Disable writing to console.
natsLog.Out = ioutil.Discard
// Add a nats hook.
natsLog.Hooks.Add(natsC)
// Set default JSON formatter.
natsLog.Formatter = new(logrus.JSONFormatter)
// Successfully enabled all NATSs.
return natsLog, nil
}
// Fire is called when an event should be sent to the message broker
func (n natsConn) Fire(entry *logrus.Entry) error {
ch := n.Conn
body, err := entry.Reader()
if err != nil {
return err
}
err = ch.Publish(n.params.Subject, body.Bytes())
if err != nil {
return err
}
return nil
}
// Levels is available logging levels.
func (n natsConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}
}