mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Periodically refresh buckets metadata from the backend disks (#16561)
fixes #16553
This commit is contained in:
parent
1141187bf2
commit
c8ffa59d28
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go/v2"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
@ -60,13 +61,43 @@ func activeListeners() int {
|
||||
return int(globalHTTPListen.Subscribers()) + int(globalTrace.Subscribers())
|
||||
}
|
||||
|
||||
func waitForLowHTTPReq() {
|
||||
var currentIO func() int
|
||||
if httpServer := newHTTPServerFn(); httpServer != nil {
|
||||
currentIO = httpServer.GetRequestCount
|
||||
func waitForLowIO(maxIO int, maxWait time.Duration, currentIO func() int) {
|
||||
// No need to wait run at full speed.
|
||||
if maxIO <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
globalHealConfig.Wait(currentIO, activeListeners)
|
||||
const waitTick = 100 * time.Millisecond
|
||||
|
||||
tmpMaxWait := maxWait
|
||||
|
||||
for currentIO() >= maxIO {
|
||||
if tmpMaxWait > 0 {
|
||||
if tmpMaxWait < waitTick {
|
||||
time.Sleep(tmpMaxWait)
|
||||
} else {
|
||||
time.Sleep(waitTick)
|
||||
}
|
||||
tmpMaxWait -= waitTick
|
||||
}
|
||||
if tmpMaxWait <= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func currentHTTPIO() int {
|
||||
httpServer := newHTTPServerFn()
|
||||
if httpServer == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return httpServer.GetRequestCount() - activeListeners()
|
||||
}
|
||||
|
||||
func waitForLowHTTPReq() {
|
||||
maxIO, maxWait, _ := globalHealConfig.Clone()
|
||||
waitForLowIO(maxIO, maxWait, currentHTTPIO)
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -40,6 +41,8 @@ import (
|
||||
|
||||
// BucketMetadataSys captures all bucket metadata for a given cluster.
|
||||
type BucketMetadataSys struct {
|
||||
objAPI ObjectLayer
|
||||
|
||||
sync.RWMutex
|
||||
metadataMap map[string]BucketMetadata
|
||||
}
|
||||
@ -386,39 +389,41 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
sys.objAPI = objAPI
|
||||
|
||||
// Load bucket metadata sys in background
|
||||
go sys.load(ctx, buckets, objAPI)
|
||||
go sys.init(ctx, buckets)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sys *BucketMetadataSys) loadBucketMetadata(ctx context.Context, bucket BucketInfo) error {
|
||||
meta, err := loadBucketMetadata(ctx, sys.objAPI, bucket.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sys.Lock()
|
||||
sys.metadataMap[bucket.Name] = meta
|
||||
sys.Unlock()
|
||||
|
||||
globalEventNotifier.set(bucket, meta) // set notification targets
|
||||
globalBucketTargetSys.set(bucket, meta) // set remote replication targets
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// concurrently load bucket metadata to speed up loading bucket metadata.
|
||||
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) {
|
||||
g := errgroup.WithNErrs(len(buckets))
|
||||
for index := range buckets {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
_, _ = objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
|
||||
_, _ = sys.objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
|
||||
// Ensure heal opts for bucket metadata be deep healed all the time.
|
||||
ScanMode: madmin.HealDeepScan,
|
||||
Recreate: true,
|
||||
})
|
||||
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
|
||||
if err != nil {
|
||||
if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) {
|
||||
meta = newBucketMetadata(buckets[index].Name)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sys.Lock()
|
||||
sys.metadataMap[buckets[index].Name] = meta
|
||||
sys.Unlock()
|
||||
|
||||
globalEventNotifier.set(buckets[index], meta) // set notification targets
|
||||
|
||||
globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets
|
||||
|
||||
return nil
|
||||
return sys.loadBucketMetadata(ctx, buckets[index])
|
||||
}, index)
|
||||
}
|
||||
for _, err := range g.Wait() {
|
||||
@ -428,17 +433,51 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
||||
}
|
||||
}
|
||||
|
||||
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
|
||||
const bucketMetadataRefresh = 15 * time.Minute
|
||||
|
||||
t := time.NewTimer(bucketMetadataRefresh)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
buckets, err := sys.objAPI.ListBuckets(ctx, BucketOptions{})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
for i := range buckets {
|
||||
err := sys.loadBucketMetadata(ctx, buckets[i])
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
// Check if there is a spare core, wait 100ms instead
|
||||
waitForLowIO(runtime.NumCPU(), 100*time.Millisecond, currentHTTPIO)
|
||||
}
|
||||
|
||||
t.Reset(bucketMetadataRefresh)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Loads bucket metadata for all buckets into BucketMetadataSys.
|
||||
func (sys *BucketMetadataSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
func (sys *BucketMetadataSys) init(ctx context.Context, buckets []BucketInfo) {
|
||||
count := 100 // load 100 bucket metadata at a time.
|
||||
for {
|
||||
if len(buckets) < count {
|
||||
sys.concurrentLoad(ctx, buckets, objAPI)
|
||||
return
|
||||
sys.concurrentLoad(ctx, buckets)
|
||||
break
|
||||
}
|
||||
sys.concurrentLoad(ctx, buckets[:count], objAPI)
|
||||
sys.concurrentLoad(ctx, buckets[:count])
|
||||
buckets = buckets[count:]
|
||||
}
|
||||
|
||||
if globalIsDistErasure {
|
||||
go sys.refreshBucketsMetadataLoop(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the state of the BucketMetadataSys.
|
||||
|
@ -70,39 +70,11 @@ func (opts Config) BitrotScanCycle() (d time.Duration) {
|
||||
return opts.cache.bitrotCycle
|
||||
}
|
||||
|
||||
// Wait waits for IOCount to go down or max sleep to elapse before returning.
|
||||
// usually used in healing paths to wait for specified amount of time to
|
||||
// throttle healing.
|
||||
func (opts Config) Wait(currentIO func() int, activeListeners func() int) {
|
||||
// Clone safely the heal configuration
|
||||
func (opts Config) Clone() (int, time.Duration, string) {
|
||||
configMutex.RLock()
|
||||
maxIO, maxWait := opts.IOCount, opts.Sleep
|
||||
configMutex.RUnlock()
|
||||
|
||||
// No need to wait run at full speed.
|
||||
if maxIO <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// At max 10 attempts to wait with 100 millisecond interval before proceeding
|
||||
waitTick := 100 * time.Millisecond
|
||||
|
||||
tmpMaxWait := maxWait
|
||||
|
||||
if currentIO != nil {
|
||||
for currentIO() >= maxIO+activeListeners() {
|
||||
if tmpMaxWait > 0 {
|
||||
if tmpMaxWait < waitTick {
|
||||
time.Sleep(tmpMaxWait)
|
||||
} else {
|
||||
time.Sleep(waitTick)
|
||||
}
|
||||
tmpMaxWait -= waitTick
|
||||
}
|
||||
if tmpMaxWait <= 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
defer configMutex.RUnlock()
|
||||
return opts.IOCount, opts.Sleep, opts.Bitrot
|
||||
}
|
||||
|
||||
// Update updates opts with nopts
|
||||
|
Loading…
x
Reference in New Issue
Block a user