mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
allow bootstrap to capture time-spent for each initializers (#17900)
This commit is contained in:
parent
adb8be069e
commit
af564b8ba0
@ -19,98 +19,51 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v3"
|
||||
"github.com/minio/minio/internal/pubsub"
|
||||
)
|
||||
|
||||
const bootstrapMsgsLimit = 4 << 10
|
||||
const bootstrapTraceLimit = 4 << 10
|
||||
|
||||
type bootstrapInfo struct {
|
||||
msg string
|
||||
ts time.Time
|
||||
source string
|
||||
}
|
||||
type bootstrapTracer struct {
|
||||
mu sync.RWMutex
|
||||
idx int
|
||||
info [bootstrapMsgsLimit]bootstrapInfo
|
||||
lastUpdate time.Time
|
||||
mu sync.RWMutex
|
||||
info []madmin.TraceInfo
|
||||
}
|
||||
|
||||
var globalBootstrapTracer = &bootstrapTracer{}
|
||||
|
||||
func (bs *bootstrapTracer) DropEvents() {
|
||||
func (bs *bootstrapTracer) Record(info madmin.TraceInfo) {
|
||||
bs.mu.Lock()
|
||||
defer bs.mu.Unlock()
|
||||
|
||||
if time.Now().UTC().Sub(bs.lastUpdate) > 24*time.Hour {
|
||||
bs.info = [4096]bootstrapInfo{}
|
||||
bs.idx = 0
|
||||
if len(bs.info) > bootstrapTraceLimit {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bootstrapTracer) Empty() bool {
|
||||
var empty bool
|
||||
bs.mu.RLock()
|
||||
empty = bs.info[0].msg == ""
|
||||
bs.mu.RUnlock()
|
||||
|
||||
return empty
|
||||
}
|
||||
|
||||
func (bs *bootstrapTracer) Record(msg string, skip int) {
|
||||
source := getSource(skip + 1)
|
||||
bs.mu.Lock()
|
||||
now := time.Now().UTC()
|
||||
bs.info[bs.idx] = bootstrapInfo{
|
||||
msg: msg,
|
||||
ts: now,
|
||||
source: source,
|
||||
}
|
||||
bs.lastUpdate = now
|
||||
bs.idx = (bs.idx + 1) % bootstrapMsgsLimit
|
||||
bs.mu.Unlock()
|
||||
bs.info = append(bs.info, info)
|
||||
}
|
||||
|
||||
func (bs *bootstrapTracer) Events() []madmin.TraceInfo {
|
||||
traceInfo := make([]madmin.TraceInfo, 0, bootstrapMsgsLimit)
|
||||
|
||||
// Add all messages in order
|
||||
addAll := func(info []bootstrapInfo) {
|
||||
for _, msg := range info {
|
||||
if msg.ts.IsZero() {
|
||||
continue // skip empty events
|
||||
}
|
||||
traceInfo = append(traceInfo, madmin.TraceInfo{
|
||||
TraceType: madmin.TraceBootstrap,
|
||||
Time: msg.ts,
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: "BOOTSTRAP",
|
||||
Message: fmt.Sprintf("%s %s", msg.source, msg.msg),
|
||||
})
|
||||
}
|
||||
}
|
||||
traceInfo := make([]madmin.TraceInfo, 0, bootstrapTraceLimit)
|
||||
|
||||
bs.mu.RLock()
|
||||
addAll(bs.info[bs.idx:])
|
||||
addAll(bs.info[:bs.idx])
|
||||
for _, i := range bs.info {
|
||||
traceInfo = append(traceInfo, i)
|
||||
}
|
||||
bs.mu.RUnlock()
|
||||
|
||||
return traceInfo
|
||||
}
|
||||
|
||||
func (bs *bootstrapTracer) Publish(ctx context.Context, trace *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType]) {
|
||||
if bs.Empty() {
|
||||
return
|
||||
}
|
||||
for _, bsEvent := range bs.Events() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
default:
|
||||
trace.Publish(bsEvent)
|
||||
if bsEvent.Message != "" {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
default:
|
||||
trace.Publish(bsEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,60 +0,0 @@
|
||||
// Copyright (c) 2015-2023 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestBootstrap(t *testing.T) {
|
||||
// Bootstrap events exceed bootstrap messages limit
|
||||
bsTracer := &bootstrapTracer{}
|
||||
for i := 0; i < bootstrapMsgsLimit+10; i++ {
|
||||
bsTracer.Record(fmt.Sprintf("msg-%d", i), 1)
|
||||
}
|
||||
|
||||
traceInfos := bsTracer.Events()
|
||||
if len(traceInfos) != bootstrapMsgsLimit {
|
||||
t.Fatalf("Expected length of events %d but got %d", bootstrapMsgsLimit, len(traceInfos))
|
||||
}
|
||||
|
||||
// Simulate the case where bootstrap events were updated a day ago
|
||||
bsTracer.lastUpdate = time.Now().UTC().Add(-25 * time.Hour)
|
||||
bsTracer.DropEvents()
|
||||
if !bsTracer.Empty() {
|
||||
t.Fatalf("Expected all bootstrap events to have been dropped, but found %d events", len(bsTracer.Events()))
|
||||
}
|
||||
|
||||
// Fewer than 4K bootstrap events
|
||||
for i := 0; i < 10; i++ {
|
||||
bsTracer.Record(fmt.Sprintf("msg-%d", i), 1)
|
||||
}
|
||||
events := bsTracer.Events()
|
||||
if len(events) != 10 {
|
||||
t.Fatalf("Expected length of events %d but got %d", 10, len(events))
|
||||
}
|
||||
for i, traceInfo := range bsTracer.Events() {
|
||||
msg := fmt.Sprintf("msg-%d", i)
|
||||
if !strings.HasSuffix(traceInfo.Message, msg) {
|
||||
t.Fatalf("Expected %s but got %s", msg, traceInfo.Message)
|
||||
}
|
||||
}
|
||||
}
|
@ -221,7 +221,7 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS
|
||||
for onlineServers < len(clnts)/2 {
|
||||
for _, clnt := range clnts {
|
||||
if err := clnt.Verify(ctx, srcCfg); err != nil {
|
||||
bootstrapTrace(fmt.Sprintf("clnt.Verify: %v, endpoint: %v", err, clnt.endpoint))
|
||||
bootstrapTraceMsg(fmt.Sprintf("clnt.Verify: %v, endpoint: %v", err, clnt.endpoint))
|
||||
if !isNetworkError(err) {
|
||||
logger.LogOnceIf(ctx, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err), clnt.String())
|
||||
incorrectConfigs = append(incorrectConfigs, fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err))
|
||||
|
@ -440,7 +440,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize remote webhook DNS config %w", err))
|
||||
}
|
||||
if err == nil && dnsURL != "" {
|
||||
bootstrapTrace("initialize remote bucket DNS store")
|
||||
bootstrapTraceMsg("initialize remote bucket DNS store")
|
||||
globalDNSConfig, err = dns.NewOperatorDNS(dnsURL,
|
||||
dns.Authentication(dnsUser, dnsPass),
|
||||
dns.RootCAs(globalRootCAs))
|
||||
@ -455,7 +455,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
||||
}
|
||||
|
||||
if etcdCfg.Enabled {
|
||||
bootstrapTrace("initialize etcd store")
|
||||
bootstrapTraceMsg("initialize etcd store")
|
||||
globalEtcdClient, err = etcd.New(etcdCfg)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
|
||||
@ -514,19 +514,19 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
||||
|
||||
transport := NewHTTPTransport()
|
||||
|
||||
bootstrapTrace("initialize the event notification targets")
|
||||
bootstrapTraceMsg("initialize the event notification targets")
|
||||
globalNotifyTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
|
||||
}
|
||||
|
||||
bootstrapTrace("initialize the lambda targets")
|
||||
bootstrapTraceMsg("initialize the lambda targets")
|
||||
globalLambdaTargetList, err = lambda.FetchEnabledTargets(GlobalContext, s, transport)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize lambda target(s): %w", err))
|
||||
}
|
||||
|
||||
bootstrapTrace("applying the dynamic configuration")
|
||||
bootstrapTraceMsg("applying the dynamic configuration")
|
||||
// Apply dynamic config values
|
||||
if err := applyDynamicConfig(ctx, objAPI, s); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
@ -786,13 +786,13 @@ func getValidConfig(objAPI ObjectLayer) (config.Config, error) {
|
||||
// from env if found and valid
|
||||
// data is optional. If nil it will be loaded from backend.
|
||||
func loadConfig(objAPI ObjectLayer, data []byte) error {
|
||||
bootstrapTrace("load the configuration")
|
||||
bootstrapTraceMsg("load the configuration")
|
||||
srvCfg, err := readServerConfig(GlobalContext, objAPI, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("lookup the configuration")
|
||||
bootstrapTraceMsg("lookup the configuration")
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, objAPI)
|
||||
|
||||
|
@ -2423,7 +2423,7 @@ func migrateV27ToV28() error {
|
||||
// Migrates ${HOME}/.minio/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||
// if etcd is configured then migrates /config/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||
func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
||||
bootstrapTrace("migrate config to .minio.sys/config/config.json")
|
||||
bootstrapTraceMsg("migrate config to .minio.sys/config/config.json")
|
||||
// Construct path to config.json for the given bucket.
|
||||
configFile := path.Join(minioConfigPrefix, minioConfigFile)
|
||||
|
||||
|
@ -197,10 +197,10 @@ func NewConfigSys() *ConfigSys {
|
||||
|
||||
// Initialize and load config from remote etcd or local config directory
|
||||
func initConfig(objAPI ObjectLayer) (err error) {
|
||||
bootstrapTrace("load the configuration")
|
||||
bootstrapTraceMsg("load the configuration")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
bootstrapTrace(fmt.Sprintf("loading configuration failed: %v", err))
|
||||
bootstrapTraceMsg(fmt.Sprintf("loading configuration failed: %v", err))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -213,7 +213,7 @@ func initConfig(objAPI ObjectLayer) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("lookup the configuration")
|
||||
bootstrapTraceMsg("lookup the configuration")
|
||||
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, objAPI)
|
||||
|
@ -384,10 +384,12 @@ func (iamOS *IAMObjectStore) listAllIAMConfigItems(ctx context.Context) (map[str
|
||||
|
||||
// Assumes cache is locked by caller.
|
||||
func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iamCache) error {
|
||||
bootstrapTrace("loading all IAM items")
|
||||
if iamOS.objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
bootstrapTraceMsg("loading all IAM items")
|
||||
|
||||
listedConfigItems, err := iamOS.listAllIAMConfigItems(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to list IAM data: %w", err)
|
||||
@ -395,7 +397,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
|
||||
// Loads things in the same order as `LoadIAMCache()`
|
||||
|
||||
bootstrapTrace("loading policy documents")
|
||||
bootstrapTraceMsg("loading policy documents")
|
||||
|
||||
policiesList := listedConfigItems[policiesListKey]
|
||||
for _, item := range policiesList {
|
||||
@ -407,7 +409,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
setDefaultCannedPolicies(cache.iamPolicyDocsMap)
|
||||
|
||||
if iamOS.usersSysType == MinIOUsersSysType {
|
||||
bootstrapTrace("loading regular IAM users")
|
||||
bootstrapTraceMsg("loading regular IAM users")
|
||||
regUsersList := listedConfigItems[usersListKey]
|
||||
for _, item := range regUsersList {
|
||||
userName := path.Dir(item)
|
||||
@ -416,7 +418,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading regular IAM groups")
|
||||
bootstrapTraceMsg("loading regular IAM groups")
|
||||
groupsList := listedConfigItems[groupsListKey]
|
||||
for _, item := range groupsList {
|
||||
group := path.Dir(item)
|
||||
@ -426,7 +428,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading user policy mapping")
|
||||
bootstrapTraceMsg("loading user policy mapping")
|
||||
userPolicyMappingsList := listedConfigItems[policyDBUsersListKey]
|
||||
for _, item := range userPolicyMappingsList {
|
||||
userName := strings.TrimSuffix(item, ".json")
|
||||
@ -435,7 +437,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading group policy mapping")
|
||||
bootstrapTraceMsg("loading group policy mapping")
|
||||
groupPolicyMappingsList := listedConfigItems[policyDBGroupsListKey]
|
||||
for _, item := range groupPolicyMappingsList {
|
||||
groupName := strings.TrimSuffix(item, ".json")
|
||||
@ -444,7 +446,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading service accounts")
|
||||
bootstrapTraceMsg("loading service accounts")
|
||||
svcAccList := listedConfigItems[svcAccListKey]
|
||||
for _, item := range svcAccList {
|
||||
userName := path.Dir(item)
|
||||
@ -453,7 +455,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading STS users")
|
||||
bootstrapTraceMsg("loading STS users")
|
||||
stsUsersList := listedConfigItems[stsListKey]
|
||||
for _, item := range stsUsersList {
|
||||
userName := path.Dir(item)
|
||||
@ -462,7 +464,7 @@ func (iamOS *IAMObjectStore) loadAllFromObjStore(ctx context.Context, cache *iam
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading STS policy mapping")
|
||||
bootstrapTraceMsg("loading STS policy mapping")
|
||||
stsPolicyMappingsList := listedConfigItems[policyDBSTSUsersListKey]
|
||||
for _, item := range stsPolicyMappingsList {
|
||||
stsName := strings.TrimSuffix(item, ".json")
|
||||
|
@ -109,7 +109,7 @@ func getUserIdentityPath(user string, userType IAMUserType) string {
|
||||
}
|
||||
|
||||
func saveIAMFormat(ctx context.Context, store IAMStorageAPI) error {
|
||||
bootstrapTrace("Load IAM format file")
|
||||
bootstrapTraceMsg("Load IAM format file")
|
||||
var iamFmt iamFormat
|
||||
path := getIAMFormatFilePath()
|
||||
if err := store.loadIAMConfig(ctx, &iamFmt, path); err != nil {
|
||||
@ -127,7 +127,7 @@ func saveIAMFormat(ctx context.Context, store IAMStorageAPI) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
bootstrapTrace("Write IAM format file")
|
||||
bootstrapTraceMsg("Write IAM format file")
|
||||
// Save iam format to version 1.
|
||||
if err := store.saveIAMConfig(ctx, newIAMFormatVersion1(), path); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
@ -462,7 +462,7 @@ func setDefaultCannedPolicies(policies map[string]PolicyDoc) {
|
||||
// LoadIAMCache reads all IAM items and populates a new iamCache object and
|
||||
// replaces the in-memory cache object.
|
||||
func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error {
|
||||
bootstrapTrace("loading IAM data")
|
||||
bootstrapTraceMsg("loading IAM data")
|
||||
|
||||
newCache := newIamCache()
|
||||
|
||||
@ -475,7 +475,7 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error {
|
||||
}
|
||||
} else {
|
||||
|
||||
bootstrapTrace("loading policy documents")
|
||||
bootstrapTraceMsg("loading policy documents")
|
||||
if err := store.loadPolicyDocs(ctx, newCache.iamPolicyDocsMap); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -484,41 +484,41 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error {
|
||||
setDefaultCannedPolicies(newCache.iamPolicyDocsMap)
|
||||
|
||||
if store.getUsersSysType() == MinIOUsersSysType {
|
||||
bootstrapTrace("loading regular users")
|
||||
bootstrapTraceMsg("loading regular users")
|
||||
if err := store.loadUsers(ctx, regUser, newCache.iamUsersMap); err != nil {
|
||||
return err
|
||||
}
|
||||
bootstrapTrace("loading regular groups")
|
||||
bootstrapTraceMsg("loading regular groups")
|
||||
if err := store.loadGroups(ctx, newCache.iamGroupsMap); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
bootstrapTrace("loading user policy mapping")
|
||||
bootstrapTraceMsg("loading user policy mapping")
|
||||
// load polices mapped to users
|
||||
if err := store.loadMappedPolicies(ctx, regUser, false, newCache.iamUserPolicyMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("loading group policy mapping")
|
||||
bootstrapTraceMsg("loading group policy mapping")
|
||||
// load policies mapped to groups
|
||||
if err := store.loadMappedPolicies(ctx, regUser, true, newCache.iamGroupPolicyMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("loading service accounts")
|
||||
bootstrapTraceMsg("loading service accounts")
|
||||
// load service accounts
|
||||
if err := store.loadUsers(ctx, svcUser, newCache.iamUsersMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("loading STS users")
|
||||
bootstrapTraceMsg("loading STS users")
|
||||
// load STS temp users
|
||||
if err := store.loadUsers(ctx, stsUser, newCache.iamUsersMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bootstrapTrace("loading STS policy mapping")
|
||||
bootstrapTraceMsg("loading STS policy mapping")
|
||||
// load STS policy mappings
|
||||
if err := store.loadMappedPolicies(ctx, stsUser, false, newCache.iamUserPolicyMap); err != nil {
|
||||
return err
|
||||
|
30
cmd/iam.go
30
cmd/iam.go
@ -187,7 +187,7 @@ func (sys *IAMSys) Initialized() bool {
|
||||
}
|
||||
|
||||
// Load - loads all credentials, policies and policy mappings.
|
||||
func (sys *IAMSys) Load(ctx context.Context) error {
|
||||
func (sys *IAMSys) Load(ctx context.Context, firstTime bool) error {
|
||||
loadStartTime := time.Now()
|
||||
err := sys.store.LoadIAMCache(ctx)
|
||||
if err != nil {
|
||||
@ -200,6 +200,10 @@ func (sys *IAMSys) Load(ctx context.Context) error {
|
||||
atomic.StoreUint64(&sys.LastRefreshTimeUnixNano, uint64(loadStartTime.Add(loadDuration).UnixNano()))
|
||||
atomic.AddUint64(&sys.TotalRefreshSuccesses, 1)
|
||||
|
||||
if firstTime {
|
||||
bootstrapTraceMsg(fmt.Sprintf("globalIAMSys.Load(): (duration: %s)", loadDuration))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-sys.configLoaded:
|
||||
default:
|
||||
@ -210,7 +214,7 @@ func (sys *IAMSys) Load(ctx context.Context) error {
|
||||
|
||||
// Init - initializes config system by reading entries from config/iam
|
||||
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etcd.Client, iamRefreshInterval time.Duration) {
|
||||
bootstrapTrace("IAM initialization started")
|
||||
bootstrapTraceMsg("IAM initialization started")
|
||||
globalServerConfigMu.RLock()
|
||||
s := globalServerConfig
|
||||
globalServerConfigMu.RUnlock()
|
||||
@ -300,7 +304,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
|
||||
|
||||
// Load IAM data from storage.
|
||||
for {
|
||||
if err := sys.Load(retryCtx); err != nil {
|
||||
if err := sys.Load(retryCtx, true); err != nil {
|
||||
if configRetriableErrors(err) {
|
||||
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
|
||||
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
|
||||
@ -313,8 +317,6 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
|
||||
break
|
||||
}
|
||||
|
||||
bootstrapTrace("finishing IAM loading")
|
||||
|
||||
refreshInterval := sys.iamRefreshInterval
|
||||
|
||||
// Set up polling for expired accounts and credentials purging.
|
||||
@ -371,6 +373,8 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
|
||||
}
|
||||
|
||||
sys.printIAMRoles()
|
||||
|
||||
bootstrapTraceMsg("finishing IAM loading")
|
||||
}
|
||||
|
||||
func (sys *IAMSys) validateAndAddRolePolicyMappings(ctx context.Context, m map[arn.ARN]string) {
|
||||
@ -446,7 +450,7 @@ func (sys *IAMSys) watch(ctx context.Context) {
|
||||
select {
|
||||
case <-timer.C:
|
||||
refreshStart := time.Now()
|
||||
if err := sys.Load(ctx); err != nil {
|
||||
if err := sys.Load(ctx, false); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Failure in periodic refresh for IAM (took %.2fs): %v", time.Since(refreshStart).Seconds(), err))
|
||||
} else {
|
||||
took := time.Since(refreshStart).Seconds()
|
||||
@ -590,12 +594,7 @@ func (sys *IAMSys) ListPolicies(ctx context.Context, bucketName string) (map[str
|
||||
return nil, errServerNotInitialized
|
||||
}
|
||||
|
||||
select {
|
||||
case <-sys.configLoaded:
|
||||
return sys.store.ListPolicies(ctx, bucketName)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return sys.store.ListPolicies(ctx, bucketName)
|
||||
}
|
||||
|
||||
// ListPolicyDocs - lists all canned policy docs.
|
||||
@ -604,12 +603,7 @@ func (sys *IAMSys) ListPolicyDocs(ctx context.Context, bucketName string) (map[s
|
||||
return nil, errServerNotInitialized
|
||||
}
|
||||
|
||||
select {
|
||||
case <-sys.configLoaded:
|
||||
return sys.store.ListPolicyDocs(ctx, bucketName)
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return sys.store.ListPolicyDocs(ctx, bucketName)
|
||||
}
|
||||
|
||||
// SetPolicy - sets a new named policy.
|
||||
|
@ -392,23 +392,51 @@ func configRetriableErrors(err error) bool {
|
||||
errors.Is(err, os.ErrDeadlineExceeded)
|
||||
}
|
||||
|
||||
func bootstrapTrace(msg string) {
|
||||
globalBootstrapTracer.Record(msg, 2)
|
||||
func bootstrapTraceMsg(msg string) {
|
||||
info := madmin.TraceInfo{
|
||||
TraceType: madmin.TraceBootstrap,
|
||||
Time: UTCNow(),
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: "BOOTSTRAP",
|
||||
Message: fmt.Sprintf("%s %s", getSource(2), msg),
|
||||
}
|
||||
globalBootstrapTracer.Record(info)
|
||||
|
||||
if serverDebugLog {
|
||||
logger.Info(fmt.Sprint(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg))
|
||||
}
|
||||
|
||||
noSubs := globalTrace.NumSubscribers(madmin.TraceBootstrap) == 0
|
||||
if noSubs {
|
||||
return
|
||||
}
|
||||
|
||||
globalTrace.Publish(info)
|
||||
}
|
||||
|
||||
func bootstrapTrace(msg string, worker func()) {
|
||||
if serverDebugLog {
|
||||
logger.Info(fmt.Sprint(time.Now().Round(time.Millisecond).Format(time.RFC3339), " bootstrap: ", msg))
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
worker()
|
||||
dur := time.Since(now)
|
||||
|
||||
info := madmin.TraceInfo{
|
||||
TraceType: madmin.TraceBootstrap,
|
||||
Time: UTCNow(),
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: "BOOTSTRAP",
|
||||
Message: fmt.Sprintf("%s %s (duration: %s)", getSource(2), msg, dur),
|
||||
}
|
||||
globalBootstrapTracer.Record(info)
|
||||
|
||||
if globalTrace.NumSubscribers(madmin.TraceBootstrap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
globalTrace.Publish(madmin.TraceInfo{
|
||||
TraceType: madmin.TraceBootstrap,
|
||||
Time: time.Now().UTC(),
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: "BOOTSTRAP",
|
||||
Message: fmt.Sprintf("%s %s", getSource(2), msg),
|
||||
})
|
||||
globalTrace.Publish(info)
|
||||
}
|
||||
|
||||
func initServerConfig(ctx context.Context, newObject ObjectLayer) error {
|
||||
@ -548,40 +576,40 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
setDefaultProfilerRates()
|
||||
|
||||
// Initialize globalConsoleSys system
|
||||
bootstrapTrace("newConsoleLogger")
|
||||
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
||||
logger.AddSystemTarget(GlobalContext, globalConsoleSys)
|
||||
|
||||
// Perform any self-tests
|
||||
bootstrapTrace("selftests")
|
||||
bitrotSelfTest()
|
||||
erasureSelfTest()
|
||||
compressSelfTest()
|
||||
|
||||
// Handle all server environment vars.
|
||||
bootstrapTrace("serverHandleEnvVars")
|
||||
serverHandleEnvVars()
|
||||
|
||||
// Handle all server command args.
|
||||
bootstrapTrace("serverHandleCmdArgs")
|
||||
serverHandleCmdArgs(ctx)
|
||||
bootstrapTrace("serverHandleCmdArgs", func() {
|
||||
serverHandleCmdArgs(ctx)
|
||||
})
|
||||
|
||||
// Initialize globalConsoleSys system
|
||||
bootstrapTrace("newConsoleLogger", func() {
|
||||
globalConsoleSys = NewConsoleLogger(GlobalContext)
|
||||
logger.AddSystemTarget(GlobalContext, globalConsoleSys)
|
||||
|
||||
// Set node name, only set for distributed setup.
|
||||
globalConsoleSys.SetNodeName(globalLocalNodeName)
|
||||
})
|
||||
|
||||
// Perform any self-tests
|
||||
bootstrapTrace("selftests", func() {
|
||||
bitrotSelfTest()
|
||||
erasureSelfTest()
|
||||
compressSelfTest()
|
||||
})
|
||||
|
||||
// Initialize KMS configuration
|
||||
bootstrapTrace("handleKMSConfig")
|
||||
handleKMSConfig()
|
||||
|
||||
// Set node name, only set for distributed setup.
|
||||
bootstrapTrace("setNodeName")
|
||||
globalConsoleSys.SetNodeName(globalLocalNodeName)
|
||||
bootstrapTrace("handleKMSConfig", handleKMSConfig)
|
||||
|
||||
// Initialize all help
|
||||
bootstrapTrace("initHelp")
|
||||
initHelp()
|
||||
bootstrapTrace("initHelp", initHelp)
|
||||
|
||||
// Initialize all sub-systems
|
||||
bootstrapTrace("initAllSubsystems")
|
||||
initAllSubsystems(GlobalContext)
|
||||
bootstrapTrace("initAllSubsystems", func() {
|
||||
initAllSubsystems(GlobalContext)
|
||||
})
|
||||
|
||||
// Is distributed setup, error out if no certificates are found for HTTPS endpoints.
|
||||
if globalIsDistErasure {
|
||||
@ -597,14 +625,16 @@ func serverMain(ctx *cli.Context) {
|
||||
go func() {
|
||||
if !globalCLIContext.Quiet && !globalInplaceUpdateDisabled {
|
||||
// Check for new updates from dl.min.io.
|
||||
bootstrapTrace("checkUpdate")
|
||||
checkUpdate(getMinioMode())
|
||||
bootstrapTrace("checkUpdate", func() {
|
||||
checkUpdate(getMinioMode())
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
// Set system resources to maximum.
|
||||
bootstrapTrace("setMaxResources")
|
||||
setMaxResources()
|
||||
bootstrapTrace("setMaxResources", func() {
|
||||
_ = setMaxResources()
|
||||
})
|
||||
|
||||
// Verify kernel release and version.
|
||||
if oldLinux() {
|
||||
@ -617,64 +647,65 @@ func serverMain(ctx *cli.Context) {
|
||||
logger.Info(color.RedBoldf("WARNING: Detected GOMAXPROCS(%d) < NumCPU(%d), please make sure to provide all PROCS to MinIO for optimal performance", maxProcs, cpuProcs))
|
||||
}
|
||||
|
||||
// Configure server.
|
||||
bootstrapTrace("configureServerHandler")
|
||||
handler, err := configureServerHandler(globalEndpoints)
|
||||
if err != nil {
|
||||
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
|
||||
}
|
||||
|
||||
var getCert certs.GetCertificateFunc
|
||||
if globalTLSCerts != nil {
|
||||
getCert = globalTLSCerts.GetCertificate
|
||||
}
|
||||
|
||||
bootstrapTrace("xhttp.NewServer")
|
||||
httpServer := xhttp.NewServer(getServerListenAddrs()).
|
||||
UseHandler(setCriticalErrorHandler(corsHandler(handler))).
|
||||
UseTLSConfig(newTLSConfig(getCert)).
|
||||
UseShutdownTimeout(ctx.Duration("shutdown-timeout")).
|
||||
UseIdleTimeout(ctx.Duration("idle-timeout")).
|
||||
UseReadHeaderTimeout(ctx.Duration("read-header-timeout")).
|
||||
UseBaseContext(GlobalContext).
|
||||
UseCustomLogger(log.New(io.Discard, "", 0)). // Turn-off random logging by Go stdlib
|
||||
UseTCPOptions(globalTCPOptions)
|
||||
|
||||
httpServer.TCPOptions.Trace = bootstrapTrace
|
||||
go func() {
|
||||
serveFn, err := httpServer.Init(GlobalContext, func(listenAddr string, err error) {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Unable to listen on `%s`: %v", listenAddr, err))
|
||||
})
|
||||
// Configure server.
|
||||
bootstrapTrace("configureServer", func() {
|
||||
handler, err := configureServerHandler(globalEndpoints)
|
||||
if err != nil {
|
||||
globalHTTPServerErrorCh <- err
|
||||
return
|
||||
logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services")
|
||||
}
|
||||
globalHTTPServerErrorCh <- serveFn()
|
||||
}()
|
||||
|
||||
bootstrapTrace("setHTTPServer")
|
||||
setHTTPServer(httpServer)
|
||||
httpServer := xhttp.NewServer(getServerListenAddrs()).
|
||||
UseHandler(setCriticalErrorHandler(corsHandler(handler))).
|
||||
UseTLSConfig(newTLSConfig(getCert)).
|
||||
UseShutdownTimeout(ctx.Duration("shutdown-timeout")).
|
||||
UseIdleTimeout(ctx.Duration("idle-timeout")).
|
||||
UseReadHeaderTimeout(ctx.Duration("read-header-timeout")).
|
||||
UseBaseContext(GlobalContext).
|
||||
UseCustomLogger(log.New(io.Discard, "", 0)). // Turn-off random logging by Go stdlib
|
||||
UseTCPOptions(globalTCPOptions)
|
||||
|
||||
httpServer.TCPOptions.Trace = bootstrapTraceMsg
|
||||
go func() {
|
||||
serveFn, err := httpServer.Init(GlobalContext, func(listenAddr string, err error) {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Unable to listen on `%s`: %v", listenAddr, err))
|
||||
})
|
||||
if err != nil {
|
||||
globalHTTPServerErrorCh <- err
|
||||
return
|
||||
}
|
||||
globalHTTPServerErrorCh <- serveFn()
|
||||
}()
|
||||
|
||||
setHTTPServer(httpServer)
|
||||
})
|
||||
|
||||
if globalIsDistErasure {
|
||||
bootstrapTrace("verifying system configuration")
|
||||
// Additionally in distributed setup, validate the setup and configuration.
|
||||
if err := verifyServerSystemConfig(GlobalContext, globalEndpoints); err != nil {
|
||||
logger.Fatal(err, "Unable to start the server")
|
||||
}
|
||||
bootstrapTrace("verifying system configuration", func() {
|
||||
// Additionally in distributed setup, validate the setup and configuration.
|
||||
if err := verifyServerSystemConfig(GlobalContext, globalEndpoints); err != nil {
|
||||
logger.Fatal(err, "Unable to start the server")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if !globalDisableFreezeOnBoot {
|
||||
// Freeze the services until the bucket notification subsystem gets initialized.
|
||||
bootstrapTrace("freezeServices")
|
||||
freezeServices()
|
||||
bootstrapTrace("freezeServices", freezeServices)
|
||||
}
|
||||
|
||||
bootstrapTrace("newObjectLayer")
|
||||
newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
|
||||
if err != nil {
|
||||
logFatalErrs(err, Endpoint{}, true)
|
||||
}
|
||||
bootstrapTrace("newObjectLayer (initialized)")
|
||||
var newObject ObjectLayer
|
||||
bootstrapTrace("newObjectLayer", func() {
|
||||
var err error
|
||||
newObject, err = newObjectLayer(GlobalContext, globalEndpoints)
|
||||
if err != nil {
|
||||
logFatalErrs(err, Endpoint{}, true)
|
||||
}
|
||||
})
|
||||
|
||||
xhttp.SetDeploymentID(globalDeploymentID)
|
||||
xhttp.SetMinIOVersion(Version)
|
||||
@ -688,43 +719,49 @@ func serverMain(ctx *cli.Context) {
|
||||
globalNodeNamesHex[hex.EncodeToString(nodeNameSum[:])] = struct{}{}
|
||||
}
|
||||
|
||||
bootstrapTrace("newSharedLock")
|
||||
globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock")
|
||||
bootstrapTrace("newSharedLock", func() {
|
||||
globalLeaderLock = newSharedLock(GlobalContext, newObject, "leader.lock")
|
||||
})
|
||||
|
||||
// Enable background operations on
|
||||
//
|
||||
// - Disk auto healing
|
||||
// - MRF (most recently failed) healing
|
||||
// - Background expiration routine for lifecycle policies
|
||||
bootstrapTrace("initAutoHeal")
|
||||
initAutoHeal(GlobalContext, newObject)
|
||||
bootstrapTrace("initAutoHeal", func() {
|
||||
initAutoHeal(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("initHealMRF")
|
||||
initHealMRF(GlobalContext, newObject)
|
||||
bootstrapTrace("initHealMRF", func() {
|
||||
initHealMRF(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("initBackgroundExpiry")
|
||||
initBackgroundExpiry(GlobalContext, newObject)
|
||||
bootstrapTrace("initBackgroundExpiry", func() {
|
||||
initBackgroundExpiry(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("initServerConfig")
|
||||
if err = initServerConfig(GlobalContext, newObject); err != nil {
|
||||
var cerr config.Err
|
||||
// For any config error, we don't need to drop into safe-mode
|
||||
// instead its a user error and should be fixed by user.
|
||||
if errors.As(err, &cerr) {
|
||||
logger.FatalIf(err, "Unable to initialize the server")
|
||||
var err error
|
||||
bootstrapTrace("initServerConfig", func() {
|
||||
if err = initServerConfig(GlobalContext, newObject); err != nil {
|
||||
var cerr config.Err
|
||||
// For any config error, we don't need to drop into safe-mode
|
||||
// instead its a user error and should be fixed by user.
|
||||
if errors.As(err, &cerr) {
|
||||
logger.FatalIf(err, "Unable to initialize the server")
|
||||
}
|
||||
|
||||
// If context was canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.FatalIf(err, "Server startup canceled upon user request")
|
||||
}
|
||||
|
||||
logger.LogIf(GlobalContext, err)
|
||||
}
|
||||
|
||||
// If context was canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.FatalIf(err, "Server startup canceled upon user request")
|
||||
if !globalCLIContext.StrictS3Compat {
|
||||
logger.Info(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production"))
|
||||
}
|
||||
|
||||
logger.LogIf(GlobalContext, err)
|
||||
}
|
||||
|
||||
if !globalCLIContext.StrictS3Compat {
|
||||
logger.Info(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production"))
|
||||
}
|
||||
})
|
||||
|
||||
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
||||
msg := fmt.Sprintf("WARNING: Detected default credentials '%s', we recommend that you change these values with 'MINIO_ROOT_USER' and 'MINIO_ROOT_PASSWORD' environment variables",
|
||||
@ -734,43 +771,44 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
// Initialize users credentials and policies in background right after config has initialized.
|
||||
go func() {
|
||||
bootstrapTrace("globalIAMSys.Init")
|
||||
globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||
bootstrapTrace("globalIAMSys.Initialized")
|
||||
bootstrapTrace("globalIAMSys.Init", func() {
|
||||
globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||
})
|
||||
|
||||
// Initialize Console UI
|
||||
if globalBrowserEnabled {
|
||||
bootstrapTrace("initConsoleServer")
|
||||
srv, err := initConsoleServer()
|
||||
if err != nil {
|
||||
logger.FatalIf(err, "Unable to initialize console service")
|
||||
}
|
||||
bootstrapTrace("initConsoleServer", func() {
|
||||
srv, err := initConsoleServer()
|
||||
if err != nil {
|
||||
logger.FatalIf(err, "Unable to initialize console service")
|
||||
}
|
||||
|
||||
bootstrapTrace("setConsoleSrv")
|
||||
setConsoleSrv(srv)
|
||||
setConsoleSrv(srv)
|
||||
|
||||
go func() {
|
||||
logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
|
||||
}()
|
||||
go func() {
|
||||
logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// if we see FTP args, start FTP if possible
|
||||
if len(ctx.StringSlice("ftp")) > 0 {
|
||||
bootstrapTrace("startFTPServer")
|
||||
go startFTPServer(ctx)
|
||||
bootstrapTrace("go startFTPServer", func() {
|
||||
go startFTPServer(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// If we see SFTP args, start SFTP if possible
|
||||
if len(ctx.StringSlice("sftp")) > 0 {
|
||||
bootstrapTrace("startFTPServer")
|
||||
go startSFTPServer(ctx)
|
||||
bootstrapTrace("go startFTPServer", func() {
|
||||
go startSFTPServer(ctx)
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if !globalDisableFreezeOnBoot {
|
||||
defer unfreezeServices()
|
||||
defer bootstrapTrace("unfreezeServices")
|
||||
defer bootstrapTrace("unfreezeServices", unfreezeServices)
|
||||
t := time.AfterFunc(5*time.Minute, func() {
|
||||
logger.Info(color.Yellow("WARNING: Taking more time to initialize the config subsystem. Please set '_MINIO_DISABLE_API_FREEZE_ON_BOOT=true' to not freeze the APIs"))
|
||||
})
|
||||
@ -778,32 +816,38 @@ func serverMain(ctx *cli.Context) {
|
||||
}
|
||||
|
||||
// Initialize data scanner.
|
||||
bootstrapTrace("initDataScanner")
|
||||
initDataScanner(GlobalContext, newObject)
|
||||
bootstrapTrace("initDataScanner", func() {
|
||||
initDataScanner(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
// Initialize background replication
|
||||
bootstrapTrace("initBackgroundReplication")
|
||||
initBackgroundReplication(GlobalContext, newObject)
|
||||
bootstrapTrace("initBackgroundReplication", func() {
|
||||
initBackgroundReplication(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
bootstrapTrace("globalTransitionState.Init")
|
||||
globalTransitionState.Init(newObject)
|
||||
bootstrapTrace("globalTransitionState.Init", func() {
|
||||
globalTransitionState.Init(newObject)
|
||||
})
|
||||
|
||||
// Initialize batch job pool.
|
||||
bootstrapTrace("newBatchJobPool")
|
||||
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
|
||||
bootstrapTrace("newBatchJobPool", func() {
|
||||
globalBatchJobPool = newBatchJobPool(GlobalContext, newObject, 100)
|
||||
})
|
||||
|
||||
// Initialize the license update job
|
||||
bootstrapTrace("initLicenseUpdateJob")
|
||||
initLicenseUpdateJob(GlobalContext, newObject)
|
||||
bootstrapTrace("initLicenseUpdateJob", func() {
|
||||
initLicenseUpdateJob(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
go func() {
|
||||
// Initialize transition tier configuration manager
|
||||
bootstrapTrace("globalTierConfigMgr.Init")
|
||||
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
} else {
|
||||
logger.FatalIf(globalTierJournal.Init(GlobalContext), "Unable to initialize remote tier pending deletes journal")
|
||||
}
|
||||
bootstrapTrace("globalTierConfigMgr.Init", func() {
|
||||
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
} else {
|
||||
logger.FatalIf(globalTierJournal.Init(GlobalContext), "Unable to initialize remote tier pending deletes journal")
|
||||
}
|
||||
})
|
||||
}()
|
||||
|
||||
// initialize the new disk cache objects.
|
||||
@ -817,40 +861,45 @@ func serverMain(ctx *cli.Context) {
|
||||
}
|
||||
|
||||
// Initialize bucket notification system.
|
||||
bootstrapTrace("initBucketTargets")
|
||||
logger.LogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject))
|
||||
bootstrapTrace("initBucketTargets", func() {
|
||||
logger.LogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject))
|
||||
})
|
||||
|
||||
var buckets []BucketInfo
|
||||
// List buckets to initialize bucket metadata sub-sys.
|
||||
bootstrapTrace("listBuckets")
|
||||
buckets, err := newObject.ListBuckets(GlobalContext, BucketOptions{})
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Unable to list buckets to initialize bucket metadata sub-system: %w", err))
|
||||
}
|
||||
bootstrapTrace("listBuckets", func() {
|
||||
buckets, err = newObject.ListBuckets(GlobalContext, BucketOptions{})
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("Unable to list buckets to initialize bucket metadata sub-system: %w", err))
|
||||
}
|
||||
})
|
||||
|
||||
// Initialize bucket metadata sub-system.
|
||||
bootstrapTrace("globalBucketMetadataSys.Init")
|
||||
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
|
||||
bootstrapTrace("globalBucketMetadataSys.Initialized")
|
||||
bootstrapTrace("globalBucketMetadataSys.Init", func() {
|
||||
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
|
||||
})
|
||||
|
||||
// initialize replication resync state.
|
||||
bootstrapTrace("initResync")
|
||||
globalReplicationPool.initResync(GlobalContext, buckets, newObject)
|
||||
bootstrapTrace("initResync", func() {
|
||||
globalReplicationPool.initResync(GlobalContext, buckets, newObject)
|
||||
})
|
||||
|
||||
// Initialize site replication manager after bucket metadata
|
||||
bootstrapTrace("globalSiteReplicationSys.Init")
|
||||
globalSiteReplicationSys.Init(GlobalContext, newObject)
|
||||
bootstrapTrace("globalSiteReplicationSys.Initialized")
|
||||
bootstrapTrace("globalSiteReplicationSys.Init", func() {
|
||||
globalSiteReplicationSys.Init(GlobalContext, newObject)
|
||||
})
|
||||
|
||||
// Initialize quota manager.
|
||||
bootstrapTrace("globalBucketQuotaSys.Init")
|
||||
globalBucketQuotaSys.Init(newObject)
|
||||
bootstrapTrace("globalBucketQuotaSys.Initialized")
|
||||
bootstrapTrace("globalBucketQuotaSys.Init", func() {
|
||||
globalBucketQuotaSys.Init(newObject)
|
||||
})
|
||||
|
||||
// Populate existing buckets to the etcd backend
|
||||
if globalDNSConfig != nil {
|
||||
// Background this operation.
|
||||
bootstrapTrace("go initFederatorBackend")
|
||||
go initFederatorBackend(buckets, newObject)
|
||||
bootstrapTrace("go initFederatorBackend", func() {
|
||||
go initFederatorBackend(buckets, newObject)
|
||||
})
|
||||
}
|
||||
|
||||
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
|
||||
@ -866,14 +915,15 @@ func serverMain(ctx *cli.Context) {
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
bootstrapTrace("globalMinioClient")
|
||||
globalMinioClient, err = minio.New(globalLocalNodeName, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(globalActiveCred.AccessKey, globalActiveCred.SecretKey, ""),
|
||||
Secure: globalIsTLS,
|
||||
Transport: globalProxyTransport,
|
||||
Region: region,
|
||||
bootstrapTrace("globalMinioClient", func() {
|
||||
globalMinioClient, err = minio.New(globalLocalNodeName, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(globalActiveCred.AccessKey, globalActiveCred.SecretKey, ""),
|
||||
Secure: globalIsTLS,
|
||||
Transport: globalProxyTransport,
|
||||
Region: region,
|
||||
})
|
||||
logger.FatalIf(err, "Unable to initialize MinIO client")
|
||||
})
|
||||
logger.FatalIf(err, "Unable to initialize MinIO client")
|
||||
|
||||
// Add User-Agent to differentiate the requests.
|
||||
globalMinioClient.SetAppInfo("minio-perf-test", ReleaseTag)
|
||||
|
Loading…
x
Reference in New Issue
Block a user