mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
speed-up startup time, do not block on ListBuckets() (#14240)
Bonus fixes #13816
This commit is contained in:
parent
2480c66857
commit
0cac868a36
@ -78,7 +78,7 @@ func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, erro
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
// Setup admin mgmt REST API handlers.
|
// Setup admin mgmt REST API handlers.
|
||||||
adminRouter := mux.NewRouter()
|
adminRouter := mux.NewRouter()
|
||||||
|
@ -369,7 +369,7 @@ func TestIsReqAuthenticated(t *testing.T) {
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
creds, err := auth.CreateCredentials("myuser", "mypassword")
|
creds, err := auth.CreateCredentials("myuser", "mypassword")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -459,7 +459,7 @@ func TestValidateAdminSignature(t *testing.T) {
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
creds, err := auth.CreateCredentials("admin", "mypassword")
|
creds, err := auth.CreateCredentials("admin", "mypassword")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -205,11 +205,11 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointS
|
|||||||
for onlineServers < len(clnts)/2 {
|
for onlineServers < len(clnts)/2 {
|
||||||
for _, clnt := range clnts {
|
for _, clnt := range clnts {
|
||||||
if err := clnt.Verify(ctx, srcCfg); err != nil {
|
if err := clnt.Verify(ctx, srcCfg); err != nil {
|
||||||
if isNetworkError(err) {
|
if !isNetworkError(err) {
|
||||||
offlineEndpoints = append(offlineEndpoints, clnt.String())
|
logger.Info(fmt.Errorf("%s has incorrect configuration: %w", clnt.String(), err).Error())
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
return fmt.Errorf("%s as has incorrect configuration: %w", clnt.String(), err)
|
offlineEndpoints = append(offlineEndpoints, clnt.String())
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
onlineServers++
|
onlineServers++
|
||||||
}
|
}
|
||||||
|
@ -434,6 +434,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
|||||||
_, _ = objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
|
_, _ = objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
|
||||||
// Ensure heal opts for bucket metadata be deep healed all the time.
|
// Ensure heal opts for bucket metadata be deep healed all the time.
|
||||||
ScanMode: madmin.HealDeepScan,
|
ScanMode: madmin.HealDeepScan,
|
||||||
|
Recreate: true,
|
||||||
})
|
})
|
||||||
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
|
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -298,15 +298,17 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
|||||||
globalObjectAPI = newObject
|
globalObjectAPI = newObject
|
||||||
globalObjLayerMutex.Unlock()
|
globalObjLayerMutex.Unlock()
|
||||||
|
|
||||||
|
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||||
|
|
||||||
if gatewayName == NASBackendGateway {
|
if gatewayName == NASBackendGateway {
|
||||||
buckets, err := newObject.ListBuckets(GlobalContext)
|
buckets, err := newObject.ListBuckets(GlobalContext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err, "Unable to list buckets")
|
logger.Fatal(err, "Unable to list buckets")
|
||||||
}
|
}
|
||||||
logger.FatalIf(globalNotificationSys.Init(GlobalContext, buckets, newObject), "Unable to initialize notification system")
|
logger.FatalIf(globalBucketMetadataSys.Init(GlobalContext, buckets, newObject), "Unable to initialize bucket metadata")
|
||||||
}
|
|
||||||
|
|
||||||
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalNotificationSys, globalRefreshIAMInterval)
|
logger.FatalIf(globalNotificationSys.Init(GlobalContext, newObject), "Unable to initialize notification system")
|
||||||
|
}
|
||||||
|
|
||||||
if globalCacheConfig.Enabled {
|
if globalCacheConfig.Enabled {
|
||||||
// initialize the new disk cache objects.
|
// initialize the new disk cache objects.
|
||||||
|
20
cmd/iam.go
20
cmd/iam.go
@ -66,7 +66,6 @@ type IAMSys struct {
|
|||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
iamRefreshInterval time.Duration
|
iamRefreshInterval time.Duration
|
||||||
notificationSys *NotificationSys
|
|
||||||
|
|
||||||
usersSysType UsersSysType
|
usersSysType UsersSysType
|
||||||
|
|
||||||
@ -198,12 +197,11 @@ func (sys *IAMSys) Load(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Init - initializes config system by reading entries from config/iam
|
// Init - initializes config system by reading entries from config/iam
|
||||||
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etcd.Client, nSys *NotificationSys, iamRefreshInterval time.Duration) {
|
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etcd.Client, iamRefreshInterval time.Duration) {
|
||||||
sys.Lock()
|
sys.Lock()
|
||||||
defer sys.Unlock()
|
defer sys.Unlock()
|
||||||
|
|
||||||
sys.iamRefreshInterval = iamRefreshInterval
|
sys.iamRefreshInterval = iamRefreshInterval
|
||||||
sys.notificationSys = nSys
|
|
||||||
|
|
||||||
// Initialize IAM store
|
// Initialize IAM store
|
||||||
sys.initStore(objAPI, etcdClient)
|
sys.initStore(objAPI, etcdClient)
|
||||||
@ -466,7 +464,7 @@ func (sys *IAMSys) DeletePolicy(ctx context.Context, policyName string, notifyPe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Notify all other MinIO peers to delete policy
|
// Notify all other MinIO peers to delete policy
|
||||||
for _, nerr := range sys.notificationSys.DeletePolicy(policyName) {
|
for _, nerr := range globalNotificationSys.DeletePolicy(policyName) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -524,7 +522,7 @@ func (sys *IAMSys) SetPolicy(ctx context.Context, policyName string, p iampolicy
|
|||||||
|
|
||||||
if !sys.HasWatcher() {
|
if !sys.HasWatcher() {
|
||||||
// Notify all other MinIO peers to reload policy
|
// Notify all other MinIO peers to reload policy
|
||||||
for _, nerr := range sys.notificationSys.LoadPolicy(policyName) {
|
for _, nerr := range globalNotificationSys.LoadPolicy(policyName) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -546,7 +544,7 @@ func (sys *IAMSys) DeleteUser(ctx context.Context, accessKey string, notifyPeers
|
|||||||
|
|
||||||
// Notify all other MinIO peers to delete user.
|
// Notify all other MinIO peers to delete user.
|
||||||
if notifyPeers && !sys.HasWatcher() {
|
if notifyPeers && !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.DeleteUser(accessKey) {
|
for _, nerr := range globalNotificationSys.DeleteUser(accessKey) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -572,7 +570,7 @@ func (sys *IAMSys) CurrentPolicies(policyName string) string {
|
|||||||
func (sys *IAMSys) notifyForUser(ctx context.Context, accessKey string, isTemp bool) {
|
func (sys *IAMSys) notifyForUser(ctx context.Context, accessKey string, isTemp bool) {
|
||||||
// Notify all other MinIO peers to reload user.
|
// Notify all other MinIO peers to reload user.
|
||||||
if !sys.HasWatcher() {
|
if !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.LoadUser(accessKey, isTemp) {
|
for _, nerr := range globalNotificationSys.LoadUser(accessKey, isTemp) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -735,7 +733,7 @@ func (sys *IAMSys) SetUserStatus(ctx context.Context, accessKey string, status m
|
|||||||
func (sys *IAMSys) notifyForServiceAccount(ctx context.Context, accessKey string) {
|
func (sys *IAMSys) notifyForServiceAccount(ctx context.Context, accessKey string) {
|
||||||
// Notify all other Minio peers to reload the service account
|
// Notify all other Minio peers to reload the service account
|
||||||
if !sys.HasWatcher() {
|
if !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.LoadServiceAccount(accessKey) {
|
for _, nerr := range globalNotificationSys.LoadServiceAccount(accessKey) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -940,7 +938,7 @@ func (sys *IAMSys) DeleteServiceAccount(ctx context.Context, accessKey string, n
|
|||||||
}
|
}
|
||||||
|
|
||||||
if notifyPeers && !sys.HasWatcher() {
|
if notifyPeers && !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.DeleteServiceAccount(accessKey) {
|
for _, nerr := range globalNotificationSys.DeleteServiceAccount(accessKey) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -1149,7 +1147,7 @@ func (sys *IAMSys) GetUser(ctx context.Context, accessKey string) (cred auth.Cre
|
|||||||
// Notify all other MinIO peers to load group.
|
// Notify all other MinIO peers to load group.
|
||||||
func (sys *IAMSys) notifyForGroup(ctx context.Context, group string) {
|
func (sys *IAMSys) notifyForGroup(ctx context.Context, group string) {
|
||||||
if !sys.HasWatcher() {
|
if !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.LoadGroup(group) {
|
for _, nerr := range globalNotificationSys.LoadGroup(group) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
@ -1256,7 +1254,7 @@ func (sys *IAMSys) PolicyDBSet(ctx context.Context, name, policy string, isGroup
|
|||||||
|
|
||||||
// Notify all other MinIO peers to reload policy
|
// Notify all other MinIO peers to reload policy
|
||||||
if !sys.HasWatcher() {
|
if !sys.HasWatcher() {
|
||||||
for _, nerr := range sys.notificationSys.LoadPolicyMapping(name, isGroup) {
|
for _, nerr := range globalNotificationSys.LoadPolicyMapping(name, isGroup) {
|
||||||
if nerr.Err != nil {
|
if nerr.Err != nil {
|
||||||
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
|
||||||
logger.LogIf(ctx, nerr.Err)
|
logger.LogIf(ctx, nerr.Err)
|
||||||
|
@ -674,7 +674,7 @@ func (sys *NotificationSys) set(bucket BucketInfo, meta BucketMetadata) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
|
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
|
||||||
func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
|
func (sys *NotificationSys) Init(ctx context.Context, objAPI ObjectLayer) error {
|
||||||
if objAPI == nil {
|
if objAPI == nil {
|
||||||
return errServerNotInitialized
|
return errServerNotInitialized
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/cli"
|
"github.com/minio/cli"
|
||||||
"github.com/minio/madmin-go"
|
|
||||||
"github.com/minio/minio/internal/auth"
|
"github.com/minio/minio/internal/auth"
|
||||||
"github.com/minio/minio/internal/bucket/bandwidth"
|
"github.com/minio/minio/internal/bucket/bandwidth"
|
||||||
"github.com/minio/minio/internal/color"
|
"github.com/minio/minio/internal/color"
|
||||||
@ -43,7 +42,6 @@ import (
|
|||||||
xhttp "github.com/minio/minio/internal/http"
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/minio/internal/rest"
|
"github.com/minio/minio/internal/rest"
|
||||||
"github.com/minio/minio/internal/sync/errgroup"
|
|
||||||
"github.com/minio/pkg/certs"
|
"github.com/minio/pkg/certs"
|
||||||
"github.com/minio/pkg/env"
|
"github.com/minio/pkg/env"
|
||||||
)
|
)
|
||||||
@ -300,7 +298,7 @@ func configRetriableErrors(err error) bool {
|
|||||||
errors.Is(err, os.ErrDeadlineExceeded)
|
errors.Is(err, os.ErrDeadlineExceeded)
|
||||||
}
|
}
|
||||||
|
|
||||||
func initServer(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error) {
|
func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||||
// Once the config is fully loaded, initialize the new object layer.
|
// Once the config is fully loaded, initialize the new object layer.
|
||||||
setObjectLayer(newObject)
|
setObjectLayer(newObject)
|
||||||
|
|
||||||
@ -316,7 +314,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
// Retry was canceled successfully.
|
// Retry was canceled successfully.
|
||||||
return nil, fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
|
return fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -348,16 +346,14 @@ func initServer(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error
|
|||||||
if err = handleEncryptedConfigBackend(newObject); err == nil {
|
if err = handleEncryptedConfigBackend(newObject); err == nil {
|
||||||
// Upon success migrating the config, initialize all sub-systems
|
// Upon success migrating the config, initialize all sub-systems
|
||||||
// if all sub-systems initialized successfully return right away
|
// if all sub-systems initialized successfully return right away
|
||||||
var buckets []BucketInfo
|
if err = initConfigSubsystem(lkctx.Context(), newObject); err == nil {
|
||||||
buckets, err = initConfigSubsystem(lkctx.Context(), newObject)
|
|
||||||
if err == nil {
|
|
||||||
txnLk.Unlock(lkctx.Cancel)
|
txnLk.Unlock(lkctx.Cancel)
|
||||||
// All successful return.
|
// All successful return.
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
||||||
logger.Info("All MinIO sub-systems initialized successfully")
|
logger.Info("All MinIO sub-systems initialized successfully")
|
||||||
}
|
}
|
||||||
return buckets, nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -371,58 +367,28 @@ func initServer(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Any other unhandled return right here.
|
// Any other unhandled return right here.
|
||||||
return nil, fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
return fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initConfigSubsystem(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error) {
|
func initConfigSubsystem(ctx context.Context, newObject ObjectLayer) error {
|
||||||
// %w is used by all error returns here to make sure
|
// %w is used by all error returns here to make sure
|
||||||
// we wrap the underlying error, make sure when you
|
// we wrap the underlying error, make sure when you
|
||||||
// are modifying this code that you do so, if and when
|
// are modifying this code that you do so, if and when
|
||||||
// you want to add extra context to your error. This
|
// you want to add extra context to your error. This
|
||||||
// ensures top level retry works accordingly.
|
// ensures top level retry works accordingly.
|
||||||
// List buckets to heal, and be re-used for loading configs.
|
|
||||||
|
|
||||||
buckets, err := newObject.ListBuckets(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Unable to list buckets to heal: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if globalIsErasure {
|
|
||||||
if len(buckets) > 0 {
|
|
||||||
if len(buckets) == 1 {
|
|
||||||
logger.Info(fmt.Sprintf("Verifying if %d bucket is consistent across drives...", len(buckets)))
|
|
||||||
} else {
|
|
||||||
logger.Info(fmt.Sprintf("Verifying if %d buckets are consistent across drives...", len(buckets)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Limit to no more than 50 concurrent buckets.
|
|
||||||
g := errgroup.WithNErrs(len(buckets)).WithConcurrency(50)
|
|
||||||
for index := range buckets {
|
|
||||||
index := index
|
|
||||||
g.Go(func() error {
|
|
||||||
_, herr := newObject.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true})
|
|
||||||
return herr
|
|
||||||
}, index)
|
|
||||||
}
|
|
||||||
for _, err := range g.Wait() {
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Unable to list buckets to heal: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize config system.
|
// Initialize config system.
|
||||||
if err = globalConfigSys.Init(newObject); err != nil {
|
if err := globalConfigSys.Init(newObject); err != nil {
|
||||||
if configRetriableErrors(err) {
|
if configRetriableErrors(err) {
|
||||||
return nil, fmt.Errorf("Unable to initialize config system: %w", err)
|
return fmt.Errorf("Unable to initialize config system: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Any other config errors we simply print a message and proceed forward.
|
// Any other config errors we simply print a message and proceed forward.
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize config, some features may be missing %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize config, some features may be missing %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return buckets, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverMain handler called for 'minio server' command.
|
// serverMain handler called for 'minio server' command.
|
||||||
@ -516,18 +482,9 @@ func serverMain(ctx *cli.Context) {
|
|||||||
setHTTPServer(httpServer)
|
setHTTPServer(httpServer)
|
||||||
|
|
||||||
if globalIsDistErasure && globalEndpoints.FirstLocal() {
|
if globalIsDistErasure && globalEndpoints.FirstLocal() {
|
||||||
for {
|
// Additionally in distributed setup, validate the setup and configuration.
|
||||||
// Additionally in distributed setup, validate the setup and configuration.
|
if err := verifyServerSystemConfig(GlobalContext, globalEndpoints); err != nil {
|
||||||
err := verifyServerSystemConfig(GlobalContext, globalEndpoints)
|
logger.Fatal(err, "Unable to start the server")
|
||||||
if err == nil || errors.Is(err, context.Canceled) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
logger.LogIf(GlobalContext, err, "Unable to initialize distributed setup, retrying.. after 5 seconds")
|
|
||||||
select {
|
|
||||||
case <-GlobalContext.Done():
|
|
||||||
return
|
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -546,8 +503,17 @@ func serverMain(ctx *cli.Context) {
|
|||||||
|
|
||||||
initBackgroundExpiry(GlobalContext, newObject)
|
initBackgroundExpiry(GlobalContext, newObject)
|
||||||
|
|
||||||
buckets, err := initServer(GlobalContext, newObject)
|
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
||||||
if err != nil {
|
msg := fmt.Sprintf("WARNING: Detected default credentials '%s', we recommend that you change these values with 'MINIO_ROOT_USER' and 'MINIO_ROOT_PASSWORD' environment variables",
|
||||||
|
globalActiveCred)
|
||||||
|
logStartupMessage(color.RedBold(msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalCLIContext.StrictS3Compat {
|
||||||
|
logStartupMessage(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = initServer(GlobalContext, newObject); err != nil {
|
||||||
var cerr config.Err
|
var cerr config.Err
|
||||||
// For any config error, we don't need to drop into safe-mode
|
// For any config error, we don't need to drop into safe-mode
|
||||||
// instead its a user error and should be fixed by user.
|
// instead its a user error and should be fixed by user.
|
||||||
@ -576,67 +542,67 @@ func serverMain(ctx *cli.Context) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate existing buckets to the etcd backend
|
|
||||||
if globalDNSConfig != nil {
|
|
||||||
// Background this operation.
|
|
||||||
go initFederatorBackend(buckets, newObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize bucket metadata sub-system.
|
|
||||||
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
|
|
||||||
|
|
||||||
// Initialize bucket notification sub-system.
|
|
||||||
globalNotificationSys.Init(GlobalContext, buckets, newObject)
|
|
||||||
|
|
||||||
// Initialize site replication manager.
|
|
||||||
globalSiteReplicationSys.Init(GlobalContext, newObject)
|
|
||||||
|
|
||||||
// Initialize quota manager.
|
|
||||||
globalBucketQuotaSys.Init(newObject)
|
|
||||||
|
|
||||||
// Initialize users credentials and policies in background right after config has initialized.
|
// Initialize users credentials and policies in background right after config has initialized.
|
||||||
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalNotificationSys, globalRefreshIAMInterval)
|
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||||
|
|
||||||
initDataScanner(GlobalContext, newObject)
|
// Background all other operations such as initializing bucket metadata etc.
|
||||||
|
go func() {
|
||||||
|
// Initialize transition tier configuration manager
|
||||||
|
if globalIsErasure {
|
||||||
|
initBackgroundReplication(GlobalContext, newObject)
|
||||||
|
initBackgroundTransition(GlobalContext, newObject)
|
||||||
|
|
||||||
// Initialize transition tier configuration manager
|
go func() {
|
||||||
if globalIsErasure {
|
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
|
||||||
initBackgroundReplication(GlobalContext, newObject)
|
logger.LogIf(GlobalContext, err)
|
||||||
initBackgroundTransition(GlobalContext, newObject)
|
}
|
||||||
|
|
||||||
go func() {
|
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
|
||||||
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
|
if err != nil {
|
||||||
logger.LogIf(GlobalContext, err)
|
logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal")
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
globalTierJournal, err = initTierDeletionJournal(GlobalContext)
|
// Initialize site replication manager.
|
||||||
if err != nil {
|
globalSiteReplicationSys.Init(GlobalContext, newObject)
|
||||||
logger.FatalIf(err, "Unable to initialize remote tier pending deletes journal")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize the new disk cache objects.
|
// Initialize quota manager.
|
||||||
if globalCacheConfig.Enabled {
|
globalBucketQuotaSys.Init(newObject)
|
||||||
logStartupMessage(color.Yellow("WARNING: Disk caching is deprecated for single/multi drive MinIO setups. Please migrate to using MinIO S3 gateway instead of disk caching"))
|
|
||||||
var cacheAPI CacheObjectLayer
|
|
||||||
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
|
|
||||||
logger.FatalIf(err, "Unable to initialize disk caching")
|
|
||||||
|
|
||||||
setCacheObjectLayer(cacheAPI)
|
initDataScanner(GlobalContext, newObject)
|
||||||
}
|
|
||||||
|
|
||||||
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
|
// List buckets to heal, and be re-used for loading configs.
|
||||||
go printStartupMessage(getAPIEndpoints(), err)
|
buckets, err := newObject.ListBuckets(GlobalContext)
|
||||||
|
if err != nil {
|
||||||
|
logger.LogIf(GlobalContext, fmt.Errorf("Unable to list buckets to heal: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
if globalActiveCred.Equal(auth.DefaultCredentials) {
|
// Populate existing buckets to the etcd backend
|
||||||
msg := fmt.Sprintf("WARNING: Detected default credentials '%s', we recommend that you change these values with 'MINIO_ROOT_USER' and 'MINIO_ROOT_PASSWORD' environment variables", globalActiveCred)
|
if globalDNSConfig != nil {
|
||||||
logStartupMessage(color.RedBold(msg))
|
// Background this operation.
|
||||||
}
|
go initFederatorBackend(buckets, newObject)
|
||||||
|
}
|
||||||
|
|
||||||
if !globalCLIContext.StrictS3Compat {
|
// Initialize bucket metadata sub-system.
|
||||||
logStartupMessage(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production"))
|
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
|
||||||
}
|
|
||||||
|
// Initialize bucket notification sub-system.
|
||||||
|
globalNotificationSys.Init(GlobalContext, newObject)
|
||||||
|
|
||||||
|
// initialize the new disk cache objects.
|
||||||
|
if globalCacheConfig.Enabled {
|
||||||
|
logStartupMessage(color.Yellow("WARNING: Disk caching is deprecated for single/multi drive MinIO setups. Please migrate to using MinIO S3 gateway instead of disk caching"))
|
||||||
|
var cacheAPI CacheObjectLayer
|
||||||
|
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
|
||||||
|
logger.FatalIf(err, "Unable to initialize disk caching")
|
||||||
|
|
||||||
|
setCacheObjectLayer(cacheAPI)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prints the formatted startup message, if err is not nil then it prints additional information as well.
|
||||||
|
printStartupMessage(getAPIEndpoints(), err)
|
||||||
|
}()
|
||||||
|
|
||||||
if serverDebugLog {
|
if serverDebugLog {
|
||||||
logger.Info("== DEBUG Mode enabled ==")
|
logger.Info("== DEBUG Mode enabled ==")
|
||||||
@ -656,6 +622,7 @@ func serverMain(ctx *cli.Context) {
|
|||||||
}
|
}
|
||||||
logger.Info("======")
|
logger.Info("======")
|
||||||
}
|
}
|
||||||
|
|
||||||
<-globalOSSignalCh
|
<-globalOSSignalCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ func TestCheckValid(t *testing.T) {
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
req, err := newTestRequest(http.MethodGet, "http://example.com:9000/bucket/object", 0, nil)
|
req, err := newTestRequest(http.MethodGet, "http://example.com:9000/bucket/object", 0, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -361,7 +361,7 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
return testServer
|
return testServer
|
||||||
}
|
}
|
||||||
@ -1522,7 +1522,7 @@ func initAPIHandlerTest(ctx context.Context, obj ObjectLayer, endpoints []string
|
|||||||
|
|
||||||
initConfigSubsystem(ctx, obj)
|
initConfigSubsystem(ctx, obj)
|
||||||
|
|
||||||
globalIAMSys.Init(ctx, obj, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, obj, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
// get random bucket name.
|
// get random bucket name.
|
||||||
bucketName := getRandomBucketName()
|
bucketName := getRandomBucketName()
|
||||||
@ -1807,7 +1807,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
|||||||
t.Fatal("Unexpected error", err)
|
t.Fatal("Unexpected error", err)
|
||||||
}
|
}
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
// Executing the object layer tests for single node setup.
|
// Executing the object layer tests for single node setup.
|
||||||
objTest(objLayer, FSTestStr, t)
|
objTest(objLayer, FSTestStr, t)
|
||||||
@ -1832,7 +1832,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
|||||||
}
|
}
|
||||||
setObjectLayer(objLayer)
|
setObjectLayer(objLayer)
|
||||||
initConfigSubsystem(ctx, objLayer)
|
initConfigSubsystem(ctx, objLayer)
|
||||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, globalNotificationSys, 2*time.Second)
|
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||||
|
|
||||||
// Executing the object layer tests for Erasure.
|
// Executing the object layer tests for Erasure.
|
||||||
objTest(objLayer, ErasureTestStr, t)
|
objTest(objLayer, ErasureTestStr, t)
|
||||||
|
Loading…
Reference in New Issue
Block a user