Add headers into AMQP notifications (#12911)

Signed-off-by: Ricardo Katz <rkatz@vmware.com>
This commit is contained in:
Ricardo Katz 2021-08-12 02:24:19 -03:00 committed by GitHub
parent 65b6f4aa31
commit a526ad2e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 0 deletions

View File

@ -140,6 +140,8 @@ $ mc admin config set myminio/ notify_amqp:1 exchange="bucketevents" exchange_ty
MinIO supports all the exchanges available in [RabbitMQ](https://www.rabbitmq.com/). For this setup, we are using `fanout` exchange. MinIO supports all the exchanges available in [RabbitMQ](https://www.rabbitmq.com/). For this setup, we are using `fanout` exchange.
MinIO also sends with the notifications two headers: `minio-bucket` and `minio-event`. An exchange using the type "headers" can use this information to route the notifications to proper queues.
Note that, you can add as many AMQP server endpoint configurations as needed by providing an identifier (like "1" in the example above) for the AMQP instance and an object of per-server configuration parameters. Note that, you can add as many AMQP server endpoint configurations as needed by providing an identifier (like "1" in the example above) for the AMQP instance and an object of per-server configuration parameters.
### Step 2: Enable bucket notification using MinIO client ### Step 2: Enable bucket notification using MinIO client

View File

@ -222,6 +222,11 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms
return err return err
} }
headers := make(amqp.Table)
// Add more information here as required, but be aware to not overload headers
headers["minio-bucket"] = eventData.S3.Bucket.Name
headers["minio-event"] = eventData.EventName.String()
if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable, if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable,
target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil { target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil {
return err return err
@ -229,6 +234,7 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms
if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp.Publishing{ target.args.Immediate, amqp.Publishing{
Headers: headers,
ContentType: "application/json", ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode, DeliveryMode: target.args.DeliveryMode,
Body: data, Body: data,