Add periodic callhome functionality (#14918)

* Add periodic callhome functionality

Periodically (every 24hrs by default), fetch callhome information and
upload it to SUBNET.

New config keys under the `callhome` subsystem:

enable - Set to `on` for enabling callhome. Default `off`
frequency - Interval between callhome cycles. Default `24h`

* Improvements based on review comments

- Update `enableCallhome` safely
- Rename pctx to ctx
- Block during execution of callhome
- Store parsed proxy URL in global subnet config
- Store callhome URL(s) in constants
- Use existing global transport
- Pass auth token to subnetPostReq
- Use `config.EnableOn` instead of `"on"`

* Use atomic package instead of lock

* Use uber atomic package

* Use `Cancel` instead of `cancel`

Co-authored-by: Harshavardhana <harsha@minio.io>

Co-authored-by: Harshavardhana <harsha@minio.io>
Co-authored-by: Aditya Manthramurthy <donatello@users.noreply.github.com>
This commit is contained in:
Shireesh Anjal 2022-06-07 04:44:52 +05:30 committed by GitHub
parent df9eeb7f8f
commit 4ce81fd07f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 401 additions and 38 deletions

View File

@ -31,7 +31,10 @@ import (
// local endpoints from given list of endpoints
func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Request) madmin.ServerProperties {
var localEndpoints Endpoints
addr := r.Host
addr := globalLocalNodeName
if r != nil {
addr = r.Host
}
if globalIsDistErasure {
addr = globalLocalNodeName
}
@ -40,7 +43,7 @@ func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Req
for _, endpoint := range ep.Endpoints {
nodeName := endpoint.Host
if nodeName == "" {
nodeName = r.Host
nodeName = addr
}
if endpoint.IsLocal {
// Only proceed for local endpoints

136
cmd/callhome.go Normal file
View File

@ -0,0 +1,136 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger"
uatomic "go.uber.org/atomic"
)
const (
// callhomeSchemaVersion1 is callhome schema version 1
callhomeSchemaVersion1 = "1"
// callhomeSchemaVersion is current callhome schema version.
callhomeSchemaVersion = callhomeSchemaVersion1
// callhomeCycleDefault is the default interval between two callhome cycles (24hrs)
callhomeCycleDefault = 24 * time.Hour
)
// CallhomeInfo - Contains callhome information
type CallhomeInfo struct {
SchemaVersion string `json:"schema_version"`
AdminInfo madmin.InfoMessage `json:"admin_info"`
}
var (
enableCallhome = uatomic.NewBool(false)
callhomeLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
callhomeFreq = uatomic.NewDuration(callhomeCycleDefault)
)
func updateCallhomeParams(ctx context.Context, objAPI ObjectLayer) {
alreadyEnabled := enableCallhome.Load()
enableCallhome.Store(globalCallhomeConfig.Enable)
callhomeFreq.Store(globalCallhomeConfig.Frequency)
// If callhome was disabled earlier and has now been enabled,
// initialize the callhome process again.
if !alreadyEnabled && enableCallhome.Load() {
initCallhome(ctx, objAPI)
}
}
// initCallhome will start the callhome task in the background.
func initCallhome(ctx context.Context, objAPI ObjectLayer) {
go func() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Leader node (that successfully acquires the lock inside runCallhome)
// will keep performing the callhome. If the leader goes down for some reason,
// the lock will be released and another node will acquire it and take over
// because of this loop.
for {
runCallhome(ctx, objAPI)
if !enableCallhome.Load() {
return
}
// callhome running on a different node.
// sleep for some time and try again.
duration := time.Duration(r.Float64() * float64(callhomeFreq.Load()))
if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second
}
time.Sleep(duration)
if !enableCallhome.Load() {
return
}
}
}()
}
func runCallhome(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 callhome is running on the cluster.
locker := objAPI.NewNSLock(minioMetaBucket, "callhome/runCallhome.lock")
lkctx, err := locker.GetLock(ctx, callhomeLeaderLockTimeout)
if err != nil {
return
}
ctx = lkctx.Context()
defer locker.Unlock(lkctx.Cancel)
callhomeTimer := time.NewTimer(callhomeFreq.Load())
defer callhomeTimer.Stop()
for {
select {
case <-ctx.Done():
return
case <-callhomeTimer.C:
if !enableCallhome.Load() {
// Stop the processing as callhome got disabled
return
}
performCallhome(ctx)
// Reset the timer for next cycle.
callhomeTimer.Reset(callhomeFreq.Load())
}
}
}
func performCallhome(ctx context.Context) {
err := sendCallhomeInfo(
CallhomeInfo{
SchemaVersion: callhomeSchemaVersion,
AdminInfo: getServerInfo(ctx, nil),
})
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to perform callhome: %w", err))
}
}

View File

@ -230,8 +230,8 @@ func minioConfigToConsoleFeatures() {
if globalSubnetConfig.APIKey != "" {
os.Setenv("CONSOLE_SUBNET_API_KEY", globalSubnetConfig.APIKey)
}
if globalSubnetConfig.Proxy != "" {
os.Setenv("CONSOLE_SUBNET_PROXY", globalSubnetConfig.Proxy)
if globalSubnetConfig.ProxyURL != nil {
os.Setenv("CONSOLE_SUBNET_PROXY", globalSubnetConfig.ProxyURL.String())
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/minio/minio/internal/config"
"github.com/minio/minio/internal/config/api"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
"github.com/minio/minio/internal/config/etcd"
@ -70,6 +71,7 @@ func initHelp() {
config.HealSubSys: heal.DefaultKVS,
config.ScannerSubSys: scanner.DefaultKVS,
config.SubnetSubSys: subnet.DefaultKVS,
config.CallhomeSubSys: callhome.DefaultKVS,
}
for k, v := range notify.DefaultNotificationKVS {
kvs[k] = v
@ -201,6 +203,12 @@ func initHelp() {
Description: "set subnet config for the cluster e.g. api key",
Optional: true,
},
config.HelpKV{
Key: config.CallhomeSubSys,
Type: "string",
Description: "enable callhome for the cluster",
Optional: true,
},
}
if globalIsErasure {
@ -243,6 +251,7 @@ func initHelp() {
config.NotifyWebhookSubSys: notify.HelpWebhook,
config.NotifyESSubSys: notify.HelpES,
config.SubnetSubSys: subnet.HelpSubnet,
config.CallhomeSubSys: callhome.HelpCallhome,
}
config.RegisterHelpSubSys(helpMap)
@ -358,6 +367,10 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
if _, err := subnet.LookupConfig(s[config.SubnetSubSys][config.Default]); err != nil {
return err
}
case config.CallhomeSubSys:
if _, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default]); err != nil {
return err
}
case config.PolicyOPASubSys:
// In case legacy OPA config is being set, we treat it as if the
// AuthZPlugin is being set.
@ -649,7 +662,7 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
return fmt.Errorf("Unable to apply scanner config: %w", err)
}
// update dynamic scanner values.
scannerCycle.Update(scannerCfg.Cycle)
scannerCycle.Store(scannerCfg.Cycle)
logger.LogIf(ctx, scannerSleeper.Update(scannerCfg.Delay, scannerCfg.MaxWait))
case config.LoggerWebhookSubSys:
loggerCfg, err := logger.LookupConfigForSubSys(s, config.LoggerWebhookSubSys)
@ -719,6 +732,14 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
}
}
}
case config.CallhomeSubSys:
callhomeCfg, err := callhome.LookupConfig(s[config.CallhomeSubSys][config.Default])
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to load callhome config: %w", err))
} else {
globalCallhomeConfig = callhomeCfg
updateCallhomeParams(ctx, objAPI)
}
}
globalServerConfigMu.Lock()
defer globalServerConfigMu.Unlock()

View File

@ -45,6 +45,7 @@ import (
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/console"
uatomic "go.uber.org/atomic"
)
const (
@ -66,9 +67,7 @@ var (
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
// Sleeper values are updated when config is loaded.
scannerSleeper = newDynamicSleeper(10, 10*time.Second)
scannerCycle = &safeDuration{
t: dataScannerStartDelay,
}
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
)
// initDataScanner will start the scanner in the background.
@ -78,7 +77,7 @@ func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
// Run the data scanner in a loop
for {
runDataScanner(ctx, objAPI)
duration := time.Duration(r.Float64() * float64(scannerCycle.Get()))
duration := time.Duration(r.Float64() * float64(scannerCycle.Load()))
if duration < time.Second {
// Make sure to sleep atleast a second to avoid high CPU ticks.
duration = time.Second
@ -88,23 +87,6 @@ func initDataScanner(ctx context.Context, objAPI ObjectLayer) {
}()
}
type safeDuration struct {
sync.Mutex
t time.Duration
}
func (s *safeDuration) Update(t time.Duration) {
s.Lock()
defer s.Unlock()
s.t = t
}
func (s *safeDuration) Get() time.Duration {
s.Lock()
defer s.Unlock()
return s.t
}
func getCycleScanMode(currentCycle, bitrotStartCycle uint64, bitrotStartTime time.Time) madmin.HealScanMode {
bitrotCycle := globalHealConfig.BitrotScanCycle()
switch bitrotCycle {
@ -189,7 +171,7 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
}
}
scannerTimer := time.NewTimer(scannerCycle.Get())
scannerTimer := time.NewTimer(scannerCycle.Load())
defer scannerTimer.Stop()
for {
@ -231,7 +213,7 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
}
// Reset the timer for next cycle.
scannerTimer.Reset(scannerCycle.Get())
scannerTimer.Reset(scannerCycle.Load())
}
}
}

View File

@ -37,6 +37,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/config/cache"
"github.com/minio/minio/internal/config/callhome"
"github.com/minio/minio/internal/config/compress"
"github.com/minio/minio/internal/config/dns"
xldap "github.com/minio/minio/internal/config/identity/ldap"
@ -243,6 +244,9 @@ var (
// The global subnet config
globalSubnetConfig subnet.Config
// The global callhome config
globalCallhomeConfig callhome.Config
globalRemoteEndpoints map[string]Endpoint
// Global server's network statistics

102
cmd/subnet-utils.go Normal file
View File

@ -0,0 +1,102 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
xhttp "github.com/minio/minio/internal/http"
)
const (
subnetRespBodyLimit = 1 << 20 // 1 MiB
callhomeURL = "https://subnet.min.io/api/callhome"
callhomeURLDev = "http://localhost:9000/api/callhome"
)
func httpClient(timeout time.Duration) *http.Client {
return &http.Client{
Timeout: timeout,
Transport: globalProxyTransport,
}
}
func subnetHTTPDo(req *http.Request) (*http.Response, error) {
client := httpClient(10 * time.Second)
if globalSubnetConfig.ProxyURL != nil {
client.Transport.(*http.Transport).Proxy = http.ProxyURL((*url.URL)(globalSubnetConfig.ProxyURL))
}
return client.Do(req)
}
func subnetReqDo(r *http.Request, authToken string) (string, error) {
r.Header.Set("Authorization", authToken)
r.Header.Set("Content-Type", "application/json")
resp, err := subnetHTTPDo(r)
if resp != nil {
defer xhttp.DrainBody(resp.Body)
}
if err != nil {
return "", err
}
respBytes, err := ioutil.ReadAll(io.LimitReader(resp.Body, subnetRespBodyLimit))
if err != nil {
return "", err
}
respStr := string(respBytes)
if resp.StatusCode == http.StatusOK {
return respStr, nil
}
return respStr, fmt.Errorf("SUBNET request failed with code %d and error: %s", resp.StatusCode, respStr)
}
func subnetPostReq(reqURL string, payload interface{}, authToken string) (string, error) {
body, err := json.Marshal(payload)
if err != nil {
return "", err
}
r, err := http.NewRequest(http.MethodPost, reqURL, bytes.NewReader(body))
if err != nil {
return "", err
}
return subnetReqDo(r, authToken)
}
func sendCallhomeInfo(ch CallhomeInfo) error {
if len(globalSubnetConfig.APIKey) == 0 {
return errors.New("Cluster is not registered with SUBNET. Please register by running 'mc support register ALIAS'")
}
url := callhomeURL
if globalIsCICD {
url = callhomeURLDev
}
_, err := subnetPostReq(url, ch, globalSubnetConfig.APIKey)
return err
}

View File

@ -0,0 +1,65 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package callhome
import (
"time"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/env"
)
// Callhome related keys
const (
Enable = "enable"
Frequency = "frequency"
)
// DefaultKVS - default KV config for subnet settings
var DefaultKVS = config.KVS{
config.KV{
Key: Enable,
Value: "off",
},
config.KV{
Key: Frequency,
Value: "24h",
},
}
// Config represents the subnet related configuration
type Config struct {
// Flag indicating whether callhome is enabled.
Enable bool `json:"enable"`
// The interval between callhome cycles
Frequency time.Duration `json:"frequency"`
}
// LookupConfig - lookup config and override with valid environment settings if any.
func LookupConfig(kvs config.KVS) (cfg Config, err error) {
if err = config.CheckValidKeys(config.CallhomeSubSys, kvs, DefaultKVS); err != nil {
return cfg, err
}
cfg.Enable = env.Get(config.EnvMinIOCallhomeEnable,
kvs.GetWithDefault(Enable, DefaultKVS)) == config.EnableOn
cfg.Frequency, err = time.ParseDuration(env.Get(config.EnvMinIOCallhomeFrequency,
kvs.GetWithDefault(Frequency, DefaultKVS)))
return cfg, err
}

View File

@ -0,0 +1,42 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package callhome
import "github.com/minio/minio/internal/config"
var (
defaultHelpPostfix = func(key string) string {
return config.DefaultHelpPostfix(DefaultKVS, key)
}
// HelpCallhome - provides help for callhome config
HelpCallhome = config.HelpKVS{
config.HelpKV{
Key: Enable,
Type: "on|off",
Description: "set to enable callhome" + defaultHelpPostfix(Enable),
Optional: true,
},
config.HelpKV{
Key: Frequency,
Type: "duration",
Description: "time duration between callhome cycles e.g. 24h" + defaultHelpPostfix(Frequency),
Optional: true,
},
}
)

View File

@ -88,6 +88,7 @@ const (
ScannerSubSys = "scanner"
CrawlerSubSys = "crawler"
SubnetSubSys = "subnet"
CallhomeSubSys = "callhome"
// Add new constants here if you add new fields to config.
)
@ -161,6 +162,7 @@ var SubSystems = set.CreateStringSet(
NotifyRedisSubSys,
NotifyWebhookSubSys,
SubnetSubSys,
CallhomeSubSys,
)
// SubSystemsDynamic - all sub-systems that have dynamic config.
@ -170,6 +172,7 @@ var SubSystemsDynamic = set.CreateStringSet(
ScannerSubSys,
HealSubSys,
SubnetSubSys,
CallhomeSubSys,
LoggerWebhookSubSys,
AuditWebhookSubSys,
AuditKafkaSubSys,

View File

@ -56,6 +56,10 @@ const (
EnvMinIOSubnetLicense = "MINIO_SUBNET_LICENSE" // Deprecated Dec 2021
EnvMinIOSubnetAPIKey = "MINIO_SUBNET_API_KEY"
EnvMinIOSubnetProxy = "MINIO_SUBNET_PROXY"
EnvMinIOCallhomeEnable = "MINIO_CALLHOME_ENABLE"
EnvMinIOCallhomeFrequency = "MINIO_CALLHOME_FREQUENCY"
EnvMinIOServerURL = "MINIO_SERVER_URL"
EnvMinIOBrowserRedirectURL = "MINIO_BROWSER_REDIRECT_URL"
EnvRootDiskThresholdSize = "MINIO_ROOTDISK_THRESHOLD_SIZE"

View File

@ -18,10 +18,9 @@
package subnet
import (
"net/url"
"github.com/minio/minio/internal/config"
"github.com/minio/pkg/env"
xnet "github.com/minio/pkg/net"
)
// DefaultKVS - default KV config for subnet settings
@ -49,7 +48,7 @@ type Config struct {
APIKey string `json:"api_key"`
// The HTTP(S) proxy URL to use for connecting to SUBNET
Proxy string `json:"proxy"`
ProxyURL *xnet.URL `json:"proxy_url"`
}
// LookupConfig - lookup config and override with valid environment settings if any.
@ -58,11 +57,13 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, err
}
cfg.Proxy = env.Get(config.EnvMinIOSubnetProxy, kvs.Get(config.Proxy))
_, err = url.Parse(cfg.Proxy)
proxy := env.Get(config.EnvMinIOSubnetProxy, kvs.Get(config.Proxy))
if len(proxy) > 0 {
cfg.ProxyURL, err = xnet.ParseHTTPURL(proxy)
if err != nil {
return cfg, err
}
}
cfg.License = env.Get(config.EnvMinIOSubnetLicense, kvs.Get(config.License))
cfg.APIKey = env.Get(config.EnvMinIOSubnetAPIKey, kvs.Get(config.APIKey))