mirror of
https://github.com/minio/minio.git
synced 2025-11-10 05:59:43 -05:00
Support persistent queue store for loggers (#17121)
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
@@ -23,7 +23,10 @@ import (
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -34,94 +37,13 @@ import (
|
||||
saramatls "github.com/Shopify/sarama/tools/tls"
|
||||
|
||||
"github.com/minio/minio/internal/logger/target/types"
|
||||
"github.com/minio/minio/internal/once"
|
||||
"github.com/minio/minio/internal/store"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
)
|
||||
|
||||
// Target - Kafka target.
|
||||
type Target struct {
|
||||
totalMessages int64
|
||||
failedMessages int64
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
|
||||
// Channel of log entries
|
||||
logCh chan audit.Entry
|
||||
|
||||
// is the target online?
|
||||
online bool
|
||||
|
||||
producer sarama.SyncProducer
|
||||
kconfig Config
|
||||
config *sarama.Config
|
||||
}
|
||||
|
||||
// Send log message 'e' to kafka target.
|
||||
func (h *Target) Send(entry interface{}) error {
|
||||
if !h.online {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
if e, ok := entry.(audit.Entry); ok {
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
case h.logCh <- e:
|
||||
default:
|
||||
// log channel is full, do not wait and return
|
||||
// an error immediately to the caller
|
||||
atomic.AddInt64(&h.totalMessages, 1)
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
return errors.New("log buffer full")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Target) logEntry(entry audit.Entry) {
|
||||
atomic.AddInt64(&h.totalMessages, 1)
|
||||
logJSON, err := json.Marshal(&entry)
|
||||
if err != nil {
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
return
|
||||
}
|
||||
msg := sarama.ProducerMessage{
|
||||
Topic: h.kconfig.Topic,
|
||||
Key: sarama.StringEncoder(entry.RequestID),
|
||||
Value: sarama.ByteEncoder(logJSON),
|
||||
}
|
||||
|
||||
_, _, err = h.producer.SendMessage(&msg)
|
||||
if err != nil {
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Target) startKakfaLogger() {
|
||||
// Create a routine which sends json logs received
|
||||
// from an internal channel.
|
||||
h.wg.Add(1)
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case entry := <-h.logCh:
|
||||
h.logEntry(entry)
|
||||
case <-h.doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
// the suffix for the configured queue dir where the logs will be persisted.
|
||||
const kafkaLoggerExtension = ".kafka.log"
|
||||
|
||||
// Config - kafka target arguments.
|
||||
type Config struct {
|
||||
@@ -143,6 +65,9 @@ type Config struct {
|
||||
Password string `json:"password"`
|
||||
Mechanism string `json:"mechanism"`
|
||||
} `json:"sasl"`
|
||||
// Queue store
|
||||
QueueSize int `json:"queueSize"`
|
||||
QueueDir string `json:"queueDir"`
|
||||
|
||||
// Custom logger
|
||||
LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
|
||||
@@ -161,13 +86,44 @@ func (k Config) pingBrokers() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns the target statistics.
|
||||
func (h *Target) Stats() types.TargetStats {
|
||||
return types.TargetStats{
|
||||
TotalMessages: atomic.LoadInt64(&h.totalMessages),
|
||||
FailedMessages: atomic.LoadInt64(&h.failedMessages),
|
||||
QueueLength: len(h.logCh),
|
||||
// Target - Kafka target.
|
||||
type Target struct {
|
||||
totalMessages int64
|
||||
failedMessages int64
|
||||
|
||||
wg sync.WaitGroup
|
||||
doneCh chan struct{}
|
||||
|
||||
// Channel of log entries
|
||||
logCh chan audit.Entry
|
||||
|
||||
// store to persist and replay the logs to the target
|
||||
// to avoid missing events when the target is down.
|
||||
store store.Store[audit.Entry]
|
||||
storeCtxCancel context.CancelFunc
|
||||
initKafkaOnce once.Init
|
||||
initQueueStoreOnce once.Init
|
||||
|
||||
producer sarama.SyncProducer
|
||||
kconfig Config
|
||||
config *sarama.Config
|
||||
}
|
||||
|
||||
func (h *Target) validate() error {
|
||||
if len(h.kconfig.Brokers) == 0 {
|
||||
return errors.New("no broker address found")
|
||||
}
|
||||
for _, b := range h.kconfig.Brokers {
|
||||
if _, err := xnet.ParseHost(b.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Name returns the name of the target
|
||||
func (h *Target) Name() string {
|
||||
return "minio-kafka-audit"
|
||||
}
|
||||
|
||||
// Endpoint - return kafka target
|
||||
@@ -180,24 +136,95 @@ func (h *Target) String() string {
|
||||
return "kafka"
|
||||
}
|
||||
|
||||
// IsOnline returns true if the initialization was successful
|
||||
func (h *Target) IsOnline() bool {
|
||||
return h.online
|
||||
// Stats returns the target statistics.
|
||||
func (h *Target) Stats() types.TargetStats {
|
||||
return types.TargetStats{
|
||||
TotalMessages: atomic.LoadInt64(&h.totalMessages),
|
||||
FailedMessages: atomic.LoadInt64(&h.failedMessages),
|
||||
QueueLength: len(h.logCh),
|
||||
}
|
||||
}
|
||||
|
||||
// Init initialize kafka target
|
||||
func (h *Target) Init() error {
|
||||
func (h *Target) Init(ctx context.Context) error {
|
||||
if !h.kconfig.Enabled {
|
||||
return nil
|
||||
}
|
||||
if len(h.kconfig.Brokers) == 0 {
|
||||
return errors.New("no broker address found")
|
||||
if err := h.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, b := range h.kconfig.Brokers {
|
||||
if _, err := xnet.ParseHost(b.String()); err != nil {
|
||||
if h.kconfig.QueueDir != "" {
|
||||
if err := h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore); err != nil {
|
||||
return err
|
||||
}
|
||||
return h.initKafkaOnce.Do(h.init)
|
||||
}
|
||||
if err := h.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
go h.startKakfaLogger()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Target) initQueueStore(ctx context.Context) (err error) {
|
||||
var queueStore store.Store[audit.Entry]
|
||||
queueDir := filepath.Join(h.kconfig.QueueDir, h.Name())
|
||||
queueStore = store.NewQueueStore[audit.Entry](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension)
|
||||
if err = queueStore.Open(); err != nil {
|
||||
return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
h.store = queueStore
|
||||
h.storeCtxCancel = cancel
|
||||
store.StreamItems(h.store, h, ctx.Done(), h.kconfig.LogOnce)
|
||||
return
|
||||
}
|
||||
|
||||
func (h *Target) startKakfaLogger() {
|
||||
// Create a routine which sends json logs received
|
||||
// from an internal channel.
|
||||
h.wg.Add(1)
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case entry := <-h.logCh:
|
||||
h.logEntry(entry)
|
||||
case <-h.doneCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (h *Target) logEntry(entry audit.Entry) {
|
||||
atomic.AddInt64(&h.totalMessages, 1)
|
||||
if err := h.send(entry); err != nil {
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Target) send(entry audit.Entry) error {
|
||||
if err := h.initKafkaOnce.Do(h.init); err != nil {
|
||||
return err
|
||||
}
|
||||
logJSON, err := json.Marshal(&entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg := sarama.ProducerMessage{
|
||||
Topic: h.kconfig.Topic,
|
||||
Key: sarama.StringEncoder(entry.RequestID),
|
||||
Value: sarama.ByteEncoder(logJSON),
|
||||
}
|
||||
_, _, err = h.producer.SendMessage(&msg)
|
||||
return err
|
||||
}
|
||||
|
||||
// Init initialize kafka target
|
||||
func (h *Target) init() error {
|
||||
if err := h.kconfig.pingBrokers(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -244,15 +271,73 @@ func (h *Target) Init() error {
|
||||
}
|
||||
|
||||
h.producer = producer
|
||||
h.online = true
|
||||
go h.startKakfaLogger()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsOnline returns true if the target is online.
|
||||
func (h *Target) IsOnline(_ context.Context) bool {
|
||||
if err := h.initKafkaOnce.Do(h.init); err != nil {
|
||||
return false
|
||||
}
|
||||
return h.kconfig.pingBrokers() == nil
|
||||
}
|
||||
|
||||
// Send log message 'e' to kafka target.
|
||||
func (h *Target) Send(ctx context.Context, entry interface{}) error {
|
||||
if auditEntry, ok := entry.(audit.Entry); ok {
|
||||
if h.store != nil {
|
||||
// save the entry to the queue store which will be replayed to the target.
|
||||
return h.store.Put(auditEntry)
|
||||
}
|
||||
if err := h.initKafkaOnce.Do(h.init); err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-h.doneCh:
|
||||
case h.logCh <- auditEntry:
|
||||
default:
|
||||
// log channel is full, do not wait and return
|
||||
// an error immediately to the caller
|
||||
atomic.AddInt64(&h.totalMessages, 1)
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
return errors.New("log buffer full")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendFromStore - reads the log from store and sends it to kafka.
|
||||
func (h *Target) SendFromStore(key string) (err error) {
|
||||
var auditEntry audit.Entry
|
||||
auditEntry, err = h.store.Get(key)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
atomic.AddInt64(&h.totalMessages, 1)
|
||||
err = h.send(auditEntry)
|
||||
if err != nil {
|
||||
atomic.AddInt64(&h.failedMessages, 1)
|
||||
return
|
||||
}
|
||||
// Delete the event from store.
|
||||
return h.store.Del(key)
|
||||
}
|
||||
|
||||
// Cancel - cancels the target
|
||||
func (h *Target) Cancel() {
|
||||
close(h.doneCh)
|
||||
close(h.logCh)
|
||||
// If queuestore is configured, cancel it's context to
|
||||
// stop the replay go-routine.
|
||||
if h.store != nil {
|
||||
h.storeCtxCancel()
|
||||
}
|
||||
if h.producer != nil {
|
||||
h.producer.Close()
|
||||
}
|
||||
h.wg.Wait()
|
||||
}
|
||||
|
||||
@@ -263,7 +348,6 @@ func New(config Config) *Target {
|
||||
logCh: make(chan audit.Entry, 10000),
|
||||
doneCh: make(chan struct{}),
|
||||
kconfig: config,
|
||||
online: false,
|
||||
}
|
||||
return target
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user