diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index 8da234720..6ba50fb49 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -26,6 +26,7 @@ import ( "time" "github.com/minio/madmin-go/v2" + "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" bucketsse "github.com/minio/minio/internal/bucket/encryption" "github.com/minio/minio/internal/bucket/lifecycle" @@ -56,13 +57,29 @@ func (sys *BucketMetadataSys) Count() int { } // Remove bucket metadata from memory. -func (sys *BucketMetadataSys) Remove(bucket string) { +func (sys *BucketMetadataSys) Remove(buckets ...string) { sys.Lock() - delete(sys.metadataMap, bucket) - globalBucketMonitor.DeleteBucket(bucket) + for _, bucket := range buckets { + delete(sys.metadataMap, bucket) + globalBucketMonitor.DeleteBucket(bucket) + } sys.Unlock() } +// RemoveStaleBuckets removes all stale buckets in memory that are not on disk. +func (sys *BucketMetadataSys) RemoveStaleBuckets(diskBuckets set.StringSet) { + sys.Lock() + defer sys.Unlock() + + for bucket := range sys.metadataMap { + if diskBuckets.Contains(bucket) { + continue + } // doesn't exist on disk remove from memory. + delete(sys.metadataMap, bucket) + globalBucketMonitor.DeleteBucket(bucket) + } +} + // Set - sets a new metadata in-memory. // Only a shallow copy is saved and fields with references // cannot be modified without causing a race condition, @@ -406,15 +423,13 @@ func (sys *BucketMetadataSys) loadBucketMetadata(ctx context.Context, bucket Buc sys.metadataMap[bucket.Name] = meta sys.Unlock() - globalEventNotifier.set(bucket, meta) // set notification targets - globalBucketTargetSys.set(bucket, meta) // set remote replication targets - return nil } // concurrently load bucket metadata to speed up loading bucket metadata. func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []BucketInfo) { g := errgroup.WithNErrs(len(buckets)) + bucketMetas := make([]BucketMetadata, len(buckets)) for index := range buckets { index := index g.Go(func() error { @@ -423,14 +438,40 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck ScanMode: madmin.HealDeepScan, Recreate: true, }) - return sys.loadBucketMetadata(ctx, buckets[index]) + meta, err := loadBucketMetadata(ctx, sys.objAPI, buckets[index].Name) + if err != nil { + return err + } + bucketMetas[index] = meta + return nil }, index) } - for _, err := range g.Wait() { + + errs := g.Wait() + for _, err := range errs { if err != nil { logger.LogIf(ctx, err) } } + + // Hold lock here to update in-memory map at once, + // instead of serializing the Go routines. + sys.Lock() + for i, meta := range bucketMetas { + if errs[i] != nil { + continue + } + sys.metadataMap[buckets[i].Name] = meta + } + sys.Unlock() + + for i, meta := range bucketMetas { + if errs[i] != nil { + continue + } + globalEventNotifier.set(buckets[i], meta) // set notification targets + globalBucketTargetSys.set(buckets[i], meta) // set remote replication targets + } } func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { @@ -448,14 +489,24 @@ func (sys *BucketMetadataSys) refreshBucketsMetadataLoop(ctx context.Context) { logger.LogIf(ctx, err) continue } - for i := range buckets { - err := sys.loadBucketMetadata(ctx, buckets[i]) + + // Handle if we have some buckets in-memory those are stale. + // first delete them and then replace the newer state() + // from disk. + diskBuckets := set.CreateStringSet() + for _, bucket := range buckets { + diskBuckets.Add(bucket.Name) + } + sys.RemoveStaleBuckets(diskBuckets) + + for _, bucket := range buckets { + err := sys.loadBucketMetadata(ctx, bucket) if err != nil { logger.LogIf(ctx, err) continue } - // Check if there is a spare core, wait 100ms instead - waitForLowIO(runtime.NumCPU(), 100*time.Millisecond, currentHTTPIO) + // Check if there is a spare procs, wait 100ms instead + waitForLowIO(runtime.GOMAXPROCS(0), 100*time.Millisecond, currentHTTPIO) } t.Reset(bucketMetadataRefresh) diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 6d3318039..d8b8faa0e 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -28,6 +28,7 @@ import ( "fmt" "net/url" "reflect" + "runtime" "sort" "strings" "sync" @@ -3537,7 +3538,7 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf return nil } -const siteHealTimeInterval = 1 * time.Minute +const siteHealTimeInterval = 30 * time.Second func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) { ctx, cancel := globalLeaderLock.GetLock(ctx) @@ -3546,6 +3547,8 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object healTimer := time.NewTimer(siteHealTimeInterval) defer healTimer.Stop() + var maxRefreshDurationSecondsForLog float64 = 10 // 10 seconds.. + for { select { case <-healTimer.C: @@ -3553,8 +3556,18 @@ func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI Object enabled := c.enabled c.RUnlock() if enabled { + refreshStart := time.Now() c.healIAMSystem(ctx, objAPI) // heal IAM system first c.healBuckets(ctx, objAPI) // heal buckets subsequently + + took := time.Since(refreshStart).Seconds() + if took > maxRefreshDurationSecondsForLog { + // Log if we took a lot of time. + logger.Info("Site replication healing refresh took %.2fs", took) + } + + // wait for 200 millisecond, if we are experience lot of I/O + waitForLowIO(runtime.GOMAXPROCS(0), 200*time.Millisecond, currentHTTPIO) } healTimer.Reset(siteHealTimeInterval)