mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
fix: hold lock while serializing replication stats (#16007)
This commit is contained in:
parent
797fa7f97b
commit
2894dd4d1a
@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
@ -37,12 +38,12 @@ func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
||||
|
||||
// ReplicationStats holds the global in-memory replication stats
|
||||
type ReplicationStats struct {
|
||||
Cache map[string]*BucketReplicationStats
|
||||
UsageCache map[string]*BucketReplicationStats
|
||||
mostRecentStats BucketStatsMap
|
||||
sync.RWMutex // mutex for Cache
|
||||
ulock sync.RWMutex // mutex for UsageCache
|
||||
dlock sync.RWMutex // mutex for mostRecentStats
|
||||
Cache map[string]*BucketReplicationStats
|
||||
UsageCache map[string]*BucketReplicationStats
|
||||
mostRecentStats BucketStatsMap
|
||||
sync.RWMutex // mutex for Cache
|
||||
ulock sync.RWMutex // mutex for UsageCache
|
||||
mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
|
||||
}
|
||||
|
||||
// Delete deletes in-memory replication statistics for a bucket.
|
||||
@ -245,13 +246,24 @@ outer:
|
||||
r.ulock.Unlock()
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap {
|
||||
// serializeStats will serialize the current stats.
|
||||
// Will return (nil, nil) if no data.
|
||||
func (r *ReplicationStats) serializeStats() ([]byte, error) {
|
||||
if r == nil {
|
||||
return BucketStatsMap{}
|
||||
return nil, nil
|
||||
}
|
||||
r.dlock.RLock()
|
||||
defer r.dlock.RUnlock()
|
||||
return r.mostRecentStats
|
||||
r.mostRecentStatsMu.Lock()
|
||||
defer r.mostRecentStatsMu.Unlock()
|
||||
bsm := r.mostRecentStats
|
||||
if len(bsm.Stats) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
data := make([]byte, 4, 4+bsm.Msgsize())
|
||||
// Add the replication stats meta header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], replStatsMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], replStatsVersion)
|
||||
// Add data
|
||||
return r.mostRecentStats.MarshalMsg(data)
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
|
||||
@ -356,13 +368,13 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, u Buck
|
||||
// normalize overall stats
|
||||
s.ReplicaSize = int64(math.Max(float64(totReplicaSize), float64(u.ReplicaSize)))
|
||||
s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize)))
|
||||
r.dlock.Lock()
|
||||
r.mostRecentStatsMu.Lock()
|
||||
if len(r.mostRecentStats.Stats) == 0 {
|
||||
r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()}
|
||||
}
|
||||
r.mostRecentStats.Stats[bucket] = BucketStats{ReplicationStats: s}
|
||||
r.mostRecentStats.Timestamp = UTCNow()
|
||||
r.dlock.Unlock()
|
||||
r.mostRecentStatsMu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -2934,17 +2934,9 @@ func (p *ReplicationPool) saveStats(ctx context.Context) error {
|
||||
if !p.initialized() {
|
||||
return nil
|
||||
}
|
||||
bsm := globalReplicationStats.getAllCachedLatest()
|
||||
if len(bsm.Stats) == 0 {
|
||||
return nil
|
||||
}
|
||||
data := make([]byte, 4, 4+bsm.Msgsize())
|
||||
// Add the replication stats meta header.
|
||||
binary.LittleEndian.PutUint16(data[0:2], replStatsMetaFormat)
|
||||
binary.LittleEndian.PutUint16(data[2:4], replStatsVersion)
|
||||
// Add data
|
||||
data, err := bsm.MarshalMsg(data)
|
||||
if err != nil {
|
||||
|
||||
data, err := globalReplicationStats.serializeStats()
|
||||
if data == nil {
|
||||
return err
|
||||
}
|
||||
return saveConfig(ctx, p.objLayer, getReplicationStatsPath(globalLocalNodeName), data)
|
||||
|
Loading…
x
Reference in New Issue
Block a user