mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
de-couple bucket metadata loading with lock context (#13679)
avoid passing lock context while loading bucket metadata, refactor such that we can de-couple things for subsystem loading.
This commit is contained in:
parent
4caed7cc0d
commit
20c43c447d
@ -72,7 +72,7 @@ func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, erro
|
||||
|
||||
newAllSubsystems()
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
|
@ -367,7 +367,7 @@ func TestIsReqAuthenticated(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
@ -457,7 +457,7 @@ func TestValidateAdminSignature(t *testing.T) {
|
||||
|
||||
newAllSubsystems()
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
|
@ -287,7 +287,7 @@ func configRetriableErrors(err error) bool {
|
||||
errors.Is(err, os.ErrDeadlineExceeded)
|
||||
}
|
||||
|
||||
func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
func initServer(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error) {
|
||||
// Once the config is fully loaded, initialize the new object layer.
|
||||
setObjectLayer(newObject)
|
||||
|
||||
@ -310,7 +310,7 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Retry was canceled successfully.
|
||||
return fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
|
||||
return nil, fmt.Errorf("Initializing sub-systems stopped gracefully %w", ctx.Err())
|
||||
default:
|
||||
}
|
||||
|
||||
@ -335,14 +335,16 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
if err = handleEncryptedConfigBackend(newObject); err == nil {
|
||||
// Upon success migrating the config, initialize all sub-systems
|
||||
// if all sub-systems initialized successfully return right away
|
||||
if err = initAllSubsystems(lkctx.Context(), newObject); err == nil {
|
||||
var buckets []BucketInfo
|
||||
buckets, err = initConfigSubsystem(lkctx.Context(), newObject)
|
||||
if err == nil {
|
||||
txnLk.Unlock(lkctx.Cancel)
|
||||
// All successful return.
|
||||
if globalIsDistErasure {
|
||||
// These messages only meant primarily for distributed setup, so only log during distributed setup.
|
||||
logger.Info("All MinIO sub-systems initialized successfully")
|
||||
}
|
||||
return nil
|
||||
return buckets, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,11 +358,11 @@ func initServer(ctx context.Context, newObject ObjectLayer) error {
|
||||
}
|
||||
|
||||
// Any other unhandled return right here.
|
||||
return fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
||||
return nil, fmt.Errorf("Unable to initialize sub-systems: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
func initConfigSubsystem(ctx context.Context, newObject ObjectLayer) ([]BucketInfo, error) {
|
||||
// %w is used by all error returns here to make sure
|
||||
// we wrap the underlying error, make sure when you
|
||||
// are modifying this code that you do so, if and when
|
||||
@ -370,7 +372,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
|
||||
buckets, err := newObject.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||
return nil, fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||
}
|
||||
|
||||
if globalIsErasure {
|
||||
@ -393,7 +395,7 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
}
|
||||
for _, err := range g.Wait() {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||
return nil, fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -401,34 +403,13 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
// Initialize config system.
|
||||
if err = globalConfigSys.Init(newObject); err != nil {
|
||||
if configRetriableErrors(err) {
|
||||
return fmt.Errorf("Unable to initialize config system: %w", err)
|
||||
return nil, fmt.Errorf("Unable to initialize config system: %w", err)
|
||||
}
|
||||
// Any other config errors we simply print a message and proceed forward.
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize config, some features may be missing %w", err))
|
||||
}
|
||||
|
||||
// Populate existing buckets to the etcd backend
|
||||
if globalDNSConfig != nil {
|
||||
// Background this operation.
|
||||
go initFederatorBackend(buckets, newObject)
|
||||
}
|
||||
|
||||
// Initialize bucket metadata sub-system.
|
||||
globalBucketMetadataSys.Init(ctx, buckets, newObject)
|
||||
|
||||
// Initialize bucket notification sub-system.
|
||||
globalNotificationSys.Init(ctx, buckets, newObject)
|
||||
|
||||
// Initialize site replication manager.
|
||||
globalSiteReplicationSys.Init(ctx, newObject)
|
||||
|
||||
if globalIsErasure {
|
||||
// Initialize transition tier configuration manager
|
||||
if err = globalTierConfigMgr.Init(ctx, newObject); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
type nullWriter struct{}
|
||||
@ -553,7 +534,8 @@ func serverMain(ctx *cli.Context) {
|
||||
|
||||
initBackgroundExpiry(GlobalContext, newObject)
|
||||
|
||||
if err = initServer(GlobalContext, newObject); err != nil {
|
||||
buckets, err := initServer(GlobalContext, newObject)
|
||||
if err != nil {
|
||||
var cerr config.Err
|
||||
// For any config error, we don't need to drop into safe-mode
|
||||
// instead its a user error and should be fixed by user.
|
||||
@ -569,9 +551,31 @@ func serverMain(ctx *cli.Context) {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
}
|
||||
|
||||
// Populate existing buckets to the etcd backend
|
||||
if globalDNSConfig != nil {
|
||||
// Background this operation.
|
||||
go initFederatorBackend(buckets, newObject)
|
||||
}
|
||||
|
||||
// Initialize bucket metadata sub-system.
|
||||
globalBucketMetadataSys.Init(GlobalContext, buckets, newObject)
|
||||
|
||||
// Initialize bucket notification sub-system.
|
||||
globalNotificationSys.Init(GlobalContext, buckets, newObject)
|
||||
|
||||
// Initialize site replication manager.
|
||||
globalSiteReplicationSys.Init(GlobalContext, newObject)
|
||||
|
||||
// Initialize users credentials and policies in background right after config has initialized.
|
||||
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||
|
||||
// Initialize transition tier configuration manager
|
||||
if globalIsErasure {
|
||||
if err := globalTierConfigMgr.Init(GlobalContext, newObject); err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
}
|
||||
}
|
||||
|
||||
initDataScanner(GlobalContext, newObject)
|
||||
|
||||
if globalIsErasure { // to be done after config init
|
||||
|
@ -44,7 +44,7 @@ func TestCheckValid(t *testing.T) {
|
||||
|
||||
newAllSubsystems()
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
|
@ -357,7 +357,7 @@ func initTestServerWithBackend(ctx context.Context, t TestErrHandler, testServer
|
||||
|
||||
globalEtcdClient = nil
|
||||
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
@ -1521,7 +1521,7 @@ func removeDiskN(disks []string, n int) {
|
||||
func initAPIHandlerTest(ctx context.Context, obj ObjectLayer, endpoints []string) (string, http.Handler, error) {
|
||||
newAllSubsystems()
|
||||
|
||||
initAllSubsystems(ctx, obj)
|
||||
initConfigSubsystem(ctx, obj)
|
||||
|
||||
globalIAMSys.Init(ctx, obj, globalEtcdClient, 2*time.Second)
|
||||
|
||||
@ -1808,7 +1808,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
||||
if err = newTestConfig(globalMinioDefaultRegion, objLayer); err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
// Executing the object layer tests for single node setup.
|
||||
@ -1833,7 +1833,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
|
||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||
}
|
||||
setObjectLayer(objLayer)
|
||||
initAllSubsystems(ctx, objLayer)
|
||||
initConfigSubsystem(ctx, objLayer)
|
||||
globalIAMSys.Init(ctx, objLayer, globalEtcdClient, 2*time.Second)
|
||||
|
||||
// Executing the object layer tests for Erasure.
|
||||
|
Loading…
x
Reference in New Issue
Block a user