mirror of
https://github.com/minio/minio.git
synced 2025-11-25 12:06:10 -05:00
add NATS JetStream support (#15201)
This commit is contained in:
@@ -49,12 +49,15 @@ const (
|
||||
NATSClientCert = "client_cert"
|
||||
NATSClientKey = "client_key"
|
||||
|
||||
// Streaming constants
|
||||
// Streaming constants - deprecated
|
||||
NATSStreaming = "streaming"
|
||||
NATSStreamingClusterID = "streaming_cluster_id"
|
||||
NATSStreamingAsync = "streaming_async"
|
||||
NATSStreamingMaxPubAcksInFlight = "streaming_max_pub_acks_in_flight"
|
||||
|
||||
// JetStream constants
|
||||
NATSJetStream = "jetstream"
|
||||
|
||||
EnvNATSEnable = "MINIO_NOTIFY_NATS_ENABLE"
|
||||
EnvNATSAddress = "MINIO_NOTIFY_NATS_ADDRESS"
|
||||
EnvNATSSubject = "MINIO_NOTIFY_NATS_SUBJECT"
|
||||
@@ -70,11 +73,14 @@ const (
|
||||
EnvNATSClientCert = "MINIO_NOTIFY_NATS_CLIENT_CERT"
|
||||
EnvNATSClientKey = "MINIO_NOTIFY_NATS_CLIENT_KEY"
|
||||
|
||||
// Streaming constants
|
||||
// Streaming constants - deprecated
|
||||
EnvNATSStreaming = "MINIO_NOTIFY_NATS_STREAMING"
|
||||
EnvNATSStreamingClusterID = "MINIO_NOTIFY_NATS_STREAMING_CLUSTER_ID"
|
||||
EnvNATSStreamingAsync = "MINIO_NOTIFY_NATS_STREAMING_ASYNC"
|
||||
EnvNATSStreamingMaxPubAcksInFlight = "MINIO_NOTIFY_NATS_STREAMING_MAX_PUB_ACKS_IN_FLIGHT"
|
||||
|
||||
// Jetstream constants
|
||||
EnvNATSJetStream = "MINIO_NOTIFY_NATS_JETSTREAM"
|
||||
)
|
||||
|
||||
// NATSArgs - NATS target arguments.
|
||||
@@ -94,7 +100,10 @@ type NATSArgs struct {
|
||||
PingInterval int64 `json:"pingInterval"`
|
||||
QueueDir string `json:"queueDir"`
|
||||
QueueLimit uint64 `json:"queueLimit"`
|
||||
Streaming struct {
|
||||
JetStream struct {
|
||||
Enable bool `json:"enable"`
|
||||
} `json:"jetStream"`
|
||||
Streaming struct {
|
||||
Enable bool `json:"enable"`
|
||||
ClusterID string `json:"clusterID"`
|
||||
Async bool `json:"async"`
|
||||
@@ -132,6 +141,12 @@ func (n NATSArgs) Validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if n.JetStream.Enable {
|
||||
if n.Subject == "" {
|
||||
return errors.New("empty subject")
|
||||
}
|
||||
}
|
||||
|
||||
if n.QueueDir != "" {
|
||||
if !filepath.IsAbs(n.QueueDir) {
|
||||
return errors.New("queueDir path should be absolute")
|
||||
@@ -200,6 +215,7 @@ type NATSTarget struct {
|
||||
args NATSArgs
|
||||
natsConn *nats.Conn
|
||||
stanConn stan.Conn
|
||||
jstream nats.JetStream
|
||||
store Store
|
||||
loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})
|
||||
}
|
||||
@@ -238,6 +254,14 @@ func (target *NATSTarget) IsActive() (bool, error) {
|
||||
return false, connErr
|
||||
}
|
||||
|
||||
if target.natsConn != nil && target.args.JetStream.Enable {
|
||||
target.jstream, connErr = target.natsConn.JetStream()
|
||||
if connErr.Error() == nats.ErrNoServers.Error() {
|
||||
return false, errNotConnected
|
||||
}
|
||||
return false, connErr
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -273,7 +297,11 @@ func (target *NATSTarget) send(eventData event.Event) error {
|
||||
err = target.stanConn.Publish(target.args.Subject, data)
|
||||
}
|
||||
} else {
|
||||
err = target.natsConn.Publish(target.args.Subject, data)
|
||||
if target.jstream != nil {
|
||||
_, err = target.jstream.Publish(target.args.Subject, data)
|
||||
} else {
|
||||
err = target.natsConn.Publish(target.args.Subject, data)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -309,20 +337,21 @@ func (target *NATSTarget) Close() (err error) {
|
||||
if target.stanConn.NatsConn() != nil {
|
||||
target.stanConn.NatsConn().Close()
|
||||
}
|
||||
err = target.stanConn.Close()
|
||||
return target.stanConn.Close()
|
||||
}
|
||||
|
||||
if target.natsConn != nil {
|
||||
target.natsConn.Close()
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewNATSTarget - creates new NATS target.
|
||||
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) {
|
||||
var natsConn *nats.Conn
|
||||
var stanConn stan.Conn
|
||||
var jstream nats.JetStream
|
||||
|
||||
var err error
|
||||
|
||||
@@ -345,6 +374,8 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
|
||||
}
|
||||
|
||||
if args.Streaming.Enable {
|
||||
target.loggerOnce(context.Background(), errors.New("NATS Streaming is deprecated please migrate to JetStream"), target.ID())
|
||||
|
||||
stanConn, err = args.connectStan()
|
||||
target.stanConn = stanConn
|
||||
} else {
|
||||
@@ -359,6 +390,17 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
|
||||
}
|
||||
}
|
||||
|
||||
if target.natsConn != nil && args.JetStream.Enable {
|
||||
jstream, err = target.natsConn.JetStream()
|
||||
if err != nil {
|
||||
if store == nil || err.Error() != nats.ErrNoServers.Error() {
|
||||
target.loggerOnce(context.Background(), err, target.ID())
|
||||
return target, err
|
||||
}
|
||||
}
|
||||
target.jstream = jstream
|
||||
}
|
||||
|
||||
if target.store != nil && !test {
|
||||
// Replays the events from the store.
|
||||
eventKeyCh := replayEvents(target.store, doneCh, target.loggerOnce, target.ID())
|
||||
|
||||
Reference in New Issue
Block a user