mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
load all blocking metadata in background (#10298)
most of this metadata already has fallbacks and there is no good reason to load them in blocking fashion
This commit is contained in:
parent
75d44b3bae
commit
59352d0ac2
@ -24,6 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/minio/minio-go/v7/pkg/tags"
|
"github.com/minio/minio-go/v7/pkg/tags"
|
||||||
|
"github.com/minio/minio/cmd/logger"
|
||||||
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
|
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
|
||||||
"github.com/minio/minio/pkg/bucket/lifecycle"
|
"github.com/minio/minio/pkg/bucket/lifecycle"
|
||||||
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
|
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
|
||||||
@ -176,6 +177,13 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
|
|||||||
// If no metadata exists errConfigNotFound is returned and a new metadata is returned.
|
// If no metadata exists errConfigNotFound is returned and a new metadata is returned.
|
||||||
// Only a shallow copy is returned, so referenced data should not be modified,
|
// Only a shallow copy is returned, so referenced data should not be modified,
|
||||||
// but can be replaced atomically.
|
// but can be replaced atomically.
|
||||||
|
//
|
||||||
|
// This function should only be used with
|
||||||
|
// - GetBucketInfo
|
||||||
|
// - ListBuckets
|
||||||
|
// - ListBucketsHeal (only in case of erasure coding mode)
|
||||||
|
// For all other bucket specific metadata, use the relevant
|
||||||
|
// calls implemented specifically for each of those features.
|
||||||
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
|
func (sys *BucketMetadataSys) Get(bucket string) (BucketMetadata, error) {
|
||||||
if globalIsGateway || bucket == minioMetaBucket {
|
if globalIsGateway || bucket == minioMetaBucket {
|
||||||
return newBucketMetadata(bucket), errConfigNotFound
|
return newBucketMetadata(bucket), errConfigNotFound
|
||||||
@ -397,8 +405,9 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load PolicySys once during boot.
|
// Load bucket metadata sys in background
|
||||||
return sys.load(ctx, buckets, objAPI)
|
go logger.LogIf(ctx, sys.load(ctx, buckets, objAPI))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// concurrently load bucket metadata to speed up loading bucket metadata.
|
// concurrently load bucket metadata to speed up loading bucket metadata.
|
||||||
|
@ -199,8 +199,8 @@ func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objA
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load bucket targets once during boot.
|
// Load bucket targets once during boot in background.
|
||||||
sys.load(ctx, buckets, objAPI)
|
go sys.load(ctx, buckets, objAPI)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,7 +315,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(err, "Unable to list buckets")
|
logger.Fatal(err, "Unable to list buckets")
|
||||||
}
|
}
|
||||||
logger.FatalIf(globalNotificationSys.Init(buckets, newObject), "Unable to initialize notification system")
|
logger.FatalIf(globalNotificationSys.Init(GlobalContext, buckets, newObject), "Unable to initialize notification system")
|
||||||
}
|
}
|
||||||
|
|
||||||
if globalEtcdClient != nil {
|
if globalEtcdClient != nil {
|
||||||
|
@ -588,31 +588,6 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRemoteTarget - adds event rules map, HTTP/PeerRPC client target to bucket name.
|
|
||||||
func (sys *NotificationSys) AddRemoteTarget(bucketName string, target event.Target, rulesMap event.RulesMap) error {
|
|
||||||
if err := sys.targetList.Add(target); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
sys.Lock()
|
|
||||||
targetMap := sys.bucketRemoteTargetRulesMap[bucketName]
|
|
||||||
if targetMap == nil {
|
|
||||||
targetMap = make(map[event.TargetID]event.RulesMap)
|
|
||||||
}
|
|
||||||
|
|
||||||
rulesMap = rulesMap.Clone()
|
|
||||||
targetMap[target.ID()] = rulesMap
|
|
||||||
sys.bucketRemoteTargetRulesMap[bucketName] = targetMap
|
|
||||||
|
|
||||||
rulesMap = rulesMap.Clone()
|
|
||||||
rulesMap.Add(sys.bucketRulesMap[bucketName])
|
|
||||||
sys.bucketRulesMap[bucketName] = rulesMap
|
|
||||||
|
|
||||||
sys.Unlock()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Loads notification policies for all buckets into NotificationSys.
|
// Loads notification policies for all buckets into NotificationSys.
|
||||||
func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
|
func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
@ -634,7 +609,7 @@ func (sys *NotificationSys) load(buckets []BucketInfo, objAPI ObjectLayer) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
|
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
|
||||||
func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error {
|
func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||||
if objAPI == nil {
|
if objAPI == nil {
|
||||||
return errServerNotInitialized
|
return errServerNotInitialized
|
||||||
}
|
}
|
||||||
@ -657,13 +632,13 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
|
|||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
reqInfo := &logger.ReqInfo{}
|
reqInfo := &logger.ReqInfo{}
|
||||||
reqInfo.AppendTags("targetID", res.ID.Name)
|
reqInfo.AppendTags("targetID", res.ID.Name)
|
||||||
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
|
logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID)
|
||||||
logger.LogOnceIf(ctx, res.Err, res.ID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return sys.load(buckets, objAPI)
|
go logger.LogIf(ctx, sys.load(buckets, objAPI))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRulesMap - adds rules map for bucket name.
|
// AddRulesMap - adds rules map for bucket name.
|
||||||
|
@ -354,12 +354,12 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize notification system.
|
// Initialize notification system.
|
||||||
if err = globalNotificationSys.Init(buckets, newObject); err != nil {
|
if err = globalNotificationSys.Init(ctx, buckets, newObject); err != nil {
|
||||||
return fmt.Errorf("Unable to initialize notification system: %w", err)
|
return fmt.Errorf("Unable to initialize notification system: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize bucket targets sub-system.
|
// Initialize bucket targets sub-system.
|
||||||
if err = globalBucketTargetSys.Init(GlobalContext, buckets, newObject); err != nil {
|
if err = globalBucketTargetSys.Init(ctx, buckets, newObject); err != nil {
|
||||||
return fmt.Errorf("Unable to initialize bucket target sub-system: %w", err)
|
return fmt.Errorf("Unable to initialize bucket target sub-system: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user