fetch bucket replication stats across peers in single call (#14956)

current implementation relied on recursively calling one bucket
at a time across all peers, this would be very slow and chatty
when there are 100's of buckets which would mean 100*peerCount
amount of network operations.

This PR attempts to reduce this entire call into `peerCount`
amount of network calls only. This functionality addresses also a
concern where the Prometheus metrics would significantly slow
down when one of the peers is offline.
This commit is contained in:
Harshavardhana 2022-05-23 09:15:30 -07:00 committed by GitHub
parent 90a52a29c5
commit f8650a3493
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 411 additions and 6 deletions

View File

@ -135,6 +135,23 @@ func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats
return st.Clone()
}
// GetAll returns replication metrics for all buckets at once.
func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats {
if r == nil {
return map[string]BucketReplicationStats{}
}
r.RLock()
defer r.RUnlock()
bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache))
for k, v := range r.Cache {
bucketReplicationStats[k] = v.Clone()
}
return bucketReplicationStats
}
// Get replication metrics for a bucket from this node since this node came up.
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
if r == nil {

View File

@ -1845,9 +1845,26 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim
return
}
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
func getAllLatestReplicationStats(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketReplicationStats) {
peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext)
bucketsReplicationStats = make(map[string]BucketReplicationStats, len(bucketsUsage))
for bucket, u := range bucketsUsage {
bucketStats := make([]BucketStats, len(peerBucketStatsList))
for i, peerBucketStats := range peerBucketStatsList {
bucketStat, ok := peerBucketStats[bucket]
if !ok {
continue
}
bucketStats[i] = bucketStat
}
bucketsReplicationStats[bucket] = calculateBucketReplicationStats(bucket, u, bucketStats)
}
return bucketsReplicationStats
}
func calculateBucketReplicationStats(bucket string, u BucketUsageInfo, bucketStats []BucketStats) (s BucketReplicationStats) {
// accumulate cluster bucket stats
stats := make(map[string]*BucketReplicationStat)
var totReplicaSize int64
@ -1916,6 +1933,12 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
return s
}
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
return calculateBucketReplicationStats(bucket, u, bucketStats)
}
const resyncTimeInterval = time.Minute * 10
// periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals

View File

@ -51,6 +51,9 @@ func (rl *ReplicationLatency) update(size int64, duration time.Duration) {
rl.UploadHistogram.Add(size, duration)
}
// BucketStatsMap captures bucket statistics for all buckets
type BucketStatsMap map[string]BucketStats
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats

View File

@ -792,6 +792,183 @@ func (z *BucketStats) Msgsize() (s int) {
return
}
// DecodeMsg implements msgp.Decodable
func (z *BucketStatsMap) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0003 uint32
zb0003, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
if (*z) == nil {
(*z) = make(BucketStatsMap, zb0003)
} else if len((*z)) > 0 {
for key := range *z {
delete((*z), key)
}
}
for zb0003 > 0 {
zb0003--
var zb0001 string
var zb0002 BucketStats
zb0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err)
return
}
var field []byte
_ = field
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0004 > 0 {
zb0004--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
err = zb0002.ReplicationStats.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, zb0001, "ReplicationStats")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
}
}
(*z)[zb0001] = zb0002
}
return
}
// EncodeMsg implements msgp.Encodable
func (z BucketStatsMap) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteMapHeader(uint32(len(z)))
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0005, zb0006 := range z {
err = en.WriteString(zb0005)
if err != nil {
err = msgp.WrapError(err)
return
}
// map header, size 1
// write "ReplicationStats"
err = en.Append(0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
if err != nil {
return
}
err = zb0006.ReplicationStats.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, zb0005, "ReplicationStats")
return
}
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z BucketStatsMap) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendMapHeader(o, uint32(len(z)))
for zb0005, zb0006 := range z {
o = msgp.AppendString(o, zb0005)
// map header, size 1
// string "ReplicationStats"
o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
o, err = zb0006.ReplicationStats.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, zb0005, "ReplicationStats")
return
}
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketStatsMap) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0003 uint32
zb0003, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
if (*z) == nil {
(*z) = make(BucketStatsMap, zb0003)
} else if len((*z)) > 0 {
for key := range *z {
delete((*z), key)
}
}
for zb0003 > 0 {
var zb0001 string
var zb0002 BucketStats
zb0003--
zb0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
var field []byte
_ = field
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
for zb0004 > 0 {
zb0004--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
bts, err = zb0002.ReplicationStats.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, zb0001, "ReplicationStats")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, zb0001)
return
}
}
}
(*z)[zb0001] = zb0002
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z BucketStatsMap) Msgsize() (s int) {
s = msgp.MapHeaderSize
if z != nil {
for zb0005, zb0006 := range z {
_ = zb0006
s += msgp.StringPrefixSize + len(zb0005) + 1 + 17 + zb0006.ReplicationStats.Msgsize()
}
}
return
}
// DecodeMsg implements msgp.Decodable
func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte

View File

@ -348,6 +348,119 @@ func BenchmarkDecodeBucketStats(b *testing.B) {
}
}
func TestMarshalUnmarshalBucketStatsMap(t *testing.T) {
v := BucketStatsMap{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketStatsMap(b *testing.B) {
v := BucketStatsMap{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketStatsMap(b *testing.B) {
v := BucketStatsMap{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketStatsMap(b *testing.B) {
v := BucketStatsMap{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketStatsMap(t *testing.T) {
v := BucketStatsMap{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketStatsMap Msgsize() is inaccurate")
}
vn := BucketStatsMap{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketStatsMap(b *testing.B) {
v := BucketStatsMap{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketStatsMap(b *testing.B) {
v := BucketStatsMap{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalReplicationLatency(t *testing.T) {
v := ReplicationLatency{}
bts, err := v.MarshalMsg(nil)

View File

@ -1604,8 +1604,9 @@ func getBucketUsageMetrics() *MetricsGroup {
Value: float64(time.Since(dataUsageInfo.LastUpdate)),
})
bucketReplStats := getAllLatestReplicationStats(dataUsageInfo.BucketsUsage)
for bucket, usage := range dataUsageInfo.BucketsUsage {
stats := getLatestReplicationStats(bucket, usage)
stats := bucketReplStats[bucket]
quota, _ := globalBucketQuotaSys.Get(ctx, bucket)

View File

@ -601,6 +601,44 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
}
}
// GetClusterAllBucketStats - returns bucket stats for all buckets from all remote peers.
func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []BucketStatsMap {
ng := WithNPeers(len(sys.peerClients))
replicationStats := make([]BucketStatsMap, len(sys.peerClients))
for index, client := range sys.peerClients {
index := index
client := client
ng.Go(ctx, func() error {
if client == nil {
return errPeerNotReachable
}
bsMap, err := client.GetAllBucketStats()
if err != nil {
return err
}
replicationStats[index] = bsMap
return nil
}, index, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
replicationStatsList := globalReplicationStats.GetAll()
bucketStatsMap := make(map[string]BucketStats, len(replicationStatsList))
for k, replicationStats := range replicationStatsList {
bucketStatsMap[k] = BucketStats{
ReplicationStats: replicationStats,
}
}
replicationStats = append(replicationStats, BucketStatsMap(bucketStatsMap))
return replicationStats
}
// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view.
func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats {
ng := WithNPeers(len(sys.peerClients))

View File

@ -510,6 +510,19 @@ func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error)
return bs, msgp.Decode(respBody, &bs)
}
// GetAllBucketStats - load replication stats for all buckets
func (client *peerRESTClient) GetAllBucketStats() (BucketStatsMap, error) {
values := make(url.Values)
respBody, err := client.call(peerRESTMethodGetAllBucketStats, values, nil, -1)
if err != nil {
return nil, err
}
bsMap := BucketStatsMap{}
defer http.DrainBody(respBody)
return bsMap, msgp.Decode(respBody, &bsMap)
}
// LoadBucketMetadata - load bucket metadata
func (client *peerRESTClient) LoadBucketMetadata(bucket string) error {
values := make(url.Values)

View File

@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v21" // Add netperf
peerRESTVersion = "v22" // Add bulk GetBucketStats
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -41,6 +41,7 @@ const (
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
peerRESTMethodGetBucketStats = "/getbucketstats"
peerRESTMethodGetAllBucketStats = "/getallbucketstats"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"

View File

@ -561,8 +561,26 @@ func (s *peerRESTServer) ReloadSiteReplicationConfigHandler(w http.ResponseWrite
logger.LogIf(r.Context(), globalSiteReplicationSys.Init(ctx, objAPI))
}
// GetAllBucketStatsHandler - fetches bucket replication stats for all buckets from this peer.
func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
replicationStats := globalReplicationStats.GetAll()
bucketStatsMap := make(map[string]BucketStats, len(replicationStats))
for k, v := range replicationStats {
bucketStatsMap[k] = BucketStats{
ReplicationStats: v,
}
}
logger.LogIf(r.Context(), msgp.Encode(w, BucketStatsMap(bucketStatsMap)))
}
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only
// returns BucketReplicationStatus
// returns BucketStats, that currently includes ReplicationStats.
func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
@ -1316,6 +1334,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodNetInfo).HandlerFunc(httpTraceHdrs(server.NetInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDispatchNetInfo).HandlerFunc(httpTraceHdrs(server.DispatchNetInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetAllBucketStats).HandlerFunc(httpTraceHdrs(server.GetAllBucketStatsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)