NATS TLS specify CA and client TLS authentication (#8389)

- added ability to specify CA for self-signed certificates
- added option to authenticate using client certificates
- added unit tests for nats connections
This commit is contained in:
svistoi
2019-11-15 12:13:23 -05:00
committed by Harshavardhana
parent 13e2b97ad9
commit c9be601988
13 changed files with 411 additions and 58 deletions

View File

@@ -145,6 +145,9 @@ var (
target.NATSStreamingClusterID: "Unique ID for the NATS streaming cluster",
target.NATSQueueLimit: "Enable persistent event store queue limit, defaults to '10000'",
target.NATSQueueDir: "Local directory where events are stored eg: '/home/events'",
target.NATSCertAuthority: "Certificate chain of the target NATS server if self signed certs were used",
target.NATSClientCert: "TLS Cert used to authenticate against NATS configured to require client certificates",
target.NATSClientKey: "TLS Key used to authenticate against NATS configured to require client certificates",
}
HelpNSQ = config.HelpKV{

View File

@@ -204,17 +204,20 @@ func SetNotifyNATS(s config.Config, natsName string, cfg target.NATSArgs) error
}
s[config.NotifyNATSSubSys][natsName] = config.KVS{
config.State: config.StateOn,
config.Comment: "Settings for NATS notification, after migrating config",
target.NATSAddress: cfg.Address.String(),
target.NATSSubject: cfg.Subject,
target.NATSUsername: cfg.Username,
target.NATSPassword: cfg.Password,
target.NATSToken: cfg.Token,
target.NATSSecure: config.FormatBool(cfg.Secure),
target.NATSPingInterval: strconv.FormatInt(cfg.PingInterval, 10),
target.NATSQueueDir: cfg.QueueDir,
target.NATSQueueLimit: strconv.Itoa(int(cfg.QueueLimit)),
config.State: config.StateOn,
config.Comment: "Settings for NATS notification, after migrating config",
target.NATSAddress: cfg.Address.String(),
target.NATSSubject: cfg.Subject,
target.NATSUsername: cfg.Username,
target.NATSPassword: cfg.Password,
target.NATSToken: cfg.Token,
target.NATSCertAuthority: cfg.CertAuthority,
target.NATSClientCert: cfg.ClientCert,
target.NATSClientKey: cfg.ClientKey,
target.NATSSecure: config.FormatBool(cfg.Secure),
target.NATSPingInterval: strconv.FormatInt(cfg.PingInterval, 10),
target.NATSQueueDir: cfg.QueueDir,
target.NATSQueueLimit: strconv.Itoa(int(cfg.QueueLimit)),
target.NATSStreaming: func() string {
if cfg.Streaming.Enable {
return config.StateOn

View File

@@ -707,6 +707,9 @@ var (
target.NATSUsername: "",
target.NATSPassword: "",
target.NATSToken: "",
target.NATSCertAuthority: "",
target.NATSClientCert: "",
target.NATSClientKey: "",
target.NATSSecure: config.StateOff,
target.NATSPingInterval: "0",
target.NATSQueueLimit: "0",
@@ -795,17 +798,35 @@ func GetNotifyNATS(natsKVS map[string]config.KVS) (map[string]target.NATSArgs, e
queueDirEnv = queueDirEnv + config.Default + k
}
certAuthorityEnv := target.EnvNATSCertAuthority
if k != config.Default {
certAuthorityEnv = certAuthorityEnv + config.Default + k
}
clientCertEnv := target.EnvNATSClientCert
if k != config.Default {
clientCertEnv = clientCertEnv + config.Default + k
}
clientKeyEnv := target.EnvNATSClientKey
if k != config.Default {
clientKeyEnv = clientKeyEnv + config.Default + k
}
natsArgs := target.NATSArgs{
Enable: true,
Address: *address,
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
Secure: env.Get(secureEnv, kv.Get(target.NATSSecure)) == config.StateOn,
PingInterval: pingInterval,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
QueueLimit: queueLimit,
Enable: true,
Address: *address,
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)),
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
Secure: env.Get(secureEnv, kv.Get(target.NATSSecure)) == config.StateOn,
PingInterval: pingInterval,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
QueueLimit: queueLimit,
}
streamingEnableEnv := target.EnvNATSStreaming