diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index bde90cd59..bac7a30cb 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -3163,10 +3163,11 @@ func (p *ReplicationPool) persistMRF() { if !ok { return } + entries[e.versionID] = e + if len(entries) >= mrfMaxEntries { saveMRFToDisk() } - entries[e.versionID] = e } } } @@ -3190,6 +3191,8 @@ func (p *ReplicationPool) queueMRFSave(entry MRFReplicateEntry) { select { case p.mrfSaveCh <- entry: default: + atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedCount, 1) + atomic.AddUint64(&globalReplicationStats.mrfStats.TotalDroppedBytes, uint64(entry.sz)) } } } @@ -3255,8 +3258,8 @@ func (p *ReplicationPool) loadMRF() (mrfRec MRFReplicateEntries, err error) { if !p.initialized() { return re, nil } - data := make([]byte, 4) - n, err := rc.Read(data) + var data [4]byte + n, err := rc.Read(data[:]) if err != nil { return re, err } @@ -3370,7 +3373,7 @@ func (p *ReplicationPool) initialized() bool { } // getMRF returns MRF entries for this node. -func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan madmin.ReplicationMRF, err error) { +func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch <-chan madmin.ReplicationMRF, err error) { mrfRec, err := p.loadMRF() if err != nil { return nil, err @@ -3380,7 +3383,7 @@ func (p *ReplicationPool) getMRF(ctx context.Context, bucket string) (ch chan ma go func() { defer close(mrfCh) for vID, e := range mrfRec.Entries { - if e.Bucket != bucket && bucket != "" { + if bucket != "" && e.Bucket != bucket { continue } select {