Periodically remove stale buckets from in-memory (#16597)

This commit is contained in:
Harshavardhana 2023-02-13 08:09:52 -08:00 committed by GitHub
parent 11fe2fd79a
commit ee6d96eb46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 13 deletions

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/minio/madmin-go/v2" "github.com/minio/madmin-go/v2"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio-go/v7/pkg/tags"
bucketsse "github.com/minio/minio/internal/bucket/encryption" bucketsse "github.com/minio/minio/internal/bucket/encryption"
"github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/lifecycle"
@ -56,13 +57,29 @@ func (sys *BucketMetadataSys) Count() int {
} }
// Remove bucket metadata from memory. // Remove bucket metadata from memory.
func (sys *BucketMetadataSys) Remove(bucket string) { func (sys *BucketMetadataSys) Remove(buckets ...string) {
sys.Lock() sys.Lock()
delete(sys.metadataMap, bucket) for _, bucket := range buckets {
globalBucketMonitor.DeleteBucket(bucket) delete(sys.metadataMap, bucket)
globalBucketMonitor.DeleteBucket(bucket)
}
sys.Unlock() sys.Unlock()
} }
// RemoveStaleBuckets removes all stale buckets in memory that are not on disk.
func (sys *BucketMetadataSys) RemoveStaleBuckets(diskBuckets set.StringSet) {
sys.Lock()
defer sys.Unlock()
for bucket := range sys.metadataMap {
if diskBuckets.Contains(bucket) {
continue
} // doesn't exist on disk remove from memory.
delete(sys.metadataMap, bucket)
globalBucketMonitor.DeleteBucket(bucket)
}
}
// Set - sets a new metadata in-memory. // Set - sets a new metadata in-memory.
// Only a shallow copy is saved and fields with references // Only a shallow copy is saved and fields with references
// cannot be modified without causing a race condition, // cannot be modified without causing a race condition,
@ -406,15 +423,13 @@ func (sys *BucketMetadataSys) loadBucketMetadata(ctx context.Context, bucket Buc
sys.metadataMap[bucket.Name] = meta sys.metadataMap[bucket.Name] = meta
sys.Unlock() sys.Unlock()
globalEventNotifier.set(bucket, meta) // set notification targets
globalBucketTargetSys.set(bucket, meta) // set remote replication targets
return nil return nil
} }
// concurrently load bucket metadata to speed up loading bucket metadata. // concurrently load bucket metadata to speed up loading bucket metadata.
func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) { func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) {
g := errgroup.WithNErrs(len(buckets)) g := errgroup.WithNErrs(len(buckets))
bucketMetas := make([]BucketMetadata, len(buckets))
for index := range buckets { for index := range buckets {
index := index index := index
g.Go(func() error { g.Go(func() error {
@ -423,14 +438,40 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
ScanMode: madmin.HealDeepScan, ScanMode: madmin.HealDeepScan,
Recreate: true, Recreate: true,
}) })
return sys.loadBucketMetadata(ctx, buckets[index]) meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index].Name)
if err != nil {
return err
}
bucketMetas[index] = meta
return nil
}, index) }, index)
} }
for _, err := range g.Wait() {
errs := g.Wait()
for _, err := range errs {
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
} }
// Hold lock here to update in-memory map at once,
// instead of serializing the Go routines.
sys.Lock()
for i, meta := range bucketMetas {
if errs[i] != nil {
continue
}
sys.metadataMap[buckets[i].Name] = meta
}
sys.Unlock()
for i, meta := range bucketMetas {
if errs[i] != nil {
continue
}
globalEventNotifier.set(buckets[i], meta) // set notification targets
globalBucketTargetSys.set(buckets[i], meta) // set remote replication targets
}
} }
func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
@ -448,14 +489,24 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }
for i := range buckets {
err := sys.loadBucketMetadata(ctx, buckets[i]) // Handle if we have some buckets in-memory those are stale.
// first delete them and then replace the newer state()
// from disk.
diskBuckets := set.CreateStringSet()
for _, bucket := range buckets {
diskBuckets.Add(bucket.Name)
}
sys.RemoveStaleBuckets(diskBuckets)
for _, bucket := range buckets {
err := sys.loadBucketMetadata(ctx, bucket)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
} }
// Check if there is a spare core, wait 100ms instead // Check if there is a spare procs, wait 100ms instead
waitForLowIO(runtime.NumCPU(), 100*time.Millisecond, currentHTTPIO) waitForLowIO(runtime.GOMAXPROCS(0), 100*time.Millisecond, currentHTTPIO)
} }
t.Reset(bucketMetadataRefresh) t.Reset(bucketMetadataRefresh)

View File

@ -28,6 +28,7 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"reflect" "reflect"
"runtime"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -3537,7 +3538,7 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf
return nil return nil
} }
const siteHealTimeInterval = 1 * time.Minute const siteHealTimeInterval = 30 * time.Second
func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) { func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) {
ctx, cancel := globalLeaderLock.GetLock(ctx) ctx, cancel := globalLeaderLock.GetLock(ctx)
@ -3546,6 +3547,8 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object
healTimer := time.NewTimer(siteHealTimeInterval) healTimer := time.NewTimer(siteHealTimeInterval)
defer healTimer.Stop() defer healTimer.Stop()
var maxRefreshDurationSecondsForLog float64 = 10 // 10 seconds..
for { for {
select { select {
case <-healTimer.C: case <-healTimer.C:
@ -3553,8 +3556,18 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object
enabled := c.enabled enabled := c.enabled
c.RUnlock() c.RUnlock()
if enabled { if enabled {
refreshStart := time.Now()
c.healIAMSystem(ctx, objAPI) // heal IAM system first c.healIAMSystem(ctx, objAPI) // heal IAM system first
c.healBuckets(ctx, objAPI) // heal buckets subsequently c.healBuckets(ctx, objAPI) // heal buckets subsequently
took := time.Since(refreshStart).Seconds()
if took > maxRefreshDurationSecondsForLog {
// Log if we took a lot of time.
logger.Info("Site replication healing refresh took %.2fs", took)
}
// wait for 200 millisecond, if we are experience lot of I/O
waitForLowIO(runtime.GOMAXPROCS(0), 200*time.Millisecond, currentHTTPIO)
} }
healTimer.Reset(siteHealTimeInterval) healTimer.Reset(siteHealTimeInterval)