mirror of
https://github.com/minio/minio.git
synced 2025-04-09 06:00:12 -04:00
Make audit webhook and kafka config dynamic (#14390)
This commit is contained in:
parent
0913eb6655
commit
3934700a08
@ -2222,7 +2222,7 @@ func fetchKMSStatus() madmin.KMS {
|
|||||||
func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) {
|
func fetchLoggerInfo() ([]madmin.Logger, []madmin.Audit) {
|
||||||
var loggerInfo []madmin.Logger
|
var loggerInfo []madmin.Logger
|
||||||
var auditloggerInfo []madmin.Audit
|
var auditloggerInfo []madmin.Audit
|
||||||
for _, target := range logger.Targets() {
|
for _, target := range logger.SystemTargets() {
|
||||||
if target.Endpoint() != "" {
|
if target.Endpoint() != "" {
|
||||||
tgt := target.String()
|
tgt := target.String()
|
||||||
err := checkConnection(target.Endpoint(), 15*time.Second)
|
err := checkConnection(target.Endpoint(), 15*time.Second)
|
||||||
|
@ -44,8 +44,6 @@ import (
|
|||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/kms"
|
"github.com/minio/minio/internal/kms"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/logger/target/http"
|
|
||||||
"github.com/minio/minio/internal/logger/target/kafka"
|
|
||||||
"github.com/minio/pkg/env"
|
"github.com/minio/pkg/env"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -353,12 +351,6 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.LoggerSubSystems.Contains(subSys) {
|
|
||||||
if err := logger.ValidateSubSysConfig(s, subSys); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.NotifySubSystems.Contains(subSys) {
|
if config.NotifySubSystems.Contains(subSys) {
|
||||||
if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil {
|
if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs(), subSys); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -566,36 +558,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
|||||||
logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load logger targets based on user's configuration
|
|
||||||
loggerUserAgent := getUserAgent(getMinioMode())
|
|
||||||
|
|
||||||
loggerCfg, err := logger.LookupConfig(s)
|
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize logger/audit targets: %w", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, l := range loggerCfg.AuditWebhook {
|
|
||||||
if l.Enabled {
|
|
||||||
l.LogOnce = logger.LogOnceIf
|
|
||||||
l.UserAgent = loggerUserAgent
|
|
||||||
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
|
||||||
// Enable http audit logging
|
|
||||||
if err = logger.AddAuditTarget(http.New(l)); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit HTTP target: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, l := range loggerCfg.AuditKafka {
|
|
||||||
if l.Enabled {
|
|
||||||
l.LogOnce = logger.LogOnceIf
|
|
||||||
// Enable Kafka audit logging
|
|
||||||
if err = logger.AddAuditTarget(kafka.New(l)); err != nil {
|
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize server audit Kafka target: %w", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), false)
|
globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
|
||||||
@ -657,7 +619,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
|||||||
scannerCycle.Update(scannerCfg.Cycle)
|
scannerCycle.Update(scannerCfg.Cycle)
|
||||||
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
loggerCfg, err := logger.LookupConfig(s)
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
||||||
}
|
}
|
||||||
@ -670,10 +632,44 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
|||||||
loggerCfg.HTTP[n] = l
|
loggerCfg.HTTP[n] = l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = logger.UpdateTargets(loggerCfg)
|
err = logger.UpdateSystemTargets(loggerCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
|
||||||
}
|
}
|
||||||
|
case config.AuditWebhookSubSys:
|
||||||
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err))
|
||||||
|
}
|
||||||
|
userAgent := getUserAgent(getMinioMode())
|
||||||
|
for n, l := range loggerCfg.AuditWebhook {
|
||||||
|
if l.Enabled {
|
||||||
|
l.LogOnce = logger.LogOnceIf
|
||||||
|
l.UserAgent = userAgent
|
||||||
|
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
||||||
|
loggerCfg.AuditWebhook[n] = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = logger.UpdateAuditWebhookTargets(loggerCfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %w", err))
|
||||||
|
}
|
||||||
|
case config.AuditKafkaSubSys:
|
||||||
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err))
|
||||||
|
}
|
||||||
|
for n, l := range loggerCfg.AuditKafka {
|
||||||
|
if l.Enabled {
|
||||||
|
l.LogOnce = logger.LogOnceIf
|
||||||
|
loggerCfg.AuditKafka[n] = l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = logger.UpdateAuditKafkaTargets(loggerCfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
globalServerConfigMu.Lock()
|
globalServerConfigMu.Lock()
|
||||||
defer globalServerConfigMu.Unlock()
|
defer globalServerConfigMu.Unlock()
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/logger/message/log"
|
"github.com/minio/minio/internal/logger/message/log"
|
||||||
"github.com/minio/minio/internal/logger/target/console"
|
"github.com/minio/minio/internal/logger/target/console"
|
||||||
|
"github.com/minio/minio/internal/logger/target/types"
|
||||||
"github.com/minio/minio/internal/pubsub"
|
"github.com/minio/minio/internal/pubsub"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
@ -77,7 +78,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
|
|||||||
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) {
|
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) {
|
||||||
// Enable console logging for remote client.
|
// Enable console logging for remote client.
|
||||||
if !sys.HasLogListeners() {
|
if !sys.HasLogListeners() {
|
||||||
logger.AddTarget(sys)
|
logger.AddSystemTarget(sys)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnt := 0
|
cnt := 0
|
||||||
@ -154,6 +155,11 @@ func (sys *HTTPConsoleLoggerSys) Content() (logs []log.Entry) {
|
|||||||
func (sys *HTTPConsoleLoggerSys) Cancel() {
|
func (sys *HTTPConsoleLoggerSys) Cancel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type - returns type of the target
|
||||||
|
func (sys *HTTPConsoleLoggerSys) Type() types.TargetType {
|
||||||
|
return types.TargetConsole
|
||||||
|
}
|
||||||
|
|
||||||
// Send log message 'e' to console and publish to console
|
// Send log message 'e' to console and publish to console
|
||||||
// log pubsub system
|
// log pubsub system
|
||||||
func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error {
|
func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error {
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
|
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/logger/message/log"
|
"github.com/minio/minio/internal/logger/message/log"
|
||||||
|
"github.com/minio/minio/internal/logger/target/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testLoggerI interface {
|
type testLoggerI interface {
|
||||||
@ -58,6 +59,10 @@ func (t *testingLogger) Init() error {
|
|||||||
func (t *testingLogger) Cancel() {
|
func (t *testingLogger) Cancel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *testingLogger) Type() types.TargetType {
|
||||||
|
return types.TargetHTTP
|
||||||
|
}
|
||||||
|
|
||||||
func (t *testingLogger) Send(entry interface{}, errKind string) error {
|
func (t *testingLogger) Send(entry interface{}, errKind string) error {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
@ -76,7 +81,7 @@ func (t *testingLogger) Send(entry interface{}, errKind string) error {
|
|||||||
|
|
||||||
func addTestingLogging(t testLoggerI) func() {
|
func addTestingLogging(t testLoggerI) func() {
|
||||||
tl := &testingLogger{t: t}
|
tl := &testingLogger{t: t}
|
||||||
logger.AddTarget(tl)
|
logger.AddSystemTarget(tl)
|
||||||
return func() {
|
return func() {
|
||||||
tl.mu.Lock()
|
tl.mu.Lock()
|
||||||
defer tl.mu.Unlock()
|
defer tl.mu.Unlock()
|
||||||
|
@ -176,7 +176,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
|||||||
|
|
||||||
// Initialize globalConsoleSys system
|
// Initialize globalConsoleSys system
|
||||||
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
||||||
logger.AddTarget(globalConsoleSys)
|
logger.AddSystemTarget(globalConsoleSys)
|
||||||
|
|
||||||
// Handle common command args.
|
// Handle common command args.
|
||||||
handleCommonCmdArgs(ctx)
|
handleCommonCmdArgs(ctx)
|
||||||
|
@ -401,7 +401,7 @@ func serverMain(ctx *cli.Context) {
|
|||||||
|
|
||||||
// Initialize globalConsoleSys system
|
// Initialize globalConsoleSys system
|
||||||
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
||||||
logger.AddTarget(globalConsoleSys)
|
logger.AddSystemTarget(globalConsoleSys)
|
||||||
|
|
||||||
// Perform any self-tests
|
// Perform any self-tests
|
||||||
bitrotSelfTest()
|
bitrotSelfTest()
|
||||||
|
@ -167,6 +167,8 @@ var SubSystemsDynamic = set.CreateStringSet(
|
|||||||
HealSubSys,
|
HealSubSys,
|
||||||
SubnetSubSys,
|
SubnetSubSys,
|
||||||
LoggerWebhookSubSys,
|
LoggerWebhookSubSys,
|
||||||
|
AuditWebhookSubSys,
|
||||||
|
AuditKafkaSubSys,
|
||||||
)
|
)
|
||||||
|
|
||||||
// SubSystemsSingleTargets - subsystems which only support single target.
|
// SubSystemsSingleTargets - subsystems which only support single target.
|
||||||
|
@ -221,62 +221,65 @@ func NewConfig() Config {
|
|||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func lookupLegacyConfig() (Config, error) {
|
func lookupLegacyConfigForSubSys(subSys string) Config {
|
||||||
cfg := NewConfig()
|
cfg := NewConfig()
|
||||||
|
switch subSys {
|
||||||
var loggerTargets []string
|
case config.LoggerWebhookSubSys:
|
||||||
envs := env.List(legacyEnvLoggerHTTPEndpoint)
|
var loggerTargets []string
|
||||||
for _, k := range envs {
|
envs := env.List(legacyEnvLoggerHTTPEndpoint)
|
||||||
target := strings.TrimPrefix(k, legacyEnvLoggerHTTPEndpoint+config.Default)
|
for _, k := range envs {
|
||||||
if target == legacyEnvLoggerHTTPEndpoint {
|
target := strings.TrimPrefix(k, legacyEnvLoggerHTTPEndpoint+config.Default)
|
||||||
target = config.Default
|
if target == legacyEnvLoggerHTTPEndpoint {
|
||||||
|
target = config.Default
|
||||||
|
}
|
||||||
|
loggerTargets = append(loggerTargets, target)
|
||||||
}
|
}
|
||||||
loggerTargets = append(loggerTargets, target)
|
|
||||||
|
// Load HTTP logger from the environment if found
|
||||||
|
for _, target := range loggerTargets {
|
||||||
|
endpointEnv := legacyEnvLoggerHTTPEndpoint
|
||||||
|
if target != config.Default {
|
||||||
|
endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target
|
||||||
|
}
|
||||||
|
endpoint := env.Get(endpointEnv, "")
|
||||||
|
if endpoint == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cfg.HTTP[target] = http.Config{
|
||||||
|
Enabled: true,
|
||||||
|
Endpoint: endpoint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case config.AuditWebhookSubSys:
|
||||||
|
// List legacy audit ENVs if any.
|
||||||
|
var loggerAuditTargets []string
|
||||||
|
envs := env.List(legacyEnvAuditLoggerHTTPEndpoint)
|
||||||
|
for _, k := range envs {
|
||||||
|
target := strings.TrimPrefix(k, legacyEnvAuditLoggerHTTPEndpoint+config.Default)
|
||||||
|
if target == legacyEnvAuditLoggerHTTPEndpoint {
|
||||||
|
target = config.Default
|
||||||
|
}
|
||||||
|
loggerAuditTargets = append(loggerAuditTargets, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, target := range loggerAuditTargets {
|
||||||
|
endpointEnv := legacyEnvAuditLoggerHTTPEndpoint
|
||||||
|
if target != config.Default {
|
||||||
|
endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target
|
||||||
|
}
|
||||||
|
endpoint := env.Get(endpointEnv, "")
|
||||||
|
if endpoint == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cfg.AuditWebhook[target] = http.Config{
|
||||||
|
Enabled: true,
|
||||||
|
Endpoint: endpoint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
return cfg
|
||||||
// Load HTTP logger from the environment if found
|
|
||||||
for _, target := range loggerTargets {
|
|
||||||
endpointEnv := legacyEnvLoggerHTTPEndpoint
|
|
||||||
if target != config.Default {
|
|
||||||
endpointEnv = legacyEnvLoggerHTTPEndpoint + config.Default + target
|
|
||||||
}
|
|
||||||
endpoint := env.Get(endpointEnv, "")
|
|
||||||
if endpoint == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cfg.HTTP[target] = http.Config{
|
|
||||||
Enabled: true,
|
|
||||||
Endpoint: endpoint,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// List legacy audit ENVs if any.
|
|
||||||
var loggerAuditTargets []string
|
|
||||||
envs = env.List(legacyEnvAuditLoggerHTTPEndpoint)
|
|
||||||
for _, k := range envs {
|
|
||||||
target := strings.TrimPrefix(k, legacyEnvAuditLoggerHTTPEndpoint+config.Default)
|
|
||||||
if target == legacyEnvAuditLoggerHTTPEndpoint {
|
|
||||||
target = config.Default
|
|
||||||
}
|
|
||||||
loggerAuditTargets = append(loggerAuditTargets, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, target := range loggerAuditTargets {
|
|
||||||
endpointEnv := legacyEnvAuditLoggerHTTPEndpoint
|
|
||||||
if target != config.Default {
|
|
||||||
endpointEnv = legacyEnvAuditLoggerHTTPEndpoint + config.Default + target
|
|
||||||
}
|
|
||||||
endpoint := env.Get(endpointEnv, "")
|
|
||||||
if endpoint == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
cfg.AuditWebhook[target] = http.Config{
|
|
||||||
Enabled: true,
|
|
||||||
Endpoint: endpoint,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAuditKafka - returns a map of registered notification 'kafka' targets
|
// GetAuditKafka - returns a map of registered notification 'kafka' targets
|
||||||
@ -604,6 +607,7 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
|||||||
if queueSize <= 0 {
|
if queueSize <= 0 {
|
||||||
return cfg, errors.New("invalid queue_size value")
|
return cfg, errors.New("invalid queue_size value")
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.AuditWebhook[starget] = http.Config{
|
cfg.AuditWebhook[starget] = http.Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
Endpoint: kv.Get(Endpoint),
|
Endpoint: kv.Get(Endpoint),
|
||||||
@ -617,33 +621,21 @@ func lookupAuditWebhookConfig(scfg config.Config, cfg Config) (Config, error) {
|
|||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LookupConfig - lookup logger config, override with ENVs if set.
|
// LookupConfigForSubSys - lookup logger config, override with ENVs if set, for the given sub-system
|
||||||
func LookupConfig(scfg config.Config) (Config, error) {
|
func LookupConfigForSubSys(scfg config.Config, subSys string) (cfg Config, err error) {
|
||||||
// Lookup for legacy environment variables first
|
|
||||||
cfg, err := lookupLegacyConfig()
|
|
||||||
if err != nil {
|
|
||||||
return cfg, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ss := range config.LoggerSubSystems.ToSlice() {
|
|
||||||
lookupConfigForSubSys(scfg, cfg, ss)
|
|
||||||
}
|
|
||||||
|
|
||||||
return cfg, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func lookupConfigForSubSys(scfg config.Config, cfg Config, subSys string) (Config, error) {
|
|
||||||
switch subSys {
|
switch subSys {
|
||||||
case config.LoggerWebhookSubSys:
|
case config.LoggerWebhookSubSys:
|
||||||
if _, err := lookupLoggerWebhookConfig(scfg, cfg); err != nil {
|
cfg = lookupLegacyConfigForSubSys(config.LoggerWebhookSubSys)
|
||||||
|
if cfg, err = lookupLoggerWebhookConfig(scfg, cfg); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
case config.AuditWebhookSubSys:
|
case config.AuditWebhookSubSys:
|
||||||
if _, err := lookupAuditWebhookConfig(scfg, cfg); err != nil {
|
cfg = lookupLegacyConfigForSubSys(config.AuditWebhookSubSys)
|
||||||
|
if cfg, err = lookupAuditWebhookConfig(scfg, cfg); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
case config.AuditKafkaSubSys:
|
case config.AuditKafkaSubSys:
|
||||||
if _, err := GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil {
|
if _, err = GetAuditKafka(scfg[config.AuditKafkaSubSys]); err != nil {
|
||||||
return cfg, err
|
return cfg, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -653,10 +645,6 @@ func lookupConfigForSubSys(scfg config.Config, cfg Config, subSys string) (Confi
|
|||||||
// ValidateSubSysConfig - validates logger related config of given sub-system
|
// ValidateSubSysConfig - validates logger related config of given sub-system
|
||||||
func ValidateSubSysConfig(scfg config.Config, subSys string) error {
|
func ValidateSubSysConfig(scfg config.Config, subSys string) error {
|
||||||
// Lookup for legacy environment variables first
|
// Lookup for legacy environment variables first
|
||||||
cfg, err := lookupLegacyConfig()
|
_, err := LookupConfigForSubSys(scfg, subSys)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = lookupConfigForSubSys(scfg, cfg, subSys)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -358,7 +358,7 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over all logger targets to send the log entry
|
// Iterate over all logger targets to send the log entry
|
||||||
for _, t := range Targets() {
|
for _, t := range SystemTargets() {
|
||||||
if err := t.Send(entry, entry.LogKind); err != nil {
|
if err := t.Send(entry, entry.LogKind); err != nil {
|
||||||
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Logger target (%v): %v", entry, t, err), entry.LogKind)
|
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Logger target (%v): %v", entry, t, err), entry.LogKind)
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
"github.com/minio/minio/internal/logger/target/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Timeout for the webhook http call
|
// Timeout for the webhook http call
|
||||||
@ -222,3 +223,8 @@ func (h *Target) Cancel() {
|
|||||||
}
|
}
|
||||||
h.wg.Wait()
|
h.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type - returns type of the target
|
||||||
|
func (h *Target) Type() types.TargetType {
|
||||||
|
return types.TargetHTTP
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
//
|
//
|
||||||
// This file is part of MinIO Object Storage stack
|
// This file is part of MinIO Object Storage stack
|
||||||
//
|
//
|
||||||
@ -24,16 +24,22 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
sarama "github.com/Shopify/sarama"
|
sarama "github.com/Shopify/sarama"
|
||||||
saramatls "github.com/Shopify/sarama/tools/tls"
|
saramatls "github.com/Shopify/sarama/tools/tls"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/logger/message/audit"
|
"github.com/minio/minio/internal/logger/message/audit"
|
||||||
|
"github.com/minio/minio/internal/logger/target/types"
|
||||||
xnet "github.com/minio/pkg/net"
|
xnet "github.com/minio/pkg/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Target - Kafka target.
|
// Target - Kafka target.
|
||||||
type Target struct {
|
type Target struct {
|
||||||
|
status int32
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
// Channel of log entries
|
// Channel of log entries
|
||||||
logCh chan interface{}
|
logCh chan interface{}
|
||||||
|
|
||||||
@ -55,30 +61,37 @@ func (h *Target) Send(entry interface{}, errKind string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Target) logEntry(entry interface{}) {
|
||||||
|
h.wg.Add(1)
|
||||||
|
defer h.wg.Done()
|
||||||
|
|
||||||
|
logJSON, err := json.Marshal(&entry)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ae, ok := entry.(audit.Entry)
|
||||||
|
if ok {
|
||||||
|
msg := sarama.ProducerMessage{
|
||||||
|
Topic: h.kconfig.Topic,
|
||||||
|
Key: sarama.StringEncoder(ae.RequestID),
|
||||||
|
Value: sarama.ByteEncoder(logJSON),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err = h.producer.SendMessage(&msg)
|
||||||
|
if err != nil {
|
||||||
|
h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Target) startKakfaLogger() {
|
func (h *Target) startKakfaLogger() {
|
||||||
// Create a routine which sends json logs received
|
// Create a routine which sends json logs received
|
||||||
// from an internal channel.
|
// from an internal channel.
|
||||||
go func() {
|
go func() {
|
||||||
for entry := range h.logCh {
|
for entry := range h.logCh {
|
||||||
logJSON, err := json.Marshal(&entry)
|
h.logEntry(entry)
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
ae, ok := entry.(audit.Entry)
|
|
||||||
if ok {
|
|
||||||
msg := sarama.ProducerMessage{
|
|
||||||
Topic: h.kconfig.Topic,
|
|
||||||
Key: sarama.StringEncoder(ae.RequestID),
|
|
||||||
Value: sarama.ByteEncoder(logJSON),
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _, err = h.producer.SendMessage(&msg)
|
|
||||||
if err != nil {
|
|
||||||
h.kconfig.LogOnce(context.Background(), err, h.kconfig.Topic)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -193,12 +206,17 @@ func (h *Target) Init() error {
|
|||||||
|
|
||||||
h.producer = producer
|
h.producer = producer
|
||||||
|
|
||||||
|
h.status = 1
|
||||||
go h.startKakfaLogger()
|
go h.startKakfaLogger()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cancel - cancels the target
|
// Cancel - cancels the target
|
||||||
func (h *Target) Cancel() {
|
func (h *Target) Cancel() {
|
||||||
|
if atomic.CompareAndSwapInt32(&h.status, 1, 0) {
|
||||||
|
close(h.logCh)
|
||||||
|
}
|
||||||
|
h.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// New initializes a new logger target which
|
// New initializes a new logger target which
|
||||||
@ -210,3 +228,8 @@ func New(config Config) *Target {
|
|||||||
}
|
}
|
||||||
return target
|
return target
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Type - returns type of the target
|
||||||
|
func (h *Target) Type() types.TargetType {
|
||||||
|
return types.TargetKafka
|
||||||
|
}
|
||||||
|
29
internal/logger/target/types/types.go
Normal file
29
internal/logger/target/types/types.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package types
|
||||||
|
|
||||||
|
// TargetType indicates type of the target e.g. console, http, kafka
|
||||||
|
type TargetType uint8
|
||||||
|
|
||||||
|
// Constants for target types
|
||||||
|
const (
|
||||||
|
_ TargetType = iota
|
||||||
|
TargetConsole
|
||||||
|
TargetHTTP
|
||||||
|
TargetKafka
|
||||||
|
)
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright (c) 2015-2021 MinIO, Inc.
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
//
|
//
|
||||||
// This file is part of MinIO Object Storage stack
|
// This file is part of MinIO Object Storage stack
|
||||||
//
|
//
|
||||||
@ -22,6 +22,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/minio/minio/internal/logger/target/http"
|
"github.com/minio/minio/internal/logger/target/http"
|
||||||
|
"github.com/minio/minio/internal/logger/target/kafka"
|
||||||
|
"github.com/minio/minio/internal/logger/target/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Target is the entity that we will receive
|
// Target is the entity that we will receive
|
||||||
@ -33,28 +35,29 @@ type Target interface {
|
|||||||
Init() error
|
Init() error
|
||||||
Cancel()
|
Cancel()
|
||||||
Send(entry interface{}, errKind string) error
|
Send(entry interface{}, errKind string) error
|
||||||
|
Type() types.TargetType
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// swapMu must be held while reading slice info or swapping targets or auditTargets.
|
// swapMu must be held while reading slice info or swapping targets or auditTargets.
|
||||||
swapMu sync.Mutex
|
swapMu sync.Mutex
|
||||||
|
|
||||||
// targets is the set of enabled loggers.
|
// systemTargets is the set of enabled loggers.
|
||||||
// Must be immutable at all times.
|
// Must be immutable at all times.
|
||||||
// Can be swapped to another while holding swapMu
|
// Can be swapped to another while holding swapMu
|
||||||
targets = []Target{}
|
systemTargets = []Target{}
|
||||||
nTargets int32 // atomic count of len(targets)
|
nTargets int32 // atomic count of len(targets)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Targets returns active targets.
|
// SystemTargets returns active targets.
|
||||||
// Returned slice may not be modified in any way.
|
// Returned slice may not be modified in any way.
|
||||||
func Targets() []Target {
|
func SystemTargets() []Target {
|
||||||
if atomic.LoadInt32(&nTargets) == 0 {
|
if atomic.LoadInt32(&nTargets) == 0 {
|
||||||
// Lock free if none...
|
// Lock free if none...
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
swapMu.Lock()
|
swapMu.Lock()
|
||||||
res := targets
|
res := systemTargets
|
||||||
swapMu.Unlock()
|
swapMu.Unlock()
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
@ -80,46 +83,30 @@ var (
|
|||||||
nAuditTargets int32 // atomic count of len(auditTargets)
|
nAuditTargets int32 // atomic count of len(auditTargets)
|
||||||
)
|
)
|
||||||
|
|
||||||
// AddAuditTarget adds a new audit logger target to the
|
// AddSystemTarget adds a new logger target to the
|
||||||
// list of enabled loggers
|
// list of enabled loggers
|
||||||
func AddAuditTarget(t Target) error {
|
func AddSystemTarget(t Target) error {
|
||||||
if err := t.Init(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
swapMu.Lock()
|
|
||||||
updated := append(make([]Target, 0, len(auditTargets)+1), auditTargets...)
|
|
||||||
updated = append(updated, t)
|
|
||||||
auditTargets = updated
|
|
||||||
atomic.StoreInt32(&nAuditTargets, int32(len(updated)))
|
|
||||||
swapMu.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddTarget adds a new logger target to the
|
|
||||||
// list of enabled loggers
|
|
||||||
func AddTarget(t Target) error {
|
|
||||||
if err := t.Init(); err != nil {
|
if err := t.Init(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
swapMu.Lock()
|
swapMu.Lock()
|
||||||
updated := append(make([]Target, 0, len(targets)+1), targets...)
|
updated := append(make([]Target, 0, len(systemTargets)+1), systemTargets...)
|
||||||
updated = append(updated, t)
|
updated = append(updated, t)
|
||||||
targets = updated
|
systemTargets = updated
|
||||||
atomic.StoreInt32(&nTargets, int32(len(updated)))
|
atomic.StoreInt32(&nTargets, int32(len(updated)))
|
||||||
swapMu.Unlock()
|
swapMu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func cancelAllTargets() {
|
func cancelAllSystemTargets() {
|
||||||
for _, tgt := range targets {
|
for _, tgt := range systemTargets {
|
||||||
tgt.Cancel()
|
tgt.Cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initTargets(cfg Config) (tgts []Target, err error) {
|
func initSystemTargets(cfgMap map[string]http.Config) (tgts []Target, err error) {
|
||||||
for _, l := range cfg.HTTP {
|
for _, l := range cfgMap {
|
||||||
if l.Enabled {
|
if l.Enabled {
|
||||||
t := http.New(l)
|
t := http.New(l)
|
||||||
if err = t.Init(); err != nil {
|
if err = t.Init(); err != nil {
|
||||||
@ -131,25 +118,90 @@ func initTargets(cfg Config) (tgts []Target, err error) {
|
|||||||
return tgts, err
|
return tgts, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateTargets swaps targets with newly loaded ones from the cfg
|
func initKafkaTargets(cfgMap map[string]kafka.Config) (tgts []Target, err error) {
|
||||||
func UpdateTargets(cfg Config) error {
|
for _, l := range cfgMap {
|
||||||
updated, err := initTargets(cfg)
|
if l.Enabled {
|
||||||
|
t := kafka.New(l)
|
||||||
|
if err = t.Init(); err != nil {
|
||||||
|
return tgts, err
|
||||||
|
}
|
||||||
|
tgts = append(tgts, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tgts, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateSystemTargets swaps targets with newly loaded ones from the cfg
|
||||||
|
func UpdateSystemTargets(cfg Config) error {
|
||||||
|
updated, err := initSystemTargets(cfg.HTTP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
swapMu.Lock()
|
swapMu.Lock()
|
||||||
for _, tgt := range targets {
|
for _, tgt := range systemTargets {
|
||||||
// Preserve console target when dynamically updating
|
// Preserve console target when dynamically updating
|
||||||
// other HTTP targets, console target is always present.
|
// other HTTP targets, console target is always present.
|
||||||
if tgt.String() == ConsoleLoggerTgt {
|
if tgt.Type() == types.TargetConsole {
|
||||||
updated = append(updated, tgt)
|
updated = append(updated, tgt)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
atomic.StoreInt32(&nTargets, int32(len(updated)))
|
atomic.StoreInt32(&nTargets, int32(len(updated)))
|
||||||
cancelAllTargets() // cancel running targets
|
cancelAllSystemTargets() // cancel running targets
|
||||||
targets = updated
|
systemTargets = updated
|
||||||
|
swapMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cancelAuditTargetType(t types.TargetType) {
|
||||||
|
for _, tgt := range auditTargets {
|
||||||
|
if tgt.Type() == t {
|
||||||
|
tgt.Cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func existingAuditTargets(t types.TargetType) []Target {
|
||||||
|
tgts := make([]Target, 0, len(auditTargets))
|
||||||
|
for _, tgt := range auditTargets {
|
||||||
|
if tgt.Type() == t {
|
||||||
|
tgts = append(tgts, tgt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tgts
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg
|
||||||
|
func UpdateAuditWebhookTargets(cfg Config) error {
|
||||||
|
updated, err := initSystemTargets(cfg.AuditWebhook)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// retain kafka targets
|
||||||
|
updated = append(existingAuditTargets(types.TargetKafka), updated...)
|
||||||
|
|
||||||
|
swapMu.Lock()
|
||||||
|
atomic.StoreInt32(&nAuditTargets, int32(len(updated)))
|
||||||
|
cancelAuditTargetType(types.TargetHTTP) // cancel running targets
|
||||||
|
auditTargets = updated
|
||||||
|
swapMu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateAuditKafkaTargets swaps audit kafka targets with newly loaded ones from the cfg
|
||||||
|
func UpdateAuditKafkaTargets(cfg Config) error {
|
||||||
|
updated, err := initKafkaTargets(cfg.AuditKafka)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// retain HTTP targets
|
||||||
|
updated = append(existingAuditTargets(types.TargetHTTP), updated...)
|
||||||
|
|
||||||
|
swapMu.Lock()
|
||||||
|
atomic.StoreInt32(&nAuditTargets, int32(len(updated)))
|
||||||
|
cancelAuditTargetType(types.TargetKafka) // cancel running targets
|
||||||
|
auditTargets = updated
|
||||||
swapMu.Unlock()
|
swapMu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user