diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 3e2e12eac..38ad749f6 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1244,7 +1244,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. return } } - globalReplicationStats.Delete(ctx, bucket) + // Write success response. writeSuccessNoContent(w) @@ -1634,8 +1634,19 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW return } - metrics := globalReplicationStats.Get(bucket) - if err := json.NewEncoder(w).Encode(&metrics); err != nil { + bucketStats := globalNotificationSys.GetClusterBucketStats(r.Context(), bucket) + bucketReplStats := BucketReplicationStats{} + + for _, bucketStat := range bucketStats { + bucketReplStats.FailedCount += bucketStat.ReplicationStats.FailedCount + bucketReplStats.FailedSize += bucketStat.ReplicationStats.FailedSize + bucketReplStats.PendingCount += bucketStat.ReplicationStats.PendingCount + bucketReplStats.PendingSize += bucketStat.ReplicationStats.PendingSize + bucketReplStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize + bucketReplStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize + } + + if err := json.NewEncoder(w).Encode(&bucketReplStats); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } diff --git a/cmd/bucket-replication-stats.go b/cmd/bucket-replication-stats.go index 844b1e6e9..4ce47ae48 100644 --- a/cmd/bucket-replication-stats.go +++ b/cmd/bucket-replication-stats.go @@ -24,23 +24,6 @@ import ( "github.com/minio/minio/pkg/bucket/replication" ) -// BucketReplicationStats represents inline replication statistics -// such as pending, failed and completed bytes in total for a bucket -type BucketReplicationStats struct { - // Pending size in bytes - PendingSize uint64 `json:"pendingReplicationSize"` - // Completed size in bytes - ReplicatedSize uint64 `json:"completedReplicationSize"` - // Total Replica size in bytes - ReplicaSize uint64 `json:"replicaSize"` - // Failed size in bytes - FailedSize uint64 `json:"failedReplicationSize"` - // Total number of pending operations including metadata updates - PendingCount uint64 `json:"pendingReplicationCount"` - // Total number of failed operations including metadata updates - FailedCount uint64 `json:"failedReplicationCount"` -} - func (b *BucketReplicationStats) hasReplicationUsage() bool { return b.PendingSize > 0 || b.FailedSize > 0 || @@ -57,7 +40,7 @@ type ReplicationStats struct { } // Delete deletes in-memory replication statistics for a bucket. -func (r *ReplicationStats) Delete(ctx context.Context, bucket string) { +func (r *ReplicationStats) Delete(bucket string) { if r == nil { return } @@ -68,7 +51,7 @@ func (r *ReplicationStats) Delete(ctx context.Context, bucket string) { } // Update updates in-memory replication statistics with new values. -func (r *ReplicationStats) Update(ctx context.Context, bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) { +func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) { if r == nil { return } @@ -125,10 +108,12 @@ func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { r.RLock() defer r.RUnlock() + st, ok := r.Cache[bucket] if !ok { return BucketReplicationStats{} } + return BucketReplicationStats{ PendingSize: atomic.LoadUint64(&st.PendingSize), FailedSize: atomic.LoadUint64(&st.FailedSize), diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 7ceb5a792..2ae4fb119 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -297,7 +297,8 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectA prevStatus = string(dobj.VersionPurgeStatus) currStatus = string(versionPurgeStatus) } - globalReplicationStats.Update(ctx, dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType) // to decrement pending count + // to decrement pending count later. + globalReplicationStats.Update(dobj.Bucket, 0, replication.StatusType(currStatus), replication.StatusType(prevStatus), replication.DeleteReplicationType) var eventName = event.ObjectReplicationComplete if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed { @@ -754,7 +755,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa if rtype == replicateAll { opType = replication.ObjectReplicationType } - globalReplicationStats.Update(ctx, bucket, size, replicationStatus, prevReplStatus, opType) + globalReplicationStats.Update(bucket, size, replicationStatus, prevReplStatus, opType) sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, @@ -1063,7 +1064,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, globalReplicationPool.queueReplicaTask(GlobalContext, objInfo) } if sz, err := objInfo.GetActualSize(); err == nil { - globalReplicationStats.Update(ctx, objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) + globalReplicationStats.Update(objInfo.Bucket, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType) } } @@ -1073,5 +1074,5 @@ func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, } else { globalReplicationPool.queueReplicaDeleteTask(GlobalContext, dv) } - globalReplicationStats.Update(ctx, dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) + globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go new file mode 100644 index 000000000..aba8921d7 --- /dev/null +++ b/cmd/bucket-stats.go @@ -0,0 +1,41 @@ +/* + * MinIO Cloud Storage, (C) 2021 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +//go:generate msgp -file $GOFILE + +// BucketStats bucket statistics +type BucketStats struct { + ReplicationStats BucketReplicationStats +} + +// BucketReplicationStats represents inline replication statistics +// such as pending, failed and completed bytes in total for a bucket +type BucketReplicationStats struct { + // Pending size in bytes + PendingSize uint64 `json:"pendingReplicationSize"` + // Completed size in bytes + ReplicatedSize uint64 `json:"completedReplicationSize"` + // Total Replica size in bytes + ReplicaSize uint64 `json:"replicaSize"` + // Failed size in bytes + FailedSize uint64 `json:"failedReplicationSize"` + // Total number of pending operations including metadata updates + PendingCount uint64 `json:"pendingReplicationCount"` + // Total number of failed operations including metadata updates + FailedCount uint64 `json:"failedReplicationCount"` +} diff --git a/cmd/bucket-stats_gen.go b/cmd/bucket-stats_gen.go new file mode 100644 index 000000000..278798104 --- /dev/null +++ b/cmd/bucket-stats_gen.go @@ -0,0 +1,342 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *BucketReplicationStats) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "PendingSize": + z.PendingSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + case "ReplicatedSize": + z.ReplicatedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "ReplicaSize": + z.ReplicaSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + case "FailedSize": + z.FailedSize, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "PendingCount": + z.PendingCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + case "FailedCount": + z.FailedCount, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BucketReplicationStats) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "PendingSize" + err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.PendingSize) + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + // write "ReplicatedSize" + err = en.Append(0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.ReplicatedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + // write "ReplicaSize" + err = en.Append(0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.ReplicaSize) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + // write "FailedSize" + err = en.Append(0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.FailedSize) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + // write "PendingCount" + err = en.Append(0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteUint64(z.PendingCount) + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + // write "FailedCount" + err = en.Append(0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteUint64(z.FailedCount) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BucketReplicationStats) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "PendingSize" + o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendUint64(o, z.PendingSize) + // string "ReplicatedSize" + o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendUint64(o, z.ReplicatedSize) + // string "ReplicaSize" + o = append(o, 0xab, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendUint64(o, z.ReplicaSize) + // string "FailedSize" + o = append(o, 0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendUint64(o, z.FailedSize) + // string "PendingCount" + o = append(o, 0xac, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x43, 0x6f, 0x75, 0x6e, 0x74) + o = msgp.AppendUint64(o, z.PendingCount) + // string "FailedCount" + o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + o = msgp.AppendUint64(o, z.FailedCount) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BucketReplicationStats) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "PendingSize": + z.PendingSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PendingSize") + return + } + case "ReplicatedSize": + z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "ReplicaSize": + z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicaSize") + return + } + case "FailedSize": + z.FailedSize, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "PendingCount": + z.PendingCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PendingCount") + return + } + case "FailedCount": + z.FailedCount, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BucketReplicationStats) Msgsize() (s int) { + s = 1 + 12 + msgp.Uint64Size + 15 + msgp.Uint64Size + 12 + msgp.Uint64Size + 11 + msgp.Uint64Size + 13 + msgp.Uint64Size + 12 + msgp.Uint64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + err = z.ReplicationStats.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "ReplicationStats") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 1 + // write "ReplicationStats" + err = en.Append(0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) + if err != nil { + return + } + err = z.ReplicationStats.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "ReplicationStats") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 1 + // string "ReplicationStats" + o = append(o, 0x81, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73) + o, err = z.ReplicationStats.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "ReplicationStats") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BucketStats) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ReplicationStats": + bts, err = z.ReplicationStats.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicationStats") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BucketStats) Msgsize() (s int) { + s = 1 + 17 + z.ReplicationStats.Msgsize() + return +} diff --git a/cmd/bucket-stats_gen_test.go b/cmd/bucket-stats_gen_test.go new file mode 100644 index 000000000..6c435d5ec --- /dev/null +++ b/cmd/bucket-stats_gen_test.go @@ -0,0 +1,236 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalBucketReplicationStats(t *testing.T) { + v := BucketReplicationStats{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBucketReplicationStats(b *testing.B) { + v := BucketReplicationStats{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBucketReplicationStats(b *testing.B) { + v := BucketReplicationStats{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBucketReplicationStats(b *testing.B) { + v := BucketReplicationStats{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBucketReplicationStats(t *testing.T) { + v := BucketReplicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBucketReplicationStats Msgsize() is inaccurate") + } + + vn := BucketReplicationStats{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBucketReplicationStats(b *testing.B) { + v := BucketReplicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBucketReplicationStats(b *testing.B) { + v := BucketReplicationStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalBucketStats(t *testing.T) { + v := BucketStats{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBucketStats(b *testing.B) { + v := BucketStats{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBucketStats(b *testing.B) { + v := BucketStats{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBucketStats(b *testing.B) { + v := BucketStats{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBucketStats(t *testing.T) { + v := BucketStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBucketStats Msgsize() is inaccurate") + } + + vn := BucketStats{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBucketStats(b *testing.B) { + v := BucketStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBucketStats(b *testing.B) { + v := BucketStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/metrics.go b/cmd/metrics.go index 1f86be8c5..2eb0b5260 100644 --- a/cmd/metrics.go +++ b/cmd/metrics.go @@ -432,35 +432,56 @@ func networkMetricsPrometheus(ch chan<- prometheus.Metric) { } // get the most current of in-memory replication stats and data usage info from crawler. -func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) BucketReplicationStats { - s := BucketReplicationStats{ - PendingSize: u.ReplicationPendingSize, - FailedSize: u.ReplicationFailedSize, - ReplicatedSize: u.ReplicatedSize, - ReplicaSize: u.ReplicaSize, - PendingCount: u.ReplicationPendingCount, - FailedCount: u.ReplicationFailedCount, +func getLatestReplicationStats(bucket string, u madmin.BucketUsageInfo) (s BucketReplicationStats) { + bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) + + replStats := BucketReplicationStats{} + for _, bucketStat := range bucketStats { + replStats.FailedCount += bucketStat.ReplicationStats.FailedCount + replStats.FailedSize += bucketStat.ReplicationStats.FailedSize + replStats.PendingCount += bucketStat.ReplicationStats.PendingCount + replStats.PendingSize += bucketStat.ReplicationStats.PendingSize + replStats.ReplicaSize += bucketStat.ReplicationStats.ReplicaSize + replStats.ReplicatedSize += bucketStat.ReplicationStats.ReplicatedSize } - rStat := globalReplicationStats.Get(bucket) + // use in memory replication stats if it is ahead of usage info. - if rStat.ReplicatedSize > u.ReplicatedSize { - s.ReplicatedSize = rStat.ReplicatedSize + if replStats.ReplicatedSize > u.ReplicatedSize { + s.ReplicatedSize = replStats.ReplicatedSize + } else { + s.ReplicatedSize = u.ReplicatedSize } - if rStat.PendingSize > u.ReplicationPendingSize { - s.PendingSize = rStat.PendingSize + + if replStats.PendingSize > u.ReplicationPendingSize { + s.PendingSize = replStats.PendingSize + } else { + s.PendingSize = u.ReplicationPendingSize } - if rStat.FailedSize > u.ReplicationFailedSize { - s.FailedSize = rStat.FailedSize + + if replStats.FailedSize > u.ReplicationFailedSize { + s.FailedSize = replStats.FailedSize + } else { + s.FailedSize = u.ReplicationFailedSize } - if rStat.ReplicaSize > u.ReplicaSize { - s.ReplicaSize = rStat.ReplicaSize + + if replStats.ReplicaSize > u.ReplicaSize { + s.ReplicaSize = replStats.ReplicaSize + } else { + s.ReplicaSize = u.ReplicaSize } - if rStat.PendingCount > u.ReplicationPendingCount { - s.PendingCount = rStat.PendingCount + + if replStats.PendingCount > u.ReplicationPendingCount { + s.PendingCount = replStats.PendingCount + } else { + s.PendingCount = u.ReplicationPendingCount } - if rStat.FailedCount > u.ReplicationFailedCount { - s.FailedCount = rStat.FailedCount + + if replStats.FailedCount > u.ReplicationFailedCount { + s.FailedCount = replStats.FailedCount + } else { + s.FailedCount = u.ReplicationFailedCount } + return s } diff --git a/cmd/notification.go b/cmd/notification.go index 647f96456..7c7612d65 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -615,6 +616,8 @@ func (sys *NotificationSys) findEarliestCleanBloomFilter(ctx context.Context, di return best } +var errPeerNotReachable = errors.New("peer is not reachable") + // GetLocks - makes GetLocks RPC call on all peers. func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*PeerLocks { locksResp := make([]*PeerLocks, len(sys.peerClients)) @@ -623,7 +626,7 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe index := index g.Go(func() error { if client == nil { - return nil + return errPeerNotReachable } serverLocksResp, err := sys.peerClients[index].GetLocks() if err != nil { @@ -671,6 +674,7 @@ func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName s // DeleteBucketMetadata - calls DeleteBucketMetadata call on all peers func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName string) { + globalReplicationStats.Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) if localMetacacheMgr != nil { localMetacacheMgr.deleteBucketCache(bucketName) @@ -694,6 +698,36 @@ func (sys *NotificationSys) DeleteBucketMetadata(ctx context.Context, bucketName } } +// GetClusterBucketStats - calls GetClusterBucketStats call on all peers for a cluster statistics view. +func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketName string) []BucketStats { + ng := WithNPeers(len(sys.peerClients)) + bucketStats := make([]BucketStats, len(sys.peerClients)) + for index, client := range sys.peerClients { + index := index + ng.Go(ctx, func() error { + if client == nil { + return errPeerNotReachable + } + bs, err := client.GetBucketStats(bucketName) + if err != nil { + return err + } + bucketStats[index] = bs + return nil + }, index, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) + } + } + bucketStats = append(bucketStats, BucketStats{ + ReplicationStats: globalReplicationStats.Get(bucketName), + }) + return bucketStats +} + // Loads notification policies for all buckets into NotificationSys. func (sys *NotificationSys) load(buckets []BucketInfo) { for _, bucket := range buckets { diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 3e4ad01c9..adec89215 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -434,6 +434,20 @@ func (client *peerRESTClient) DownloadProfileData() (data map[string][]byte, err return data, err } +// GetBucketStats - load bucket statistics +func (client *peerRESTClient) GetBucketStats(bucket string) (BucketStats, error) { + values := make(url.Values) + values.Set(peerRESTBucket, bucket) + respBody, err := client.call(peerRESTMethodGetBucketStats, values, nil, -1) + if err != nil { + return BucketStats{}, err + } + + var bs BucketStats + defer http.DrainBody(respBody) + return bs, msgp.Decode(respBody, &bs) +} + // LoadBucketMetadata - load bucket metadata func (client *peerRESTClient) LoadBucketMetadata(bucket string) error { values := make(url.Values) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 66cb8c4ad..3311df7f2 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - peerRESTVersion = "v13" // Add storage tracing + peerRESTVersion = "v14" // Add GetBucketStats API peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -36,6 +36,7 @@ const ( peerRESTMethodDispatchNetInfo = "/dispatchnetinfo" peerRESTMethodDeleteBucketMetadata = "/deletebucketmetadata" peerRESTMethodLoadBucketMetadata = "/loadbucketmetadata" + peerRESTMethodGetBucketStats = "/getbucketstats" peerRESTMethodServerUpdate = "/serverupdate" peerRESTMethodSignalService = "/signalservice" peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 19b2e2072..2372a9734 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -532,12 +532,35 @@ func (s *peerRESTServer) DeleteBucketMetadataHandler(w http.ResponseWriter, r *h return } + globalReplicationStats.Delete(bucketName) globalBucketMetadataSys.Remove(bucketName) if localMetacacheMgr != nil { localMetacacheMgr.deleteBucketCache(bucketName) } } +// GetBucketStatsHandler - fetches current in-memory bucket stats, currently only +// returns BucketReplicationStatus +func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("Invalid request")) + return + } + + vars := mux.Vars(r) + bucketName := vars[peerRESTBucket] + if bucketName == "" { + s.writeErrorResponse(w, errors.New("Bucket name is missing")) + return + } + + bs := BucketStats{ + ReplicationStats: globalReplicationStats.Get(bucketName), + } + defer w.(http.Flusher).Flush() + logger.LogIf(r.Context(), msgp.Encode(w, &bs)) +} + // LoadBucketMetadataHandler - reloads in memory bucket metadata func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1092,6 +1115,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCycleBloom).HandlerFunc(httpTraceHdrs(server.CycleServerBloomFilterHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteBucketMetadata).HandlerFunc(httpTraceHdrs(server.DeleteBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadBucketMetadata).HandlerFunc(httpTraceHdrs(server.LoadBucketMetadataHandler)).Queries(restQueries(peerRESTBucket)...) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetBucketStats).HandlerFunc(httpTraceHdrs(server.GetBucketStatsHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSignalService).HandlerFunc(httpTraceHdrs(server.SignalServiceHandler)).Queries(restQueries(peerRESTSignal)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerUpdate).HandlerFunc(httpTraceHdrs(server.ServerUpdateHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeletePolicy).HandlerFunc(httpTraceAll(server.DeletePolicyHandler)).Queries(restQueries(peerRESTPolicy)...)