make subnet subsys dynamic and simplify callhome (#15927)

This commit is contained in:
Harshavardhana 2022-10-27 00:20:01 -07:00 committed by GitHub
parent 86420a1f46
commit ec77d28e62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 136 additions and 60 deletions

View File

@ -25,7 +25,6 @@ import (
"github.com/minio/madmin-go" "github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
uatomic "go.uber.org/atomic"
) )
const ( const (
@ -34,9 +33,6 @@ const (
// callhomeSchemaVersion is current callhome schema version. // callhomeSchemaVersion is current callhome schema version.
callhomeSchemaVersion = callhomeSchemaVersion1 callhomeSchemaVersion = callhomeSchemaVersion1
// callhomeCycleDefault is the default interval between two callhome cycles (24hrs)
callhomeCycleDefault = 24 * time.Hour
) )
// CallhomeInfo - Contains callhome information // CallhomeInfo - Contains callhome information
@ -45,26 +41,14 @@ type CallhomeInfo struct {
AdminInfo madmin.InfoMessage `json:"admin_info"` AdminInfo madmin.InfoMessage `json:"admin_info"`
} }
var ( var callhomeLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
enableCallhome = uatomic.NewBool(false)
callhomeLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
callhomeFreq = uatomic.NewDuration(callhomeCycleDefault)
)
func updateCallhomeParams(ctx context.Context, objAPI ObjectLayer) {
alreadyEnabled := enableCallhome.Load()
enableCallhome.Store(globalCallhomeConfig.Enable)
callhomeFreq.Store(globalCallhomeConfig.Frequency)
// If callhome was disabled earlier and has now been enabled,
// initialize the callhome process again.
if !alreadyEnabled && enableCallhome.Load() {
initCallhome(ctx, objAPI)
}
}
// initCallhome will start the callhome task in the background. // initCallhome will start the callhome task in the background.
func initCallhome(ctx context.Context, objAPI ObjectLayer) { func initCallhome(ctx context.Context, objAPI ObjectLayer) {
if !globalCallhomeConfig.Enabled() {
return
}
go func() { go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Leader node (that successfully acquires the lock inside runCallhome) // Leader node (that successfully acquires the lock inside runCallhome)
@ -72,54 +56,63 @@ func initCallhome(ctx context.Context, objAPI ObjectLayer) {
// the lock will be released and another node will acquire it and take over // the lock will be released and another node will acquire it and take over
// because of this loop. // because of this loop.
for { for {
runCallhome(ctx, objAPI) if !globalCallhomeConfig.Enabled() {
if !enableCallhome.Load() { return
}
if !runCallhome(ctx, objAPI) {
// callhome was disabled or context was canceled
return return
} }
// callhome running on a different node. // callhome running on a different node.
// sleep for some time and try again. // sleep for some time and try again.
duration := time.Duration(r.Float64() * float64(callhomeFreq.Load())) duration := time.Duration(r.Float64() * float64(globalCallhomeConfig.FrequencyDur()))
if duration < time.Second { if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks. // Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second duration = time.Second
} }
time.Sleep(duration) time.Sleep(duration)
if !enableCallhome.Load() {
return
}
} }
}() }()
} }
func runCallhome(ctx context.Context, objAPI ObjectLayer) { func runCallhome(ctx context.Context, objAPI ObjectLayer) bool {
// Make sure only 1 callhome is running on the cluster. // Make sure only 1 callhome is running on the cluster.
locker := objAPI.NewNSLock(minioMetaBucket, "callhome/runCallhome.lock") locker := objAPI.NewNSLock(minioMetaBucket, "callhome/runCallhome.lock")
lkctx, err := locker.GetLock(ctx, callhomeLeaderLockTimeout) lkctx, err := locker.GetLock(ctx, callhomeLeaderLockTimeout)
if err != nil { if err != nil {
return // lock timedout means some other node is the leader,
// cycle back return 'true'
return true
} }
ctx = lkctx.Context() ctx = lkctx.Context()
defer locker.Unlock(lkctx.Cancel) defer locker.Unlock(lkctx.Cancel)
callhomeTimer := time.NewTimer(callhomeFreq.Load()) callhomeTimer := time.NewTimer(globalCallhomeConfig.FrequencyDur())
defer callhomeTimer.Stop() defer callhomeTimer.Stop()
for { for {
if !globalCallhomeConfig.Enabled() {
// Stop the processing as callhome got disabled
return false
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
return // indicates that we do not need to run callhome anymore
return false
case <-callhomeTimer.C: case <-callhomeTimer.C:
if !enableCallhome.Load() { if !globalCallhomeConfig.Enabled() {
// Stop the processing as callhome got disabled // Stop the processing as callhome got disabled
return return false
} }
performCallhome(ctx) performCallhome(ctx)
// Reset the timer for next cycle. // Reset the timer for next cycle.
callhomeTimer.Reset(callhomeFreq.Load()) callhomeTimer.Reset(globalCallhomeConfig.FrequencyDur())
} }
} }
} }

View File

@ -209,15 +209,8 @@ func minioConfigToConsoleFeatures() {
} }
os.Setenv("CONSOLE_MINIO_REGION", globalSite.Region) os.Setenv("CONSOLE_MINIO_REGION", globalSite.Region)
os.Setenv("CONSOLE_CERT_PASSWD", env.Get("MINIO_CERT_PASSWD", "")) os.Setenv("CONSOLE_CERT_PASSWD", env.Get("MINIO_CERT_PASSWD", ""))
if globalSubnetConfig.License != "" {
os.Setenv("CONSOLE_SUBNET_LICENSE", globalSubnetConfig.License) globalSubnetConfig.ApplyEnv()
}
if globalSubnetConfig.APIKey != "" {
os.Setenv("CONSOLE_SUBNET_API_KEY", globalSubnetConfig.APIKey)
}
if globalSubnetConfig.ProxyURL != nil {
os.Setenv("CONSOLE_SUBNET_PROXY", globalSubnetConfig.ProxyURL.String())
}
} }
func buildOpenIDConsoleConfig() consoleoauth2.OpenIDPCfg { func buildOpenIDConsoleConfig() consoleoauth2.OpenIDPCfg {

View File

@ -360,9 +360,14 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
return err return err
} }
case config.CallhomeSubSys: case config.CallhomeSubSys:
if _, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default]); err != nil { cfg, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default])
if err != nil {
return err return err
} }
// callhome cannot be enabled if license is not registered yet, throw an error.
if cfg.Enabled() && !globalSubnetConfig.Registered() {
return errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'")
}
case config.PolicyOPASubSys: case config.PolicyOPASubSys:
// In case legacy OPA config is being set, we treat it as if the // In case legacy OPA config is being set, we treat it as if the
// AuthZPlugin is being set. // AuthZPlugin is being set.
@ -516,11 +521,6 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
logger.LogIf(ctx, fmt.Errorf("CRITICAL: enabling %s is not recommended in a production environment", xtls.EnvIdentityTLSSkipVerify)) logger.LogIf(ctx, fmt.Errorf("CRITICAL: enabling %s is not recommended in a production environment", xtls.EnvIdentityTLSSkipVerify))
} }
globalSubnetConfig, err = subnet.LookupConfig(s[config.SubnetSubSys][config.Default], globalProxyTransport)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err))
}
transport := NewHTTPTransport() transport := NewHTTPTransport()
globalConfigTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport) globalConfigTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport)
@ -641,13 +641,24 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
globalStorageClass.Update(sc) globalStorageClass.Update(sc)
} }
} }
case config.SubnetSubSys:
subnetConfig, err := subnet.LookupConfig(s[config.SubnetSubSys][config.Default], globalProxyTransport)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to parse subnet configuration: %w", err))
} else {
globalSubnetConfig.Update(subnetConfig)
globalSubnetConfig.ApplyEnv() // update environment settings for Console UI
}
case config.CallhomeSubSys: case config.CallhomeSubSys:
callhomeCfg, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default]) callhomeCfg, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default])
if err != nil { if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to load callhome config: %w", err)) logger.LogIf(ctx, fmt.Errorf("Unable to load callhome config: %w", err))
} else { } else {
globalCallhomeConfig = callhomeCfg enable := callhomeCfg.Enable && !globalCallhomeConfig.Enabled()
updateCallhomeParams(ctx, objAPI) globalCallhomeConfig.Update(callhomeCfg)
if enable {
initCallhome(ctx, objAPI)
}
} }
} }
globalServerConfigMu.Lock() globalServerConfigMu.Lock()

View File

@ -51,7 +51,7 @@ func printStartupMessage(apiEndpoints []string, err error) {
} }
} }
if len(globalSubnetConfig.APIKey) == 0 && err == nil { if !globalSubnetConfig.Registered() {
var builder strings.Builder var builder strings.Builder
startupBanner(&builder) startupBanner(&builder)
logger.Info(builder.String()) logger.Info(builder.String())

View File

@ -18,6 +18,7 @@
package callhome package callhome
import ( import (
"sync"
"time" "time"
"github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config"
@ -42,6 +43,9 @@ var DefaultKVS = config.KVS{
}, },
} }
// callhomeCycleDefault is the default interval between two callhome cycles (24hrs)
const callhomeCycleDefault = 24 * time.Hour
// Config represents the subnet related configuration // Config represents the subnet related configuration
type Config struct { type Config struct {
// Flag indicating whether callhome is enabled. // Flag indicating whether callhome is enabled.
@ -51,6 +55,37 @@ type Config struct {
Frequency time.Duration `json:"frequency"` Frequency time.Duration `json:"frequency"`
} }
var configLock sync.RWMutex
// Enabled - indicates if callhome is enabled or not
func (c *Config) Enabled() bool {
configLock.RLock()
defer configLock.RUnlock()
return c.Enable
}
// FrequencyDur - returns the currently configured callhome frequency
func (c *Config) FrequencyDur() time.Duration {
configLock.RLock()
defer configLock.RUnlock()
if c.Frequency == 0 {
return callhomeCycleDefault
}
return c.Frequency
}
// Update updates new callhome frequency
func (c *Config) Update(ncfg Config) {
configLock.Lock()
defer configLock.Unlock()
c.Enable = ncfg.Enable
c.Frequency = ncfg.Frequency
}
// LookupConfig - lookup config and override with valid environment settings if any. // LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) { func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.CallhomeSubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.CallhomeSubSys, kvs, DefaultKVS); err != nil {

View File

@ -20,7 +20,9 @@ package subnet
import ( import (
"net/http" "net/http"
"net/url" "net/url"
"os"
"strings" "strings"
"sync"
"github.com/minio/minio/internal/config" "github.com/minio/minio/internal/config"
"github.com/minio/pkg/env" "github.com/minio/pkg/env"
@ -49,24 +51,62 @@ type Config struct {
License string `json:"license"` License string `json:"license"`
// The subnet api key // The subnet api key
APIKey string `json:"api_key"` APIKey string `json:"apiKey"`
// The HTTP(S) proxy URL to use for connecting to SUBNET // The HTTP(S) proxy URL to use for connecting to SUBNET
ProxyURL *xnet.URL `json:"proxy_url"` Proxy string `json:"proxy"`
// Transport configured with proxy_url if set optionally. // Transport configured with proxy_url if set optionally.
transport http.RoundTripper transport http.RoundTripper
} }
var configLock sync.RWMutex
// Registered indicates if cluster is registered or not
func (c *Config) Registered() bool {
configLock.RLock()
defer configLock.RUnlock()
return len(c.APIKey) > 0
}
// ApplyEnv - applies the current subnet config to Console UI specific environment variables.
func (c *Config) ApplyEnv() {
configLock.RLock()
defer configLock.RUnlock()
if c.License != "" {
os.Setenv("CONSOLE_SUBNET_LICENSE", c.License)
}
if c.APIKey != "" {
os.Setenv("CONSOLE_SUBNET_API_KEY", c.APIKey)
}
if c.Proxy != "" {
os.Setenv("CONSOLE_SUBNET_PROXY", c.Proxy)
}
}
// Update - in-place update with new license and registration information.
func (c *Config) Update(ncfg Config) {
configLock.Lock()
defer configLock.Unlock()
c.License = ncfg.License
c.APIKey = ncfg.APIKey
c.Proxy = ncfg.Proxy
c.transport = ncfg.transport
}
// LookupConfig - lookup config and override with valid environment settings if any. // LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err error) { func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err error) {
if err = config.CheckValidKeys(config.SubnetSubSys, kvs, DefaultKVS); err != nil { if err = config.CheckValidKeys(config.SubnetSubSys, kvs, DefaultKVS); err != nil {
return cfg, err return cfg, err
} }
var proxyURL *xnet.URL
proxy := env.Get(config.EnvMinIOSubnetProxy, kvs.Get(config.Proxy)) proxy := env.Get(config.EnvMinIOSubnetProxy, kvs.Get(config.Proxy))
if len(proxy) > 0 { if len(proxy) > 0 {
cfg.ProxyURL, err = xnet.ParseHTTPURL(proxy) proxyURL, err = xnet.ParseHTTPURL(proxy)
if err != nil { if err != nil {
return cfg, err return cfg, err
} }
@ -75,6 +115,7 @@ func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err
cfg.License = strings.TrimSpace(env.Get(config.EnvMinIOSubnetLicense, kvs.Get(config.License))) cfg.License = strings.TrimSpace(env.Get(config.EnvMinIOSubnetLicense, kvs.Get(config.License)))
cfg.APIKey = strings.TrimSpace(env.Get(config.EnvMinIOSubnetAPIKey, kvs.Get(config.APIKey))) cfg.APIKey = strings.TrimSpace(env.Get(config.EnvMinIOSubnetAPIKey, kvs.Get(config.APIKey)))
cfg.Proxy = proxy
if transport == nil { if transport == nil {
// when transport is nil, it means we are just validating the // when transport is nil, it means we are just validating the
@ -83,9 +124,9 @@ func LookupConfig(kvs config.KVS, transport http.RoundTripper) (cfg Config, err
} }
// Make sure to clone the transport before editing the ProxyURL // Make sure to clone the transport before editing the ProxyURL
if cfg.ProxyURL != nil { if proxyURL != nil {
ctransport := transport.(*http.Transport).Clone() ctransport := transport.(*http.Transport).Clone()
ctransport.Proxy = http.ProxyURL((*url.URL)(cfg.ProxyURL)) ctransport.Proxy = http.ProxyURL((*url.URL)(proxyURL))
cfg.transport = ctransport cfg.transport = ctransport
} else { } else {
cfg.transport = transport cfg.transport = transport

View File

@ -35,8 +35,8 @@ const (
// Post submit 'payload' to specified URL // Post submit 'payload' to specified URL
func (c Config) Post(reqURL string, payload interface{}) (string, error) { func (c Config) Post(reqURL string, payload interface{}) (string, error) {
if len(c.APIKey) == 0 { if !c.Registered() {
return "", errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc support register ALIAS'") return "", errors.New("Deployment is not registered with SUBNET. Please register the deployment via 'mc license register ALIAS'")
} }
body, err := json.Marshal(payload) body, err := json.Marshal(payload)
if err != nil { if err != nil {
@ -47,7 +47,10 @@ func (c Config) Post(reqURL string, payload interface{}) (string, error) {
return "", err return "", err
} }
configLock.RLock()
r.Header.Set("Authorization", c.APIKey) r.Header.Set("Authorization", c.APIKey)
configLock.RUnlock()
r.Header.Set("Content-Type", "application/json") r.Header.Set("Content-Type", "application/json")
client := &http.Client{ client := &http.Client{