change dependency from amqp -> amqp091 (RabbitMQ) official (#16142)

This commit is contained in:
Harshavardhana
2022-11-28 16:05:06 -08:00
committed by GitHub
parent cc1d8f0057
commit be92cf5959
4 changed files with 106 additions and 259 deletions

View File

@@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@@ -31,7 +31,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
xnet "github.com/minio/pkg/net"
"github.com/streadway/amqp"
"github.com/rabbitmq/amqp091-go"
)
// AMQPArgs - AMQP target arguments.
@@ -97,7 +97,7 @@ func (a *AMQPArgs) Validate() error {
if !a.Enable {
return nil
}
if _, err := amqp.ParseURI(a.URL.String()); err != nil {
if _, err := amqp091.ParseURI(a.URL.String()); err != nil {
return err
}
if a.QueueDir != "" {
@@ -115,7 +115,7 @@ type AMQPTarget struct {
id event.TargetID
args AMQPArgs
conn *amqp.Connection
conn *amqp091.Connection
connMutex sync.Mutex
store Store
loggerOnce logger.LogOnce
@@ -153,13 +153,13 @@ func (target *AMQPTarget) isActive() (bool, error) {
return true, nil
}
func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, error) {
func (target *AMQPTarget) channel() (*amqp091.Channel, chan amqp091.Confirmation, error) {
var err error
var conn *amqp.Connection
var ch *amqp.Channel
var conn *amqp091.Connection
var ch *amqp091.Channel
isAMQPClosedErr := func(err error) bool {
if err == amqp.ErrClosed {
if err == amqp091.ErrClosed {
return true
}
@@ -177,7 +177,7 @@ func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, erro
ch, err = target.conn.Channel()
if err == nil {
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
@@ -195,7 +195,7 @@ func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, erro
target.conn.Close()
}
conn, err = amqp.Dial(target.args.URL.String())
conn, err = amqp091.Dial(target.args.URL.String())
if err != nil {
if IsConnRefusedErr(err) {
return nil, nil, errNotConnected
@@ -211,7 +211,7 @@ func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, erro
target.conn = conn
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
@@ -222,8 +222,8 @@ func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, erro
return ch, nil, nil
}
// send - sends an event to the AMQP.
func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms chan amqp.Confirmation) error {
// send - sends an event to the AMQP091.
func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confirms chan amqp091.Confirmation) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
@@ -235,7 +235,7 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms
return err
}
headers := make(amqp.Table)
headers := make(amqp091.Table)
// Add more information here as required, but be aware to not overload headers
headers["minio-bucket"] = eventData.S3.Bucket.Name
headers["minio-event"] = eventData.EventName.String()
@@ -246,7 +246,7 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms
}
if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{
target.args.Immediate, amqp091.Publishing{
Headers: headers,
ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode,
@@ -284,7 +284,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error {
return target.send(eventData, ch, confirms)
}
// Send - sends event to AMQP.
// Send - sends event to AMQP091.
func (target *AMQPTarget) Send(eventKey string) error {
if err := target.init(); err != nil {
return err
@@ -328,7 +328,7 @@ func (target *AMQPTarget) init() error {
}
func (target *AMQPTarget) initAMQP() error {
conn, err := amqp.Dial(target.args.URL.String())
conn, err := amqp091.Dial(target.args.URL.String())
if err != nil {
if IsConnRefusedErr(err) || IsConnResetErr(err) {
target.loggerOnce(context.Background(), err, target.ID().String())