From dd839bf2957eccca7beaa152195a7046910bd8fb Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 6 Jul 2022 13:29:08 -0700 Subject: [PATCH] add NATS JetStream support (#15201) --- go.mod | 2 +- go.sum | 4 +-- internal/config/notify/help.go | 54 ++++++++++++++++++--------------- internal/config/notify/parse.go | 10 ++++++ internal/event/target/nats.go | 54 +++++++++++++++++++++++++++++---- 5 files changed, 91 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 2301bdcf8..66349efe9 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/minio/zipindex v0.2.1 github.com/mitchellh/go-homedir v1.1.0 github.com/nats-io/nats-server/v2 v2.7.4 - github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d + github.com/nats-io/nats.go v1.16.0 github.com/nats-io/stan.go v0.10.2 github.com/ncw/directio v1.0.5 github.com/nsqio/go-nsq v1.0.8 diff --git a/go.sum b/go.sum index c53f4acb1..9532a6df5 100644 --- a/go.sum +++ b/go.sum @@ -689,8 +689,8 @@ github.com/nats-io/nats-streaming-server v0.24.1 h1:autzhooN72ELtqP3alC2OPzmrbiA github.com/nats-io/nats-streaming-server v0.24.1/go.mod h1:N2Q05hKD+aW2Ur1VYP85yUR2zUWHbqJG88CxAFLRrd4= github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d h1:zJf4l8Kp67RIZhoVeniSLZs69SHNgjLHz0aNsqPPlx8= -github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g= +github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/internal/config/notify/help.go b/internal/config/notify/help.go index 634a7cdc0..53114ab3a 100644 --- a/internal/config/notify/help.go +++ b/internal/config/notify/help.go @@ -479,30 +479,6 @@ var ( Optional: true, Type: "duration", }, - config.HelpKV{ - Key: target.NATSStreaming, - Description: "set to 'on', to use streaming NATS server", - Optional: true, - Type: "on|off", - }, - config.HelpKV{ - Key: target.NATSStreamingAsync, - Description: "set to 'on', to enable asynchronous publish", - Optional: true, - Type: "on|off", - }, - config.HelpKV{ - Key: target.NATSStreamingMaxPubAcksInFlight, - Description: "number of messages to publish without waiting for ACKs", - Optional: true, - Type: "number", - }, - config.HelpKV{ - Key: target.NATSStreamingClusterID, - Description: "unique ID for NATS streaming cluster", - Optional: true, - Type: "string", - }, config.HelpKV{ Key: target.NATSCertAuthority, Description: "path to certificate chain of the target NATS server", @@ -524,6 +500,12 @@ var ( Type: "string", Sensitive: true, }, + config.HelpKV{ + Key: target.NATSJetStream, + Description: "enable JetStream support", + Optional: true, + Type: "on|off", + }, config.HelpKV{ Key: target.NATSQueueDir, Description: queueDirComment, @@ -536,6 +518,30 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: target.NATSStreaming, + Description: "[DEPRECATED] set to 'on', to use streaming NATS server", + Optional: true, + Type: "on|off", + }, + config.HelpKV{ + Key: target.NATSStreamingAsync, + Description: "[DEPRECATED] set to 'on', to enable asynchronous publish", + Optional: true, + Type: "on|off", + }, + config.HelpKV{ + Key: target.NATSStreamingMaxPubAcksInFlight, + Description: "[DEPRECATED] number of messages to publish without waiting for ACKs", + Optional: true, + Type: "number", + }, + config.HelpKV{ + Key: target.NATSStreamingClusterID, + Description: "[DEPRECATED] unique ID for NATS streaming cluster", + Optional: true, + Type: "string", + }, config.HelpKV{ Key: config.Comment, Description: config.DefaultComment, diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 17c531fca..60b09e121 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -949,6 +949,10 @@ var ( Key: target.NATSPingInterval, Value: "0", }, + config.KV{ + Key: target.NATSJetStream, + Value: config.EnableOff, + }, config.KV{ Key: target.NATSStreaming, Value: config.EnableOff, @@ -1073,6 +1077,11 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s clientKeyEnv = clientKeyEnv + config.Default + k } + jetStreamEnableEnv := target.EnvNATSJetStream + if k != config.Default { + jetStreamEnableEnv = jetStreamEnableEnv + config.Default + k + } + natsArgs := target.NATSArgs{ Enable: true, Address: *address, @@ -1090,6 +1099,7 @@ func GetNotifyNATS(natsKVS map[string]config.KVS, rootCAs *x509.CertPool) (map[s QueueLimit: queueLimit, RootCAs: rootCAs, } + natsArgs.JetStream.Enable = env.Get(jetStreamEnableEnv, kv.Get(target.NATSJetStream)) == config.EnableOn streamingEnableEnv := target.EnvNATSStreaming if k != config.Default { diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index f89adc5f7..684ad110d 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -49,12 +49,15 @@ const ( NATSClientCert = "client_cert" NATSClientKey = "client_key" - // Streaming constants + // Streaming constants - deprecated NATSStreaming = "streaming" NATSStreamingClusterID = "streaming_cluster_id" NATSStreamingAsync = "streaming_async" NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight" + // JetStream constants + NATSJetStream = "jetstream" + EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE" EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS" EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT" @@ -70,11 +73,14 @@ const ( EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT" EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY" - // Streaming constants + // Streaming constants - deprecated EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING" EnvNATSStreamingClusterID = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID" EnvNATSStreamingAsync = "MINIO_NOTIFY_NATS_STREAMING_ASYNC" EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT" + + // Jetstream constants + EnvNATSJetStream = "MINIO_NOTIFY_NATS_JETSTREAM" ) // NATSArgs - NATS target arguments. @@ -94,7 +100,10 @@ type NATSArgs struct { PingInterval int64 `json:"pingInterval"` QueueDir string `json:"queueDir"` QueueLimit uint64 `json:"queueLimit"` - Streaming struct { + JetStream struct { + Enable bool `json:"enable"` + } `json:"jetStream"` + Streaming struct { Enable bool `json:"enable"` ClusterID string `json:"clusterID"` Async bool `json:"async"` @@ -132,6 +141,12 @@ func (n NATSArgs) Validate() error { } } + if n.JetStream.Enable { + if n.Subject == "" { + return errors.New("empty subject") + } + } + if n.QueueDir != "" { if !filepath.IsAbs(n.QueueDir) { return errors.New("queueDir path should be absolute") @@ -200,6 +215,7 @@ type NATSTarget struct { args NATSArgs natsConn *nats.Conn stanConn stan.Conn + jstream nats.JetStream store Store loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}) } @@ -238,6 +254,14 @@ func (target *NATSTarget) IsActive() (bool, error) { return false, connErr } + if target.natsConn != nil && target.args.JetStream.Enable { + target.jstream, connErr = target.natsConn.JetStream() + if connErr.Error() == nats.ErrNoServers.Error() { + return false, errNotConnected + } + return false, connErr + } + return true, nil } @@ -273,7 +297,11 @@ func (target *NATSTarget) send(eventData event.Event) error { err = target.stanConn.Publish(target.args.Subject, data) } } else { - err = target.natsConn.Publish(target.args.Subject, data) + if target.jstream != nil { + _, err = target.jstream.Publish(target.args.Subject, data) + } else { + err = target.natsConn.Publish(target.args.Subject, data) + } } return err } @@ -309,20 +337,21 @@ func (target *NATSTarget) Close() (err error) { if target.stanConn.NatsConn() != nil { target.stanConn.NatsConn().Close() } - err = target.stanConn.Close() + return target.stanConn.Close() } if target.natsConn != nil { target.natsConn.Close() } - return err + return nil } // NewNATSTarget - creates new NATS target. func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) { var natsConn *nats.Conn var stanConn stan.Conn + var jstream nats.JetStream var err error @@ -345,6 +374,8 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce } if args.Streaming.Enable { + target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID()) + stanConn, err = args.connectStan() target.stanConn = stanConn } else { @@ -359,6 +390,17 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce } } + if target.natsConn != nil && args.JetStream.Enable { + jstream, err = target.natsConn.JetStream() + if err != nil { + if store == nil || err.Error() != nats.ErrNoServers.Error() { + target.loggerOnce(context.Background(), err, target.ID()) + return target, err + } + } + target.jstream = jstream + } + if target.store != nil && !test { // Replays the events from the store. eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())