From 57acacd5a7ccc0380e669474762c8033abf07813 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Tue, 9 May 2023 09:50:31 +0530 Subject: [PATCH] Support persistent queue store for loggers (#17121) --- cmd/admin-handlers.go | 4 +- cmd/config-current.go | 6 +- cmd/consolelogger.go | 8 +- cmd/server-main.go | 2 +- cmd/server-startup-msg.go | 2 +- internal/event/target/amqp.go | 9 +- internal/event/target/elasticsearch.go | 9 +- internal/event/target/kafka.go | 9 +- internal/event/target/mqtt.go | 9 +- internal/event/target/mysql.go | 9 +- internal/event/target/nats.go | 9 +- internal/event/target/nsq.go | 9 +- internal/event/target/postgresql.go | 9 +- internal/event/target/redis.go | 9 +- internal/event/target/webhook.go | 9 +- internal/event/targetlist.go | 2 +- internal/event/targetlist_test.go | 4 +- internal/logger/audit.go | 2 +- internal/logger/config.go | 70 ++++- internal/logger/logger.go | 6 +- internal/logger/target/http/http.go | 234 ++++++++------ internal/logger/target/kafka/kafka.go | 290 +++++++++++------- internal/logger/targets.go | 37 +-- .../target/lazyinit.go => once/init.go} | 36 ++- internal/store/store.go | 23 +- 25 files changed, 516 insertions(+), 300 deletions(-) rename internal/{event/target/lazyinit.go => once/init.go} (57%) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 07affb86d..5344404a0 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2511,13 +2511,13 @@ func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) { var auditloggerInfo []madmin.Audit for _, tgt := range logger.SystemTargets() { if tgt.Endpoint() != "" { - loggerInfo = append(loggerInfo, madmin.Logger{tgt.String(): logger.TargetStatus(tgt)}) + loggerInfo = append(loggerInfo, madmin.Logger{tgt.String(): logger.TargetStatus(GlobalContext, tgt)}) } } for _, tgt := range logger.AuditTargets() { if tgt.Endpoint() != "" { - auditloggerInfo = append(auditloggerInfo, madmin.Audit{tgt.String(): logger.TargetStatus(tgt)}) + auditloggerInfo = append(auditloggerInfo, madmin.Audit{tgt.String(): logger.TargetStatus(GlobalContext, tgt)}) } } diff --git a/cmd/config-current.go b/cmd/config-current.go index ace156d48..4c453cc8c 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -588,7 +588,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf } loggerCfg.HTTP[n] = l } - if errs := logger.UpdateSystemTargets(loggerCfg); len(errs) > 0 { + if errs := logger.UpdateSystemTargets(ctx, loggerCfg); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs)) } case config.AuditWebhookSubSys: @@ -606,7 +606,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf loggerCfg.AuditWebhook[n] = l } - if errs := logger.UpdateAuditWebhookTargets(loggerCfg); len(errs) > 0 { + if errs := logger.UpdateAuditWebhookTargets(ctx, loggerCfg); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs)) } case config.AuditKafkaSubSys: @@ -620,7 +620,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf loggerCfg.AuditKafka[n] = l } } - if errs := logger.UpdateAuditKafkaTargets(loggerCfg); len(errs) > 0 { + if errs := logger.UpdateAuditKafkaTargets(ctx, loggerCfg); len(errs) > 0 { logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %v", errs)) } case config.StorageClassSubSys: diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index f7a25be43..43b77d244 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -58,7 +58,7 @@ func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys { } // IsOnline always true in case of console logger -func (sys *HTTPConsoleLoggerSys) IsOnline() bool { +func (sys *HTTPConsoleLoggerSys) IsOnline(_ context.Context) bool { return true } @@ -87,7 +87,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan log.Info, doneCh <-chan struct{}, node string, last int, logKind madmin.LogMask, filter func(entry log.Info) bool) error { // Enable console logging for remote client. if !sys.HasLogListeners() { - logger.AddSystemTarget(sys) + logger.AddSystemTarget(GlobalContext, sys) } cnt := 0 @@ -128,7 +128,7 @@ func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan log.Info, doneCh <-chan st } // Init if HTTPConsoleLoggerSys is valid, always returns nil right now -func (sys *HTTPConsoleLoggerSys) Init() error { +func (sys *HTTPConsoleLoggerSys) Init(_ context.Context) error { return nil } @@ -180,7 +180,7 @@ func (sys *HTTPConsoleLoggerSys) Type() types.TargetType { // Send log message 'e' to console and publish to console // log pubsub system -func (sys *HTTPConsoleLoggerSys) Send(entry interface{}) error { +func (sys *HTTPConsoleLoggerSys) Send(ctx context.Context, entry interface{}) error { var lg log.Info switch e := entry.(type) { case log.Entry: diff --git a/cmd/server-main.go b/cmd/server-main.go index 0c1f84c23..e212e13e2 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -516,7 +516,7 @@ func serverMain(ctx *cli.Context) { // Initialize globalConsoleSys system globalConsoleSys = NewConsoleLogger(GlobalContext) - logger.AddSystemTarget(globalConsoleSys) + logger.AddSystemTarget(GlobalContext, globalConsoleSys) // Perform any self-tests bitrotSelfTest() diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index 4ad80139e..a432efa64 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -42,7 +42,7 @@ func printStartupMessage(apiEndpoints []string, err error) { logger.Info(color.Bold("MinIO Object Storage Server")) if err != nil { if globalConsoleSys != nil { - globalConsoleSys.Send(fmt.Sprintf("Server startup failed with '%v', some features may be missing", err)) + globalConsoleSys.Send(GlobalContext, fmt.Sprintf("Server startup failed with '%v', some features may be missing", err)) } } diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 2762f48de..fb59cdd25 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -30,6 +30,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/rabbitmq/amqp091-go" @@ -112,7 +113,7 @@ func (a *AMQPArgs) Validate() error { // AMQPTarget - AMQP target type AMQPTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args AMQPArgs @@ -289,8 +290,8 @@ func (target *AMQPTarget) Save(eventData event.Event) error { return target.send(eventData, ch, confirms) } -// Send - sends event to AMQP091. -func (target *AMQPTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to AMQP091. +func (target *AMQPTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -329,7 +330,7 @@ func (target *AMQPTarget) Close() error { } func (target *AMQPTarget) init() error { - return target.lazyInit.Do(target.initAMQP) + return target.initOnce.Do(target.initAMQP) } func (target *AMQPTarget) initAMQP() error { diff --git a/internal/event/target/elasticsearch.go b/internal/event/target/elasticsearch.go index 5e91b1bb9..282a81deb 100644 --- a/internal/event/target/elasticsearch.go +++ b/internal/event/target/elasticsearch.go @@ -36,6 +36,7 @@ import ( "github.com/minio/highwayhash" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/pkg/errors" @@ -153,7 +154,7 @@ func (a ElasticsearchArgs) Validate() error { // ElasticsearchTarget - Elasticsearch target. type ElasticsearchTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args ElasticsearchArgs @@ -263,8 +264,8 @@ func (target *ElasticsearchTarget) send(eventData event.Event) error { return nil } -// Send - reads an event from store and sends it to Elasticsearch. -func (target *ElasticsearchTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to Elasticsearch. +func (target *ElasticsearchTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -344,7 +345,7 @@ func (target *ElasticsearchTarget) checkAndInitClient(ctx context.Context) error } func (target *ElasticsearchTarget) init() error { - return target.lazyInit.Do(target.initElasticsearch) + return target.initOnce.Do(target.initElasticsearch) } func (target *ElasticsearchTarget) initElasticsearch() error { diff --git a/internal/event/target/kafka.go b/internal/event/target/kafka.go index a8e128b1e..84842b216 100644 --- a/internal/event/target/kafka.go +++ b/internal/event/target/kafka.go @@ -32,6 +32,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" @@ -125,7 +126,7 @@ func (k KafkaArgs) Validate() error { // KafkaTarget - Kafka target. type KafkaTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args KafkaArgs @@ -208,8 +209,8 @@ func (target *KafkaTarget) send(eventData event.Event) error { return err } -// Send - reads an event from store and sends it to Kafka. -func (target *KafkaTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to Kafka. +func (target *KafkaTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -278,7 +279,7 @@ func (k KafkaArgs) pingBrokers() bool { } func (target *KafkaTarget) init() error { - return target.lazyInit.Do(target.initKafka) + return target.initOnce.Do(target.initKafka) } func (target *KafkaTarget) initKafka() error { diff --git a/internal/event/target/mqtt.go b/internal/event/target/mqtt.go index f20b10adf..171486e25 100644 --- a/internal/event/target/mqtt.go +++ b/internal/event/target/mqtt.go @@ -31,6 +31,7 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -107,7 +108,7 @@ func (m MQTTArgs) Validate() error { // MQTTTarget - MQTT target. type MQTTTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args MQTTArgs @@ -167,8 +168,8 @@ func (target *MQTTTarget) send(eventData event.Event) error { return token.Error() } -// Send - reads an event from store and sends it to MQTT. -func (target *MQTTTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to MQTT. +func (target *MQTTTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -224,7 +225,7 @@ func (target *MQTTTarget) Close() error { } func (target *MQTTTarget) init() error { - return target.lazyInit.Do(target.initMQTT) + return target.initOnce.Do(target.initMQTT) } func (target *MQTTTarget) initMQTT() error { diff --git a/internal/event/target/mysql.go b/internal/event/target/mysql.go index ffae34cb9..bc6c53c84 100644 --- a/internal/event/target/mysql.go +++ b/internal/event/target/mysql.go @@ -33,6 +33,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -146,7 +147,7 @@ func (m MySQLArgs) Validate() error { // MySQLTarget - MySQL target. type MySQLTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args MySQLArgs @@ -252,8 +253,8 @@ func (target *MySQLTarget) send(eventData event.Event) error { return nil } -// Send - reads an event from store and sends it to MySQL. -func (target *MySQLTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to MySQL. +func (target *MySQLTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -349,7 +350,7 @@ func (target *MySQLTarget) executeStmts() error { } func (target *MySQLTarget) init() error { - return target.lazyInit.Do(target.initMySQL) + return target.initOnce.Do(target.initMySQL) } func (target *MySQLTarget) initMySQL() error { diff --git a/internal/event/target/nats.go b/internal/event/target/nats.go index e38fb7396..c39774498 100644 --- a/internal/event/target/nats.go +++ b/internal/event/target/nats.go @@ -31,6 +31,7 @@ import ( "github.com/google/uuid" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" "github.com/nats-io/nats.go" @@ -216,7 +217,7 @@ func (n NATSArgs) connectStan() (stan.Conn, error) { // NATSTarget - NATS target. type NATSTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args NATSArgs @@ -333,8 +334,8 @@ func (target *NATSTarget) send(eventData event.Event) error { return err } -// Send - sends event to Nats. -func (target *NATSTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to Nats. +func (target *NATSTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -380,7 +381,7 @@ func (target *NATSTarget) Close() (err error) { } func (target *NATSTarget) init() error { - return target.lazyInit.Do(target.initNATS) + return target.initOnce.Do(target.initNATS) } func (target *NATSTarget) initNATS() error { diff --git a/internal/event/target/nsq.go b/internal/event/target/nsq.go index d77707bae..01540e9ac 100644 --- a/internal/event/target/nsq.go +++ b/internal/event/target/nsq.go @@ -31,6 +31,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -90,7 +91,7 @@ func (n NSQArgs) Validate() error { // NSQTarget - NSQ target. type NSQTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args NSQArgs @@ -176,8 +177,8 @@ func (target *NSQTarget) send(eventData event.Event) error { return target.producer.Publish(target.args.Topic, data) } -// Send - reads an event from store and sends it to NSQ. -func (target *NSQTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to NSQ. +func (target *NSQTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -216,7 +217,7 @@ func (target *NSQTarget) Close() (err error) { } func (target *NSQTarget) init() error { - return target.lazyInit.Do(target.initNSQ) + return target.initOnce.Do(target.initNSQ) } func (target *NSQTarget) initNSQ() error { diff --git a/internal/event/target/postgresql.go b/internal/event/target/postgresql.go index 240444bab..132b6d40c 100644 --- a/internal/event/target/postgresql.go +++ b/internal/event/target/postgresql.go @@ -34,6 +34,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -138,7 +139,7 @@ func (p PostgreSQLArgs) Validate() error { // PostgreSQLTarget - PostgreSQL target. type PostgreSQLTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args PostgreSQLArgs @@ -249,8 +250,8 @@ func (target *PostgreSQLTarget) send(eventData event.Event) error { return nil } -// Send - reads an event from store and sends it to PostgreSQL. -func (target *PostgreSQLTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to PostgreSQL. +func (target *PostgreSQLTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -345,7 +346,7 @@ func (target *PostgreSQLTarget) executeStmts() error { } func (target *PostgreSQLTarget) init() error { - return target.lazyInit.Do(target.initPostgreSQL) + return target.initOnce.Do(target.initPostgreSQL) } func (target *PostgreSQLTarget) initPostgreSQL() error { diff --git a/internal/event/target/redis.go b/internal/event/target/redis.go index b85b80222..ad8d2a917 100644 --- a/internal/event/target/redis.go +++ b/internal/event/target/redis.go @@ -31,6 +31,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) @@ -118,7 +119,7 @@ func (r RedisArgs) validateFormat(c redis.Conn) error { // RedisTarget - Redis target. type RedisTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args RedisArgs @@ -221,8 +222,8 @@ func (target *RedisTarget) send(eventData event.Event) error { return nil } -// Send - reads an event from store and sends it to redis. -func (target *RedisTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to redis. +func (target *RedisTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -276,7 +277,7 @@ func (target *RedisTarget) Close() error { } func (target *RedisTarget) init() error { - return target.lazyInit.Do(target.initRedis) + return target.initOnce.Do(target.initRedis) } func (target *RedisTarget) initRedis() error { diff --git a/internal/event/target/webhook.go b/internal/event/target/webhook.go index fbce36c6a..7383ddd29 100644 --- a/internal/event/target/webhook.go +++ b/internal/event/target/webhook.go @@ -35,6 +35,7 @@ import ( "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/once" "github.com/minio/minio/internal/store" "github.com/minio/pkg/certs" xnet "github.com/minio/pkg/net" @@ -91,7 +92,7 @@ func (w WebhookArgs) Validate() error { // WebhookTarget - Webhook target. type WebhookTarget struct { - lazyInit lazyInit + initOnce once.Init id event.TargetID args WebhookArgs @@ -222,8 +223,8 @@ func (target *WebhookTarget) send(eventData event.Event) error { return nil } -// Send - reads an event from store and sends it to webhook. -func (target *WebhookTarget) Send(eventKey string) error { +// SendFromStore - reads an event from store and sends it to webhook. +func (target *WebhookTarget) SendFromStore(eventKey string) error { if err := target.init(); err != nil { return err } @@ -256,7 +257,7 @@ func (target *WebhookTarget) Close() error { } func (target *WebhookTarget) init() error { - return target.lazyInit.Do(target.initWebhook) + return target.initOnce.Do(target.initWebhook) } // Only called from init() diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index 596f9e920..9da477be1 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -34,7 +34,7 @@ type Target interface { ID() TargetID IsActive() (bool, error) Save(Event) error - Send(string) error + SendFromStore(string) error Close() error Store() TargetStore } diff --git a/internal/event/targetlist_test.go b/internal/event/targetlist_test.go index 07e85594b..9a3bbda8b 100644 --- a/internal/event/targetlist_test.go +++ b/internal/event/targetlist_test.go @@ -60,8 +60,8 @@ func (target ExampleTarget) send(eventData Event) error { return nil } -// Send - interface compatible method does no-op. -func (target ExampleTarget) Send(eventKey string) error { +// SendFromStore - interface compatible method does no-op. +func (target *ExampleTarget) SendFromStore(eventKey string) error { return nil } diff --git a/internal/logger/audit.go b/internal/logger/audit.go index 5b80024d2..2b00133a3 100644 --- a/internal/logger/audit.go +++ b/internal/logger/audit.go @@ -139,7 +139,7 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl // Send audit logs only to http targets. for _, t := range auditTgts { - if err := t.Send(entry); err != nil { + if err := t.Send(ctx, entry); err != nil { LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), madmin.LogKindAll) } } diff --git a/internal/logger/config.go b/internal/logger/config.go index df2c1b402..ec058cd05 100644 --- a/internal/logger/config.go +++ b/internal/logger/config.go @@ -43,6 +43,7 @@ const ( ClientCert = "client_cert" ClientKey = "client_key" QueueSize = "queue_size" + QueueDir = "queue_dir" Proxy = "proxy" KafkaBrokers = "brokers" @@ -57,6 +58,8 @@ const ( KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSKey = "client_tls_key" KafkaVersion = "version" + KafkaQueueDir = "queue_dir" + KafkaQueueSize = "queue_size" EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" @@ -65,6 +68,7 @@ const ( EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" + EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" @@ -72,6 +76,7 @@ const ( EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" + EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" @@ -86,6 +91,11 @@ const ( EnvKafkaClientTLSCert = "MINIO_AUDIT_KAFKA_CLIENT_TLS_CERT" EnvKafkaClientTLSKey = "MINIO_AUDIT_KAFKA_CLIENT_TLS_KEY" EnvKafkaVersion = "MINIO_AUDIT_KAFKA_VERSION" + EnvKafkaQueueDir = "MINIO_AUDIT_KAFKA_QUEUE_DIR" + EnvKafkaQueueSize = "MINIO_AUDIT_KAFKA_QUEUE_SIZE" + + loggerTargetNamePrefix = "logger-" + auditTargetNamePrefix = "audit-" ) // Default KVS for loggerHTTP and loggerAuditHTTP @@ -119,6 +129,10 @@ var ( Key: QueueSize, Value: "100000", }, + config.KV{ + Key: QueueDir, + Value: "", + }, } DefaultAuditWebhookKVS = config.KVS{ @@ -146,6 +160,10 @@ var ( Key: QueueSize, Value: "100000", }, + config.KV{ + Key: QueueDir, + Value: "", + }, } DefaultAuditKafkaKVS = config.KVS{ @@ -201,6 +219,14 @@ var ( Key: KafkaVersion, Value: "", }, + config.KV{ + Key: QueueSize, + Value: "100000", + }, + config.KV{ + Key: QueueDir, + Value: "", + }, } ) @@ -395,6 +421,25 @@ func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword)) kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism)) + queueDirEnv := EnvKafkaQueueDir + if k != config.Default { + queueDirEnv = queueDirEnv + config.Default + k + } + kafkaArgs.QueueDir = env.Get(queueDirEnv, kv.Get(KafkaQueueDir)) + + queueSizeEnv := EnvKafkaQueueSize + if k != config.Default { + queueSizeEnv = queueSizeEnv + config.Default + k + } + queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, kv.Get(KafkaQueueSize))) + if err != nil { + return cfg, err + } + if queueSize <= 0 { + return cfg, errors.New("invalid queue_size value") + } + kafkaArgs.QueueSize = queueSize + cfg.AuditKafka[k] = kafkaArgs } @@ -448,9 +493,9 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { return cfg, err } proxyEnv := EnvLoggerWebhookProxy - queueSizeEnv := EnvAuditWebhookQueueSize + queueSizeEnv := EnvLoggerWebhookQueueSize if target != config.Default { - queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target + queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target } queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) if err != nil { @@ -459,6 +504,10 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } + queueDirEnv := EnvLoggerWebhookQueueDir + if target != config.Default { + queueDirEnv = EnvLoggerWebhookQueueDir + config.Default + target + } cfg.HTTP[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), @@ -467,7 +516,8 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientKey: env.Get(clientKeyEnv, ""), Proxy: env.Get(proxyEnv, ""), QueueSize: queueSize, - Name: target, + QueueDir: env.Get(queueDirEnv, ""), + Name: loggerTargetNamePrefix + target, } } @@ -511,7 +561,8 @@ func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientKey: kv.Get(ClientKey), Proxy: kv.Get(Proxy), QueueSize: queueSize, - Name: starget, + QueueDir: kv.Get(QueueDir), + Name: loggerTargetNamePrefix + starget, } } @@ -574,6 +625,10 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } + queueDirEnv := EnvAuditWebhookQueueDir + if target != config.Default { + queueDirEnv = EnvAuditWebhookQueueDir + config.Default + target + } cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), @@ -581,7 +636,8 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), QueueSize: queueSize, - Name: target, + QueueDir: env.Get(queueDirEnv, ""), + Name: auditTargetNamePrefix + target, } } @@ -617,7 +673,6 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } - cfg.AuditWebhook[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), @@ -625,7 +680,8 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), QueueSize: queueSize, - Name: starget, + QueueDir: kv.Get(QueueDir), + Name: auditTargetNamePrefix + starget, } } diff --git a/internal/logger/logger.go b/internal/logger/logger.go index c0f63b218..c0f258317 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -347,7 +347,7 @@ func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) { if consoleTgt != nil { entry := errToEntry(ctx, err, errKind...) - consoleTgt.Send(entry) + consoleTgt.Send(ctx, entry) } } @@ -366,10 +366,10 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) { entry := errToEntry(ctx, err, errKind...) // Iterate over all logger targets to send the log entry for _, t := range systemTgts { - if err := t.Send(entry); err != nil { + if err := t.Send(ctx, entry); err != nil { if consoleTgt != nil { entry.Trace.Message = fmt.Sprintf("event(%#v) was not sent to Logger target (%#v): %#v", entry, t, err) - consoleTgt.Send(entry) + consoleTgt.Send(ctx, entry) } } } diff --git a/internal/logger/target/http/http.go b/internal/logger/target/http/http.go index b0fee62ea..ebc195e70 100644 --- a/internal/logger/target/http/http.go +++ b/internal/logger/target/http/http.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -26,13 +26,17 @@ import ( "math" "net/http" "net/url" - "strings" + "os" + "path/filepath" "sync" "sync/atomic" "time" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger/target/types" + "github.com/minio/minio/internal/once" + "github.com/minio/minio/internal/store" + xnet "github.com/minio/pkg/net" ) const ( @@ -41,6 +45,9 @@ const ( // maxWorkers is the maximum number of concurrent operations. maxWorkers = 16 + + // the suffix for the configured queue dir where the logs will be persisted. + httpLoggerExtension = ".http.log" ) const ( @@ -59,6 +66,7 @@ type Config struct { ClientCert string `json:"clientCert"` ClientKey string `json:"clientKey"` QueueSize int `json:"queueSize"` + QueueDir string `json:"queueDir"` Proxy string `json:"string"` Transport http.RoundTripper `json:"-"` @@ -93,10 +101,21 @@ type Target struct { // will attempt to establish the connection. revive sync.Once + // store to persist and replay the logs to the target + // to avoid missing events when the target is down. + store store.Store[interface{}] + storeCtxCancel context.CancelFunc + initQueueStoreOnce once.Init + config Config client *http.Client } +// Name returns the name of the target +func (h *Target) Name() string { + return "minio-http-" + h.config.Name +} + // Endpoint returns the backend endpoint func (h *Target) Endpoint() string { return h.config.Endpoint @@ -106,9 +125,9 @@ func (h *Target) String() string { return h.config.Name } -// IsOnline returns true if the initialization was successful -func (h *Target) IsOnline() bool { - return atomic.LoadInt32(&h.status) == statusOnline +// IsOnline returns true if the target is reachable. +func (h *Target) IsOnline(ctx context.Context) bool { + return h.isAlive(ctx) == nil } // Stats returns the target statistics. @@ -125,8 +144,34 @@ func (h *Target) Stats() types.TargetStats { return stats } +// This will check if we can reach the remote. +func (h *Target) isAlive(ctx context.Context) (err error) { + return h.send(ctx, []byte(`{}`), 2*webhookCallTimeout) +} + // Init validate and initialize the http target -func (h *Target) Init() (err error) { +func (h *Target) Init(ctx context.Context) (err error) { + if h.config.QueueDir != "" { + return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore) + } + return h.initLogChannel(ctx) +} + +func (h *Target) initQueueStore(ctx context.Context) (err error) { + var queueStore store.Store[interface{}] + queueDir := filepath.Join(h.config.QueueDir, h.Name()) + queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.config.QueueSize), httpLoggerExtension) + if err = queueStore.Open(); err != nil { + return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) + } + ctx, cancel := context.WithCancel(ctx) + h.store = queueStore + h.storeCtxCancel = cancel + store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnce) + return +} + +func (h *Target) initLogChannel(ctx context.Context) (err error) { switch atomic.LoadInt32(&h.status) { case statusOnline: return nil @@ -134,45 +179,7 @@ func (h *Target) Init() (err error) { return errors.New("target is closed") } - // This will check if we can reach the remote. - checkAlive := func() error { - ctx, cancel := context.WithTimeout(context.Background(), 2*webhookCallTimeout) - defer cancel() - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.config.Endpoint, strings.NewReader(`{}`)) - if err != nil { - return err - } - - req.Header.Set(xhttp.ContentType, "application/json") - - // Set user-agent to indicate MinIO release - // version to the configured log endpoint - req.Header.Set("User-Agent", h.config.UserAgent) - - if h.config.AuthToken != "" { - req.Header.Set("Authorization", h.config.AuthToken) - } - - resp, err := h.client.Do(req) - if err != nil { - return err - } - // Drain any response. - xhttp.DrainBody(resp.Body) - - if !acceptedResponseStatusCode(resp.StatusCode) { - if resp.StatusCode == http.StatusForbidden { - return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", - h.config.Endpoint, resp.Status) - } - return fmt.Errorf("%s returned '%s', please check your endpoint configuration", - h.config.Endpoint, resp.Status) - } - return nil - } - - err = checkAlive() + err = h.isAlive(ctx) if err != nil { // Start a goroutine that will continue to check if we can reach h.revive.Do(func() { @@ -183,14 +190,14 @@ func (h *Target) Init() (err error) { if atomic.LoadInt32(&h.status) != statusOffline { return } - if err := checkAlive(); err == nil { + if err := h.isAlive(ctx); err == nil { // We are online. if atomic.CompareAndSwapInt32(&h.status, statusOffline, statusOnline) { h.workerStartMu.Lock() h.lastStarted = time.Now() h.workerStartMu.Unlock() atomic.AddInt64(&h.workers, 1) - go h.startHTTPLogger() + go h.startHTTPLogger(ctx) } return } @@ -205,19 +212,51 @@ func (h *Target) Init() (err error) { h.lastStarted = time.Now() h.workerStartMu.Unlock() atomic.AddInt64(&h.workers, 1) - go h.startHTTPLogger() + go h.startHTTPLogger(ctx) } return nil } -// Accepted HTTP Status Codes -var acceptedStatusCodeMap = map[int]bool{http.StatusOK: true, http.StatusCreated: true, http.StatusAccepted: true, http.StatusNoContent: true} +func (h *Target) send(ctx context.Context, payload []byte, timeout time.Duration) (err error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + h.config.Endpoint, bytes.NewReader(payload)) + if err != nil { + return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err) + } + req.Header.Set(xhttp.ContentType, "application/json") + req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) + req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) -func acceptedResponseStatusCode(code int) bool { - return acceptedStatusCodeMap[code] + // Set user-agent to indicate MinIO release + // version to the configured log endpoint + req.Header.Set("User-Agent", h.config.UserAgent) + + if h.config.AuthToken != "" { + req.Header.Set("Authorization", h.config.AuthToken) + } + + resp, err := h.client.Do(req) + if err != nil { + return fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err) + } + + // Drain any response. + xhttp.DrainBody(resp.Body) + + switch resp.StatusCode { + case http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent: + // accepted HTTP status codes. + return nil + case http.StatusForbidden: + return fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status) + default: + return fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status) + } } -func (h *Target) logEntry(entry interface{}) { +func (h *Target) logEntry(ctx context.Context, entry interface{}) { logJSON, err := json.Marshal(&entry) if err != nil { atomic.AddInt64(&h.failedMessages, 1) @@ -239,52 +278,14 @@ func (h *Target) logEntry(entry interface{}) { time.Sleep(sleep) } tries++ - ctx, cancel := context.WithTimeout(context.Background(), webhookCallTimeout) - defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodPost, - h.config.Endpoint, bytes.NewReader(logJSON)) - if err != nil { - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) + if err := h.send(ctx, logJSON, webhookCallTimeout); err != nil { + h.config.LogOnce(ctx, err, h.config.Endpoint) atomic.AddInt64(&h.failedMessages, 1) - continue - } - req.Header.Set(xhttp.ContentType, "application/json") - req.Header.Set(xhttp.MinIOVersion, xhttp.GlobalMinIOVersion) - req.Header.Set(xhttp.MinioDeploymentID, xhttp.GlobalDeploymentID) - - // Set user-agent to indicate MinIO release - // version to the configured log endpoint - req.Header.Set("User-Agent", h.config.UserAgent) - - if h.config.AuthToken != "" { - req.Header.Set("Authorization", h.config.AuthToken) - } - - resp, err := h.client.Do(req) - if err != nil { - atomic.AddInt64(&h.failedMessages, 1) - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%w', please check your endpoint configuration", h.config.Endpoint, err), h.config.Endpoint) - continue - } - - // Drain any response. - xhttp.DrainBody(resp.Body) - - if acceptedResponseStatusCode(resp.StatusCode) { - return - } - // Log failure, retry - atomic.AddInt64(&h.failedMessages, 1) - switch resp.StatusCode { - case http.StatusForbidden: - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check if your auth token is correctly set", h.config.Endpoint, resp.Status), h.config.Endpoint) - default: - h.config.LogOnce(ctx, fmt.Errorf("%s returned '%s', please check your endpoint configuration", h.config.Endpoint, resp.Status), h.config.Endpoint) } } } -func (h *Target) startHTTPLogger() { +func (h *Target) startHTTPLogger(ctx context.Context) { h.logChMu.RLock() logCh := h.logCh if logCh != nil { @@ -302,7 +303,7 @@ func (h *Target) startHTTPLogger() { // Send messages until channel is closed. for entry := range logCh { atomic.AddInt64(&h.totalMessages, 1) - h.logEntry(entry) + h.logEntry(ctx, entry) } } @@ -328,10 +329,41 @@ func New(config Config) *Target { return h } +// SendFromStore - reads the log from store and sends it to webhook. +func (h *Target) SendFromStore(key string) (err error) { + var eventData interface{} + eventData, err = h.store.Get(key) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + atomic.AddInt64(&h.totalMessages, 1) + logJSON, err := json.Marshal(&eventData) + if err != nil { + atomic.AddInt64(&h.failedMessages, 1) + return + } + if err := h.send(context.Background(), logJSON, webhookCallTimeout); err != nil { + atomic.AddInt64(&h.failedMessages, 1) + if xnet.IsNetworkOrHostDown(err, true) { + return store.ErrNotConnected + } + return err + } + // Delete the event from store. + return h.store.Del(key) +} + // Send log message 'e' to http target. // If servers are offline messages are queued until queue is full. // If Cancel has been called the message is ignored. -func (h *Target) Send(entry interface{}) error { +func (h *Target) Send(ctx context.Context, entry interface{}) error { + if h.store != nil { + // save the entry to the queue store which will be replayed to the target. + return h.store.Put(entry) + } if atomic.LoadInt32(&h.status) == statusClosed { return nil } @@ -345,7 +377,7 @@ func (h *Target) Send(entry interface{}) error { case h.logCh <- entry: default: // Drop messages until we are online. - if !h.IsOnline() { + if !h.IsOnline(ctx) { return errors.New("log buffer full and remote offline") } nWorkers := atomic.LoadInt64(&h.workers) @@ -358,7 +390,7 @@ func (h *Target) Send(entry interface{}) error { if atomic.CompareAndSwapInt64(&h.workers, nWorkers, nWorkers+1) { // Start another logger. h.lastStarted = time.Now() - go h.startHTTPLogger() + go h.startHTTPLogger(ctx) } } h.logCh <- entry @@ -380,6 +412,12 @@ func (h *Target) Send(entry interface{}) error { func (h *Target) Cancel() { atomic.StoreInt32(&h.status, statusClosed) + // If queuestore is configured, cancel it's context to + // stop the replay go-routine. + if h.store != nil { + h.storeCtxCancel() + } + // Set logch to nil and close it. // This will block all Send operations, // and finish the existing ones. diff --git a/internal/logger/target/kafka/kafka.go b/internal/logger/target/kafka/kafka.go index c80a72bb8..4edb12de3 100644 --- a/internal/logger/target/kafka/kafka.go +++ b/internal/logger/target/kafka/kafka.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2022 MinIO, Inc. +// Copyright (c) 2015-2023 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -23,7 +23,10 @@ import ( "crypto/x509" "encoding/json" "errors" + "fmt" "net" + "os" + "path/filepath" "sync" "sync/atomic" "time" @@ -34,94 +37,13 @@ import ( saramatls "github.com/Shopify/sarama/tools/tls" "github.com/minio/minio/internal/logger/target/types" + "github.com/minio/minio/internal/once" + "github.com/minio/minio/internal/store" xnet "github.com/minio/pkg/net" ) -// Target - Kafka target. -type Target struct { - totalMessages int64 - failedMessages int64 - - wg sync.WaitGroup - doneCh chan struct{} - - // Channel of log entries - logCh chan audit.Entry - - // is the target online? - online bool - - producer sarama.SyncProducer - kconfig Config - config *sarama.Config -} - -// Send log message 'e' to kafka target. -func (h *Target) Send(entry interface{}) error { - if !h.online { - return nil - } - - select { - case <-h.doneCh: - return nil - default: - } - - if e, ok := entry.(audit.Entry); ok { - select { - case <-h.doneCh: - case h.logCh <- e: - default: - // log channel is full, do not wait and return - // an error immediately to the caller - atomic.AddInt64(&h.totalMessages, 1) - atomic.AddInt64(&h.failedMessages, 1) - return errors.New("log buffer full") - } - } - - return nil -} - -func (h *Target) logEntry(entry audit.Entry) { - atomic.AddInt64(&h.totalMessages, 1) - logJSON, err := json.Marshal(&entry) - if err != nil { - atomic.AddInt64(&h.failedMessages, 1) - return - } - msg := sarama.ProducerMessage{ - Topic: h.kconfig.Topic, - Key: sarama.StringEncoder(entry.RequestID), - Value: sarama.ByteEncoder(logJSON), - } - - _, _, err = h.producer.SendMessage(&msg) - if err != nil { - atomic.AddInt64(&h.failedMessages, 1) - h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) - return - } -} - -func (h *Target) startKakfaLogger() { - // Create a routine which sends json logs received - // from an internal channel. - h.wg.Add(1) - go func() { - defer h.wg.Done() - - for { - select { - case entry := <-h.logCh: - h.logEntry(entry) - case <-h.doneCh: - return - } - } - }() -} +// the suffix for the configured queue dir where the logs will be persisted. +const kafkaLoggerExtension = ".kafka.log" // Config - kafka target arguments. type Config struct { @@ -143,6 +65,9 @@ type Config struct { Password string `json:"password"` Mechanism string `json:"mechanism"` } `json:"sasl"` + // Queue store + QueueSize int `json:"queueSize"` + QueueDir string `json:"queueDir"` // Custom logger LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"` @@ -161,13 +86,44 @@ func (k Config) pingBrokers() (err error) { return nil } -// Stats returns the target statistics. -func (h *Target) Stats() types.TargetStats { - return types.TargetStats{ - TotalMessages: atomic.LoadInt64(&h.totalMessages), - FailedMessages: atomic.LoadInt64(&h.failedMessages), - QueueLength: len(h.logCh), +// Target - Kafka target. +type Target struct { + totalMessages int64 + failedMessages int64 + + wg sync.WaitGroup + doneCh chan struct{} + + // Channel of log entries + logCh chan audit.Entry + + // store to persist and replay the logs to the target + // to avoid missing events when the target is down. + store store.Store[audit.Entry] + storeCtxCancel context.CancelFunc + initKafkaOnce once.Init + initQueueStoreOnce once.Init + + producer sarama.SyncProducer + kconfig Config + config *sarama.Config +} + +func (h *Target) validate() error { + if len(h.kconfig.Brokers) == 0 { + return errors.New("no broker address found") } + for _, b := range h.kconfig.Brokers { + if _, err := xnet.ParseHost(b.String()); err != nil { + return err + } + } + return nil +} + +// Name returns the name of the target +func (h *Target) Name() string { + return "minio-kafka-audit" } // Endpoint - return kafka target @@ -180,24 +136,95 @@ func (h *Target) String() string { return "kafka" } -// IsOnline returns true if the initialization was successful -func (h *Target) IsOnline() bool { - return h.online +// Stats returns the target statistics. +func (h *Target) Stats() types.TargetStats { + return types.TargetStats{ + TotalMessages: atomic.LoadInt64(&h.totalMessages), + FailedMessages: atomic.LoadInt64(&h.failedMessages), + QueueLength: len(h.logCh), + } } // Init initialize kafka target -func (h *Target) Init() error { +func (h *Target) Init(ctx context.Context) error { if !h.kconfig.Enabled { return nil } - if len(h.kconfig.Brokers) == 0 { - return errors.New("no broker address found") + if err := h.validate(); err != nil { + return err } - for _, b := range h.kconfig.Brokers { - if _, err := xnet.ParseHost(b.String()); err != nil { + if h.kconfig.QueueDir != "" { + if err := h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore); err != nil { return err } + return h.initKafkaOnce.Do(h.init) } + if err := h.init(); err != nil { + return err + } + go h.startKakfaLogger() + return nil +} + +func (h *Target) initQueueStore(ctx context.Context) (err error) { + var queueStore store.Store[audit.Entry] + queueDir := filepath.Join(h.kconfig.QueueDir, h.Name()) + queueStore = store.NewQueueStore[audit.Entry](queueDir, uint64(h.kconfig.QueueSize), kafkaLoggerExtension) + if err = queueStore.Open(); err != nil { + return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err) + } + ctx, cancel := context.WithCancel(ctx) + h.store = queueStore + h.storeCtxCancel = cancel + store.StreamItems(h.store, h, ctx.Done(), h.kconfig.LogOnce) + return +} + +func (h *Target) startKakfaLogger() { + // Create a routine which sends json logs received + // from an internal channel. + h.wg.Add(1) + go func() { + defer h.wg.Done() + + for { + select { + case entry := <-h.logCh: + h.logEntry(entry) + case <-h.doneCh: + return + } + } + }() +} + +func (h *Target) logEntry(entry audit.Entry) { + atomic.AddInt64(&h.totalMessages, 1) + if err := h.send(entry); err != nil { + atomic.AddInt64(&h.failedMessages, 1) + h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic) + } +} + +func (h *Target) send(entry audit.Entry) error { + if err := h.initKafkaOnce.Do(h.init); err != nil { + return err + } + logJSON, err := json.Marshal(&entry) + if err != nil { + return err + } + msg := sarama.ProducerMessage{ + Topic: h.kconfig.Topic, + Key: sarama.StringEncoder(entry.RequestID), + Value: sarama.ByteEncoder(logJSON), + } + _, _, err = h.producer.SendMessage(&msg) + return err +} + +// Init initialize kafka target +func (h *Target) init() error { if err := h.kconfig.pingBrokers(); err != nil { return err } @@ -244,15 +271,73 @@ func (h *Target) Init() error { } h.producer = producer - h.online = true - go h.startKakfaLogger() return nil } +// IsOnline returns true if the target is online. +func (h *Target) IsOnline(_ context.Context) bool { + if err := h.initKafkaOnce.Do(h.init); err != nil { + return false + } + return h.kconfig.pingBrokers() == nil +} + +// Send log message 'e' to kafka target. +func (h *Target) Send(ctx context.Context, entry interface{}) error { + if auditEntry, ok := entry.(audit.Entry); ok { + if h.store != nil { + // save the entry to the queue store which will be replayed to the target. + return h.store.Put(auditEntry) + } + if err := h.initKafkaOnce.Do(h.init); err != nil { + return err + } + select { + case <-h.doneCh: + case h.logCh <- auditEntry: + default: + // log channel is full, do not wait and return + // an error immediately to the caller + atomic.AddInt64(&h.totalMessages, 1) + atomic.AddInt64(&h.failedMessages, 1) + return errors.New("log buffer full") + } + } + return nil +} + +// SendFromStore - reads the log from store and sends it to kafka. +func (h *Target) SendFromStore(key string) (err error) { + var auditEntry audit.Entry + auditEntry, err = h.store.Get(key) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + atomic.AddInt64(&h.totalMessages, 1) + err = h.send(auditEntry) + if err != nil { + atomic.AddInt64(&h.failedMessages, 1) + return + } + // Delete the event from store. + return h.store.Del(key) +} + // Cancel - cancels the target func (h *Target) Cancel() { close(h.doneCh) close(h.logCh) + // If queuestore is configured, cancel it's context to + // stop the replay go-routine. + if h.store != nil { + h.storeCtxCancel() + } + if h.producer != nil { + h.producer.Close() + } h.wg.Wait() } @@ -263,7 +348,6 @@ func New(config Config) *Target { logCh: make(chan audit.Entry, 10000), doneCh: make(chan struct{}), kconfig: config, - online: false, } return target } diff --git a/internal/logger/targets.go b/internal/logger/targets.go index 4abe913df..da39c6184 100644 --- a/internal/logger/targets.go +++ b/internal/logger/targets.go @@ -18,6 +18,7 @@ package logger import ( + "context" "fmt" "strings" "sync" @@ -36,10 +37,10 @@ type Target interface { String() string Endpoint() string Stats() types.TargetStats - Init() error - IsOnline() bool + Init(ctx context.Context) error + IsOnline(ctx context.Context) bool Cancel() - Send(entry interface{}) error + Send(ctx context.Context, entry interface{}) error Type() types.TargetType } @@ -57,12 +58,12 @@ var ( ) // TargetStatus returns status of the target (online|offline) -func TargetStatus(h Target) madmin.Status { - if h.IsOnline() { +func TargetStatus(ctx context.Context, h Target) madmin.Status { + if h.IsOnline(ctx) { return madmin.Status{Status: string(madmin.ItemOnline)} } // Previous initialization had failed. Try again. - if e := h.Init(); e == nil { + if e := h.Init(ctx); e == nil { return madmin.Status{Status: string(madmin.ItemOnline)} } return madmin.Status{Status: string(madmin.ItemOffline)} @@ -124,8 +125,8 @@ var ( // AddSystemTarget adds a new logger target to the // list of enabled loggers -func AddSystemTarget(t Target) error { - if err := t.Init(); err != nil { +func AddSystemTarget(ctx context.Context, t Target) error { + if err := t.Init(ctx); err != nil { return err } @@ -144,7 +145,7 @@ func AddSystemTarget(t Target) error { return nil } -func initSystemTargets(cfgMap map[string]http.Config) ([]Target, []error) { +func initSystemTargets(ctx context.Context, cfgMap map[string]http.Config) ([]Target, []error) { tgts := []Target{} errs := []error{} for _, l := range cfgMap { @@ -152,7 +153,7 @@ func initSystemTargets(cfgMap map[string]http.Config) ([]Target, []error) { t := http.New(l) tgts = append(tgts, t) - e := t.Init() + e := t.Init(ctx) if e != nil { errs = append(errs, e) } @@ -161,7 +162,7 @@ func initSystemTargets(cfgMap map[string]http.Config) ([]Target, []error) { return tgts, errs } -func initKafkaTargets(cfgMap map[string]kafka.Config) ([]Target, []error) { +func initKafkaTargets(ctx context.Context, cfgMap map[string]kafka.Config) ([]Target, []error) { tgts := []Target{} errs := []error{} for _, l := range cfgMap { @@ -169,7 +170,7 @@ func initKafkaTargets(cfgMap map[string]kafka.Config) ([]Target, []error) { t := kafka.New(l) tgts = append(tgts, t) - e := t.Init() + e := t.Init(ctx) if e != nil { errs = append(errs, e) } @@ -200,8 +201,8 @@ func cancelTargets(targets []Target) { } // UpdateSystemTargets swaps targets with newly loaded ones from the cfg -func UpdateSystemTargets(cfg Config) []error { - newTgts, errs := initSystemTargets(cfg.HTTP) +func UpdateSystemTargets(ctx context.Context, cfg Config) []error { + newTgts, errs := initSystemTargets(ctx, cfg.HTTP) swapSystemMuRW.Lock() consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole) @@ -214,8 +215,8 @@ func UpdateSystemTargets(cfg Config) []error { } // UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg -func UpdateAuditWebhookTargets(cfg Config) []error { - newWebhookTgts, errs := initSystemTargets(cfg.AuditWebhook) +func UpdateAuditWebhookTargets(ctx context.Context, cfg Config) []error { + newWebhookTgts, errs := initSystemTargets(ctx, cfg.AuditWebhook) swapAuditMuRW.Lock() // Retain kafka targets @@ -229,8 +230,8 @@ func UpdateAuditWebhookTargets(cfg Config) []error { } // UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg -func UpdateAuditKafkaTargets(cfg Config) []error { - newKafkaTgts, errs := initKafkaTargets(cfg.AuditKafka) +func UpdateAuditKafkaTargets(ctx context.Context, cfg Config) []error { + newKafkaTgts, errs := initKafkaTargets(ctx, cfg.AuditKafka) swapAuditMuRW.Lock() // Retain webhook targets diff --git a/internal/event/target/lazyinit.go b/internal/once/init.go similarity index 57% rename from internal/event/target/lazyinit.go rename to internal/once/init.go index c7abf38d9..4bd02ac6b 100644 --- a/internal/event/target/lazyinit.go +++ b/internal/once/init.go @@ -15,9 +15,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package target +package once import ( + "context" "sync" "sync/atomic" ) @@ -25,19 +26,23 @@ import ( // Inspired from Golang sync.Once but it is only marked // initialized when the provided function returns nil. -type lazyInit struct { +// Init represents the structure. +type Init struct { done uint32 m sync.Mutex } -func (l *lazyInit) Do(f func() error) error { +// Do is similar to sync.Once.Do - makes one successful +// call to the function. ie, it invokes the function +// if it is not successful yet. +func (l *Init) Do(f func() error) error { if atomic.LoadUint32(&l.done) == 0 { - return l.doSlow(f) + return l.do(f) } return nil } -func (l *lazyInit) doSlow(f func() error) error { +func (l *Init) do(f func() error) error { l.m.Lock() defer l.m.Unlock() if atomic.LoadUint32(&l.done) == 0 { @@ -49,3 +54,24 @@ func (l *lazyInit) doSlow(f func() error) error { } return nil } + +// DoWithContext is similar to Do except that it accepts a context as an argument to be passed. +func (l *Init) DoWithContext(ctx context.Context, f func(context.Context) error) error { + if atomic.LoadUint32(&l.done) == 0 { + return l.doWithContext(ctx, f) + } + return nil +} + +func (l *Init) doWithContext(ctx context.Context, f func(context.Context) error) error { + l.m.Lock() + defer l.m.Unlock() + if atomic.LoadUint32(&l.done) == 0 { + if err := f(ctx); err != nil { + return err + } + // Mark as done only when f() is successful + atomic.StoreUint32(&l.done, 1) + } + return nil +} diff --git a/internal/store/store.go b/internal/store/store.go index b8428a71f..c90c609bf 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -24,7 +24,6 @@ import ( "strings" "time" - "github.com/minio/minio/internal/logger" xnet "github.com/minio/pkg/net" ) @@ -32,13 +31,15 @@ const ( retryInterval = 3 * time.Second ) +type logger = func(ctx context.Context, err error, id string, errKind ...interface{}) + // ErrNotConnected - indicates that the target connection is not active. var ErrNotConnected = errors.New("not connected to target server/service") // Target - store target interface type Target interface { Name() string - Send(key string) error + SendFromStore(key string) error } // Store - Used to persist items. @@ -53,7 +54,7 @@ type Store[I any] interface { } // replayItems - Reads the items from the store and replays. -func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logger.LogOnce, id string) <-chan string { +func replayItems[I any](store Store[I], doneCh <-chan struct{}, logger logger, id string) <-chan string { itemKeyCh := make(chan string) go func() { @@ -65,7 +66,7 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logge for { names, err := store.List() if err != nil { - loggerOnce(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) + logger(context.Background(), fmt.Errorf("store.List() failed with: %w", err), id) } else { for _, name := range names { select { @@ -89,20 +90,20 @@ func replayItems[I any](store Store[I], doneCh <-chan struct{}, loggerOnce logge } // sendItems - Reads items from the store and re-plays. -func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { +func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, logger logger) { retryTicker := time.NewTicker(retryInterval) defer retryTicker.Stop() send := func(itemKey string) bool { for { - err := target.Send(itemKey) + err := target.SendFromStore(itemKey) if err == nil { break } if err != ErrNotConnected && !xnet.IsConnResetErr(err) { - loggerOnce(context.Background(), - fmt.Errorf("target.Send() failed with '%w'", err), + logger(context.Background(), + fmt.Errorf("target.SendFromStore() failed with '%w'", err), target.Name()) } @@ -135,11 +136,11 @@ func sendItems(target Target, itemKeyCh <-chan string, doneCh <-chan struct{}, l } // StreamItems reads the keys from the store and replays the corresponding item to the target. -func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, loggerOnce logger.LogOnce) { +func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger) { go func() { // Replays the items from the store. - itemKeyCh := replayItems(store, doneCh, loggerOnce, target.Name()) + itemKeyCh := replayItems(store, doneCh, logger, target.Name()) // Send items from the store. - sendItems(target, itemKeyCh, doneCh, loggerOnce) + sendItems(target, itemKeyCh, doneCh, logger) }() }