diff --git a/cmd/bucket-stats.go b/cmd/bucket-stats.go index da94be00b..eef2b2aed 100644 --- a/cmd/bucket-stats.go +++ b/cmd/bucket-stats.go @@ -38,7 +38,7 @@ func (rl ReplicationLatency) merge(other ReplicationLatency) (newReplLatency Rep // Get upload latency of each object size range func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) { ret = make(map[string]uint64) - avg := rl.UploadHistogram.GetAvg() + avg := rl.UploadHistogram.GetAvgData() for k, v := range avg { // Convert nanoseconds to milliseconds ret[sizeTagToString(k)] = v.avg() / uint64(time.Millisecond) diff --git a/cmd/last-minute.go b/cmd/last-minute.go index d87c589fa..6d715c14e 100644 --- a/cmd/last-minute.go +++ b/cmd/last-minute.go @@ -15,6 +15,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +//go:generate msgp -file=$GOFILE -unexported + package cmd import ( @@ -76,34 +78,34 @@ type AccElem struct { N int64 } -// add dur to a as a single element. +// Add a duration to a single element. func (a *AccElem) add(dur time.Duration) { a.Total += int64(dur) a.N++ } -// merge b into a. +// Merge b into a. func (a *AccElem) merge(b AccElem) { a.N += b.N a.Total += b.Total } -// avg converts total to average. -func (a *AccElem) avg() uint64 { +// Avg converts total to average. +func (a AccElem) avg() uint64 { if a.N >= 1 && a.Total > 0 { return uint64(a.Total / a.N) } return 0 } -// LastMinuteLatencies keeps track of last minute latencies. -type LastMinuteLatencies struct { - Totals [60][sizeLastElemMarker]AccElem +// lastMinuteLatency keeps track of last minute latency. +type lastMinuteLatency struct { + Totals [60]AccElem LastSec int64 } -// Merge safely merges two LastMinuteLatencies structures into one -func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) { +// Merge data of two lastMinuteLatency structure +func (l lastMinuteLatency) merge(o lastMinuteLatency) (merged lastMinuteLatency) { if l.LastSec > o.LastSec { o.forwardTo(l.LastSec) merged.LastSec = l.LastSec @@ -113,57 +115,73 @@ func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLate } for i := range merged.Totals { - for j := range merged.Totals[i] { - merged.Totals[i][j] = AccElem{ - Total: l.Totals[i][j].Total + o.Totals[i][j].Total, - N: l.Totals[i][j].N + o.Totals[i][j].N, - } + merged.Totals[i] = AccElem{ + Total: l.Totals[i].Total + o.Totals[i].Total, + N: l.Totals[i].N + o.Totals[i].N, } } return merged } +// Add a new duration data +func (l *lastMinuteLatency) add(t time.Duration) { + sec := time.Now().Unix() + l.forwardTo(sec) + winIdx := sec % 60 + l.Totals[winIdx].add(t) + l.LastSec = sec +} + +// Merge all recorded latencies of last minute into one +func (l *lastMinuteLatency) getAvgData() AccElem { + var res AccElem + sec := time.Now().Unix() + l.forwardTo(sec) + for _, elem := range l.Totals[:] { + res.merge(elem) + } + return res +} + +// forwardTo time t, clearing any entries in between. +func (l *lastMinuteLatency) forwardTo(t int64) { + if l.LastSec >= t { + return + } + if t-l.LastSec >= 60 { + l.Totals = [60]AccElem{} + return + } + for l.LastSec != t { + // Clear next element. + idx := (l.LastSec + 1) % 60 + l.Totals[idx] = AccElem{} + l.LastSec++ + } +} + +// LastMinuteLatencies keeps track of last minute latencies. +type LastMinuteLatencies [sizeLastElemMarker]lastMinuteLatency + +// Merge safely merges two LastMinuteLatencies structures into one +func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) { + for i := range l { + merged[i] = l[i].merge(o[i]) + } + return merged +} + // Add latency t from object with the specified size. func (l *LastMinuteLatencies) Add(size int64, t time.Duration) { - tag := sizeToTag(size) - - // Update... - sec := time.Now().Unix() - l.forwardTo(sec) - - winIdx := sec % 60 - l.Totals[winIdx][tag].add(t) - - l.LastSec = sec + l[sizeToTag(size)].add(t) } -// GetAvg will return the average for each bucket from the last time minute. +// GetAvgData will return the average for each bucket from the last time minute. // The number of objects is also included. -func (l *LastMinuteLatencies) GetAvg() [sizeLastElemMarker]AccElem { +func (l *LastMinuteLatencies) GetAvgData() [sizeLastElemMarker]AccElem { var res [sizeLastElemMarker]AccElem - sec := time.Now().Unix() - l.forwardTo(sec) - for _, elems := range l.Totals[:] { - for j := range elems { - res[j].merge(elems[j]) - } + for i, elem := range l[:] { + res[i] = elem.getAvgData() } return res } - -// forwardTo time t, clearing any entries in between. -func (l *LastMinuteLatencies) forwardTo(t int64) { - if l.LastSec >= t { - return - } - if t-l.LastSec >= 60 { - l.Totals = [60][sizeLastElemMarker]AccElem{} - return - } - for l.LastSec != t { - // Clear next element. - idx := (l.LastSec + 1) % 60 - l.Totals[idx] = [sizeLastElemMarker]AccElem{} - l.LastSec++ - } -} diff --git a/cmd/last-minute_gen.go b/cmd/last-minute_gen.go index 2cb1bdf2d..c1a5742d1 100644 --- a/cmd/last-minute_gen.go +++ b/cmd/last-minute_gen.go @@ -136,6 +136,282 @@ func (z AccElem) Msgsize() (s int) { // DecodeMsg implements msgp.Decodable func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0001 uint32 + zb0001, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0001} + return + } + for za0001 := range z { + var field []byte + _ = field + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + for zb0002 > 0 { + zb0002-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + switch msgp.UnsafeString(field) { + case "Totals": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals") + return + } + if zb0003 != uint32(60) { + err = msgp.ArrayError{Wanted: uint32(60), Got: zb0003} + return + } + for za0002 := range z[za0001].Totals { + var zb0004 uint32 + zb0004, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z[za0001].Totals[za0002].Total, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "Total") + return + } + case "N": + z[za0001].Totals[za0002].N, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "N") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + } + } + } + case "LastSec": + z[za0001].LastSec, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, za0001, "LastSec") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(sizeLastElemMarker)) + if err != nil { + err = msgp.WrapError(err) + return + } + for za0001 := range z { + // map header, size 2 + // write "Totals" + err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(60)) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals") + return + } + for za0002 := range z[za0001].Totals { + // map header, size 2 + // write "Total" + err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + if err != nil { + return + } + err = en.WriteInt64(z[za0001].Totals[za0002].Total) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "Total") + return + } + // write "N" + err = en.Append(0xa1, 0x4e) + if err != nil { + return + } + err = en.WriteInt64(z[za0001].Totals[za0002].N) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "N") + return + } + } + // write "LastSec" + err = en.Append(0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63) + if err != nil { + return + } + err = en.WriteInt64(z[za0001].LastSec) + if err != nil { + err = msgp.WrapError(err, za0001, "LastSec") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker)) + for za0001 := range z { + // map header, size 2 + // string "Totals" + o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) + o = msgp.AppendArrayHeader(o, uint32(60)) + for za0002 := range z[za0001].Totals { + // map header, size 2 + // string "Total" + o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = msgp.AppendInt64(o, z[za0001].Totals[za0002].Total) + // string "N" + o = append(o, 0xa1, 0x4e) + o = msgp.AppendInt64(o, z[za0001].Totals[za0002].N) + } + // string "LastSec" + o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63) + o = msgp.AppendInt64(o, z[za0001].LastSec) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0001 uint32 + zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if zb0001 != uint32(sizeLastElemMarker) { + err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0001} + return + } + for za0001 := range z { + var field []byte + _ = field + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + for zb0002 > 0 { + zb0002-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + switch msgp.UnsafeString(field) { + case "Totals": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals") + return + } + if zb0003 != uint32(60) { + err = msgp.ArrayError{Wanted: uint32(60), Got: zb0003} + return + } + for za0002 := range z[za0001].Totals { + var zb0004 uint32 + zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + for zb0004 > 0 { + zb0004-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + switch msgp.UnsafeString(field) { + case "Total": + z[za0001].Totals[za0002].Total, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "Total") + return + } + case "N": + z[za0001].Totals[za0002].N, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002, "N") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "Totals", za0002) + return + } + } + } + } + case "LastSec": + z[za0001].LastSec, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, za0001, "LastSec") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, za0001) + return + } + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *LastMinuteLatencies) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + (sizeLastElemMarker * (16 + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + msgp.Int64Size)) + return +} + +// DecodeMsg implements msgp.Decodable +func (z *lastMinuteLatency) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte _ = field var zb0001 uint32 @@ -165,48 +441,36 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) { } for za0001 := range z.Totals { var zb0003 uint32 - zb0003, err = dc.ReadArrayHeader() + zb0003, err = dc.ReadMapHeader() if err != nil { err = msgp.WrapError(err, "Totals", za0001) return } - if zb0003 != uint32(sizeLastElemMarker) { - err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} - return - } - for za0002 := range z.Totals[za0001] { - var zb0004 uint32 - zb0004, err = dc.ReadMapHeader() + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) + err = msgp.WrapError(err, "Totals", za0001) return } - for zb0004 > 0 { - zb0004-- - field, err = dc.ReadMapKeyPtr() + switch msgp.UnsafeString(field) { + case "Total": + z.Totals[za0001].Total, err = dc.ReadInt64() if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) + err = msgp.WrapError(err, "Totals", za0001, "Total") return } - switch msgp.UnsafeString(field) { - case "Total": - z.Totals[za0001][za0002].Total, err = dc.ReadInt64() - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") - return - } - case "N": - z.Totals[za0001][za0002].N, err = dc.ReadInt64() - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "N") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) - return - } + case "N": + z.Totals[za0001].N, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, "N") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err, "Totals", za0001) + return } } } @@ -229,7 +493,7 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) { +func (z *lastMinuteLatency) EncodeMsg(en *msgp.Writer) (err error) { // map header, size 2 // write "Totals" err = en.Append(0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) @@ -242,33 +506,26 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) { return } for za0001 := range z.Totals { - err = en.WriteArrayHeader(uint32(sizeLastElemMarker)) + // map header, size 2 + // write "Total" + err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) if err != nil { - err = msgp.WrapError(err, "Totals", za0001) return } - for za0002 := range z.Totals[za0001] { - // map header, size 2 - // write "Total" - err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) - if err != nil { - return - } - err = en.WriteInt64(z.Totals[za0001][za0002].Total) - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") - return - } - // write "N" - err = en.Append(0xa1, 0x4e) - if err != nil { - return - } - err = en.WriteInt64(z.Totals[za0001][za0002].N) - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "N") - return - } + err = en.WriteInt64(z.Totals[za0001].Total) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, "Total") + return + } + // write "N" + err = en.Append(0xa1, 0x4e) + if err != nil { + return + } + err = en.WriteInt64(z.Totals[za0001].N) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, "N") + return } } // write "LastSec" @@ -285,23 +542,20 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) { } // MarshalMsg implements msgp.Marshaler -func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) { +func (z *lastMinuteLatency) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) // map header, size 2 // string "Totals" o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73) o = msgp.AppendArrayHeader(o, uint32(60)) for za0001 := range z.Totals { - o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker)) - for za0002 := range z.Totals[za0001] { - // map header, size 2 - // string "Total" - o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) - o = msgp.AppendInt64(o, z.Totals[za0001][za0002].Total) - // string "N" - o = append(o, 0xa1, 0x4e) - o = msgp.AppendInt64(o, z.Totals[za0001][za0002].N) - } + // map header, size 2 + // string "Total" + o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c) + o = msgp.AppendInt64(o, z.Totals[za0001].Total) + // string "N" + o = append(o, 0xa1, 0x4e) + o = msgp.AppendInt64(o, z.Totals[za0001].N) } // string "LastSec" o = append(o, 0xa7, 0x4c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x63) @@ -310,7 +564,7 @@ func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) { } // UnmarshalMsg implements msgp.Unmarshaler -func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) { +func (z *lastMinuteLatency) UnmarshalMsg(bts []byte) (o []byte, err error) { var field []byte _ = field var zb0001 uint32 @@ -340,48 +594,36 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) { } for za0001 := range z.Totals { var zb0003 uint32 - zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) if err != nil { err = msgp.WrapError(err, "Totals", za0001) return } - if zb0003 != uint32(sizeLastElemMarker) { - err = msgp.ArrayError{Wanted: uint32(sizeLastElemMarker), Got: zb0003} - return - } - for za0002 := range z.Totals[za0001] { - var zb0004 uint32 - zb0004, bts, err = msgp.ReadMapHeaderBytes(bts) + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) + err = msgp.WrapError(err, "Totals", za0001) return } - for zb0004 > 0 { - zb0004-- - field, bts, err = msgp.ReadMapKeyZC(bts) + switch msgp.UnsafeString(field) { + case "Total": + z.Totals[za0001].Total, bts, err = msgp.ReadInt64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) + err = msgp.WrapError(err, "Totals", za0001, "Total") return } - switch msgp.UnsafeString(field) { - case "Total": - z.Totals[za0001][za0002].Total, bts, err = msgp.ReadInt64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "Total") - return - } - case "N": - z.Totals[za0001][za0002].N, bts, err = msgp.ReadInt64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002, "N") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, "Totals", za0001, za0002) - return - } + case "N": + z.Totals[za0001].N, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001, "N") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err, "Totals", za0001) + return } } } @@ -405,7 +647,7 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *LastMinuteLatencies) Msgsize() (s int) { - s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (sizeLastElemMarker * (9 + msgp.Int64Size + msgp.Int64Size))) + 8 + msgp.Int64Size +func (z *lastMinuteLatency) Msgsize() (s int) { + s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + 8 + msgp.Int64Size return } diff --git a/cmd/last-minute_gen_test.go b/cmd/last-minute_gen_test.go index d54aed0e3..6c8de2d26 100644 --- a/cmd/last-minute_gen_test.go +++ b/cmd/last-minute_gen_test.go @@ -234,3 +234,116 @@ func BenchmarkDecodeLastMinuteLatencies(b *testing.B) { } } } + +func TestMarshalUnmarshallastMinuteLatency(t *testing.T) { + v := lastMinuteLatency{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsglastMinuteLatency(b *testing.B) { + v := lastMinuteLatency{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsglastMinuteLatency(b *testing.B) { + v := lastMinuteLatency{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshallastMinuteLatency(b *testing.B) { + v := lastMinuteLatency{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodelastMinuteLatency(t *testing.T) { + v := lastMinuteLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodelastMinuteLatency Msgsize() is inaccurate") + } + + vn := lastMinuteLatency{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodelastMinuteLatency(b *testing.B) { + v := lastMinuteLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodelastMinuteLatency(b *testing.B) { + v := lastMinuteLatency{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/metrics-v2.go b/cmd/metrics-v2.go index ff1bc2642..bf73dc176 100644 --- a/cmd/metrics-v2.go +++ b/cmd/metrics-v2.go @@ -70,6 +70,7 @@ func init() { nodeCollector = newMinioCollectorNode([]*MetricsGroup{ getNodeHealthMetrics(), + getLocalDiskStorageMetrics(), getCacheMetrics(), getHTTPMetrics(), getNetworkMetrics(), @@ -157,6 +158,8 @@ const ( writeBytes MetricName = "write_bytes" wcharBytes MetricName = "wchar_bytes" + apiLatencyMicroSec MetricName = "latency_us" + usagePercent MetricName = "update_percent" commitInfo MetricName = "commit_info" @@ -315,6 +318,16 @@ func getClusterCapacityUsageFreeBytesMD() MetricDescription { } } +func getNodeDiskAPILatencyMD() MetricDescription { + return MetricDescription{ + Namespace: nodeMetricNamespace, + Subsystem: diskSubsystem, + Name: apiLatencyMicroSec, + Help: "Average last minute latency in µs for disk API storage operations.", + Type: gaugeMetric, + } +} + func getNodeDiskUsedBytesMD() MetricDescription { return MetricDescription{ Namespace: nodeMetricNamespace, @@ -1583,6 +1596,35 @@ func getLocalStorageMetrics() *MetricsGroup { Value: float64(disk.FreeInodes), VariableLabels: map[string]string{"disk": disk.DrivePath}, }) + + } + return + }) + return mg +} + +func getLocalDiskStorageMetrics() *MetricsGroup { + mg := &MetricsGroup{ + cacheInterval: 3 * time.Second, + } + mg.RegisterRead(func(ctx context.Context) (metrics []Metric) { + objLayer := newObjectLayerFn() + // Service not initialized yet + if objLayer == nil || globalIsGateway { + return + } + + metrics = make([]Metric, 0, 50) + storageInfo, _ := objLayer.LocalStorageInfo(ctx) + for _, disk := range storageInfo.Disks { + for apiName, latency := range disk.Metrics.APILatencies { + val := latency.(uint64) + metrics = append(metrics, Metric{ + Description: getNodeDiskAPILatencyMD(), + Value: float64(val / 1000), + VariableLabels: map[string]string{"disk": disk.DrivePath, "api": "storage." + apiName}, + }) + } } return }) diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index e571a118f..99059dc65 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -47,7 +47,7 @@ type DiskInfo struct { // the number of calls of each API and the moving average of // the duration of each API. type DiskMetrics struct { - APILatencies map[string]string `json:"apiLatencies,omitempty"` + APILatencies map[string]uint64 `json:"apiLatencies,omitempty"` APICalls map[string]uint64 `json:"apiCalls,omitempty"` } diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 5d40784b3..26837980b 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -299,7 +299,7 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { return } if z.APILatencies == nil { - z.APILatencies = make(map[string]string, zb0002) + z.APILatencies = make(map[string]uint64, zb0002) } else if len(z.APILatencies) > 0 { for key := range z.APILatencies { delete(z.APILatencies, key) @@ -308,13 +308,13 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) { for zb0002 > 0 { zb0002-- var za0001 string - var za0002 string + var za0002 uint64 za0001, err = dc.ReadString() if err != nil { err = msgp.WrapError(err, "APILatencies") return } - za0002, err = dc.ReadString() + za0002, err = dc.ReadUint64() if err != nil { err = msgp.WrapError(err, "APILatencies", za0001) return @@ -381,7 +381,7 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "APILatencies") return } - err = en.WriteString(za0002) + err = en.WriteUint64(za0002) if err != nil { err = msgp.WrapError(err, "APILatencies", za0001) return @@ -421,7 +421,7 @@ func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.AppendMapHeader(o, uint32(len(z.APILatencies))) for za0001, za0002 := range z.APILatencies { o = msgp.AppendString(o, za0001) - o = msgp.AppendString(o, za0002) + o = msgp.AppendUint64(o, za0002) } // string "APICalls" o = append(o, 0xa8, 0x41, 0x50, 0x49, 0x43, 0x61, 0x6c, 0x6c, 0x73) @@ -459,7 +459,7 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { return } if z.APILatencies == nil { - z.APILatencies = make(map[string]string, zb0002) + z.APILatencies = make(map[string]uint64, zb0002) } else if len(z.APILatencies) > 0 { for key := range z.APILatencies { delete(z.APILatencies, key) @@ -467,14 +467,14 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { } for zb0002 > 0 { var za0001 string - var za0002 string + var za0002 uint64 zb0002-- za0001, bts, err = msgp.ReadStringBytes(bts) if err != nil { err = msgp.WrapError(err, "APILatencies") return } - za0002, bts, err = msgp.ReadStringBytes(bts) + za0002, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { err = msgp.WrapError(err, "APILatencies", za0001) return @@ -529,7 +529,7 @@ func (z *DiskMetrics) Msgsize() (s int) { if z.APILatencies != nil { for za0001, za0002 := range z.APILatencies { _ = za0002 - s += msgp.StringPrefixSize + len(za0001) + msgp.StringPrefixSize + len(za0002) + s += msgp.StringPrefixSize + len(za0001) + msgp.Uint64Size } } s += 9 + msgp.MapHeaderSize diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index af9477ae0..de4cf0e12 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "github.com/VividCortex/ewma" "github.com/minio/madmin-go" ) @@ -71,18 +70,18 @@ type xlStorageDiskIDCheck struct { // please use `fieldalignment ./...` to check // if your changes are not causing any problems. storage StorageAPI - apiLatencies [storageMetricLast]ewma.MovingAverage + apiLatencies [storageMetricLast]*lockedLastMinuteLatency diskID string apiCalls [storageMetricLast]uint64 } func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { diskMetric := DiskMetrics{ - APILatencies: make(map[string]string), + APILatencies: make(map[string]uint64), APICalls: make(map[string]uint64), } for i, v := range p.apiLatencies { - diskMetric.APILatencies[storageMetric(i).String()] = time.Duration(v.Value()).String() + diskMetric.APILatencies[storageMetric(i).String()] = v.value() } for i := range p.apiCalls { diskMetric.APICalls[storageMetric(i).String()] = atomic.LoadUint64(&p.apiCalls[i]) @@ -90,28 +89,21 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics { return diskMetric } -type lockedSimpleEWMA struct { - sync.RWMutex - *ewma.SimpleEWMA +type lockedLastMinuteLatency struct { + sync.Mutex + lastMinuteLatency } -func (e *lockedSimpleEWMA) Add(value float64) { +func (e *lockedLastMinuteLatency) add(value time.Duration) { e.Lock() defer e.Unlock() - e.SimpleEWMA.Add(value) + e.lastMinuteLatency.add(value) } -func (e *lockedSimpleEWMA) Set(value float64) { +func (e *lockedLastMinuteLatency) value() uint64 { e.Lock() defer e.Unlock() - - e.SimpleEWMA.Set(value) -} - -func (e *lockedSimpleEWMA) Value() float64 { - e.RLock() - defer e.RUnlock() - return e.SimpleEWMA.Value() + return e.lastMinuteLatency.getAvgData().avg() } func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck { @@ -119,9 +111,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck { storage: storage, } for i := range xl.apiLatencies[:] { - xl.apiLatencies[i] = &lockedSimpleEWMA{ - SimpleEWMA: new(ewma.SimpleEWMA), - } + xl.apiLatencies[i] = &lockedLastMinuteLatency{} } return &xl } @@ -582,7 +572,7 @@ func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...st duration := time.Since(startTime) atomic.AddUint64(&p.apiCalls[s], 1) - p.apiLatencies[s].Add(float64(duration)) + p.apiLatencies[s].add(duration) if trace { globalTrace.Publish(storageTrace(s, startTime, duration, strings.Join(paths, " ")))