feat: support nats tls handshake first (#21008)

This commit is contained in:
Matt Lloyd 2025-04-22 23:12:26 +01:00 committed by GitHub
parent 864f80e226
commit 0d7408fc99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 109 additions and 61 deletions

View File

@ -482,6 +482,10 @@ func SetNotifyNATS(s config.Config, natsName string, cfg target.NATSArgs) error
Key: target.NATSTLSSkipVerify, Key: target.NATSTLSSkipVerify,
Value: config.FormatBool(cfg.Secure), Value: config.FormatBool(cfg.Secure),
}, },
config.KV{
Key: target.NATSTLSHandshakeFirst,
Value: config.FormatBool(cfg.TLSHandshakeFirst),
},
config.KV{ config.KV{
Key: target.NATSPingInterval, Key: target.NATSPingInterval,
Value: strconv.FormatInt(cfg.PingInterval, 10), Value: strconv.FormatInt(cfg.PingInterval, 10),

View File

@ -959,6 +959,11 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s
tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k
} }
tlsHandshakeFirstEnv := target.EnvNatsTLSHandshakeFirst
if k != config.Default {
tlsHandshakeFirstEnv = tlsHandshakeFirstEnv + config.Default + k
}
subjectEnv := target.EnvNATSSubject subjectEnv := target.EnvNATSSubject
if k != config.Default { if k != config.Default {
subjectEnv = subjectEnv + config.Default + k subjectEnv = subjectEnv + config.Default + k
@ -1010,22 +1015,23 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s
} }
natsArgs := target.NATSArgs{ natsArgs := target.NATSArgs{
Enable: true, Enable: true,
Address: *address, Address: *address,
Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)), Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)),
Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)), Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)),
UserCredentials: env.Get(userCredentialsEnv, kv.Get(target.NATSUserCredentials)), UserCredentials: env.Get(userCredentialsEnv, kv.Get(target.NATSUserCredentials)),
Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)), Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)),
CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)), CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)),
ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)), ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)),
ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)), ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)),
Token: env.Get(tokenEnv, kv.Get(target.NATSToken)), Token: env.Get(tokenEnv, kv.Get(target.NATSToken)),
TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn, TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn,
TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn, TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn,
PingInterval: pingInterval, TLSHandshakeFirst: env.Get(tlsHandshakeFirstEnv, kv.Get(target.NATSTLSHandshakeFirst)) == config.EnableOn,
QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)), PingInterval: pingInterval,
QueueLimit: queueLimit, QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)),
RootCAs: rootCAs, QueueLimit: queueLimit,
RootCAs: rootCAs,
} }
natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn

View File

@ -40,19 +40,20 @@ import (
// NATS related constants // NATS related constants
const ( const (
NATSAddress = "address" NATSAddress = "address"
NATSSubject = "subject" NATSSubject = "subject"
NATSUsername = "username" NATSUsername = "username"
NATSPassword = "password" NATSPassword = "password"
NATSToken = "token" NATSToken = "token"
NATSTLS = "tls" NATSTLS = "tls"
NATSTLSSkipVerify = "tls_skip_verify" NATSTLSSkipVerify = "tls_skip_verify"
NATSPingInterval = "ping_interval" NATSTLSHandshakeFirst = "tls_handshake_first"
NATSQueueDir = "queue_dir" NATSPingInterval = "ping_interval"
NATSQueueLimit = "queue_limit" NATSQueueDir = "queue_dir"
NATSCertAuthority = "cert_authority" NATSQueueLimit = "queue_limit"
NATSClientCert = "client_cert" NATSCertAuthority = "cert_authority"
NATSClientKey = "client_key" NATSClientCert = "client_cert"
NATSClientKey = "client_key"
// Streaming constants - deprecated // Streaming constants - deprecated
NATSStreaming = "streaming" NATSStreaming = "streaming"
@ -63,21 +64,22 @@ const (
// JetStream constants // JetStream constants
NATSJetStream = "jetstream" NATSJetStream = "jetstream"
EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE" EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE"
EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS" EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS"
EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT" EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT"
EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME" EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME"
NATSUserCredentials = "MINIO_NOTIFY_NATS_USER_CREDENTIALS" NATSUserCredentials = "MINIO_NOTIFY_NATS_USER_CREDENTIALS"
EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD" EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD"
EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN" EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN"
EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS" EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS"
EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY" EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY"
EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL" EnvNatsTLSHandshakeFirst = "MINIO_NOTIFY_NATS_TLS_HANDSHAKE_FIRST"
EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR" EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL"
EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT" EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR"
EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY" EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT"
EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT" EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY"
EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY" EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT"
EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY"
// Streaming constants - deprecated // Streaming constants - deprecated
EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING" EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING"
@ -91,23 +93,24 @@ const (
// NATSArgs - NATS target arguments. // NATSArgs - NATS target arguments.
type NATSArgs struct { type NATSArgs struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
Address xnet.Host `json:"address"` Address xnet.Host `json:"address"`
Subject string `json:"subject"` Subject string `json:"subject"`
Username string `json:"username"` Username string `json:"username"`
UserCredentials string `json:"userCredentials"` UserCredentials string `json:"userCredentials"`
Password string `json:"password"` Password string `json:"password"`
Token string `json:"token"` Token string `json:"token"`
TLS bool `json:"tls"` TLS bool `json:"tls"`
TLSSkipVerify bool `json:"tlsSkipVerify"` TLSSkipVerify bool `json:"tlsSkipVerify"`
Secure bool `json:"secure"` TLSHandshakeFirst bool `json:"tlsHandshakeFirst"`
CertAuthority string `json:"certAuthority"` Secure bool `json:"secure"`
ClientCert string `json:"clientCert"` CertAuthority string `json:"certAuthority"`
ClientKey string `json:"clientKey"` ClientCert string `json:"clientCert"`
PingInterval int64 `json:"pingInterval"` ClientKey string `json:"clientKey"`
QueueDir string `json:"queueDir"` PingInterval int64 `json:"pingInterval"`
QueueLimit uint64 `json:"queueLimit"` QueueDir string `json:"queueDir"`
JetStream struct { QueueLimit uint64 `json:"queueLimit"`
JetStream struct {
Enable bool `json:"enable"` Enable bool `json:"enable"`
} `json:"jetStream"` } `json:"jetStream"`
Streaming struct { Streaming struct {
@ -180,6 +183,9 @@ func (n NATSArgs) connectNats() (*nats.Conn, error) {
} else if n.TLS { } else if n.TLS {
connOpts = append(connOpts, nats.Secure(&tls.Config{RootCAs: n.RootCAs})) connOpts = append(connOpts, nats.Secure(&tls.Config{RootCAs: n.RootCAs}))
} }
if n.TLSHandshakeFirst {
connOpts = append(connOpts, nats.TLSHandshakeFirst())
}
if n.CertAuthority != "" { if n.CertAuthority != "" {
connOpts = append(connOpts, nats.RootCAs(n.CertAuthority)) connOpts = append(connOpts, nats.RootCAs(n.CertAuthority))
} }

View File

@ -48,6 +48,30 @@ func TestNatsConnTLSCustomCA(t *testing.T) {
defer con.Close() defer con.Close()
} }
func TestNatsConnTLSCustomCAHandshakeFirst(t *testing.T) {
s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_handshake_first.conf"))
defer s.Shutdown()
clientConfig := &NATSArgs{
Enable: true,
Address: xnet.Host{
Name: "localhost",
Port: (xnet.Port(opts.Port)),
IsPortSet: true,
},
Subject: "test",
Secure: true,
CertAuthority: path.Join("testdata", "contrib", "certs", "root_ca_cert.pem"),
TLSHandshakeFirst: true,
}
con, err := clientConfig.connectNats()
if err != nil {
t.Errorf("Could not connect to nats: %v", err)
}
defer con.Close()
}
func TestNatsConnTLSClientAuthorization(t *testing.T) { func TestNatsConnTLSClientAuthorization(t *testing.T) {
s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_client_cert.conf")) s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_client_cert.conf"))
defer s.Shutdown() defer s.Shutdown()

View File

@ -0,0 +1,8 @@
port: 14227
net: localhost
tls {
cert_file: "./testdata/contrib/certs/nats_server_cert.pem"
key_file: "./testdata/contrib/certs/nats_server_key.pem"
handshake_first: true
}