nats: Add support of NATS.io Streaming server (#3494)

This commit is contained in:
Anis Elleuch
2017-01-12 01:41:05 +01:00
committed by Harshavardhana
parent 08b6cfb082
commit f24753812a
66 changed files with 23979 additions and 84 deletions

View File

@@ -66,6 +66,11 @@ func migrateConfig() error {
if err := migrateV10ToV11(); err != nil {
return err
}
// Migrate version '11' to '12'.
if err := migrateV11ToV12(); err != nil {
return err
}
return nil
}
@@ -423,7 +428,7 @@ func migrateV7ToV8() error {
srvConfig.Logger.File = cv7.Logger.File
srvConfig.Logger.Syslog = cv7.Logger.Syslog
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS = make(map[string]natsNotifyV1)
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
@@ -433,7 +438,7 @@ func migrateV7ToV8() error {
srvConfig.Notify.AMQP = cv7.Notify.AMQP
}
if len(cv7.Notify.NATS) == 0 {
srvConfig.Notify.NATS["1"] = natsNotify{}
srvConfig.Notify.NATS["1"] = natsNotifyV1{}
} else {
srvConfig.Notify.NATS = cv7.Notify.NATS
}
@@ -502,8 +507,8 @@ func migrateV8ToV9() error {
srvConfig.Notify.AMQP = cv8.Notify.AMQP
}
if len(cv8.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
srvConfig.Notify.NATS = make(map[string]natsNotifyV1)
srvConfig.Notify.NATS["1"] = natsNotifyV1{}
} else {
srvConfig.Notify.NATS = cv8.Notify.NATS
}
@@ -587,8 +592,8 @@ func migrateV9ToV10() error {
srvConfig.Notify.AMQP = cv9.Notify.AMQP
}
if len(cv9.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
srvConfig.Notify.NATS = make(map[string]natsNotifyV1)
srvConfig.Notify.NATS["1"] = natsNotifyV1{}
} else {
srvConfig.Notify.NATS = cv9.Notify.NATS
}
@@ -672,8 +677,8 @@ func migrateV10ToV11() error {
srvConfig.Notify.AMQP = cv10.Notify.AMQP
}
if len(cv10.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
srvConfig.Notify.NATS = make(map[string]natsNotifyV1)
srvConfig.Notify.NATS["1"] = natsNotifyV1{}
} else {
srvConfig.Notify.NATS = cv10.Notify.NATS
}
@@ -725,3 +730,109 @@ func migrateV10ToV11() error {
)
return nil
}
// Version '11' to '12' migration. Add support for NATS streaming
// notifications.
func migrateV11ToV12() error {
cv11, err := loadConfigV11()
if err != nil {
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("Unable to load config version 11. %v", err)
}
if cv11.Version != "11" {
return nil
}
// Copy over fields from V11 into V12 config struct
srvConfig := &serverConfigV12{}
srvConfig.Version = "12"
srvConfig.Credential = cv11.Credential
srvConfig.Region = cv11.Region
if srvConfig.Region == "" {
// Region needs to be set for AWS Signature Version 4.
srvConfig.Region = "us-east-1"
}
srvConfig.Logger.Console = cv11.Logger.Console
srvConfig.Logger.File = cv11.Logger.File
// check and set notifiers config
if len(cv11.Notify.AMQP) == 0 {
srvConfig.Notify.AMQP = make(map[string]amqpNotify)
srvConfig.Notify.AMQP["1"] = amqpNotify{}
} else {
srvConfig.Notify.AMQP = cv11.Notify.AMQP
}
if len(cv11.Notify.ElasticSearch) == 0 {
srvConfig.Notify.ElasticSearch = make(map[string]elasticSearchNotify)
srvConfig.Notify.ElasticSearch["1"] = elasticSearchNotify{}
} else {
srvConfig.Notify.ElasticSearch = cv11.Notify.ElasticSearch
}
if len(cv11.Notify.Redis) == 0 {
srvConfig.Notify.Redis = make(map[string]redisNotify)
srvConfig.Notify.Redis["1"] = redisNotify{}
} else {
srvConfig.Notify.Redis = cv11.Notify.Redis
}
if len(cv11.Notify.PostgreSQL) == 0 {
srvConfig.Notify.PostgreSQL = make(map[string]postgreSQLNotify)
srvConfig.Notify.PostgreSQL["1"] = postgreSQLNotify{}
} else {
srvConfig.Notify.PostgreSQL = cv11.Notify.PostgreSQL
}
if len(cv11.Notify.Kafka) == 0 {
srvConfig.Notify.Kafka = make(map[string]kafkaNotify)
srvConfig.Notify.Kafka["1"] = kafkaNotify{}
} else {
srvConfig.Notify.Kafka = cv11.Notify.Kafka
}
// V12 will have an updated config of nats. So we create a new one or we
// update the old one if found.
if len(cv11.Notify.NATS) == 0 {
srvConfig.Notify.NATS = make(map[string]natsNotify)
srvConfig.Notify.NATS["1"] = natsNotify{}
} else {
srvConfig.Notify.NATS = make(map[string]natsNotify)
for k, v := range cv11.Notify.NATS {
n := natsNotify{}
n.Enable = v.Enable
n.Address = v.Address
n.Subject = v.Subject
n.Username = v.Username
n.Password = v.Password
n.Token = v.Token
n.Secure = v.Secure
n.PingInterval = v.PingInterval
srvConfig.Notify.NATS[k] = n
}
}
qc, err := quick.New(srvConfig)
if err != nil {
return fmt.Errorf("Unable to initialize the quick config. %v",
err)
}
configFile, err := getConfigFile()
if err != nil {
return fmt.Errorf("Unable to get config file. %v", err)
}
err = qc.Save(configFile)
if err != nil {
return fmt.Errorf(
"Failed to migrate config from "+
cv11.Version+" to "+srvConfig.Version+
" failed. %v", err,
)
}
console.Println(
"Migration from version " +
cv11.Version + " to " + srvConfig.Version +
" completed successfully.",
)
return nil
}

View File

@@ -100,10 +100,13 @@ func TestServerConfigMigrateInexistentConfig(t *testing.T) {
if err := migrateV10ToV11(); err != nil {
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
}
if err := migrateV11ToV12(); err != nil {
t.Fatal("migrate v10 to v11 should succeed when no config file is found")
}
}
// Test if a config migration from v2 to v11 is successfully done
func TestServerConfigMigrateV2toV11(t *testing.T) {
// Test if a config migration from v2 to v12 is successfully done
func TestServerConfigMigrateV2toV12(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
@@ -206,4 +209,8 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV10ToV11(); err == nil {
t.Fatal("migrateConfigV10ToV11() should fail with a corrupted json")
}
if err := migrateV11ToV12(); err == nil {
t.Fatal("migrateConfigV11ToV12() should fail with a corrupted json")
}
}

View File

@@ -301,7 +301,7 @@ type configV6 struct {
Logger loggerV6 `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV1 `json:"notify"`
}
// loadConfigV6 load config version '6'.
@@ -325,6 +325,16 @@ func loadConfigV6() (*configV6, error) {
return c, nil
}
// Notifier represents collection of supported notification queues.
type notifierV1 struct {
AMQP map[string]amqpNotify `json:"amqp"`
NATS map[string]natsNotifyV1 `json:"nats"`
ElasticSearch map[string]elasticSearchNotify `json:"elasticsearch"`
Redis map[string]redisNotify `json:"redis"`
PostgreSQL map[string]postgreSQLNotify `json:"postgresql"`
Kafka map[string]kafkaNotify `json:"kafka"`
}
// configV7 server configuration version '7'.
type serverConfigV7 struct {
Version string `json:"version"`
@@ -337,7 +347,7 @@ type serverConfigV7 struct {
Logger loggerV6 `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV1 `json:"notify"`
// Read Write mutex.
rwMutex *sync.RWMutex
@@ -377,7 +387,7 @@ type serverConfigV8 struct {
Logger loggerV6 `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV1 `json:"notify"`
// Read Write mutex.
rwMutex *sync.RWMutex
@@ -417,7 +427,7 @@ type serverConfigV9 struct {
Logger loggerV6 `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV1 `json:"notify"`
// Read Write mutex.
rwMutex *sync.RWMutex
@@ -458,7 +468,7 @@ type serverConfigV10 struct {
Logger logger `json:"logger"`
// Notification queue configuration.
Notify notifier `json:"notify"`
Notify notifierV1 `json:"notify"`
}
func loadConfigV10() (*serverConfigV10, error) {
@@ -480,3 +490,51 @@ func loadConfigV10() (*serverConfigV10, error) {
}
return srvCfg, nil
}
// natsNotifyV1 - structure was valid until config V 11
type natsNotifyV1 struct {
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
}
// serverConfigV11 server configuration version '11' which is like
// version '10' except it adds support for Kafka notifications.
type serverConfigV11 struct {
Version string `json:"version"`
// S3 API configuration.
Credential credential `json:"credential"`
Region string `json:"region"`
// Additional error logging configuration.
Logger logger `json:"logger"`
// Notification queue configuration.
Notify notifierV1 `json:"notify"`
}
func loadConfigV11() (*serverConfigV11, error) {
configFile, err := getConfigFile()
if err != nil {
return nil, err
}
if _, err = os.Stat(configFile); err != nil {
return nil, err
}
srvCfg := &serverConfigV11{}
srvCfg.Version = "11"
qc, err := quick.New(srvCfg)
if err != nil {
return nil, err
}
if err := qc.Load(configFile); err != nil {
return nil, err
}
return srvCfg, nil
}

View File

@@ -26,9 +26,9 @@ import (
// Read Write mutex for safe access to ServerConfig.
var serverConfigMu sync.RWMutex
// serverConfigV11 server configuration version '11' which is like
// version '10' except it adds support for Kafka notifications.
type serverConfigV11 struct {
// serverConfigV12 server configuration version '12' which is like
// version '11' except it adds support for NATS streaming notifications.
type serverConfigV12 struct {
Version string `json:"version"`
// S3 API configuration.
@@ -47,7 +47,7 @@ type serverConfigV11 struct {
func initConfig() (bool, error) {
if !isConfigFileExists() {
// Initialize server config.
srvCfg := &serverConfigV11{}
srvCfg := &serverConfigV12{}
srvCfg.Version = globalMinioConfigVersion
srvCfg.Region = "us-east-1"
srvCfg.Credential = newCredential()
@@ -94,7 +94,7 @@ func initConfig() (bool, error) {
if _, err = os.Stat(configFile); err != nil {
return false, err
}
srvCfg := &serverConfigV11{}
srvCfg := &serverConfigV12{}
srvCfg.Version = globalMinioConfigVersion
qc, err := quick.New(srvCfg)
if err != nil {
@@ -116,10 +116,10 @@ func initConfig() (bool, error) {
}
// serverConfig server config.
var serverConfig *serverConfigV11
var serverConfig *serverConfigV12
// GetVersion get current config version.
func (s serverConfigV11) GetVersion() string {
func (s serverConfigV12) GetVersion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -128,14 +128,14 @@ func (s serverConfigV11) GetVersion() string {
/// Logger related.
func (s *serverConfigV11) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
func (s *serverConfigV12) SetAMQPNotifyByID(accountID string, amqpn amqpNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.AMQP[accountID] = amqpn
}
func (s serverConfigV11) GetAMQP() map[string]amqpNotify {
func (s serverConfigV12) GetAMQP() map[string]amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -143,7 +143,7 @@ func (s serverConfigV11) GetAMQP() map[string]amqpNotify {
}
// GetAMQPNotify get current AMQP logger.
func (s serverConfigV11) GetAMQPNotifyByID(accountID string) amqpNotify {
func (s serverConfigV12) GetAMQPNotifyByID(accountID string) amqpNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -151,35 +151,35 @@ func (s serverConfigV11) GetAMQPNotifyByID(accountID string) amqpNotify {
}
//
func (s *serverConfigV11) SetNATSNotifyByID(accountID string, natsn natsNotify) {
func (s *serverConfigV12) SetNATSNotifyByID(accountID string, natsn natsNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.NATS[accountID] = natsn
}
func (s serverConfigV11) GetNATS() map[string]natsNotify {
func (s serverConfigV12) GetNATS() map[string]natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS
}
// GetNATSNotify get current NATS logger.
func (s serverConfigV11) GetNATSNotifyByID(accountID string) natsNotify {
func (s serverConfigV12) GetNATSNotifyByID(accountID string) natsNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.NATS[accountID]
}
func (s *serverConfigV11) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
func (s *serverConfigV12) SetElasticSearchNotifyByID(accountID string, esNotify elasticSearchNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.ElasticSearch[accountID] = esNotify
}
func (s serverConfigV11) GetElasticSearch() map[string]elasticSearchNotify {
func (s serverConfigV12) GetElasticSearch() map[string]elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -187,21 +187,21 @@ func (s serverConfigV11) GetElasticSearch() map[string]elasticSearchNotify {
}
// GetElasticSearchNotify get current ElasicSearch logger.
func (s serverConfigV11) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
func (s serverConfigV12) GetElasticSearchNotifyByID(accountID string) elasticSearchNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.ElasticSearch[accountID]
}
func (s *serverConfigV11) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
func (s *serverConfigV12) SetRedisNotifyByID(accountID string, rNotify redisNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Redis[accountID] = rNotify
}
func (s serverConfigV11) GetRedis() map[string]redisNotify {
func (s serverConfigV12) GetRedis() map[string]redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -209,28 +209,28 @@ func (s serverConfigV11) GetRedis() map[string]redisNotify {
}
// GetRedisNotify get current Redis logger.
func (s serverConfigV11) GetRedisNotifyByID(accountID string) redisNotify {
func (s serverConfigV12) GetRedisNotifyByID(accountID string) redisNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Redis[accountID]
}
func (s *serverConfigV11) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
func (s *serverConfigV12) SetPostgreSQLNotifyByID(accountID string, pgn postgreSQLNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.PostgreSQL[accountID] = pgn
}
func (s serverConfigV11) GetPostgreSQL() map[string]postgreSQLNotify {
func (s serverConfigV12) GetPostgreSQL() map[string]postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.PostgreSQL
}
func (s serverConfigV11) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
func (s serverConfigV12) GetPostgreSQLNotifyByID(accountID string) postgreSQLNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -238,21 +238,21 @@ func (s serverConfigV11) GetPostgreSQLNotifyByID(accountID string) postgreSQLNot
}
// Kafka related functions
func (s *serverConfigV11) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
func (s *serverConfigV12) SetKafkaNotifyByID(accountID string, kn kafkaNotify) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
s.Notify.Kafka[accountID] = kn
}
func (s serverConfigV11) GetKafka() map[string]kafkaNotify {
func (s serverConfigV12) GetKafka() map[string]kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
return s.Notify.Kafka
}
func (s serverConfigV11) GetKafkaNotifyByID(accountID string) kafkaNotify {
func (s serverConfigV12) GetKafkaNotifyByID(accountID string) kafkaNotify {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -260,7 +260,7 @@ func (s serverConfigV11) GetKafkaNotifyByID(accountID string) kafkaNotify {
}
// SetFileLogger set new file logger.
func (s *serverConfigV11) SetFileLogger(flogger fileLogger) {
func (s *serverConfigV12) SetFileLogger(flogger fileLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -268,7 +268,7 @@ func (s *serverConfigV11) SetFileLogger(flogger fileLogger) {
}
// GetFileLogger get current file logger.
func (s serverConfigV11) GetFileLogger() fileLogger {
func (s serverConfigV12) GetFileLogger() fileLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -276,7 +276,7 @@ func (s serverConfigV11) GetFileLogger() fileLogger {
}
// SetConsoleLogger set new console logger.
func (s *serverConfigV11) SetConsoleLogger(clogger consoleLogger) {
func (s *serverConfigV12) SetConsoleLogger(clogger consoleLogger) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -284,7 +284,7 @@ func (s *serverConfigV11) SetConsoleLogger(clogger consoleLogger) {
}
// GetConsoleLogger get current console logger.
func (s serverConfigV11) GetConsoleLogger() consoleLogger {
func (s serverConfigV12) GetConsoleLogger() consoleLogger {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -292,7 +292,7 @@ func (s serverConfigV11) GetConsoleLogger() consoleLogger {
}
// SetRegion set new region.
func (s *serverConfigV11) SetRegion(region string) {
func (s *serverConfigV12) SetRegion(region string) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -300,7 +300,7 @@ func (s *serverConfigV11) SetRegion(region string) {
}
// GetRegion get current region.
func (s serverConfigV11) GetRegion() string {
func (s serverConfigV12) GetRegion() string {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -308,7 +308,7 @@ func (s serverConfigV11) GetRegion() string {
}
// SetCredentials set new credentials.
func (s *serverConfigV11) SetCredential(creds credential) {
func (s *serverConfigV12) SetCredential(creds credential) {
serverConfigMu.Lock()
defer serverConfigMu.Unlock()
@@ -316,7 +316,7 @@ func (s *serverConfigV11) SetCredential(creds credential) {
}
// GetCredentials get current credentials.
func (s serverConfigV11) GetCredential() credential {
func (s serverConfigV12) GetCredential() credential {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()
@@ -324,7 +324,7 @@ func (s serverConfigV11) GetCredential() credential {
}
// Save config.
func (s serverConfigV11) Save() error {
func (s serverConfigV12) Save() error {
serverConfigMu.RLock()
defer serverConfigMu.RUnlock()

View File

@@ -36,7 +36,7 @@ const (
// minio configuration related constants.
const (
globalMinioConfigVersion = "11"
globalMinioConfigVersion = "12"
globalMinioConfigDir = ".minio"
globalMinioCertsDir = "certs"
globalMinioCertsCADir = "CAs"

View File

@@ -93,12 +93,12 @@ func isNATSQueue(sqsArn arnSQS) bool {
return false
}
// Connect to nats server to validate.
natsC, err := dialNATS(natsL)
natsC, err := dialNATS(natsL, true)
if err != nil {
errorIf(err, "Unable to connect to nats service. %#v", natsL)
return false
}
defer natsC.Close()
closeNATS(natsC)
return true
}

View File

@@ -20,53 +20,117 @@ import (
"io/ioutil"
"github.com/Sirupsen/logrus"
"github.com/nats-io/go-nats-streaming"
"github.com/nats-io/nats"
)
// natsNotifyStreaming contains specific options related to connection
// to a NATS streaming server
type natsNotifyStreaming struct {
Enable bool `json:"enable"`
ClusterID string `json:"clusterID"`
ClientID string `json:"clientID"`
Async bool `json:"async"`
MaxPubAcksInflight int `json:"maxPubAcksInflight"`
}
// natsNotify - represents logrus compatible NATS hook.
// All fields represent NATS configuration details.
type natsNotify struct {
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
Enable bool `json:"enable"`
Address string `json:"address"`
Subject string `json:"subject"`
Username string `json:"username"`
Password string `json:"password"`
Token string `json:"token"`
Secure bool `json:"secure"`
PingInterval int64 `json:"pingInterval"`
Streaming natsNotifyStreaming `json:"streaming"`
}
type natsConn struct {
params natsNotify
*nats.Conn
// natsIOConn abstracts connection to any type of NATS server
type natsIOConn struct {
params natsNotify
natsConn *nats.Conn
stanConn stan.Conn
}
// dialNATS - dials and returns an natsConn instance,
// dialNATS - dials and returns an natsIOConn instance,
// for sending notifications. Returns error if nats logger
// is not enabled.
func dialNATS(natsL natsNotify) (natsConn, error) {
func dialNATS(natsL natsNotify, testDial bool) (natsIOConn, error) {
if !natsL.Enable {
return natsConn{}, errNotifyNotEnabled
return natsIOConn{}, errNotifyNotEnabled
}
// Configure and connect to NATS server
natsC := nats.DefaultOptions
natsC.Url = "nats://" + natsL.Address
natsC.User = natsL.Username
natsC.Password = natsL.Password
natsC.Token = natsL.Token
natsC.Secure = natsL.Secure
conn, err := natsC.Connect()
if err != nil {
return natsConn{}, err
// Construct natsIOConn which holds all NATS connection information
conn := natsIOConn{params: natsL}
if natsL.Streaming.Enable {
// Construct scheme to differentiate between clear and TLS connections
scheme := "nats"
if natsL.Secure {
scheme = "tls"
}
// Construct address URL
addressURL := scheme + "://" + natsL.Username + ":" + natsL.Password + "@" + natsL.Address
// Fetch the user-supplied client ID and provide a random one if not provided
clientID := natsL.Streaming.ClientID
if clientID == "" {
clientID = mustGetUUID()
}
// Add test suffix to clientID to avoid clientID already registered error
if testDial {
clientID += "-test"
}
connOpts := []stan.Option{
stan.NatsURL(addressURL),
}
// Setup MaxPubAcksInflight parameter
if natsL.Streaming.MaxPubAcksInflight > 0 {
connOpts = append(connOpts,
stan.MaxPubAcksInflight(natsL.Streaming.MaxPubAcksInflight))
}
// Do the real connection to the NATS server
sc, err := stan.Connect(natsL.Streaming.ClusterID, clientID, connOpts...)
if err != nil {
return natsIOConn{}, err
}
// Save the created connection
conn.stanConn = sc
} else {
// Configure and connect to NATS server
natsC := nats.DefaultOptions
natsC.Url = "nats://" + natsL.Address
natsC.User = natsL.Username
natsC.Password = natsL.Password
natsC.Token = natsL.Token
natsC.Secure = natsL.Secure
// Do the real connection
nc, err := natsC.Connect()
if err != nil {
return natsIOConn{}, err
}
// Save the created connection
conn.natsConn = nc
}
return conn, nil
}
// closeNATS - close the underlying NATS connection
func closeNATS(conn natsIOConn) {
if conn.params.Streaming.Enable {
conn.stanConn.Close()
} else {
conn.natsConn.Close()
}
return natsConn{Conn: conn, params: natsL}, nil
}
func newNATSNotify(accountID string) (*logrus.Logger, error) {
natsL := serverConfig.GetNATSNotifyByID(accountID)
// Connect to nats server.
natsC, err := dialNATS(natsL)
natsC, err := dialNATS(natsL, false)
if err != nil {
return nil, err
}
@@ -87,21 +151,34 @@ func newNATSNotify(accountID string) (*logrus.Logger, error) {
}
// Fire is called when an event should be sent to the message broker
func (n natsConn) Fire(entry *logrus.Entry) error {
ch := n.Conn
func (n natsIOConn) Fire(entry *logrus.Entry) error {
body, err := entry.Reader()
if err != nil {
return err
}
err = ch.Publish(n.params.Subject, body.Bytes())
if err != nil {
return err
if n.params.Streaming.Enable {
// Streaming flag is enabled, publish the log synchronously or asynchronously
// depending on the user supplied parameter
if n.params.Streaming.Async {
_, err = n.stanConn.PublishAsync(n.params.Subject, body.Bytes(), nil)
} else {
err = n.stanConn.Publish(n.params.Subject, body.Bytes())
}
if err != nil {
return err
}
} else {
// Publish the log
err = n.natsConn.Publish(n.params.Subject, body.Bytes())
if err != nil {
return err
}
}
return nil
}
// Levels is available logging levels.
func (n natsConn) Levels() []logrus.Level {
func (n natsIOConn) Levels() []logrus.Level {
return []logrus.Level{
logrus.InfoLevel,
}