mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
Load bucket configs during the metadata refresh (#18449)
This patch takes care of loading the bucket configs of failed buckets during the periodic refresh. This makes sure the event notifiers and remote bucket targets are properly initialized.
This commit is contained in:
parent
5573986e8e
commit
38f35463b7
@ -437,21 +437,8 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sys *BucketMetadataSys) loadBucketMetadata(ctx context.Context, bucket BucketInfo) error {
|
||||
meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sys.Lock()
|
||||
sys.metadataMap[bucket.Name] = meta
|
||||
sys.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// concurrently load bucket metadata to speed up loading bucket metadata.
|
||||
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) {
|
||||
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, failedBuckets map[string]struct{}) {
|
||||
g := errgroup.WithNErrs(len(buckets))
|
||||
bucketMetas := make([]BucketMetadata, len(buckets))
|
||||
for index := range buckets {
|
||||
@ -491,6 +478,10 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
||||
|
||||
for i, meta := range bucketMetas {
|
||||
if errs[i] != nil {
|
||||
if failedBuckets == nil {
|
||||
failedBuckets = make(map[string]struct{})
|
||||
}
|
||||
failedBuckets[buckets[i].Name] = struct{}{}
|
||||
continue
|
||||
}
|
||||
globalEventNotifier.set(buckets[i], meta) // set notification targets
|
||||
@ -498,7 +489,7 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
||||
}
|
||||
}
|
||||
|
||||
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
|
||||
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context, failedBuckets map[string]struct{}) {
|
||||
const bucketMetadataRefresh = 15 * time.Minute
|
||||
|
||||
sleeper := newDynamicSleeper(2, 150*time.Millisecond, false)
|
||||
@ -513,7 +504,7 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
|
||||
buckets, err := sys.objAPI.ListBuckets(ctx, BucketOptions{})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
// Handle if we have some buckets in-memory those are stale.
|
||||
@ -525,38 +516,49 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
|
||||
}
|
||||
sys.RemoveStaleBuckets(diskBuckets)
|
||||
|
||||
for _, bucket := range buckets {
|
||||
for i := range buckets {
|
||||
wait := sleeper.Timer(ctx)
|
||||
|
||||
err := sys.loadBucketMetadata(ctx, bucket)
|
||||
meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[i].Name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
wait() // wait to proceed to next entry.
|
||||
continue
|
||||
}
|
||||
|
||||
sys.Lock()
|
||||
sys.metadataMap[buckets[i].Name] = meta
|
||||
sys.Unlock()
|
||||
|
||||
// Initialize the failed buckets
|
||||
if _, ok := failedBuckets[buckets[i].Name]; ok {
|
||||
globalEventNotifier.set(buckets[i], meta)
|
||||
globalBucketTargetSys.set(buckets[i], meta)
|
||||
delete(failedBuckets, buckets[i].Name)
|
||||
}
|
||||
|
||||
wait() // wait to proceed to next entry.
|
||||
}
|
||||
|
||||
t.Reset(bucketMetadataRefresh)
|
||||
}
|
||||
t.Reset(bucketMetadataRefresh)
|
||||
}
|
||||
}
|
||||
|
||||
// Loads bucket metadata for all buckets into BucketMetadataSys.
|
||||
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
|
||||
count := 100 // load 100 bucket metadata at a time.
|
||||
failedBuckets := make(map[string]struct{})
|
||||
for {
|
||||
if len(buckets) < count {
|
||||
sys.concurrentLoad(ctx, buckets)
|
||||
sys.concurrentLoad(ctx, buckets, failedBuckets)
|
||||
break
|
||||
}
|
||||
sys.concurrentLoad(ctx, buckets[:count])
|
||||
sys.concurrentLoad(ctx, buckets[:count], failedBuckets)
|
||||
buckets = buckets[count:]
|
||||
}
|
||||
|
||||
if globalIsDistErasure {
|
||||
go sys.refreshBucketsMetadataLoop(ctx)
|
||||
go sys.refreshBucketsMetadataLoop(ctx, failedBuckets)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,40 +128,6 @@ func (evnot *EventNotifier) AddRulesMap(bucketName string, rulesMap event.RulesM
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveRulesMap - removes rules map for bucket name.
|
||||
func (evnot *EventNotifier) RemoveRulesMap(bucketName string, rulesMap event.RulesMap) {
|
||||
evnot.Lock()
|
||||
defer evnot.Unlock()
|
||||
|
||||
evnot.bucketRulesMap[bucketName].Remove(rulesMap)
|
||||
if len(evnot.bucketRulesMap[bucketName]) == 0 {
|
||||
delete(evnot.bucketRulesMap, bucketName)
|
||||
}
|
||||
}
|
||||
|
||||
// ConfiguredTargetIDs - returns list of configured target id's
|
||||
func (evnot *EventNotifier) ConfiguredTargetIDs() []event.TargetID {
|
||||
if evnot == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
evnot.RLock()
|
||||
defer evnot.RUnlock()
|
||||
|
||||
var targetIDs []event.TargetID
|
||||
for _, rmap := range evnot.bucketRulesMap {
|
||||
for _, rules := range rmap {
|
||||
for _, targetSet := range rules {
|
||||
for id := range targetSet {
|
||||
targetIDs = append(targetIDs, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return targetIDs
|
||||
}
|
||||
|
||||
// RemoveNotification - removes all notification configuration for bucket name.
|
||||
func (evnot *EventNotifier) RemoveNotification(bucketName string) {
|
||||
evnot.Lock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user