mirror of
https://github.com/minio/minio.git
synced 2025-01-12 07:23:23 -05:00
afbb63a197
This change moves external event notification functionality into `event-notification.go`. This simplifies notification related code.
833 lines
27 KiB
Go
833 lines
27 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/minio/madmin-go"
|
|
"github.com/minio/minio/internal/config"
|
|
"github.com/minio/minio/internal/config/api"
|
|
"github.com/minio/minio/internal/config/cache"
|
|
"github.com/minio/minio/internal/config/callhome"
|
|
"github.com/minio/minio/internal/config/compress"
|
|
"github.com/minio/minio/internal/config/dns"
|
|
"github.com/minio/minio/internal/config/etcd"
|
|
"github.com/minio/minio/internal/config/heal"
|
|
xldap "github.com/minio/minio/internal/config/identity/ldap"
|
|
"github.com/minio/minio/internal/config/identity/openid"
|
|
idplugin "github.com/minio/minio/internal/config/identity/plugin"
|
|
xtls "github.com/minio/minio/internal/config/identity/tls"
|
|
"github.com/minio/minio/internal/config/notify"
|
|
"github.com/minio/minio/internal/config/policy/opa"
|
|
polplugin "github.com/minio/minio/internal/config/policy/plugin"
|
|
"github.com/minio/minio/internal/config/scanner"
|
|
"github.com/minio/minio/internal/config/storageclass"
|
|
"github.com/minio/minio/internal/config/subnet"
|
|
"github.com/minio/minio/internal/crypto"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
"github.com/minio/minio/internal/kms"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/pkg/env"
|
|
)
|
|
|
|
func initHelp() {
|
|
kvs := map[string]config.KVS{
|
|
config.EtcdSubSys: etcd.DefaultKVS,
|
|
config.CacheSubSys: cache.DefaultKVS,
|
|
config.CompressionSubSys: compress.DefaultKVS,
|
|
config.IdentityLDAPSubSys: xldap.DefaultKVS,
|
|
config.IdentityOpenIDSubSys: openid.DefaultKVS,
|
|
config.IdentityTLSSubSys: xtls.DefaultKVS,
|
|
config.IdentityPluginSubSys: idplugin.DefaultKVS,
|
|
config.PolicyOPASubSys: opa.DefaultKVS,
|
|
config.PolicyPluginSubSys: polplugin.DefaultKVS,
|
|
config.SiteSubSys: config.DefaultSiteKVS,
|
|
config.RegionSubSys: config.DefaultRegionKVS,
|
|
config.APISubSys: api.DefaultKVS,
|
|
config.CredentialsSubSys: config.DefaultCredentialKVS,
|
|
config.LoggerWebhookSubSys: logger.DefaultLoggerWebhookKVS,
|
|
config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS,
|
|
config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS,
|
|
config.HealSubSys: heal.DefaultKVS,
|
|
config.ScannerSubSys: scanner.DefaultKVS,
|
|
config.SubnetSubSys: subnet.DefaultKVS,
|
|
config.CallhomeSubSys: callhome.DefaultKVS,
|
|
}
|
|
for k, v := range notify.DefaultNotificationKVS {
|
|
kvs[k] = v
|
|
}
|
|
if globalIsErasure {
|
|
kvs[config.StorageClassSubSys] = storageclass.DefaultKVS
|
|
}
|
|
config.RegisterDefaultKVS(kvs)
|
|
|
|
// Captures help for each sub-system
|
|
helpSubSys := config.HelpKVS{
|
|
config.HelpKV{
|
|
Key: config.SiteSubSys,
|
|
Description: "label the server and its location",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.CacheSubSys,
|
|
Description: "add caching storage tier",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.CompressionSubSys,
|
|
Description: "enable server side compression of objects",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.EtcdSubSys,
|
|
Description: "federate multiple clusters for IAM and Bucket DNS",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.IdentityOpenIDSubSys,
|
|
Description: "enable OpenID SSO support",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.IdentityLDAPSubSys,
|
|
Description: "enable LDAP SSO support",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.IdentityTLSSubSys,
|
|
Description: "enable X.509 TLS certificate SSO support",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.IdentityPluginSubSys,
|
|
Description: "enable Identity Plugin via external hook",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.PolicyPluginSubSys,
|
|
Description: "enable Access Management Plugin for policy enforcement",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.APISubSys,
|
|
Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.HealSubSys,
|
|
Description: "manage object healing frequency and bitrot verification checks",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.ScannerSubSys,
|
|
Description: "manage namespace scanning for usage calculation, lifecycle, healing and more",
|
|
},
|
|
config.HelpKV{
|
|
Key: config.LoggerWebhookSubSys,
|
|
Description: "send server logs to webhook endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.AuditWebhookSubSys,
|
|
Description: "send audit logs to webhook endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.AuditKafkaSubSys,
|
|
Description: "send audit logs to kafka endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyWebhookSubSys,
|
|
Description: "publish bucket notifications to webhook endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyAMQPSubSys,
|
|
Description: "publish bucket notifications to AMQP endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyKafkaSubSys,
|
|
Description: "publish bucket notifications to Kafka endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyMQTTSubSys,
|
|
Description: "publish bucket notifications to MQTT endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyNATSSubSys,
|
|
Description: "publish bucket notifications to NATS endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyNSQSubSys,
|
|
Description: "publish bucket notifications to NSQ endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyMySQLSubSys,
|
|
Description: "publish bucket notifications to MySQL databases",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyPostgresSubSys,
|
|
Description: "publish bucket notifications to Postgres databases",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyESSubSys,
|
|
Description: "publish bucket notifications to Elasticsearch endpoints",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.NotifyRedisSubSys,
|
|
Description: "publish bucket notifications to Redis datastores",
|
|
MultipleTargets: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.SubnetSubSys,
|
|
Type: "string",
|
|
Description: "set subnet config for the cluster e.g. api key",
|
|
Optional: true,
|
|
},
|
|
config.HelpKV{
|
|
Key: config.CallhomeSubSys,
|
|
Type: "string",
|
|
Description: "enable callhome for the cluster",
|
|
Optional: true,
|
|
},
|
|
}
|
|
|
|
if globalIsErasure {
|
|
helpSubSys = append(helpSubSys, config.HelpKV{})
|
|
copy(helpSubSys[2:], helpSubSys[1:])
|
|
helpSubSys[1] = config.HelpKV{
|
|
Key: config.StorageClassSubSys,
|
|
Description: "define object level redundancy",
|
|
}
|
|
}
|
|
|
|
helpMap := map[string]config.HelpKVS{
|
|
"": helpSubSys, // Help for all sub-systems.
|
|
config.SiteSubSys: config.SiteHelp,
|
|
config.RegionSubSys: config.RegionHelp,
|
|
config.APISubSys: api.Help,
|
|
config.StorageClassSubSys: storageclass.Help,
|
|
config.EtcdSubSys: etcd.Help,
|
|
config.CacheSubSys: cache.Help,
|
|
config.CompressionSubSys: compress.Help,
|
|
config.HealSubSys: heal.Help,
|
|
config.ScannerSubSys: scanner.Help,
|
|
config.IdentityOpenIDSubSys: openid.Help,
|
|
config.IdentityLDAPSubSys: xldap.Help,
|
|
config.IdentityTLSSubSys: xtls.Help,
|
|
config.IdentityPluginSubSys: idplugin.Help,
|
|
config.PolicyOPASubSys: opa.Help,
|
|
config.PolicyPluginSubSys: polplugin.Help,
|
|
config.LoggerWebhookSubSys: logger.Help,
|
|
config.AuditWebhookSubSys: logger.HelpWebhook,
|
|
config.AuditKafkaSubSys: logger.HelpKafka,
|
|
config.NotifyAMQPSubSys: notify.HelpAMQP,
|
|
config.NotifyKafkaSubSys: notify.HelpKafka,
|
|
config.NotifyMQTTSubSys: notify.HelpMQTT,
|
|
config.NotifyNATSSubSys: notify.HelpNATS,
|
|
config.NotifyNSQSubSys: notify.HelpNSQ,
|
|
config.NotifyMySQLSubSys: notify.HelpMySQL,
|
|
config.NotifyPostgresSubSys: notify.HelpPostgres,
|
|
config.NotifyRedisSubSys: notify.HelpRedis,
|
|
config.NotifyWebhookSubSys: notify.HelpWebhook,
|
|
config.NotifyESSubSys: notify.HelpES,
|
|
config.SubnetSubSys: subnet.HelpSubnet,
|
|
config.CallhomeSubSys: callhome.HelpCallhome,
|
|
}
|
|
|
|
config.RegisterHelpSubSys(helpMap)
|
|
|
|
// save top-level help for deprecated sub-systems in a separate map.
|
|
deprecatedHelpKVMap := map[string]config.HelpKV{
|
|
config.RegionSubSys: {
|
|
Key: config.RegionSubSys,
|
|
Description: "[DEPRECATED - use `site` instead] label the location of the server",
|
|
},
|
|
config.PolicyOPASubSys: {
|
|
Key: config.PolicyOPASubSys,
|
|
Description: "[DEPRECATED - use `policy_plugin` instead] enable external OPA for policy enforcement",
|
|
},
|
|
}
|
|
|
|
config.RegisterHelpDeprecatedSubSys(deprecatedHelpKVMap)
|
|
}
|
|
|
|
var (
|
|
// globalServerConfig server config.
|
|
globalServerConfig config.Config
|
|
globalServerConfigMu sync.RWMutex
|
|
)
|
|
|
|
func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) error {
|
|
switch subSys {
|
|
case config.CredentialsSubSys:
|
|
if _, err := config.LookupCreds(s[config.CredentialsSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.SiteSubSys:
|
|
if _, err := config.LookupSite(s[config.SiteSubSys][config.Default], s[config.RegionSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.APISubSys:
|
|
if _, err := api.LookupConfig(s[config.APISubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.StorageClassSubSys:
|
|
if globalIsErasure {
|
|
if objAPI == nil {
|
|
return errServerNotInitialized
|
|
}
|
|
for _, setDriveCount := range objAPI.SetDriveCounts() {
|
|
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
case config.CacheSubSys:
|
|
if _, err := cache.LookupConfig(s[config.CacheSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.CompressionSubSys:
|
|
compCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if objAPI != nil {
|
|
if compCfg.Enabled && !objAPI.IsCompressionSupported() {
|
|
return fmt.Errorf("Backend does not support compression")
|
|
}
|
|
}
|
|
case config.HealSubSys:
|
|
if _, err := heal.LookupConfig(s[config.HealSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.ScannerSubSys:
|
|
if _, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.EtcdSubSys:
|
|
etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if etcdCfg.Enabled {
|
|
etcdClnt, err := etcd.New(etcdCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
etcdClnt.Close()
|
|
}
|
|
case config.IdentityOpenIDSubSys:
|
|
if _, err := openid.LookupConfig(s,
|
|
NewGatewayHTTPTransport(), xhttp.DrainBody, globalSite.Region); err != nil {
|
|
return err
|
|
}
|
|
case config.IdentityLDAPSubSys:
|
|
cfg, err := xldap.Lookup(s[config.IdentityLDAPSubSys][config.Default], globalRootCAs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cfg.Enabled {
|
|
conn, cerr := cfg.Connect()
|
|
if cerr != nil {
|
|
return cerr
|
|
}
|
|
conn.Close()
|
|
}
|
|
case config.IdentityTLSSubSys:
|
|
if _, err := xtls.Lookup(s[config.IdentityTLSSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.IdentityPluginSubSys:
|
|
if _, err := idplugin.LookupConfig(s[config.IdentityPluginSubSys][config.Default],
|
|
NewGatewayHTTPTransport(), xhttp.DrainBody, globalSite.Region); err != nil {
|
|
return err
|
|
}
|
|
case config.SubnetSubSys:
|
|
if _, err := subnet.LookupConfig(s[config.SubnetSubSys][config.Default], nil); err != nil {
|
|
return err
|
|
}
|
|
case config.CallhomeSubSys:
|
|
if _, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default]); err != nil {
|
|
return err
|
|
}
|
|
case config.PolicyOPASubSys:
|
|
// In case legacy OPA config is being set, we treat it as if the
|
|
// AuthZPlugin is being set.
|
|
subSys = config.PolicyPluginSubSys
|
|
fallthrough
|
|
case config.PolicyPluginSubSys:
|
|
if ppargs, err := polplugin.LookupConfig(s[config.PolicyPluginSubSys][config.Default],
|
|
NewGatewayHTTPTransport(), xhttp.DrainBody); err != nil {
|
|
return err
|
|
} else if ppargs.URL == nil {
|
|
// Check if legacy opa is configured.
|
|
if _, err := opa.LookupConfig(s[config.PolicyOPASubSys][config.Default],
|
|
NewGatewayHTTPTransport(), xhttp.DrainBody); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
default:
|
|
if config.LoggerSubSystems.Contains(subSys) {
|
|
if err := logger.ValidateSubSysConfig(s, subSys); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if config.NotifySubSystems.Contains(subSys) {
|
|
if err := notify.TestSubSysNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalEventNotifier.ConfiguredTargetIDs(), subSys); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func validateConfig(s config.Config, subSys string) error {
|
|
objAPI := newObjectLayerFn()
|
|
|
|
// We must have a global lock for this so nobody else modifies env while we do.
|
|
defer env.LockSetEnv()()
|
|
|
|
// Disable merging env values with config for validation.
|
|
env.SetEnvOff()
|
|
|
|
// Enable env values to validate KMS.
|
|
defer env.SetEnvOn()
|
|
if subSys != "" {
|
|
return validateSubSysConfig(s, subSys, objAPI)
|
|
}
|
|
|
|
// No sub-system passed. Validate all of them.
|
|
for _, ss := range config.SubSystems.ToSlice() {
|
|
if err := validateSubSysConfig(s, ss, objAPI); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
|
ctx := GlobalContext
|
|
|
|
var err error
|
|
if !globalActiveCred.IsValid() {
|
|
// Env doesn't seem to be set, we fallback to lookup creds from the config.
|
|
globalActiveCred, err = config.LookupCreds(s[config.CredentialsSubSys][config.Default])
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Invalid credentials configuration: %w", err))
|
|
}
|
|
}
|
|
|
|
dnsURL, dnsUser, dnsPass, err := env.LookupEnv(config.EnvDNSWebhook)
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize remote webhook DNS config")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize remote webhook DNS config %w", err))
|
|
}
|
|
}
|
|
if err == nil && dnsURL != "" {
|
|
globalDNSConfig, err = dns.NewOperatorDNS(dnsURL,
|
|
dns.Authentication(dnsUser, dnsPass),
|
|
dns.RootCAs(globalRootCAs))
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize remote webhook DNS config")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize remote webhook DNS config %w", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize etcd config")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
|
|
}
|
|
}
|
|
|
|
if etcdCfg.Enabled {
|
|
globalEtcdClient, err = etcd.New(etcdCfg)
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize etcd config")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
|
|
}
|
|
}
|
|
|
|
if len(globalDomainNames) != 0 && !globalDomainIPs.IsEmpty() && globalEtcdClient != nil {
|
|
if globalDNSConfig != nil {
|
|
// if global DNS is already configured, indicate with a warning, incase
|
|
// users are confused.
|
|
logger.LogIf(ctx, fmt.Errorf("DNS store is already configured with %s, etcd is not used for DNS store", globalDNSConfig))
|
|
} else {
|
|
globalDNSConfig, err = dns.NewCoreDNS(etcdCfg.Config,
|
|
dns.DomainNames(globalDomainNames),
|
|
dns.DomainIPs(globalDomainIPs),
|
|
dns.DomainPort(globalMinioPort),
|
|
dns.CoreDNSPath(etcdCfg.CoreDNSPath),
|
|
)
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize DNS config")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize DNS config for %s: %w",
|
|
globalDomainNames, err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Bucket federation is 'true' only when IAM assets are not namespaced
|
|
// per tenant and all tenants interested in globally available users
|
|
// if namespace was requested such as specifying etcdPathPrefix then
|
|
// we assume that users are interested in global bucket support
|
|
// but not federation.
|
|
globalBucketFederation = etcdCfg.PathPrefix == "" && etcdCfg.Enabled
|
|
|
|
globalSite, err = config.LookupSite(s[config.SiteSubSys][config.Default], s[config.RegionSubSys][config.Default])
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Invalid site configuration: %w", err))
|
|
}
|
|
|
|
globalCacheConfig, err = cache.LookupConfig(s[config.CacheSubSys][config.Default])
|
|
if err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to setup cache")
|
|
} else {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to setup cache: %w", err))
|
|
}
|
|
}
|
|
|
|
if globalCacheConfig.Enabled {
|
|
if cacheEncKey := env.Get(cache.EnvCacheEncryptionKey, ""); cacheEncKey != "" {
|
|
globalCacheKMS, err = kms.Parse(cacheEncKey)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to setup encryption cache: %w", err))
|
|
}
|
|
}
|
|
}
|
|
|
|
globalAutoEncryption = crypto.LookupAutoEncryption() // Enable auto-encryption if enabled
|
|
if globalAutoEncryption && GlobalKMS == nil {
|
|
logger.Fatal(errors.New("no KMS configured"), "MINIO_KMS_AUTO_ENCRYPTION requires a valid KMS configuration")
|
|
}
|
|
|
|
globalSTSTLSConfig, err = xtls.Lookup(s[config.IdentityTLSSubSys][config.Default])
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize X.509/TLS STS API: %w", err))
|
|
}
|
|
|
|
if globalSTSTLSConfig.InsecureSkipVerify {
|
|
logger.LogIf(ctx, fmt.Errorf("CRITICAL: enabling %s is not recommended in a production environment", xtls.EnvIdentityTLSSkipVerify))
|
|
}
|
|
|
|
globalSubnetConfig, err = subnet.LookupConfig(s[config.SubnetSubSys][config.Default], globalProxyTransport)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err))
|
|
}
|
|
|
|
transport := NewGatewayHTTPTransport()
|
|
|
|
globalConfigTargetList, err = notify.GetNotificationTargets(GlobalContext, s, transport, false)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
|
|
}
|
|
|
|
globalEnvTargetList, err = notify.GetNotificationTargets(GlobalContext, newServerConfig(), transport, true)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
|
|
}
|
|
|
|
// Apply dynamic config values
|
|
if err := applyDynamicConfig(ctx, objAPI, s); err != nil {
|
|
if globalIsGateway {
|
|
logger.FatalIf(err, "Unable to initialize dynamic configuration")
|
|
} else {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s config.Config, subSys string) error {
|
|
switch subSys {
|
|
case config.APISubSys:
|
|
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
|
}
|
|
var setDriveCounts []int
|
|
if objAPI != nil {
|
|
setDriveCounts = objAPI.SetDriveCounts()
|
|
}
|
|
globalAPIConfig.init(apiConfig, setDriveCounts)
|
|
|
|
// Initialize remote instance transport once.
|
|
getRemoteInstanceTransportOnce.Do(func() {
|
|
getRemoteInstanceTransport = newGatewayHTTPTransport(apiConfig.RemoteTransportDeadline)
|
|
})
|
|
case config.CompressionSubSys:
|
|
cmpCfg, err := compress.LookupConfig(s[config.CompressionSubSys][config.Default])
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to setup Compression: %w", err)
|
|
}
|
|
// Validate if the object layer supports compression.
|
|
if objAPI != nil {
|
|
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
|
return fmt.Errorf("Backend does not support compression")
|
|
}
|
|
}
|
|
globalCompressConfigMu.Lock()
|
|
globalCompressConfig = cmpCfg
|
|
globalCompressConfigMu.Unlock()
|
|
case config.HealSubSys:
|
|
healCfg, err := heal.LookupConfig(s[config.HealSubSys][config.Default])
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to apply heal config: %w", err)
|
|
}
|
|
globalHealConfig.Update(healCfg)
|
|
case config.ScannerSubSys:
|
|
scannerCfg, err := scanner.LookupConfig(s[config.ScannerSubSys][config.Default])
|
|
if err != nil {
|
|
return fmt.Errorf("Unable to apply scanner config: %w", err)
|
|
}
|
|
// update dynamic scanner values.
|
|
scannerCycle.Store(scannerCfg.Cycle)
|
|
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
|
|
case config.LoggerWebhookSubSys:
|
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to load logger webhook config: %w", err))
|
|
}
|
|
userAgent := getUserAgent(getMinioMode())
|
|
for n, l := range loggerCfg.HTTP {
|
|
if l.Enabled {
|
|
l.LogOnce = logger.LogOnceConsoleIf
|
|
l.UserAgent = userAgent
|
|
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
|
loggerCfg.HTTP[n] = l
|
|
}
|
|
}
|
|
if err = logger.UpdateSystemTargets(loggerCfg); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %w", err))
|
|
}
|
|
case config.AuditWebhookSubSys:
|
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditWebhookSubSys)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit webhook config: %w", err))
|
|
}
|
|
userAgent := getUserAgent(getMinioMode())
|
|
for n, l := range loggerCfg.AuditWebhook {
|
|
if l.Enabled {
|
|
l.LogOnce = logger.LogOnceConsoleIf
|
|
l.UserAgent = userAgent
|
|
l.Transport = NewGatewayHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
|
|
loggerCfg.AuditWebhook[n] = l
|
|
}
|
|
}
|
|
|
|
if err = logger.UpdateAuditWebhookTargets(loggerCfg); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %w", err))
|
|
}
|
|
case config.AuditKafkaSubSys:
|
|
loggerCfg, err := logger.LookupConfigForSubSys(s, config.AuditKafkaSubSys)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to load audit kafka config: %w", err))
|
|
}
|
|
for n, l := range loggerCfg.AuditKafka {
|
|
if l.Enabled {
|
|
l.LogOnce = logger.LogOnceIf
|
|
loggerCfg.AuditKafka[n] = l
|
|
}
|
|
}
|
|
if err = logger.UpdateAuditKafkaTargets(loggerCfg); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err))
|
|
}
|
|
case config.StorageClassSubSys:
|
|
if globalIsErasure && objAPI != nil {
|
|
setDriveCounts := objAPI.SetDriveCounts()
|
|
for i, setDriveCount := range setDriveCounts {
|
|
sc, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
|
break
|
|
}
|
|
// if we validated all setDriveCounts and it was successful
|
|
// proceed to store the correct storage class globally.
|
|
if i == len(setDriveCounts)-1 {
|
|
globalStorageClass.Update(sc)
|
|
}
|
|
}
|
|
}
|
|
case config.CallhomeSubSys:
|
|
callhomeCfg, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default])
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to load callhome config: %w", err))
|
|
} else {
|
|
globalCallhomeConfig = callhomeCfg
|
|
updateCallhomeParams(ctx, objAPI)
|
|
}
|
|
}
|
|
globalServerConfigMu.Lock()
|
|
defer globalServerConfigMu.Unlock()
|
|
if globalServerConfig != nil {
|
|
globalServerConfig[subSys] = s[subSys]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// applyDynamicConfig will apply dynamic config values.
|
|
// Dynamic systems should be in config.SubSystemsDynamic as well.
|
|
func applyDynamicConfig(ctx context.Context, objAPI ObjectLayer, s config.Config) error {
|
|
for subSys := range config.SubSystemsDynamic {
|
|
err := applyDynamicConfigForSubSys(ctx, objAPI, s, subSys)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Help - return sub-system level help
|
|
type Help struct {
|
|
SubSys string `json:"subSys"`
|
|
Description string `json:"description"`
|
|
MultipleTargets bool `json:"multipleTargets"`
|
|
KeysHelp config.HelpKVS `json:"keysHelp"`
|
|
}
|
|
|
|
// GetHelp - returns help for sub-sys, a key for a sub-system or all the help.
|
|
func GetHelp(subSys, key string, envOnly bool) (Help, error) {
|
|
if len(subSys) == 0 {
|
|
return Help{KeysHelp: config.HelpSubSysMap[subSys]}, nil
|
|
}
|
|
subSystemValue := strings.SplitN(subSys, config.SubSystemSeparator, 2)
|
|
if len(subSystemValue) == 0 {
|
|
return Help{}, config.Errorf("invalid number of arguments %s", subSys)
|
|
}
|
|
|
|
subSys = subSystemValue[0]
|
|
|
|
subSysHelp, ok := config.HelpSubSysMap[""].Lookup(subSys)
|
|
if !ok {
|
|
subSysHelp, ok = config.HelpDeprecatedSubSysMap[subSys]
|
|
if !ok {
|
|
return Help{}, config.Errorf("unknown sub-system %s", subSys)
|
|
}
|
|
}
|
|
|
|
h, ok := config.HelpSubSysMap[subSys]
|
|
if !ok {
|
|
return Help{}, config.Errorf("unknown sub-system %s", subSys)
|
|
}
|
|
if key != "" {
|
|
value, ok := h.Lookup(key)
|
|
if !ok {
|
|
return Help{}, config.Errorf("unknown key %s for sub-system %s",
|
|
key, subSys)
|
|
}
|
|
h = config.HelpKVS{value}
|
|
}
|
|
|
|
envHelp := config.HelpKVS{}
|
|
if envOnly {
|
|
// Only for multiple targets, make sure
|
|
// to list the ENV, for regular k/v EnableKey is
|
|
// implicit, for ENVs we cannot make it implicit.
|
|
if subSysHelp.MultipleTargets {
|
|
envK := config.EnvPrefix + strings.ToTitle(subSys) + config.EnvWordDelimiter + strings.ToTitle(madmin.EnableKey)
|
|
envHelp = append(envHelp, config.HelpKV{
|
|
Key: envK,
|
|
Description: fmt.Sprintf("enable %s target, default is 'off'", subSys),
|
|
Optional: false,
|
|
Type: "on|off",
|
|
})
|
|
}
|
|
for _, hkv := range h {
|
|
envK := config.EnvPrefix + strings.ToTitle(subSys) + config.EnvWordDelimiter + strings.ToTitle(hkv.Key)
|
|
envHelp = append(envHelp, config.HelpKV{
|
|
Key: envK,
|
|
Description: hkv.Description,
|
|
Optional: hkv.Optional,
|
|
Type: hkv.Type,
|
|
})
|
|
}
|
|
h = envHelp
|
|
}
|
|
|
|
return Help{
|
|
SubSys: subSys,
|
|
Description: subSysHelp.Description,
|
|
MultipleTargets: subSysHelp.MultipleTargets,
|
|
KeysHelp: h,
|
|
}, nil
|
|
}
|
|
|
|
func newServerConfig() config.Config {
|
|
return config.New()
|
|
}
|
|
|
|
// newSrvConfig - initialize a new server config, saves env parameters if
|
|
// found, otherwise use default parameters
|
|
func newSrvConfig(objAPI ObjectLayer) error {
|
|
// Initialize server config.
|
|
srvCfg := newServerConfig()
|
|
|
|
// hold the mutex lock before a new config is assigned.
|
|
globalServerConfigMu.Lock()
|
|
globalServerConfig = srvCfg
|
|
globalServerConfigMu.Unlock()
|
|
|
|
// Save config into file.
|
|
return saveServerConfig(GlobalContext, objAPI, srvCfg)
|
|
}
|
|
|
|
func getValidConfig(objAPI ObjectLayer) (config.Config, error) {
|
|
return readServerConfig(GlobalContext, objAPI)
|
|
}
|
|
|
|
// loadConfig - loads a new config from disk, overrides params
|
|
// from env if found and valid
|
|
func loadConfig(objAPI ObjectLayer) error {
|
|
srvCfg, err := getValidConfig(objAPI)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Override any values from ENVs.
|
|
lookupConfigs(srvCfg, objAPI)
|
|
|
|
// hold the mutex lock before a new config is assigned.
|
|
globalServerConfigMu.Lock()
|
|
globalServerConfig = srvCfg
|
|
globalServerConfigMu.Unlock()
|
|
|
|
return nil
|
|
}
|