// Copyright (c) 2015-2021 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package logger import ( "crypto/tls" "errors" "strconv" "strings" "github.com/minio/pkg/env" xnet "github.com/minio/pkg/net" "github.com/minio/minio/internal/config" "github.com/minio/minio/internal/logger/target/http" "github.com/minio/minio/internal/logger/target/kafka" ) // Console logger target type Console struct { Enabled bool `json:"enabled"` } // Audit/Logger constants const ( Endpoint = "endpoint" AuthToken = "auth_token" ClientCert = "client_cert" ClientKey = "client_key" QueueSize = "queue_size" QueueDir = "queue_dir" Proxy = "proxy" KafkaBrokers = "brokers" KafkaTopic = "topic" KafkaTLS = "tls" KafkaTLSSkipVerify = "tls_skip_verify" KafkaTLSClientAuth = "tls_client_auth" KafkaSASL = "sasl" KafkaSASLUsername = "sasl_username" KafkaSASLPassword = "sasl_password" KafkaSASLMechanism = "sasl_mechanism" KafkaClientTLSCert = "client_tls_cert" KafkaClientTLSKey = "client_tls_key" KafkaVersion = "version" KafkaQueueDir = "queue_dir" KafkaQueueSize = "queue_size" EnvLoggerWebhookEnable = "MINIO_LOGGER_WEBHOOK_ENABLE" EnvLoggerWebhookEndpoint = "MINIO_LOGGER_WEBHOOK_ENDPOINT" EnvLoggerWebhookAuthToken = "MINIO_LOGGER_WEBHOOK_AUTH_TOKEN" EnvLoggerWebhookClientCert = "MINIO_LOGGER_WEBHOOK_CLIENT_CERT" EnvLoggerWebhookClientKey = "MINIO_LOGGER_WEBHOOK_CLIENT_KEY" EnvLoggerWebhookProxy = "MINIO_LOGGER_WEBHOOK_PROXY" EnvLoggerWebhookQueueSize = "MINIO_LOGGER_WEBHOOK_QUEUE_SIZE" EnvLoggerWebhookQueueDir = "MINIO_LOGGER_WEBHOOK_QUEUE_DIR" EnvAuditWebhookEnable = "MINIO_AUDIT_WEBHOOK_ENABLE" EnvAuditWebhookEndpoint = "MINIO_AUDIT_WEBHOOK_ENDPOINT" EnvAuditWebhookAuthToken = "MINIO_AUDIT_WEBHOOK_AUTH_TOKEN" EnvAuditWebhookClientCert = "MINIO_AUDIT_WEBHOOK_CLIENT_CERT" EnvAuditWebhookClientKey = "MINIO_AUDIT_WEBHOOK_CLIENT_KEY" EnvAuditWebhookQueueSize = "MINIO_AUDIT_WEBHOOK_QUEUE_SIZE" EnvAuditWebhookQueueDir = "MINIO_AUDIT_WEBHOOK_QUEUE_DIR" EnvKafkaEnable = "MINIO_AUDIT_KAFKA_ENABLE" EnvKafkaBrokers = "MINIO_AUDIT_KAFKA_BROKERS" EnvKafkaTopic = "MINIO_AUDIT_KAFKA_TOPIC" EnvKafkaTLS = "MINIO_AUDIT_KAFKA_TLS" EnvKafkaTLSSkipVerify = "MINIO_AUDIT_KAFKA_TLS_SKIP_VERIFY" EnvKafkaTLSClientAuth = "MINIO_AUDIT_KAFKA_TLS_CLIENT_AUTH" EnvKafkaSASLEnable = "MINIO_AUDIT_KAFKA_SASL" EnvKafkaSASLUsername = "MINIO_AUDIT_KAFKA_SASL_USERNAME" EnvKafkaSASLPassword = "MINIO_AUDIT_KAFKA_SASL_PASSWORD" EnvKafkaSASLMechanism = "MINIO_AUDIT_KAFKA_SASL_MECHANISM" EnvKafkaClientTLSCert = "MINIO_AUDIT_KAFKA_CLIENT_TLS_CERT" EnvKafkaClientTLSKey = "MINIO_AUDIT_KAFKA_CLIENT_TLS_KEY" EnvKafkaVersion = "MINIO_AUDIT_KAFKA_VERSION" EnvKafkaQueueDir = "MINIO_AUDIT_KAFKA_QUEUE_DIR" EnvKafkaQueueSize = "MINIO_AUDIT_KAFKA_QUEUE_SIZE" loggerTargetNamePrefix = "logger-" auditTargetNamePrefix = "audit-" ) // Default KVS for loggerHTTP and loggerAuditHTTP var ( DefaultLoggerWebhookKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: Endpoint, Value: "", }, config.KV{ Key: AuthToken, Value: "", }, config.KV{ Key: ClientCert, Value: "", }, config.KV{ Key: ClientKey, Value: "", }, config.KV{ Key: Proxy, Value: "", }, config.KV{ Key: QueueSize, Value: "100000", }, config.KV{ Key: QueueDir, Value: "", }, } DefaultAuditWebhookKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: Endpoint, Value: "", }, config.KV{ Key: AuthToken, Value: "", }, config.KV{ Key: ClientCert, Value: "", }, config.KV{ Key: ClientKey, Value: "", }, config.KV{ Key: QueueSize, Value: "100000", }, config.KV{ Key: QueueDir, Value: "", }, } DefaultAuditKafkaKVS = config.KVS{ config.KV{ Key: config.Enable, Value: config.EnableOff, }, config.KV{ Key: KafkaTopic, Value: "", }, config.KV{ Key: KafkaBrokers, Value: "", }, config.KV{ Key: KafkaSASLUsername, Value: "", }, config.KV{ Key: KafkaSASLPassword, Value: "", }, config.KV{ Key: KafkaSASLMechanism, Value: "plain", }, config.KV{ Key: KafkaClientTLSCert, Value: "", }, config.KV{ Key: KafkaClientTLSKey, Value: "", }, config.KV{ Key: KafkaTLSClientAuth, Value: "0", }, config.KV{ Key: KafkaSASL, Value: config.EnableOff, }, config.KV{ Key: KafkaTLS, Value: config.EnableOff, }, config.KV{ Key: KafkaTLSSkipVerify, Value: config.EnableOff, }, config.KV{ Key: KafkaVersion, Value: "", }, config.KV{ Key: QueueSize, Value: "100000", }, config.KV{ Key: QueueDir, Value: "", }, } ) // Config console and http logger targets type Config struct { Console Console `json:"console"` HTTP map[string]http.Config `json:"http"` AuditWebhook map[string]http.Config `json:"audit"` AuditKafka map[string]kafka.Config `json:"audit_kafka"` } // NewConfig - initialize new logger config. func NewConfig() Config { cfg := Config{ // Console logging is on by default Console: Console{ Enabled: true, }, HTTP: make(map[string]http.Config), AuditWebhook: make(map[string]http.Config), AuditKafka: make(map[string]kafka.Config), } return cfg } func lookupLegacyConfigForSubSys(subSys string) Config { cfg := NewConfig() switch subSys { case config.LoggerWebhookSubSys: var loggerTargets []string envs := env.List(legacyEnvLoggerHTTPEndpoint) for _, k := range envs { target := strings.TrimPrefix(k, legacyEnvLoggerHTTPEndpoint+config.Default) if target == legacyEnvLoggerHTTPEndpoint { target = config.Default } loggerTargets = append(loggerTargets, target) } // Load HTTP logger from the environment if found for _, target := range loggerTargets { endpointEnv := legacyEnvLoggerHTTPEndpoint if target != config.Default { endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target } endpoint := env.Get(endpointEnv, "") if endpoint == "" { continue } cfg.HTTP[target] = http.Config{ Enabled: true, Endpoint: endpoint, } } case config.AuditWebhookSubSys: // List legacy audit ENVs if any. var loggerAuditTargets []string envs := env.List(legacyEnvAuditLoggerHTTPEndpoint) for _, k := range envs { target := strings.TrimPrefix(k, legacyEnvAuditLoggerHTTPEndpoint+config.Default) if target == legacyEnvAuditLoggerHTTPEndpoint { target = config.Default } loggerAuditTargets = append(loggerAuditTargets, target) } for _, target := range loggerAuditTargets { endpointEnv := legacyEnvAuditLoggerHTTPEndpoint if target != config.Default { endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target } endpoint := env.Get(endpointEnv, "") if endpoint == "" { continue } cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: endpoint, } } } return cfg } func lookupAuditKafkaConfig(scfg config.Config, cfg Config) (Config, error) { for k, kv := range config.Merge(scfg[config.AuditKafkaSubSys], EnvKafkaEnable, DefaultAuditKafkaKVS) { enableEnv := EnvKafkaEnable if k != config.Default { enableEnv = enableEnv + config.Default + k } enabled, err := config.ParseBool(env.Get(enableEnv, kv.Get(config.Enable))) if err != nil { return cfg, err } if !enabled { continue } var brokers []xnet.Host brokersEnv := EnvKafkaBrokers if k != config.Default { brokersEnv = brokersEnv + config.Default + k } kafkaBrokers := env.Get(brokersEnv, kv.Get(KafkaBrokers)) if len(kafkaBrokers) == 0 { return cfg, config.Errorf("kafka 'brokers' cannot be empty") } for _, s := range strings.Split(kafkaBrokers, config.ValueSeparator) { var host *xnet.Host host, err = xnet.ParseHost(s) if err != nil { break } brokers = append(brokers, *host) } if err != nil { return cfg, err } clientAuthEnv := EnvKafkaTLSClientAuth if k != config.Default { clientAuthEnv = clientAuthEnv + config.Default + k } clientAuth, err := strconv.Atoi(env.Get(clientAuthEnv, kv.Get(KafkaTLSClientAuth))) if err != nil { return cfg, err } topicEnv := EnvKafkaTopic if k != config.Default { topicEnv = topicEnv + config.Default + k } versionEnv := EnvKafkaVersion if k != config.Default { versionEnv = versionEnv + config.Default + k } kafkaArgs := kafka.Config{ Enabled: enabled, Brokers: brokers, Topic: env.Get(topicEnv, kv.Get(KafkaTopic)), Version: env.Get(versionEnv, kv.Get(KafkaVersion)), } tlsEnableEnv := EnvKafkaTLS if k != config.Default { tlsEnableEnv = tlsEnableEnv + config.Default + k } tlsSkipVerifyEnv := EnvKafkaTLSSkipVerify if k != config.Default { tlsSkipVerifyEnv = tlsSkipVerifyEnv + config.Default + k } tlsClientTLSCertEnv := EnvKafkaClientTLSCert if k != config.Default { tlsClientTLSCertEnv = tlsClientTLSCertEnv + config.Default + k } tlsClientTLSKeyEnv := EnvKafkaClientTLSKey if k != config.Default { tlsClientTLSKeyEnv = tlsClientTLSKeyEnv + config.Default + k } kafkaArgs.TLS.Enable = env.Get(tlsEnableEnv, kv.Get(KafkaTLS)) == config.EnableOn kafkaArgs.TLS.SkipVerify = env.Get(tlsSkipVerifyEnv, kv.Get(KafkaTLSSkipVerify)) == config.EnableOn kafkaArgs.TLS.ClientAuth = tls.ClientAuthType(clientAuth) kafkaArgs.TLS.ClientTLSCert = env.Get(tlsClientTLSCertEnv, kv.Get(KafkaClientTLSCert)) kafkaArgs.TLS.ClientTLSKey = env.Get(tlsClientTLSKeyEnv, kv.Get(KafkaClientTLSKey)) saslEnableEnv := EnvKafkaSASLEnable if k != config.Default { saslEnableEnv = saslEnableEnv + config.Default + k } saslUsernameEnv := EnvKafkaSASLUsername if k != config.Default { saslUsernameEnv = saslUsernameEnv + config.Default + k } saslPasswordEnv := EnvKafkaSASLPassword if k != config.Default { saslPasswordEnv = saslPasswordEnv + config.Default + k } saslMechanismEnv := EnvKafkaSASLMechanism if k != config.Default { saslMechanismEnv = saslMechanismEnv + config.Default + k } kafkaArgs.SASL.Enable = env.Get(saslEnableEnv, kv.Get(KafkaSASL)) == config.EnableOn kafkaArgs.SASL.User = env.Get(saslUsernameEnv, kv.Get(KafkaSASLUsername)) kafkaArgs.SASL.Password = env.Get(saslPasswordEnv, kv.Get(KafkaSASLPassword)) kafkaArgs.SASL.Mechanism = env.Get(saslMechanismEnv, kv.Get(KafkaSASLMechanism)) queueDirEnv := EnvKafkaQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k } kafkaArgs.QueueDir = env.Get(queueDirEnv, kv.Get(KafkaQueueDir)) queueSizeEnv := EnvKafkaQueueSize if k != config.Default { queueSizeEnv = queueSizeEnv + config.Default + k } queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, kv.Get(KafkaQueueSize))) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } kafkaArgs.QueueSize = queueSize cfg.AuditKafka[k] = kafkaArgs } return cfg, nil } func lookupLoggerWebhookConfig(scfg config.Config, cfg Config) (Config, error) { envs := env.List(EnvLoggerWebhookEndpoint) var loggerTargets []string for _, k := range envs { target := strings.TrimPrefix(k, EnvLoggerWebhookEndpoint+config.Default) if target == EnvLoggerWebhookEndpoint { target = config.Default } loggerTargets = append(loggerTargets, target) } // Load HTTP logger from the environment if found for _, target := range loggerTargets { if v, ok := cfg.HTTP[target]; ok && v.Enabled { // This target is already enabled using the // legacy environment variables, ignore. continue } enableEnv := EnvLoggerWebhookEnable if target != config.Default { enableEnv = EnvLoggerWebhookEnable + config.Default + target } enable, err := config.ParseBool(env.Get(enableEnv, "")) if err != nil || !enable { continue } endpointEnv := EnvLoggerWebhookEndpoint if target != config.Default { endpointEnv = EnvLoggerWebhookEndpoint + config.Default + target } authTokenEnv := EnvLoggerWebhookAuthToken if target != config.Default { authTokenEnv = EnvLoggerWebhookAuthToken + config.Default + target } clientCertEnv := EnvLoggerWebhookClientCert if target != config.Default { clientCertEnv = EnvLoggerWebhookClientCert + config.Default + target } clientKeyEnv := EnvLoggerWebhookClientKey if target != config.Default { clientKeyEnv = EnvLoggerWebhookClientKey + config.Default + target } err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, "")) if err != nil { return cfg, err } proxyEnv := EnvLoggerWebhookProxy queueSizeEnv := EnvLoggerWebhookQueueSize if target != config.Default { queueSizeEnv = EnvLoggerWebhookQueueSize + config.Default + target } queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } queueDirEnv := EnvLoggerWebhookQueueDir if target != config.Default { queueDirEnv = EnvLoggerWebhookQueueDir + config.Default + target } cfg.HTTP[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), Proxy: env.Get(proxyEnv, ""), QueueSize: queueSize, QueueDir: env.Get(queueDirEnv, ""), Name: loggerTargetNamePrefix + target, } } for starget, kv := range scfg[config.LoggerWebhookSubSys] { if l, ok := cfg.HTTP[starget]; ok && l.Enabled { // Ignore this HTTP logger config since there is // a target with the same name loaded and enabled // from the environment. continue } subSysTarget := config.LoggerWebhookSubSys if starget != config.Default { subSysTarget = config.LoggerWebhookSubSys + config.SubSystemSeparator + starget } if err := config.CheckValidKeys(subSysTarget, kv, DefaultLoggerWebhookKVS); err != nil { return cfg, err } enabled, err := config.ParseBool(kv.Get(config.Enable)) if err != nil { return cfg, err } if !enabled { continue } err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) if err != nil { return cfg, err } queueSize, err := strconv.Atoi(kv.Get(QueueSize)) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } cfg.HTTP[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), Proxy: kv.Get(Proxy), QueueSize: queueSize, QueueDir: kv.Get(QueueDir), Name: loggerTargetNamePrefix + starget, } } return cfg, nil } func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) { var loggerAuditTargets []string envs := env.List(EnvAuditWebhookEndpoint) for _, k := range envs { target := strings.TrimPrefix(k, EnvAuditWebhookEndpoint+config.Default) if target == EnvAuditWebhookEndpoint { target = config.Default } loggerAuditTargets = append(loggerAuditTargets, target) } for _, target := range loggerAuditTargets { if v, ok := cfg.AuditWebhook[target]; ok && v.Enabled { // This target is already enabled using the // legacy environment variables, ignore. continue } enableEnv := EnvAuditWebhookEnable if target != config.Default { enableEnv = EnvAuditWebhookEnable + config.Default + target } enable, err := config.ParseBool(env.Get(enableEnv, "")) if err != nil || !enable { continue } endpointEnv := EnvAuditWebhookEndpoint if target != config.Default { endpointEnv = EnvAuditWebhookEndpoint + config.Default + target } authTokenEnv := EnvAuditWebhookAuthToken if target != config.Default { authTokenEnv = EnvAuditWebhookAuthToken + config.Default + target } clientCertEnv := EnvAuditWebhookClientCert if target != config.Default { clientCertEnv = EnvAuditWebhookClientCert + config.Default + target } clientKeyEnv := EnvAuditWebhookClientKey if target != config.Default { clientKeyEnv = EnvAuditWebhookClientKey + config.Default + target } err = config.EnsureCertAndKey(env.Get(clientCertEnv, ""), env.Get(clientKeyEnv, "")) if err != nil { return cfg, err } queueSizeEnv := EnvAuditWebhookQueueSize if target != config.Default { queueSizeEnv = EnvAuditWebhookQueueSize + config.Default + target } queueSize, err := strconv.Atoi(env.Get(queueSizeEnv, "100000")) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } queueDirEnv := EnvAuditWebhookQueueDir if target != config.Default { queueDirEnv = EnvAuditWebhookQueueDir + config.Default + target } cfg.AuditWebhook[target] = http.Config{ Enabled: true, Endpoint: env.Get(endpointEnv, ""), AuthToken: env.Get(authTokenEnv, ""), ClientCert: env.Get(clientCertEnv, ""), ClientKey: env.Get(clientKeyEnv, ""), QueueSize: queueSize, QueueDir: env.Get(queueDirEnv, ""), Name: auditTargetNamePrefix + target, } } for starget, kv := range scfg[config.AuditWebhookSubSys] { if l, ok := cfg.AuditWebhook[starget]; ok && l.Enabled { // Ignore this audit config since another target // with the same name is already loaded and enabled // in the shell environment. continue } subSysTarget := config.AuditWebhookSubSys if starget != config.Default { subSysTarget = config.AuditWebhookSubSys + config.SubSystemSeparator + starget } if err := config.CheckValidKeys(subSysTarget, kv, DefaultAuditWebhookKVS); err != nil { return cfg, err } enabled, err := config.ParseBool(kv.Get(config.Enable)) if err != nil { return cfg, err } if !enabled { continue } err = config.EnsureCertAndKey(kv.Get(ClientCert), kv.Get(ClientKey)) if err != nil { return cfg, err } queueSize, err := strconv.Atoi(kv.Get(QueueSize)) if err != nil { return cfg, err } if queueSize <= 0 { return cfg, errors.New("invalid queue_size value") } cfg.AuditWebhook[starget] = http.Config{ Enabled: true, Endpoint: kv.Get(Endpoint), AuthToken: kv.Get(AuthToken), ClientCert: kv.Get(ClientCert), ClientKey: kv.Get(ClientKey), QueueSize: queueSize, QueueDir: kv.Get(QueueDir), Name: auditTargetNamePrefix + starget, } } return cfg, nil } // LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err error) { switch subSys { case config.LoggerWebhookSubSys: cfg = lookupLegacyConfigForSubSys(config.LoggerWebhookSubSys) if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil { return cfg, err } case config.AuditWebhookSubSys: cfg = lookupLegacyConfigForSubSys(config.AuditWebhookSubSys) if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil { return cfg, err } case config.AuditKafkaSubSys: cfg.AuditKafka = make(map[string]kafka.Config) if cfg, err = lookupAuditKafkaConfig(scfg, cfg); err != nil { return cfg, err } } return cfg, nil } // ValidateSubSysConfig - validates logger related config of given sub-system func ValidateSubSysConfig(scfg config.Config, subSys string) error { // Lookup for legacy environment variables first _, err := LookupConfigForSubSys(scfg, subSys) return err }