Praveen raj Mani 261111e728
Kafka notify: support batched commits for queue store (#20377)
The items will be saved per target batch and will
be committed to the queue store when the batch is full

Also, periodically commit the batched items to the queue store
based on configured commit_timeout; default is 30s;

Bonus: compress queue store multi writes
2024-09-06 16:06:30 -07:00

375 lines
10 KiB
Go

// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package target
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"sync"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/once"
"github.com/minio/minio/internal/store"
xnet "github.com/minio/pkg/v3/net"
"github.com/rabbitmq/amqp091-go"
)
// AMQPArgs - AMQP target arguments.
type AMQPArgs struct {
Enable bool `json:"enable"`
URL xnet.URL `json:"url"`
Exchange string `json:"exchange"`
RoutingKey string `json:"routingKey"`
ExchangeType string `json:"exchangeType"`
DeliveryMode uint8 `json:"deliveryMode"`
Mandatory bool `json:"mandatory"`
Immediate bool `json:"immediate"`
Durable bool `json:"durable"`
Internal bool `json:"internal"`
NoWait bool `json:"noWait"`
AutoDeleted bool `json:"autoDeleted"`
PublisherConfirms bool `json:"publisherConfirms"`
QueueDir string `json:"queueDir"`
QueueLimit uint64 `json:"queueLimit"`
}
//lint:file-ignore ST1003 We cannot change these exported names.
// AMQP input constants.
const (
AmqpQueueDir = "queue_dir"
AmqpQueueLimit = "queue_limit"
AmqpURL = "url"
AmqpExchange = "exchange"
AmqpRoutingKey = "routing_key"
AmqpExchangeType = "exchange_type"
AmqpDeliveryMode = "delivery_mode"
AmqpMandatory = "mandatory"
AmqpImmediate = "immediate"
AmqpDurable = "durable"
AmqpInternal = "internal"
AmqpNoWait = "no_wait"
AmqpAutoDeleted = "auto_deleted"
AmqpArguments = "arguments"
AmqpPublisherConfirms = "publisher_confirms"
EnvAMQPEnable = "MINIO_NOTIFY_AMQP_ENABLE"
EnvAMQPURL = "MINIO_NOTIFY_AMQP_URL"
EnvAMQPExchange = "MINIO_NOTIFY_AMQP_EXCHANGE"
EnvAMQPRoutingKey = "MINIO_NOTIFY_AMQP_ROUTING_KEY"
EnvAMQPExchangeType = "MINIO_NOTIFY_AMQP_EXCHANGE_TYPE"
EnvAMQPDeliveryMode = "MINIO_NOTIFY_AMQP_DELIVERY_MODE"
EnvAMQPMandatory = "MINIO_NOTIFY_AMQP_MANDATORY"
EnvAMQPImmediate = "MINIO_NOTIFY_AMQP_IMMEDIATE"
EnvAMQPDurable = "MINIO_NOTIFY_AMQP_DURABLE"
EnvAMQPInternal = "MINIO_NOTIFY_AMQP_INTERNAL"
EnvAMQPNoWait = "MINIO_NOTIFY_AMQP_NO_WAIT"
EnvAMQPAutoDeleted = "MINIO_NOTIFY_AMQP_AUTO_DELETED"
EnvAMQPArguments = "MINIO_NOTIFY_AMQP_ARGUMENTS"
EnvAMQPPublisherConfirms = "MINIO_NOTIFY_AMQP_PUBLISHING_CONFIRMS"
EnvAMQPQueueDir = "MINIO_NOTIFY_AMQP_QUEUE_DIR"
EnvAMQPQueueLimit = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT"
)
// Validate AMQP arguments
func (a *AMQPArgs) Validate() error {
if !a.Enable {
return nil
}
if _, err := amqp091.ParseURI(a.URL.String()); err != nil {
return err
}
if a.QueueDir != "" {
if !filepath.IsAbs(a.QueueDir) {
return errors.New("queueDir path should be absolute")
}
}
return nil
}
// AMQPTarget - AMQP target
type AMQPTarget struct {
initOnce once.Init
id event.TargetID
args AMQPArgs
conn *amqp091.Connection
connMutex sync.Mutex
store store.Store[event.Event]
loggerOnce logger.LogOnce
quitCh chan struct{}
}
// ID - returns TargetID.
func (target *AMQPTarget) ID() event.TargetID {
return target.id
}
// Name - returns the Name of the target.
func (target *AMQPTarget) Name() string {
return target.ID().String()
}
// Store returns any underlying store if set.
func (target *AMQPTarget) Store() event.TargetStore {
return target.store
}
// IsActive - Return true if target is up and active
func (target *AMQPTarget) IsActive() (bool, error) {
if err := target.init(); err != nil {
return false, err
}
return target.isActive()
}
func (target *AMQPTarget) isActive() (bool, error) {
ch, _, err := target.channel()
if err != nil {
return false, err
}
defer func() {
ch.Close()
}()
return true, nil
}
func (target *AMQPTarget) channel() (*amqp091.Channel, chan amqp091.Confirmation, error) {
var err error
var conn *amqp091.Connection
var ch *amqp091.Channel
isAMQPClosedErr := func(err error) bool {
if err == amqp091.ErrClosed {
return true
}
if nerr, ok := err.(*net.OpError); ok {
return (nerr.Err.Error() == "use of closed network connection")
}
return false
}
target.connMutex.Lock()
defer target.connMutex.Unlock()
if target.conn != nil {
ch, err = target.conn.Channel()
if err == nil {
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
}
return ch, confirms, nil
}
return ch, nil, nil
}
if !isAMQPClosedErr(err) {
return nil, nil, err
}
// close when we know this is a network error.
target.conn.Close()
}
conn, err = amqp091.Dial(target.args.URL.String())
if err != nil {
if xnet.IsConnRefusedErr(err) {
return nil, nil, store.ErrNotConnected
}
return nil, nil, err
}
ch, err = conn.Channel()
if err != nil {
return nil, nil, err
}
target.conn = conn
if target.args.PublisherConfirms {
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
if err := ch.Confirm(false); err != nil {
ch.Close()
return nil, nil, err
}
return ch, confirms, nil
}
return ch, nil, nil
}
// send - sends an event to the AMQP091.
func (target *AMQPTarget) send(eventData event.Event, ch *amqp091.Channel, confirms chan amqp091.Confirmation) error {
objectName, err := url.QueryUnescape(eventData.S3.Object.Key)
if err != nil {
return err
}
key := eventData.S3.Bucket.Name + "/" + objectName
data, err := json.Marshal(event.Log{EventName: eventData.EventName, Key: key, Records: []event.Event{eventData}})
if err != nil {
return err
}
headers := make(amqp091.Table)
// Add more information here as required, but be aware to not overload headers
headers["minio-bucket"] = eventData.S3.Bucket.Name
headers["minio-event"] = eventData.EventName.String()
if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable,
target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil {
return err
}
if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory,
target.args.Immediate, amqp091.Publishing{
Headers: headers,
ContentType: "application/json",
DeliveryMode: target.args.DeliveryMode,
Body: data,
}); err != nil {
return err
}
// check for publisher confirms only if its enabled
if target.args.PublisherConfirms {
confirmed := <-confirms
if !confirmed.Ack {
return fmt.Errorf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
return nil
}
// Save - saves the events to the store which will be replayed when the amqp connection is active.
func (target *AMQPTarget) Save(eventData event.Event) error {
if target.store != nil {
_, err := target.store.Put(eventData)
return err
}
if err := target.init(); err != nil {
return err
}
ch, confirms, err := target.channel()
if err != nil {
return err
}
defer ch.Close()
return target.send(eventData, ch, confirms)
}
// SendFromStore - reads an event from store and sends it to AMQP091.
func (target *AMQPTarget) SendFromStore(key store.Key) error {
if err := target.init(); err != nil {
return err
}
ch, confirms, err := target.channel()
if err != nil {
return err
}
defer ch.Close()
eventData, eErr := target.store.Get(key)
if eErr != nil {
// The last event key in a successful batch will be sent in the channel atmost once by the replayEvents()
// Such events will not exist and wouldve been already been sent successfully.
if os.IsNotExist(eErr) {
return nil
}
return eErr
}
if err := target.send(eventData, ch, confirms); err != nil {
return err
}
// Delete the event from store.
return target.store.Del(key)
}
// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
close(target.quitCh)
if target.conn != nil {
return target.conn.Close()
}
return nil
}
func (target *AMQPTarget) init() error {
return target.initOnce.Do(target.initAMQP)
}
func (target *AMQPTarget) initAMQP() error {
conn, err := amqp091.Dial(target.args.URL.String())
if err != nil {
if xnet.IsConnRefusedErr(err) || xnet.IsConnResetErr(err) {
target.loggerOnce(context.Background(), err, target.ID().String())
}
return err
}
target.conn = conn
return nil
}
// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, loggerOnce logger.LogOnce) (*AMQPTarget, error) {
var queueStore store.Store[event.Event]
if args.QueueDir != "" {
queueDir := filepath.Join(args.QueueDir, storePrefix+"-amqp-"+id)
queueStore = store.NewQueueStore[event.Event](queueDir, args.QueueLimit, event.StoreExtension)
if err := queueStore.Open(); err != nil {
return nil, fmt.Errorf("unable to initialize the queue store of AMQP `%s`: %w", id, err)
}
}
target := &AMQPTarget{
id: event.TargetID{ID: id, Name: "amqp"},
args: args,
loggerOnce: loggerOnce,
store: queueStore,
quitCh: make(chan struct{}),
}
if target.store != nil {
store.StreamItems(target.store, target, target.quitCh, target.loggerOnce)
}
return target, nil
}