metrics: Add replication latency metrics (#13515)

Add a new Prometheus metric for bucket replication latency

e.g.:
minio_bucket_replication_latency_ns{
    bucket="testbucket",
    operation="upload",
    range="LESS_THAN_1_MiB",
    server="127.0.0.1:9001",
    targetArn="arn:minio:replication::45da043c-14f5-4da4-9316-aba5f77bf730:testbucket"} 2.2015663e+07

Co-authored-by: Klaus Post <klauspost@gmail.com>
This commit is contained in:
Anis Elleuch 2021-11-17 21:10:57 +01:00 committed by GitHub
parent 5b68f8ea6a
commit 4caed7cc0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1237 additions and 34 deletions

View File

@ -20,7 +20,6 @@ package cmd
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/minio/minio/internal/bucket/replication"
@ -68,16 +67,18 @@ func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) {
if !ok {
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
}
atomic.AddInt64(&bs.ReplicaSize, n)
bs.ReplicaSize += n
r.Cache[bucket] = bs
}
// Update updates in-memory replication statistics with new values.
func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
func (r *ReplicationStats) Update(bucket string, arn string, n int64, duration time.Duration, status, prevStatus replication.StatusType, opType replication.Type) {
if r == nil {
return
}
r.RLock()
r.Lock()
defer r.Unlock()
bs, ok := r.Cache[bucket]
if !ok {
bs = &BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)}
@ -86,36 +87,37 @@ func (r *ReplicationStats) Update(bucket string, arn string, n int64, status, pr
if !ok {
b = &BucketReplicationStat{}
}
r.RUnlock()
switch status {
case replication.Completed:
switch prevStatus { // adjust counters based on previous state
case replication.Failed:
atomic.AddInt64(&b.FailedCount, -1)
b.FailedCount--
}
if opType == replication.ObjectReplicationType {
atomic.AddInt64(&b.ReplicatedSize, n)
b.ReplicatedSize += n
switch prevStatus {
case replication.Failed:
atomic.AddInt64(&b.FailedSize, -1*n)
b.FailedSize -= n
}
if duration > 0 {
b.Latency.update(n, duration)
}
}
case replication.Failed:
if opType == replication.ObjectReplicationType {
if prevStatus == replication.Pending {
atomic.AddInt64(&b.FailedSize, n)
atomic.AddInt64(&b.FailedCount, 1)
b.FailedSize += n
b.FailedCount++
}
}
case replication.Replica:
if opType == replication.ObjectReplicationType {
atomic.AddInt64(&b.ReplicaSize, n)
b.ReplicaSize += n
}
}
r.Lock()
bs.Stats[arn] = b
r.Cache[bucket] = bs
r.Unlock()
}
// GetInitialUsage get replication metrics available at the time of cluster initialization

View File

@ -34,6 +34,7 @@ import (
type replicatedTargetInfo struct {
Arn string
Size int64
Duration time.Duration
ReplicationAction replicationAction // full or metadata only
OpType replication.Type // whether incoming replication, existing object, healing etc..
ReplicationStatus replication.StatusType

View File

@ -433,7 +433,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
// to decrement pending count later.
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, replicationStatus,
globalReplicationStats.Update(dobj.Bucket, rinfo.Arn, 0, 0, replicationStatus,
prevStatus, replication.DeleteReplicationType)
}
}
@ -938,7 +938,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
}
for _, rinfo := range rinfos.Targets {
if rinfo.ReplicationStatus != rinfo.PrevReplicationStatus {
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
globalReplicationStats.Update(bucket, rinfo.Arn, rinfo.Size, rinfo.Duration, rinfo.ReplicationStatus, rinfo.PrevReplicationStatus, opType)
}
}
}
@ -963,6 +963,7 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
// replicateObjectToTarget replicates the specified version of the object to destination bucket
// The source object is then updated to reflect the replication status.
func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) {
startTime := time.Now()
objInfo := ri.ObjectInfo.Clone()
bucket := objInfo.Bucket
object := objInfo.Name
@ -1100,6 +1101,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID)
rinfo.ReplicationResynced = true
}
rinfo.Duration = time.Since(startTime)
}()
// use core client to avoid doing multipart on PUT
c := &miniogo.Core{Client: tgt.Client}
@ -1634,7 +1636,7 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
}
if sz, err := objInfo.GetActualSize(); err == nil {
for arn := range dsc.targetsMap {
globalReplicationStats.Update(objInfo.Bucket, arn, sz, objInfo.ReplicationStatus, replication.StatusType(""), opType)
globalReplicationStats.Update(objInfo.Bucket, arn, sz, 0, objInfo.ReplicationStatus, replication.StatusType(""), opType)
}
}
}
@ -1642,10 +1644,10 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer,
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer) {
globalReplicationPool.queueReplicaDeleteTask(dv)
for arn := range dv.ReplicationState.Targets {
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
for arn := range dv.ReplicationState.PurgeTargets {
globalReplicationStats.Update(dv.Bucket, arn, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
globalReplicationStats.Update(dv.Bucket, arn, 0, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType)
}
}

View File

@ -19,10 +19,46 @@ package cmd
import (
"sync/atomic"
"time"
)
//go:generate msgp -file $GOFILE
// ReplicationLatency holds information of bucket operations latency, such us uploads
type ReplicationLatency struct {
// Single & Multipart PUTs latency
UploadHistogram LastMinuteLatencies
}
// Merge two replication latency into a new one
func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency ReplicationLatency) {
newReplLatency.UploadHistogram = rl.UploadHistogram.Merge(other.UploadHistogram)
return
}
// Get upload latency of each object size range
func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) {
ret = make(map[string]uint64)
avg := rl.UploadHistogram.GetAvg()
for k, v := range avg {
// Convert nanoseconds to milliseconds
ret[sizeTagToString(k)] = v.avg() / uint64(time.Millisecond)
}
return
}
// Update replication upload latency with a new value
func (rl *ReplicationLatency) update(size int64, duration time.Duration) {
rl.UploadHistogram.Add(size, duration)
}
// Clone replication latency
func (rl ReplicationLatency) clone() ReplicationLatency {
return ReplicationLatency{
UploadHistogram: rl.UploadHistogram.Clone(),
}
}
// BucketStats bucket statistics
type BucketStats struct {
ReplicationStats BucketReplicationStats
@ -65,6 +101,7 @@ func (brs BucketReplicationStats) Clone() BucketReplicationStats {
FailedCount: atomic.LoadInt64(&st.FailedCount),
PendingSize: atomic.LoadInt64(&st.PendingSize),
PendingCount: atomic.LoadInt64(&st.PendingCount),
Latency: st.Latency.clone(),
}
}
// update total counts across targets
@ -93,6 +130,8 @@ type BucketReplicationStat struct {
PendingCount int64 `json:"pendingReplicationCount"`
// Total number of failed operations including metadata updates
FailedCount int64 `json:"failedReplicationCount"`
// Replication latency information
Latency ReplicationLatency `json:"replicationLatency"`
}
func (bs *BucketReplicationStat) hasReplicationUsage() bool {

View File

@ -60,6 +60,35 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
case "Latency":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
for zb0002 > 0 {
zb0002--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
err = z.Latency.UploadHistogram.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
}
}
default:
err = dc.Skip()
if err != nil {
@ -73,9 +102,9 @@ func (z *BucketReplicationStat) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 6
// map header, size 7
// write "PendingSize"
err = en.Append(0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
err = en.Append(0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
@ -134,15 +163,31 @@ func (z *BucketReplicationStat) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
// write "Latency"
err = en.Append(0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79)
if err != nil {
return
}
// map header, size 1
// write "UploadHistogram"
err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
if err != nil {
return
}
err = z.Latency.UploadHistogram.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 6
// map header, size 7
// string "PendingSize"
o = append(o, 0x86, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = append(o, 0x87, 0xab, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z.PendingSize)
// string "ReplicatedSize"
o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65)
@ -159,6 +204,16 @@ func (z *BucketReplicationStat) MarshalMsg(b []byte) (o []byte, err error) {
// string "FailedCount"
o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74)
o = msgp.AppendInt64(o, z.FailedCount)
// string "Latency"
o = append(o, 0xa7, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x79)
// map header, size 1
// string "UploadHistogram"
o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
o, err = z.Latency.UploadHistogram.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
return
}
@ -216,6 +271,35 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "FailedCount")
return
}
case "Latency":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
for zb0002 > 0 {
zb0002--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
bts, err = z.Latency.UploadHistogram.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "Latency", "UploadHistogram")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Latency")
return
}
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -230,7 +314,7 @@ func (z *BucketReplicationStat) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *BucketReplicationStat) Msgsize() (s int) {
s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size
s = 1 + 12 + msgp.Int64Size + 15 + msgp.Int64Size + 12 + msgp.Int64Size + 11 + msgp.Int64Size + 13 + msgp.Int64Size + 12 + msgp.Int64Size + 8 + 1 + 16 + z.Latency.UploadHistogram.Msgsize()
return
}
@ -707,3 +791,110 @@ func (z *BucketStats) Msgsize() (s int) {
s = 1 + 17 + z.ReplicationStats.Msgsize()
return
}
// DecodeMsg implements msgp.Decodable
func (z *ReplicationLatency) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
err = z.UploadHistogram.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *ReplicationLatency) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1
// write "UploadHistogram"
err = en.Append(0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
if err != nil {
return
}
err = z.UploadHistogram.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *ReplicationLatency) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 1
// string "UploadHistogram"
o = append(o, 0x81, 0xaf, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x48, 0x69, 0x73, 0x74, 0x6f, 0x67, 0x72, 0x61, 0x6d)
o, err = z.UploadHistogram.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *ReplicationLatency) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "UploadHistogram":
bts, err = z.UploadHistogram.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "UploadHistogram")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *ReplicationLatency) Msgsize() (s int) {
s = 1 + 16 + z.UploadHistogram.Msgsize()
return
}

View File

@ -347,3 +347,116 @@ func BenchmarkDecodeBucketStats(b *testing.B) {
}
}
}
func TestMarshalUnmarshalReplicationLatency(t *testing.T) {
v := ReplicationLatency{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeReplicationLatency(t *testing.T) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeReplicationLatency Msgsize() is inaccurate")
}
vn := ReplicationLatency{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeReplicationLatency(b *testing.B) {
v := ReplicationLatency{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

187
cmd/last-minute.go Normal file
View File

@ -0,0 +1,187 @@
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"time"
)
const (
sizeLessThan1KiB = iota
sizeLessThan1MiB
sizeLessThan10MiB
sizeLessThan100MiB
sizeLessThan1GiB
sizeGreaterThan1GiB
// Add new entries here
sizeLastElemMarker
)
// sizeToTag converts a size to a tag.
func sizeToTag(size int64) int {
switch {
case size < 1024:
return sizeLessThan1KiB
case size < 1024*1024:
return sizeLessThan1MiB
case size < 10*1024*1024:
return sizeLessThan10MiB
case size < 100*1024*1024:
return sizeLessThan100MiB
case size < 1024*1024*1024:
return sizeLessThan1GiB
default:
return sizeGreaterThan1GiB
}
}
func sizeTagToString(tag int) string {
switch tag {
case sizeLessThan1KiB:
return "LESS_THAN_1_KiB"
case sizeLessThan1MiB:
return "LESS_THAN_1_MiB"
case sizeLessThan10MiB:
return "LESS_THAN_10_MiB"
case sizeLessThan100MiB:
return "LESS_THAN_100_MiB"
case sizeLessThan1GiB:
return "LESS_THAN_1_GiB"
case sizeGreaterThan1GiB:
return "GREATER_THAN_1_GiB"
default:
return "unknown"
}
}
// AccElem holds information for calculating an average value
type AccElem struct {
Total int64
N int64
}
// add dur to a as a single element.
func (a *AccElem) add(dur time.Duration) {
a.Total += int64(dur)
a.N++
}
// merge b into a.
func (a *AccElem) merge(b AccElem) {
a.N += b.N
a.Total += b.Total
}
// avg converts total to average.
func (a *AccElem) avg() uint64 {
if a.N >= 1 && a.Total > 0 {
return uint64(a.Total / a.N)
}
return 0
}
// LastMinuteLatencies keeps track of last minute latencies.
type LastMinuteLatencies struct {
Totals [60][sizeLastElemMarker]AccElem
LastSec int64
}
// Clone safely returns a copy for a LastMinuteLatencies structure
func (l *LastMinuteLatencies) Clone() LastMinuteLatencies {
n := LastMinuteLatencies{}
n.LastSec = l.LastSec
for i := range l.Totals {
for j := range l.Totals[i] {
n.Totals[i][j] = AccElem{
Total: l.Totals[i][j].Total,
N: l.Totals[i][j].N,
}
}
}
return n
}
// Merge safely merges two LastMinuteLatencies structures into one
func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) {
cl := l.Clone()
co := o.Clone()
if cl.LastSec > co.LastSec {
co.forwardTo(cl.LastSec)
merged.LastSec = cl.LastSec
} else {
cl.forwardTo(co.LastSec)
merged.LastSec = co.LastSec
}
for i := range cl.Totals {
for j := range cl.Totals[i] {
merged.Totals[i][j] = AccElem{
Total: cl.Totals[i][j].Total + co.Totals[i][j].Total,
N: cl.Totals[i][j].N + co.Totals[i][j].N,
}
}
}
return merged
}
// Add latency t from object with the specified size.
func (l *LastMinuteLatencies) Add(size int64, t time.Duration) {
tag := sizeToTag(size)
// Update...
sec := time.Now().Unix()
l.forwardTo(sec)
winIdx := sec % 60
l.Totals[winIdx][tag].add(t)
l.LastSec = sec
}
// GetAvg will return the average for each bucket from the last time minute.
// The number of objects is also included.
func (l *LastMinuteLatencies) GetAvg() [sizeLastElemMarker]AccElem {
var res [sizeLastElemMarker]AccElem
sec := time.Now().Unix()
l.forwardTo(sec)
for _, elems := range l.Totals[:] {
for j := range elems {
res[j].merge(elems[j])
}
}
return res
}
// forwardTo time t, clearing any entries in between.
func (l *LastMinuteLatencies) forwardTo(t int64) {
if l.LastSec >= t {
return
}
if t-l.LastSec >= 60 {
l.Totals = [60][sizeLastElemMarker]AccElem{}
return
}
for l.LastSec != t {
// Clear next element.
idx := (l.LastSec + 1) % 60
l.Totals[idx] = [sizeLastElemMarker]AccElem{}
l.LastSec++
}
}

411
cmd/last-minute_gen.go Normal file
View File

@ -0,0 +1,411 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *AccElem) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Total, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
case "N":
z.N, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "N")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z AccElem) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteInt64(z.Total)
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
return
}
err = en.WriteInt64(z.N)
if err != nil {
err = msgp.WrapError(err, "N")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z AccElem) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Total)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.N)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *AccElem) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Total, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Total")
return
}
case "N":
z.N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "N")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z AccElem) Msgsize() (s int) {
s = 1 + 6 + msgp.Int64Size + 2 + msgp.Int64Size
return
}
// DecodeMsg implements msgp.Decodable
func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Totals":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Totals")
return
}
if zb0002 != uint32(60) {
err = msgp.ArrayError{Wanted: uint32(60), Got: zb0002}
return
}
for za0001 := range z.Totals {
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0002 := range z.Totals[za0001] {
var zb0004 uint32
zb0004, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
for zb0004 > 0 {
zb0004--
field, err = dc.ReadMapKeyPtr()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001][za0002].Total, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
case "N":
z.Totals[za0001][za0002].N, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
}
}
}
}
case "LastSec":
z.LastSec, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "LastSec")
return
}
default:
err = dc.Skip()
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "Totals"
err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(60))
if err != nil {
err = msgp.WrapError(err, "Totals")
return
}
for za0001 := range z.Totals {
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
for za0002 := range z.Totals[za0001] {
// map header, size 2
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001][za0002].Total)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001][za0002].N)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
}
}
// write "LastSec"
err = en.Append(0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63)
if err != nil {
return
}
err = en.WriteInt64(z.LastSec)
if err != nil {
err = msgp.WrapError(err, "LastSec")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "Totals"
o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(60))
for za0001 := range z.Totals {
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0002 := range z.Totals[za0001] {
// map header, size 2
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Totals[za0001][za0002].Total)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.Totals[za0001][za0002].N)
}
}
// string "LastSec"
o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63)
o = msgp.AppendInt64(o, z.LastSec)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Totals":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals")
return
}
if zb0002 != uint32(60) {
err = msgp.ArrayError{Wanted: uint32(60), Got: zb0002}
return
}
for za0001 := range z.Totals {
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001)
return
}
if zb0003 != uint32(sizeLastElemMarker) {
err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003}
return
}
for za0002 := range z.Totals[za0001] {
var zb0004 uint32
zb0004, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
for zb0004 > 0 {
zb0004--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
switch msgp.UnsafeString(field) {
case "Total":
z.Totals[za0001][za0002].Total, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "Total")
return
}
case "N":
z.Totals[za0001][za0002].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002, "N")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, za0002)
return
}
}
}
}
}
case "LastSec":
z.LastSec, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "LastSec")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *LastMinuteLatencies) Msgsize() (s int) {
s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (sizeLastElemMarker * (9 + msgp.Int64Size + msgp.Int64Size))) + 8 + msgp.Int64Size
return
}

236
cmd/last-minute_gen_test.go Normal file
View File

@ -0,0 +1,236 @@
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshalAccElem(t *testing.T) {
v := AccElem{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgAccElem(b *testing.B) {
v := AccElem{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgAccElem(b *testing.B) {
v := AccElem{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalAccElem(b *testing.B) {
v := AccElem{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeAccElem(t *testing.T) {
v := AccElem{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeAccElem Msgsize() is inaccurate")
}
vn := AccElem{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeAccElem(b *testing.B) {
v := AccElem{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeAccElem(b *testing.B) {
v := AccElem{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalLastMinuteLatencies(t *testing.T) {
v := LastMinuteLatencies{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodeLastMinuteLatencies(t *testing.T) {
v := LastMinuteLatencies{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeLastMinuteLatencies Msgsize() is inaccurate")
}
vn := LastMinuteLatencies{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodeLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodeLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}

View File

@ -99,17 +99,18 @@ const (
total MetricName = "total"
freeInodes MetricName = "free_inodes"
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
sentBytes MetricName = "sent_bytes"
totalBytes MetricName = "total_bytes"
usedBytes MetricName = "used_bytes"
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
failedCount MetricName = "failed_count"
failedBytes MetricName = "failed_bytes"
freeBytes MetricName = "free_bytes"
readBytes MetricName = "read_bytes"
rcharBytes MetricName = "rchar_bytes"
receivedBytes MetricName = "received_bytes"
latencyMilliSec MetricName = "latency_ms"
sentBytes MetricName = "sent_bytes"
totalBytes MetricName = "total_bytes"
usedBytes MetricName = "used_bytes"
writeBytes MetricName = "write_bytes"
wcharBytes MetricName = "wchar_bytes"
usagePercent MetricName = "update_percent"
@ -409,6 +410,16 @@ func getBucketUsageObjectsTotalMD() MetricDescription {
}
}
func getBucketRepLatencyMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
Subsystem: replicationSubsystem,
Name: latencyMilliSec,
Help: "Replication latency in milliseconds.",
Type: histogramMetric,
}
}
func getBucketRepFailedBytesMD() MetricDescription {
return MetricDescription{
Namespace: bucketMetricNamespace,
@ -1482,6 +1493,13 @@ func getBucketUsageMetrics() MetricsGroup {
Value: float64(stat.FailedCount),
VariableLabels: map[string]string{"bucket": bucket, "targetArn": arn},
})
metrics = append(metrics, Metric{
Description: getBucketRepLatencyMD(),
HistogramBucketLabel: "range",
Histogram: stat.Latency.getUploadLatency(),
VariableLabels: map[string]string{"bucket": bucket, "operation": "upload", "targetArn": arn},
})
}
}

View File

@ -454,6 +454,7 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
FailedCount: stat.FailedCount + oldst.FailedCount,
FailedSize: stat.FailedSize + oldst.FailedSize,
ReplicatedSize: stat.ReplicatedSize + oldst.ReplicatedSize,
Latency: stat.Latency.merge(oldst.Latency),
}
}
}
@ -502,6 +503,8 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic
// happen since data usage picture can lag behind actual usage state at the time of cluster start
st.FailedSize = int64(math.Max(float64(tgtstat.FailedSize), 0))
st.FailedCount = int64(math.Max(float64(tgtstat.FailedCount), 0))
st.Latency = tgtstat.Latency
s.Stats[arn] = &st
s.FailedSize += st.FailedSize
s.FailedCount += st.FailedCount