bootstrap: Speed up bucket metadata loading (#19969)

Currently, bucket metadata is being loaded serially inside ListBuckets
Objet API. Fix that by loading the bucket metadata as the number of
erasure sets * 10, which is a good approximation.
This commit is contained in:
Anis Eleuch 2024-06-21 23:22:24 +01:00 committed by GitHub
parent 2d7a3d1516
commit 4d7d008741
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 71 additions and 48 deletions

View File

@ -91,7 +91,7 @@ const (
// -- If IP of the entry doesn't match, this means entry is // -- If IP of the entry doesn't match, this means entry is
// //
// for another instance. Log an error to console. // for another instance. Log an error to console.
func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) { func initFederatorBackend(buckets []string, objLayer ObjectLayer) {
if len(buckets) == 0 { if len(buckets) == 0 {
return return
} }
@ -112,10 +112,10 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
domainMissing := err == dns.ErrDomainMissing domainMissing := err == dns.ErrDomainMissing
if dnsBuckets != nil { if dnsBuckets != nil {
for _, bucket := range buckets { for _, bucket := range buckets {
bucketsSet.Add(bucket.Name) bucketsSet.Add(bucket)
r, ok := dnsBuckets[bucket.Name] r, ok := dnsBuckets[bucket]
if !ok { if !ok {
bucketsToBeUpdated.Add(bucket.Name) bucketsToBeUpdated.Add(bucket)
continue continue
} }
if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() { if !globalDomainIPs.Intersection(set.CreateStringSet(getHostsSlice(r)...)).IsEmpty() {
@ -134,7 +134,7 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
// but if we do see a difference with local domain IPs with // but if we do see a difference with local domain IPs with
// hostSlice from etcd then we should update with newer // hostSlice from etcd then we should update with newer
// domainIPs, we proceed to do that here. // domainIPs, we proceed to do that here.
bucketsToBeUpdated.Add(bucket.Name) bucketsToBeUpdated.Add(bucket)
continue continue
} }
@ -143,7 +143,7 @@ func initFederatorBackend(buckets []BucketInfo, objLayer ObjectLayer) {
// bucket names are globally unique in federation at a given // bucket names are globally unique in federation at a given
// path prefix, name collision is not allowed. We simply log // path prefix, name collision is not allowed. We simply log
// an error and continue. // an error and continue.
bucketsInConflict.Add(bucket.Name) bucketsInConflict.Add(bucket)
} }
} }

View File

@ -392,9 +392,7 @@ func (sys *BucketMetadataSys) GetReplicationConfig(ctx context.Context, bucket s
return nil, time.Time{}, BucketReplicationConfigNotFound{Bucket: bucket} return nil, time.Time{}, BucketReplicationConfigNotFound{Bucket: bucket}
} }
if reloaded { if reloaded {
globalBucketTargetSys.set(BucketInfo{ globalBucketTargetSys.set(bucket, meta)
Name: bucket,
}, meta)
} }
return meta.replicationConfig, meta.ReplicationConfigUpdatedAt, nil return meta.replicationConfig, meta.ReplicationConfigUpdatedAt, nil
} }
@ -413,9 +411,7 @@ func (sys *BucketMetadataSys) GetBucketTargetsConfig(bucket string) (*madmin.Buc
return nil, BucketRemoteTargetNotFound{Bucket: bucket} return nil, BucketRemoteTargetNotFound{Bucket: bucket}
} }
if reloaded { if reloaded {
globalBucketTargetSys.set(BucketInfo{ globalBucketTargetSys.set(bucket, meta)
Name: bucket,
}, meta)
} }
return meta.bucketTargetConfig, nil return meta.bucketTargetConfig, nil
} }
@ -471,7 +467,7 @@ func (sys *BucketMetadataSys) GetConfig(ctx context.Context, bucket string) (met
} }
// Init - initializes bucket metadata system for all buckets. // Init - initializes bucket metadata system for all buckets.
func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []string, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errServerNotInitialized return errServerNotInitialized
} }
@ -484,7 +480,7 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
} }
// concurrently load bucket metadata to speed up loading bucket metadata. // concurrently load bucket metadata to speed up loading bucket metadata.
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, failedBuckets map[string]struct{}) { func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []string, failedBuckets map[string]struct{}) {
g := errgroup.WithNErrs(len(buckets)) g := errgroup.WithNErrs(len(buckets))
bucketMetas := make([]BucketMetadata, len(buckets)) bucketMetas := make([]BucketMetadata, len(buckets))
for index := range buckets { for index := range buckets {
@ -494,8 +490,8 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
// herd upon start up sequence. // herd upon start up sequence.
time.Sleep(25*time.Millisecond + time.Duration(rand.Int63n(int64(100*time.Millisecond)))) time.Sleep(25*time.Millisecond + time.Duration(rand.Int63n(int64(100*time.Millisecond))))
_, _ = sys.objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{Recreate: true}) _, _ = sys.objAPI.HealBucket(ctx, buckets[index], madmin.HealOpts{Recreate: true})
meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index].Name) meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index])
if err != nil { if err != nil {
return err return err
} }
@ -508,7 +504,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
for index, err := range errs { for index, err := range errs {
if err != nil { if err != nil {
internalLogOnceIf(ctx, fmt.Errorf("Unable to load bucket metadata, will be retried: %w", err), internalLogOnceIf(ctx, fmt.Errorf("Unable to load bucket metadata, will be retried: %w", err),
"load-bucket-metadata-"+buckets[index].Name, logger.WarningKind) "load-bucket-metadata-"+buckets[index], logger.WarningKind)
} }
} }
@ -519,7 +515,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
if errs[i] != nil { if errs[i] != nil {
continue continue
} }
sys.metadataMap[buckets[i].Name] = meta sys.metadataMap[buckets[i]] = meta
} }
sys.Unlock() sys.Unlock()
@ -528,7 +524,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
if failedBuckets == nil { if failedBuckets == nil {
failedBuckets = make(map[string]struct{}) failedBuckets = make(map[string]struct{})
} }
failedBuckets[buckets[i].Name] = struct{}{} failedBuckets[buckets[i]] = struct{}{}
continue continue
} }
globalEventNotifier.set(buckets[i], meta) // set notification targets globalEventNotifier.set(buckets[i], meta) // set notification targets
@ -548,7 +544,7 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
buckets, err := sys.objAPI.ListBuckets(ctx, BucketOptions{}) buckets, err := sys.objAPI.ListBuckets(ctx, BucketOptions{NoMetadata: true})
if err != nil { if err != nil {
internalLogIf(ctx, err, logger.WarningKind) internalLogIf(ctx, err, logger.WarningKind)
break break
@ -579,8 +575,8 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, fa
// Initialize the failed buckets // Initialize the failed buckets
if _, ok := failedBuckets[buckets[i].Name]; ok { if _, ok := failedBuckets[buckets[i].Name]; ok {
globalEventNotifier.set(buckets[i], meta) globalEventNotifier.set(buckets[i].Name, meta)
globalBucketTargetSys.set(buckets[i], meta) globalBucketTargetSys.set(buckets[i].Name, meta)
delete(failedBuckets, buckets[i].Name) delete(failedBuckets, buckets[i].Name)
} }
@ -600,8 +596,8 @@ func (sys *BucketMetadataSys) Initialized() bool {
} }
// Loads bucket metadata for all buckets into BucketMetadataSys. // Loads bucket metadata for all buckets into BucketMetadataSys.
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) { func (sys *BucketMetadataSys) init(ctx context.Context, buckets []string) {
count := 100 // load 100 bucket metadata at a time. count := globalEndpoints.ESCount() * 10
failedBuckets := make(map[string]struct{}) failedBuckets := make(map[string]struct{})
for { for {
if len(buckets) < count { if len(buckets) < count {

View File

@ -3086,7 +3086,7 @@ func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket strin
} }
// initResync - initializes bucket replication resync for all buckets. // initResync - initializes bucket replication resync for all buckets.
func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { func (p *ReplicationPool) initResync(ctx context.Context, buckets []string, objAPI ObjectLayer) error {
if objAPI == nil { if objAPI == nil {
return errServerNotInitialized return errServerNotInitialized
} }
@ -3095,7 +3095,7 @@ func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo,
return nil return nil
} }
func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []string, objAPI ObjectLayer) {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Run the replication resync in a loop // Run the replication resync in a loop
for { for {
@ -3113,13 +3113,13 @@ func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []Buck
} }
// Loads bucket replication resync statuses into memory. // Loads bucket replication resync statuses into memory.
func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { func (p *ReplicationPool) loadResync(ctx context.Context, buckets []string, objAPI ObjectLayer) error {
// Make sure only one node running resync on the cluster. // Make sure only one node running resync on the cluster.
ctx, cancel := globalLeaderLock.GetLock(ctx) ctx, cancel := globalLeaderLock.GetLock(ctx)
defer cancel() defer cancel()
for index := range buckets { for index := range buckets {
bucket := buckets[index].Name bucket := buckets[index]
meta, err := loadBucketResyncMetadata(ctx, bucket, objAPI) meta, err := loadBucketResyncMetadata(ctx, bucket, objAPI)
if err != nil { if err != nil {

View File

@ -612,7 +612,7 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
} }
// create minio-go clients for buckets having remote targets // create minio-go clients for buckets having remote targets
func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) { func (sys *BucketTargetSys) set(bucket string, meta BucketMetadata) {
cfg := meta.bucketTargetConfig cfg := meta.bucketTargetConfig
if cfg == nil || cfg.Empty() { if cfg == nil || cfg.Empty() {
return return
@ -626,9 +626,9 @@ func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) {
continue continue
} }
sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: tgtClient} sys.arnRemotesMap[tgt.Arn] = arnTarget{Client: tgtClient}
sys.updateBandwidthLimit(bucket.Name, tgt.Arn, tgt.BandwidthLimit) sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
} }
sys.targetsMap[bucket.Name] = cfg.Targets sys.targetsMap[bucket] = cfg.Targets
} }
// Returns a minio-go Client configured to access remote host described in replication target config. // Returns a minio-go Client configured to access remote host described in replication target config.

View File

@ -253,6 +253,14 @@ type PoolEndpoints struct {
// EndpointServerPools - list of list of endpoints // EndpointServerPools - list of list of endpoints
type EndpointServerPools []PoolEndpoints type EndpointServerPools []PoolEndpoints
// ESCount returns the total number of erasure sets in this cluster
func (l EndpointServerPools) ESCount() (count int) {
for _, p := range l {
count += p.SetCount
}
return
}
// GetNodes returns a sorted list of nodes in this cluster // GetNodes returns a sorted list of nodes in this cluster
func (l EndpointServerPools) GetNodes() (nodes []Node) { func (l EndpointServerPools) GetNodes() (nodes []Node) {
nodesMap := make(map[string]Node) nodesMap := make(map[string]Node)

View File

@ -2008,12 +2008,14 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !opts.NoMetadata {
for i := range buckets { for i := range buckets {
createdAt, err := globalBucketMetadataSys.CreatedAt(buckets[i].Name) createdAt, err := globalBucketMetadataSys.CreatedAt(buckets[i].Name)
if err == nil { if err == nil {
buckets[i].Created = createdAt buckets[i].Created = createdAt
} }
} }
}
return buckets, nil return buckets, nil
}, },
) )
@ -2025,12 +2027,15 @@ func (z *erasureServerPools) ListBuckets(ctx context.Context, opts BucketOptions
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !opts.NoMetadata {
for i := range buckets { for i := range buckets {
createdAt, err := globalBucketMetadataSys.CreatedAt(buckets[i].Name) createdAt, err := globalBucketMetadataSys.CreatedAt(buckets[i].Name)
if err == nil { if err == nil {
buckets[i].Created = createdAt buckets[i].Created = createdAt
} }
} }
}
return buckets, nil return buckets, nil
} }

View File

@ -69,7 +69,7 @@ func (evnot *EventNotifier) GetARNList() []string {
} }
// Loads notification policies for all buckets into EventNotifier. // Loads notification policies for all buckets into EventNotifier.
func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) { func (evnot *EventNotifier) set(bucket string, meta BucketMetadata) {
config := meta.notificationConfig config := meta.notificationConfig
if config == nil { if config == nil {
return return
@ -81,7 +81,7 @@ func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) {
internalLogIf(GlobalContext, err) internalLogIf(GlobalContext, err)
} }
} }
evnot.AddRulesMap(bucket.Name, config.ToRulesMap()) evnot.AddRulesMap(bucket, config.ToRulesMap())
} }
// Targets returns all the registered targets // Targets returns all the registered targets

View File

@ -181,6 +181,7 @@ type DeleteBucketOptions struct {
type BucketOptions struct { type BucketOptions struct {
Deleted bool // true only when site replication is enabled Deleted bool // true only when site replication is enabled
Cached bool // true only when we are requesting a cached response instead of hitting the disk for example ListBuckets() call. Cached bool // true only when we are requesting a cached response instead of hitting the disk for example ListBuckets() call.
NoMetadata bool
} }
// SetReplicaStatus sets replica status and timestamp for delete operations in ObjectOptions // SetReplicaStatus sets replica status and timestamp for delete operations in ObjectOptions

View File

@ -9,13 +9,16 @@ import (
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z BucketOptions) MarshalMsg(b []byte) (o []byte, err error) { func (z BucketOptions) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 2 // map header, size 3
// string "Deleted" // string "Deleted"
o = append(o, 0x82, 0xa7, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64) o = append(o, 0x83, 0xa7, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64)
o = msgp.AppendBool(o, z.Deleted) o = msgp.AppendBool(o, z.Deleted)
// string "Cached" // string "Cached"
o = append(o, 0xa6, 0x43, 0x61, 0x63, 0x68, 0x65, 0x64) o = append(o, 0xa6, 0x43, 0x61, 0x63, 0x68, 0x65, 0x64)
o = msgp.AppendBool(o, z.Cached) o = msgp.AppendBool(o, z.Cached)
// string "NoMetadata"
o = append(o, 0xaa, 0x4e, 0x6f, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61)
o = msgp.AppendBool(o, z.NoMetadata)
return return
} }
@ -49,6 +52,12 @@ func (z *BucketOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Cached") err = msgp.WrapError(err, "Cached")
return return
} }
case "NoMetadata":
z.NoMetadata, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "NoMetadata")
return
}
default: default:
bts, err = msgp.Skip(bts) bts, err = msgp.Skip(bts)
if err != nil { if err != nil {
@ -63,7 +72,7 @@ func (z *BucketOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z BucketOptions) Msgsize() (s int) { func (z BucketOptions) Msgsize() (s int) {
s = 1 + 8 + msgp.BoolSize + 7 + msgp.BoolSize s = 1 + 8 + msgp.BoolSize + 7 + msgp.BoolSize + 11 + msgp.BoolSize
return return
} }

View File

@ -1053,11 +1053,11 @@ func serverMain(ctx *cli.Context) {
bootLogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject)) bootLogIf(GlobalContext, globalEventNotifier.InitBucketTargets(GlobalContext, newObject))
}) })
var buckets []BucketInfo var buckets []string
// List buckets to initialize bucket metadata sub-sys. // List buckets to initialize bucket metadata sub-sys.
bootstrapTrace("listBuckets", func() { bootstrapTrace("listBuckets", func() {
for { for {
buckets, err = newObject.ListBuckets(GlobalContext, BucketOptions{}) bucketsList, err := newObject.ListBuckets(GlobalContext, BucketOptions{NoMetadata: true})
if err != nil { if err != nil {
if configRetriableErrors(err) { if configRetriableErrors(err) {
logger.Info("Waiting for list buckets to succeed to initialize buckets.. possible cause (%v)", err) logger.Info("Waiting for list buckets to succeed to initialize buckets.. possible cause (%v)", err)
@ -1067,6 +1067,10 @@ func serverMain(ctx *cli.Context) {
bootLogIf(GlobalContext, fmt.Errorf("Unable to list buckets to initialize bucket metadata sub-system: %w", err)) bootLogIf(GlobalContext, fmt.Errorf("Unable to list buckets to initialize bucket metadata sub-system: %w", err))
} }
buckets = make([]string, len(bucketsList))
for i := range bucketsList {
buckets[i] = bucketsList[i].Name
}
break break
} }
}) })