From 0d7408fc9969caf07de6a8c3a84f9fbb10a6739e Mon Sep 17 00:00:00 2001 From: Matt Lloyd Date: Tue, 22 Apr 2025 23:12:26 +0100 Subject: [PATCH] feat: support nats tls handshake first (#21008) --- internal/config/notify/legacy.go | 4 + internal/config/notify/parse.go | 38 ++++---- internal/event/target/nats.go | 96 ++++++++++--------- .../event/target/nats_tls_contrib_test.go | 24 +++++ .../contrib/nats_tls_handshake_first.conf | 8 ++ 5 files changed, 109 insertions(+), 61 deletions(-) create mode 100644 internal/event/target/testdata/contrib/nats_tls_handshake_first.conf diff --git a/internal/config/notify/legacy.go b/internal/config/notify/legacy.go index d76bd67fe..9e8545a62 100644 --- a/internal/config/notify/legacy.go +++ b/internal/config/notify/legacy.go @@ -482,6 +482,10 @@ func SetNotifyNATS(s config.Config, natsName string, cfg target.NATSArgs) error Key: target.NATSTLSSkipVerify, Value: config.FormatBool(cfg.Secure), }, + config.KV{ + Key: target.NATSTLSHandshakeFirst, + Value: config.FormatBool(cfg.TLSHandshakeFirst), + }, config.KV{ Key: target.NATSPingInterval, Value: strconv.FormatInt(cfg.PingInterval, 10), diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 46d478934..8765d23bc 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -959,6 +959,11 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k } + tlsHandshakeFirstEnv := target.EnvNatsTLSHandshakeFirst + if k != config.Default { + tlsHandshakeFirstEnv = tlsHandshakeFirstEnv + config.Default + k + } + subjectEnv := target.EnvNATSSubject if k != config.Default { subjectEnv = subjectEnv + config.Default + k @@ -1010,22 +1015,23 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s } natsArgs := target.NATSArgs{ - Enable: true, - Address: *address, - Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)), - Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)), - UserCredentials: env.Get(userCredentialsEnv, kv.Get(target.NATSUserCredentials)), - Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)), - CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)), - ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)), - ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)), - Token: env.Get(tokenEnv, kv.Get(target.NATSToken)), - TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn, - TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn, - PingInterval: pingInterval, - QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)), - QueueLimit: queueLimit, - RootCAs: rootCAs, + Enable: true, + Address: *address, + Subject: env.Get(subjectEnv, kv.Get(target.NATSSubject)), + Username: env.Get(usernameEnv, kv.Get(target.NATSUsername)), + UserCredentials: env.Get(userCredentialsEnv, kv.Get(target.NATSUserCredentials)), + Password: env.Get(passwordEnv, kv.Get(target.NATSPassword)), + CertAuthority: env.Get(certAuthorityEnv, kv.Get(target.NATSCertAuthority)), + ClientCert: env.Get(clientCertEnv, kv.Get(target.NATSClientCert)), + ClientKey: env.Get(clientKeyEnv, kv.Get(target.NATSClientKey)), + Token: env.Get(tokenEnv, kv.Get(target.NATSToken)), + TLS: env.Get(tlsEnv, kv.Get(target.NATSTLS)) == config.EnableOn, + TLSSkipVerify: env.Get(tlsSkipVerifyEnv, kv.Get(target.NATSTLSSkipVerify)) == config.EnableOn, + TLSHandshakeFirst: env.Get(tlsHandshakeFirstEnv, kv.Get(target.NATSTLSHandshakeFirst)) == config.EnableOn, + PingInterval: pingInterval, + QueueDir: env.Get(queueDirEnv, kv.Get(target.NATSQueueDir)), + QueueLimit: queueLimit, + RootCAs: rootCAs, } natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index b01aa141b..56f4f91e6 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -40,19 +40,20 @@ import ( // NATS related constants const ( - NATSAddress = "address" - NATSSubject = "subject" - NATSUsername = "username" - NATSPassword = "password" - NATSToken = "token" - NATSTLS = "tls" - NATSTLSSkipVerify = "tls_skip_verify" - NATSPingInterval = "ping_interval" - NATSQueueDir = "queue_dir" - NATSQueueLimit = "queue_limit" - NATSCertAuthority = "cert_authority" - NATSClientCert = "client_cert" - NATSClientKey = "client_key" + NATSAddress = "address" + NATSSubject = "subject" + NATSUsername = "username" + NATSPassword = "password" + NATSToken = "token" + NATSTLS = "tls" + NATSTLSSkipVerify = "tls_skip_verify" + NATSTLSHandshakeFirst = "tls_handshake_first" + NATSPingInterval = "ping_interval" + NATSQueueDir = "queue_dir" + NATSQueueLimit = "queue_limit" + NATSCertAuthority = "cert_authority" + NATSClientCert = "client_cert" + NATSClientKey = "client_key" // Streaming constants - deprecated NATSStreaming = "streaming" @@ -63,21 +64,22 @@ const ( // JetStream constants NATSJetStream = "jetstream" - EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE" - EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS" - EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT" - EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME" - NATSUserCredentials = "MINIO_NOTIFY_NATS_USER_CREDENTIALS" - EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD" - EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN" - EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS" - EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY" - EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL" - EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR" - EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT" - EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY" - EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT" - EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY" + EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE" + EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS" + EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT" + EnvNATSUsername = "MINIO_NOTIFY_NATS_USERNAME" + NATSUserCredentials = "MINIO_NOTIFY_NATS_USER_CREDENTIALS" + EnvNATSPassword = "MINIO_NOTIFY_NATS_PASSWORD" + EnvNATSToken = "MINIO_NOTIFY_NATS_TOKEN" + EnvNATSTLS = "MINIO_NOTIFY_NATS_TLS" + EnvNATSTLSSkipVerify = "MINIO_NOTIFY_NATS_TLS_SKIP_VERIFY" + EnvNatsTLSHandshakeFirst = "MINIO_NOTIFY_NATS_TLS_HANDSHAKE_FIRST" + EnvNATSPingInterval = "MINIO_NOTIFY_NATS_PING_INTERVAL" + EnvNATSQueueDir = "MINIO_NOTIFY_NATS_QUEUE_DIR" + EnvNATSQueueLimit = "MINIO_NOTIFY_NATS_QUEUE_LIMIT" + EnvNATSCertAuthority = "MINIO_NOTIFY_NATS_CERT_AUTHORITY" + EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT" + EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY" // Streaming constants - deprecated EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING" @@ -91,23 +93,24 @@ const ( // NATSArgs - NATS target arguments. type NATSArgs struct { - Enable bool `json:"enable"` - Address xnet.Host `json:"address"` - Subject string `json:"subject"` - Username string `json:"username"` - UserCredentials string `json:"userCredentials"` - Password string `json:"password"` - Token string `json:"token"` - TLS bool `json:"tls"` - TLSSkipVerify bool `json:"tlsSkipVerify"` - Secure bool `json:"secure"` - CertAuthority string `json:"certAuthority"` - ClientCert string `json:"clientCert"` - ClientKey string `json:"clientKey"` - PingInterval int64 `json:"pingInterval"` - QueueDir string `json:"queueDir"` - QueueLimit uint64 `json:"queueLimit"` - JetStream struct { + Enable bool `json:"enable"` + Address xnet.Host `json:"address"` + Subject string `json:"subject"` + Username string `json:"username"` + UserCredentials string `json:"userCredentials"` + Password string `json:"password"` + Token string `json:"token"` + TLS bool `json:"tls"` + TLSSkipVerify bool `json:"tlsSkipVerify"` + TLSHandshakeFirst bool `json:"tlsHandshakeFirst"` + Secure bool `json:"secure"` + CertAuthority string `json:"certAuthority"` + ClientCert string `json:"clientCert"` + ClientKey string `json:"clientKey"` + PingInterval int64 `json:"pingInterval"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` + JetStream struct { Enable bool `json:"enable"` } `json:"jetStream"` Streaming struct { @@ -180,6 +183,9 @@ func (n NATSArgs) connectNats() (*nats.Conn, error) { } else if n.TLS { connOpts = append(connOpts, nats.Secure(&tls.Config{RootCAs: n.RootCAs})) } + if n.TLSHandshakeFirst { + connOpts = append(connOpts, nats.TLSHandshakeFirst()) + } if n.CertAuthority != "" { connOpts = append(connOpts, nats.RootCAs(n.CertAuthority)) } diff --git a/internal/event/target/nats_tls_contrib_test.go b/internal/event/target/nats_tls_contrib_test.go index 5f3080715..30cf5b46b 100644 --- a/internal/event/target/nats_tls_contrib_test.go +++ b/internal/event/target/nats_tls_contrib_test.go @@ -48,6 +48,30 @@ func TestNatsConnTLSCustomCA(t *testing.T) { defer con.Close() } +func TestNatsConnTLSCustomCAHandshakeFirst(t *testing.T) { + s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_handshake_first.conf")) + defer s.Shutdown() + + clientConfig := &NATSArgs{ + Enable: true, + Address: xnet.Host{ + Name: "localhost", + Port: (xnet.Port(opts.Port)), + IsPortSet: true, + }, + Subject: "test", + Secure: true, + CertAuthority: path.Join("testdata", "contrib", "certs", "root_ca_cert.pem"), + TLSHandshakeFirst: true, + } + + con, err := clientConfig.connectNats() + if err != nil { + t.Errorf("Could not connect to nats: %v", err) + } + defer con.Close() +} + func TestNatsConnTLSClientAuthorization(t *testing.T) { s, opts := natsserver.RunServerWithConfig(filepath.Join("testdata", "contrib", "nats_tls_client_cert.conf")) defer s.Shutdown() diff --git a/internal/event/target/testdata/contrib/nats_tls_handshake_first.conf b/internal/event/target/testdata/contrib/nats_tls_handshake_first.conf new file mode 100644 index 000000000..069eac489 --- /dev/null +++ b/internal/event/target/testdata/contrib/nats_tls_handshake_first.conf @@ -0,0 +1,8 @@ +port: 14227 +net: localhost + +tls { + cert_file: "./testdata/contrib/certs/nats_server_cert.pem" + key_file: "./testdata/contrib/certs/nats_server_key.pem" + handshake_first: true +}