add cluster support for realtime bucket stats (#11963)

implementation in #11949 only catered from single
node, but we need cluster metrics by capturing
from all peers. introduce bucket stats API that
will be used for capturing in-line bucket usage
as well eventually
This commit is contained in:
Harshavardhana 2021-04-04 15:34:33 -07:00 committed by GitHub
parent d46386246f
commit 09ee303244
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 759 additions and 49 deletions

View File

@ -1244,7 +1244,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
return
}
}
globalReplicationStats.Delete(ctx, bucket)
// Write success response.
writeSuccessNoContent(w)
@ -1634,8 +1634,19 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
return
}
metrics := globalReplicationStats.Get(bucket)
if err := json.NewEncoder(w).Encode(&metrics); err != nil {
bucketStats := globalNotificationSys.GetClusterBucketStats(r.Context(), bucket)
bucketReplStats := BucketReplicationStats{}
for _, bucketStat := range bucketStats {
bucketReplStats.FailedCount += bucketStat.ReplicationStats.FailedCount
bucketReplStats.FailedSize += bucketStat.ReplicationStats.FailedSize
bucketReplStats.PendingCount += bucketStat.ReplicationStats.PendingCount
bucketReplStats.PendingSize += bucketStat.ReplicationStats.PendingSize
bucketReplStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
bucketReplStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
}
if err := json.NewEncoder(w).Encode(&bucketReplStats); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}

View File

@ -24,23 +24,6 @@ import (
"github.com/minio/minio/pkg/bucket/replication"
)
// BucketReplicationStats represents inline replication statistics
// such as pending, failed and completed bytes in total for a bucket
type BucketReplicationStats struct {
// Pending size in bytes
PendingSize uint64 `json:"pendingReplicationSize"`
// Completed size in bytes
ReplicatedSize uint64 `json:"completedReplicationSize"`
// Total Replica size in bytes
ReplicaSize uint64 `json:"replicaSize"`
// Failed size in bytes
FailedSize uint64 `json:"failedReplicationSize"`
// Total number of pending operations including metadata updates
PendingCount uint64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates
FailedCount uint64 `json:"failedReplicationCount"`
}
func (b *BucketReplicationStats) hasReplicationUsage() bool {
return b.PendingSize > 0 ||
b.FailedSize > 0 ||
@ -57,7 +40,7 @@ type ReplicationStats struct {
}
// Delete deletes in-memory replication statistics for a bucket.
func (r *ReplicationStats) Delete(ctx context.Context, bucket string) {
func (r *ReplicationStats) Delete(bucket string) {
if r == nil {
return
}
@ -68,7 +51,7 @@ func (r *ReplicationStats) Delete(ctx context.Context, bucket string) {
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(ctx context.Context, bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
if r == nil {
return
}
@ -125,10 +108,12 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
r.RLock()
defer r.RUnlock()
st, ok := r.Cache[bucket]
if !ok {
return BucketReplicationStats{}
}
return BucketReplicationStats{
PendingSize: atomic.LoadUint64(&st.PendingSize),
FailedSize: atomic.LoadUint64(&st.FailedSize),

View File

@ -297,7 +297,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA
prevStatus = string(dobj.VersionPurgeStatus)
currStatus = string(versionPurgeStatus)
}
globalReplicationStats.Update(ctx, dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType) // to decrement pending count
// to decrement pending count later.
globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType)
var eventName = event.ObjectReplicationComplete
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
@ -754,7 +755,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
if rtype == replicateAll {
opType = replication.ObjectReplicationType
}
globalReplicationStats.Update(ctx, bucket, size, replicationStatus, prevReplStatus, opType)
globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
@ -1063,7 +1064,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
globalReplicationPool.queueReplicaTask(GlobalContext, objInfo)
}
if sz, err := objInfo.GetActualSize(); err == nil {
globalReplicationStats.Update(ctx, objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
}
}
@ -1073,5 +1074,5 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo,
} else {
globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv)
}
globalReplicationStats.Update(ctx, dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}

41
cmd/bucket-stats.go Normal file
View File

@ -0,0 +1,41 @@
/*
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
//go:generate msgp -file $GOFILE
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats
}
// BucketReplicationStats represents inline replication statistics
// such as pending, failed and completed bytes in total for a bucket
type BucketReplicationStats struct {
// Pending size in bytes
PendingSize uint64 `json:"pendingReplicationSize"`
// Completed size in bytes
ReplicatedSize uint64 `json:"completedReplicationSize"`
// Total Replica size in bytes
ReplicaSize uint64 `json:"replicaSize"`
// Failed size in bytes
FailedSize uint64 `json:"failedReplicationSize"`
// Total number of pending operations including metadata updates
PendingCount uint64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates
FailedCount uint64 `json:"failedReplicationCount"`
}

342
cmd/bucket-stats_gen.go Normal file
View File

@ -0,0 +1,342 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *BucketReplicationStats) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "PendingSize":
z.PendingSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
case "ReplicatedSize":
z.ReplicatedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
case "ReplicaSize":
z.ReplicaSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
case "FailedSize":
z.FailedSize, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
case "PendingCount":
z.PendingCount, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
case "FailedCount":
z.FailedCount, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *BucketReplicationStats) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// write "PendingSize"
err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.PendingSize)
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
// write "ReplicatedSize"
err = en.Append(0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ReplicatedSize)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
// write "ReplicaSize"
err = en.Append(0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ReplicaSize)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
// write "FailedSize"
err = en.Append(0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.FailedSize)
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
// write "PendingCount"
err = en.Append(0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteUint64(z.PendingCount)
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
// write "FailedCount"
err = en.Append(0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteUint64(z.FailedCount)
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketReplicationStats) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// string "PendingSize"
o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.PendingSize)
// string "ReplicatedSize"
o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.ReplicatedSize)
// string "ReplicaSize"
o = append(o, 0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.ReplicaSize)
// string "FailedSize"
o = append(o, 0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendUint64(o, z.FailedSize)
// string "PendingCount"
o = append(o, 0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.PendingCount)
// string "FailedCount"
o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.FailedCount)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketReplicationStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "PendingSize":
z.PendingSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "PendingSize")
return
}
case "ReplicatedSize":
z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicatedSize")
return
}
case "ReplicaSize":
z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicaSize")
return
}
case "FailedSize":
z.FailedSize, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "FailedSize")
return
}
case "PendingCount":
z.PendingCount, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "PendingCount")
return
}
case "FailedCount":
z.FailedCount, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "FailedCount")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketReplicationStats) Msgsize() (s int) {
s = 1 + 12 + msgp.Uint64Size + 15 + msgp.Uint64Size + 12 + msgp.Uint64Size + 11 + msgp.Uint64Size + 13 + msgp.Uint64Size + 12 + msgp.Uint64Size
return
}
// DecodeMsg implements msgp.Decodable
func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
err = z.ReplicationStats.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "ReplicationStats"
err = en.Append(0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
if err != nil {
return
}
err = z.ReplicationStats.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "ReplicationStats"
o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
o, err = z.ReplicationStats.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *BucketStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "ReplicationStats":
bts, err = z.ReplicationStats.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "ReplicationStats")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketStats) Msgsize() (s int) {
s = 1 + 17 + z.ReplicationStats.Msgsize()
return
}

View File

@ -0,0 +1,236 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalBucketReplicationStats(t *testing.T) {
v := BucketReplicationStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketReplicationStats(t *testing.T) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketReplicationStats Msgsize() is inaccurate")
}
vn := BucketReplicationStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketReplicationStats(b *testing.B) {
v := BucketReplicationStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalBucketStats(t *testing.T) {
v := BucketStats{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgBucketStats(b *testing.B) {
v := BucketStats{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgBucketStats(b *testing.B) {
v := BucketStats{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalBucketStats(b *testing.B) {
v := BucketStats{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeBucketStats(t *testing.T) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeBucketStats Msgsize() is inaccurate")
}
vn := BucketStats{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeBucketStats(b *testing.B) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeBucketStats(b *testing.B) {
v := BucketStats{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -432,35 +432,56 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) {
}
// get the most current of in-memory replication stats and data usage info from crawler.
func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) BucketReplicationStats {
s := BucketReplicationStats{
PendingSize: u.ReplicationPendingSize,
FailedSize: u.ReplicationFailedSize,
ReplicatedSize: u.ReplicatedSize,
ReplicaSize: u.ReplicaSize,
PendingCount: u.ReplicationPendingCount,
FailedCount: u.ReplicationFailedCount,
func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s BucketReplicationStats) {
bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket)
replStats := BucketReplicationStats{}
for _, bucketStat := range bucketStats {
replStats.FailedCount += bucketStat.ReplicationStats.FailedCount
replStats.FailedSize += bucketStat.ReplicationStats.FailedSize
replStats.PendingCount += bucketStat.ReplicationStats.PendingCount
replStats.PendingSize += bucketStat.ReplicationStats.PendingSize
replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize
replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize
}
rStat := globalReplicationStats.Get(bucket)
// use in memory replication stats if it is ahead of usage info.
if rStat.ReplicatedSize > u.ReplicatedSize {
s.ReplicatedSize = rStat.ReplicatedSize
if replStats.ReplicatedSize > u.ReplicatedSize {
s.ReplicatedSize = replStats.ReplicatedSize
} else {
s.ReplicatedSize = u.ReplicatedSize
}
if rStat.PendingSize > u.ReplicationPendingSize {
s.PendingSize = rStat.PendingSize
if replStats.PendingSize > u.ReplicationPendingSize {
s.PendingSize = replStats.PendingSize
} else {
s.PendingSize = u.ReplicationPendingSize
}
if rStat.FailedSize > u.ReplicationFailedSize {
s.FailedSize = rStat.FailedSize
if replStats.FailedSize > u.ReplicationFailedSize {
s.FailedSize = replStats.FailedSize
} else {
s.FailedSize = u.ReplicationFailedSize
}
if rStat.ReplicaSize > u.ReplicaSize {
s.ReplicaSize = rStat.ReplicaSize
if replStats.ReplicaSize > u.ReplicaSize {
s.ReplicaSize = replStats.ReplicaSize
} else {
s.ReplicaSize = u.ReplicaSize
}
if rStat.PendingCount > u.ReplicationPendingCount {
s.PendingCount = rStat.PendingCount
if replStats.PendingCount > u.ReplicationPendingCount {
s.PendingCount = replStats.PendingCount
} else {
s.PendingCount = u.ReplicationPendingCount
}
if rStat.FailedCount > u.ReplicationFailedCount {
s.FailedCount = rStat.FailedCount
if replStats.FailedCount > u.ReplicationFailedCount {
s.FailedCount = replStats.FailedCount
} else {
s.FailedCount = u.ReplicationFailedCount
}
return s
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@ -615,6 +616,8 @@ func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, di
return best
}
var errPeerNotReachable = errors.New("peer is not reachable")
// GetLocks - makes GetLocks RPC call on all peers.
func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks {
locksResp := make([]*PeerLocks, len(sys.peerClients))
@ -623,7 +626,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
index := index
g.Go(func() error {
if client == nil {
return nil
return errPeerNotReachable
}
serverLocksResp, err := sys.peerClients[index].GetLocks()
if err != nil {
@ -671,6 +674,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s
// DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers
func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) {
globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName)
@ -694,6 +698,36 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName
}
}
// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view.
func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats {
ng := WithNPeers(len(sys.peerClients))
bucketStats := make([]BucketStats, len(sys.peerClients))
for index, client := range sys.peerClients {
index := index
ng.Go(ctx, func() error {
if client == nil {
return errPeerNotReachable
}
bs, err := client.GetBucketStats(bucketName)
if err != nil {
return err
}
bucketStats[index] = bs
return nil
}, index, *client.host)
}
for _, nErr := range ng.Wait() {
reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
if nErr.Err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
}
}
bucketStats = append(bucketStats, BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
})
return bucketStats
}
// Loads notification policies for all buckets into NotificationSys.
func (sys *NotificationSys) load(buckets []BucketInfo) {
for _, bucket := range buckets {

View File

@ -434,6 +434,20 @@ func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err
return data, err
}
// GetBucketStats - load bucket statistics
func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) {
values := make(url.Values)
values.Set(peerRESTBucket, bucket)
respBody, err := client.call(peerRESTMethodGetBucketStats, values, nil, -1)
if err != nil {
return BucketStats{}, err
}
var bs BucketStats
defer http.DrainBody(respBody)
return bs, msgp.Decode(respBody, &bs)
}
// LoadBucketMetadata - load bucket metadata
func (client *peerRESTClient) LoadBucketMetadata(bucket string) error {
values := make(url.Values)

View File

@ -17,7 +17,7 @@
package cmd
const (
peerRESTVersion = "v13" // Add storage tracing
peerRESTVersion = "v14" // Add GetBucketStats API
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -36,6 +36,7 @@ const (
peerRESTMethodDispatchNetInfo = "/dispatchnetinfo"
peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata"
peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata"
peerRESTMethodGetBucketStats = "/getbucketstats"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"

View File

@ -532,12 +532,35 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h
return
}
globalReplicationStats.Delete(bucketName)
globalBucketMetadataSys.Remove(bucketName)
if localMetacacheMgr != nil {
localMetacacheMgr.deleteBucketCache(bucketName)
}
}
// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only
// returns BucketReplicationStatus
func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
vars := mux.Vars(r)
bucketName := vars[peerRESTBucket]
if bucketName == "" {
s.writeErrorResponse(w, errors.New("Bucket name is missing"))
return
}
bs := BucketStats{
ReplicationStats: globalReplicationStats.Get(bucketName),
}
defer w.(http.Flusher).Flush()
logger.LogIf(r.Context(), msgp.Encode(w, &bs))
}
// LoadBucketMetadataHandler - reloads in memory bucket metadata
func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -1092,6 +1115,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(httpTraceAll(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)