mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
metrics: add replication metrics on proxied requests (#18957)
This commit is contained in:
parent
794a7993cb
commit
27d02ea6f7
@ -379,3 +379,145 @@ func (s *SMA) simpleMovingAvg() float64 {
|
||||
const (
|
||||
defaultWindowSize = 10
|
||||
)
|
||||
|
||||
type proxyStatsCache struct {
|
||||
srProxyStats ProxyMetric
|
||||
bucketStats map[string]ProxyMetric
|
||||
sync.RWMutex // mutex for proxy stats
|
||||
}
|
||||
|
||||
func newProxyStatsCache() proxyStatsCache {
|
||||
return proxyStatsCache{
|
||||
bucketStats: make(map[string]ProxyMetric),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxyStatsCache) inc(bucket string, api replProxyAPI, isErr bool) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
v, ok := p.bucketStats[bucket]
|
||||
if !ok {
|
||||
v = ProxyMetric{}
|
||||
}
|
||||
switch api {
|
||||
case putObjectTaggingAPI:
|
||||
if !isErr {
|
||||
atomic.AddUint64(&v.PutTagTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.PutTagTotal, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&v.PutTagFailedTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.PutTagFailedTotal, 1)
|
||||
}
|
||||
case getObjectTaggingAPI:
|
||||
if !isErr {
|
||||
atomic.AddUint64(&v.GetTagTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.GetTagTotal, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&v.GetTagFailedTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.GetTagFailedTotal, 1)
|
||||
}
|
||||
case removeObjectTaggingAPI:
|
||||
if !isErr {
|
||||
atomic.AddUint64(&v.RmvTagTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.RmvTagTotal, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&v.RmvTagFailedTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.RmvTagFailedTotal, 1)
|
||||
}
|
||||
case headObjectAPI:
|
||||
if !isErr {
|
||||
atomic.AddUint64(&v.HeadTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.HeadTotal, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&v.HeadFailedTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.HeadFailedTotal, 1)
|
||||
}
|
||||
case getObjectAPI:
|
||||
if !isErr {
|
||||
atomic.AddUint64(&v.GetTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.GetTotal, 1)
|
||||
} else {
|
||||
atomic.AddUint64(&v.GetFailedTotal, 1)
|
||||
atomic.AddUint64(&p.srProxyStats.GetFailedTotal, 1)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
p.bucketStats[bucket] = v
|
||||
}
|
||||
|
||||
func (p *proxyStatsCache) getBucketStats(bucket string) ProxyMetric {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
v, ok := p.bucketStats[bucket]
|
||||
|
||||
if !ok {
|
||||
return ProxyMetric{}
|
||||
}
|
||||
return ProxyMetric{
|
||||
PutTagTotal: atomic.LoadUint64(&v.PutTagTotal),
|
||||
GetTagTotal: atomic.LoadUint64(&v.GetTagTotal),
|
||||
RmvTagTotal: atomic.LoadUint64(&v.RmvTagTotal),
|
||||
HeadTotal: atomic.LoadUint64(&v.HeadTotal),
|
||||
GetTotal: atomic.LoadUint64(&v.GetTotal),
|
||||
|
||||
PutTagFailedTotal: atomic.LoadUint64(&v.PutTagFailedTotal),
|
||||
GetTagFailedTotal: atomic.LoadUint64(&v.GetTagFailedTotal),
|
||||
RmvTagFailedTotal: atomic.LoadUint64(&v.RmvTagFailedTotal),
|
||||
HeadFailedTotal: atomic.LoadUint64(&v.HeadFailedTotal),
|
||||
GetFailedTotal: atomic.LoadUint64(&v.GetFailedTotal),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proxyStatsCache) getSiteStats() ProxyMetric {
|
||||
v := p.srProxyStats
|
||||
return ProxyMetric{
|
||||
PutTagTotal: atomic.LoadUint64(&v.PutTagTotal),
|
||||
GetTagTotal: atomic.LoadUint64(&v.GetTagTotal),
|
||||
RmvTagTotal: atomic.LoadUint64(&v.RmvTagTotal),
|
||||
HeadTotal: atomic.LoadUint64(&v.HeadTotal),
|
||||
GetTotal: atomic.LoadUint64(&v.GetTotal),
|
||||
PutTagFailedTotal: atomic.LoadUint64(&v.PutTagFailedTotal),
|
||||
GetTagFailedTotal: atomic.LoadUint64(&v.GetTagFailedTotal),
|
||||
RmvTagFailedTotal: atomic.LoadUint64(&v.RmvTagFailedTotal),
|
||||
HeadFailedTotal: atomic.LoadUint64(&v.HeadFailedTotal),
|
||||
GetFailedTotal: atomic.LoadUint64(&v.GetFailedTotal),
|
||||
}
|
||||
}
|
||||
|
||||
type replProxyAPI string
|
||||
|
||||
const (
|
||||
putObjectTaggingAPI replProxyAPI = "PutObjectTagging"
|
||||
getObjectTaggingAPI replProxyAPI = "GetObjectTagging"
|
||||
removeObjectTaggingAPI replProxyAPI = "RemoveObjectTagging"
|
||||
headObjectAPI replProxyAPI = "HeadObject"
|
||||
getObjectAPI replProxyAPI = "GetObject"
|
||||
)
|
||||
|
||||
// ProxyMetric holds stats for replication proxying
|
||||
type ProxyMetric struct {
|
||||
PutTagTotal uint64 `json:"putTaggingProxyTotal" msg:"ptc"`
|
||||
GetTagTotal uint64 `json:"getTaggingProxyTotal" msg:"gtc"`
|
||||
RmvTagTotal uint64 `json:"removeTaggingProxyTotal" msg:"rtc"`
|
||||
GetTotal uint64 `json:"getProxyTotal" msg:"gc"`
|
||||
HeadTotal uint64 `json:"headProxyTotal" msg:"hc"`
|
||||
PutTagFailedTotal uint64 `json:"putTaggingProxyFailed" msg:"ptf"`
|
||||
GetTagFailedTotal uint64 `json:"getTaggingProxyFailed" msg:"gtf"`
|
||||
RmvTagFailedTotal uint64 `json:"removeTaggingProxyFailed" msg:"rtf"`
|
||||
GetFailedTotal uint64 `json:"getProxyFailed" msg:"gf"`
|
||||
HeadFailedTotal uint64 `json:"headProxyFailed" msg:"hf"`
|
||||
}
|
||||
|
||||
func (p *ProxyMetric) add(p2 ProxyMetric) {
|
||||
atomic.AddUint64(&p.GetTotal, p2.GetTotal)
|
||||
atomic.AddUint64(&p.HeadTotal, p2.HeadTotal)
|
||||
atomic.AddUint64(&p.GetTagTotal, p2.GetTagTotal)
|
||||
atomic.AddUint64(&p.PutTagTotal, p2.PutTagTotal)
|
||||
atomic.AddUint64(&p.RmvTagTotal, p2.RmvTagTotal)
|
||||
atomic.AddUint64(&p.GetFailedTotal, p2.GetFailedTotal)
|
||||
atomic.AddUint64(&p.HeadFailedTotal, p2.HeadFailedTotal)
|
||||
atomic.AddUint64(&p.GetTagFailedTotal, p2.GetTagFailedTotal)
|
||||
atomic.AddUint64(&p.PutTagFailedTotal, p2.PutTagFailedTotal)
|
||||
atomic.AddUint64(&p.RmvTagFailedTotal, p2.RmvTagFailedTotal)
|
||||
}
|
||||
|
@ -635,6 +635,334 @@ func (z InQueueStats) Msgsize() (s int) {
|
||||
return
|
||||
}
|
||||
|
||||
// DecodeMsg implements msgp.Decodable
|
||||
func (z *ProxyMetric) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
var field []byte
|
||||
_ = field
|
||||
var zb0001 uint32
|
||||
zb0001, err = dc.ReadMapHeader()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
for zb0001 > 0 {
|
||||
zb0001--
|
||||
field, err = dc.ReadMapKeyPtr()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
switch msgp.UnsafeString(field) {
|
||||
case "ptc":
|
||||
z.PutTagTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagTotal")
|
||||
return
|
||||
}
|
||||
case "gtc":
|
||||
z.GetTagTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagTotal")
|
||||
return
|
||||
}
|
||||
case "rtc":
|
||||
z.RmvTagTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagTotal")
|
||||
return
|
||||
}
|
||||
case "gc":
|
||||
z.GetTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTotal")
|
||||
return
|
||||
}
|
||||
case "hc":
|
||||
z.HeadTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadTotal")
|
||||
return
|
||||
}
|
||||
case "ptf":
|
||||
z.PutTagFailedTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "gtf":
|
||||
z.GetTagFailedTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "rtf":
|
||||
z.RmvTagFailedTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "gf":
|
||||
z.GetFailedTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetFailedTotal")
|
||||
return
|
||||
}
|
||||
case "hf":
|
||||
z.HeadFailedTotal, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadFailedTotal")
|
||||
return
|
||||
}
|
||||
default:
|
||||
err = dc.Skip()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *ProxyMetric) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 10
|
||||
// write "ptc"
|
||||
err = en.Append(0x8a, 0xa3, 0x70, 0x74, 0x63)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.PutTagTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagTotal")
|
||||
return
|
||||
}
|
||||
// write "gtc"
|
||||
err = en.Append(0xa3, 0x67, 0x74, 0x63)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.GetTagTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagTotal")
|
||||
return
|
||||
}
|
||||
// write "rtc"
|
||||
err = en.Append(0xa3, 0x72, 0x74, 0x63)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.RmvTagTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagTotal")
|
||||
return
|
||||
}
|
||||
// write "gc"
|
||||
err = en.Append(0xa2, 0x67, 0x63)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.GetTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTotal")
|
||||
return
|
||||
}
|
||||
// write "hc"
|
||||
err = en.Append(0xa2, 0x68, 0x63)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.HeadTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadTotal")
|
||||
return
|
||||
}
|
||||
// write "ptf"
|
||||
err = en.Append(0xa3, 0x70, 0x74, 0x66)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.PutTagFailedTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagFailedTotal")
|
||||
return
|
||||
}
|
||||
// write "gtf"
|
||||
err = en.Append(0xa3, 0x67, 0x74, 0x66)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.GetTagFailedTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagFailedTotal")
|
||||
return
|
||||
}
|
||||
// write "rtf"
|
||||
err = en.Append(0xa3, 0x72, 0x74, 0x66)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.RmvTagFailedTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagFailedTotal")
|
||||
return
|
||||
}
|
||||
// write "gf"
|
||||
err = en.Append(0xa2, 0x67, 0x66)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.GetFailedTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetFailedTotal")
|
||||
return
|
||||
}
|
||||
// write "hf"
|
||||
err = en.Append(0xa2, 0x68, 0x66)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.HeadFailedTotal)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadFailedTotal")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *ProxyMetric) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 10
|
||||
// string "ptc"
|
||||
o = append(o, 0x8a, 0xa3, 0x70, 0x74, 0x63)
|
||||
o = msgp.AppendUint64(o, z.PutTagTotal)
|
||||
// string "gtc"
|
||||
o = append(o, 0xa3, 0x67, 0x74, 0x63)
|
||||
o = msgp.AppendUint64(o, z.GetTagTotal)
|
||||
// string "rtc"
|
||||
o = append(o, 0xa3, 0x72, 0x74, 0x63)
|
||||
o = msgp.AppendUint64(o, z.RmvTagTotal)
|
||||
// string "gc"
|
||||
o = append(o, 0xa2, 0x67, 0x63)
|
||||
o = msgp.AppendUint64(o, z.GetTotal)
|
||||
// string "hc"
|
||||
o = append(o, 0xa2, 0x68, 0x63)
|
||||
o = msgp.AppendUint64(o, z.HeadTotal)
|
||||
// string "ptf"
|
||||
o = append(o, 0xa3, 0x70, 0x74, 0x66)
|
||||
o = msgp.AppendUint64(o, z.PutTagFailedTotal)
|
||||
// string "gtf"
|
||||
o = append(o, 0xa3, 0x67, 0x74, 0x66)
|
||||
o = msgp.AppendUint64(o, z.GetTagFailedTotal)
|
||||
// string "rtf"
|
||||
o = append(o, 0xa3, 0x72, 0x74, 0x66)
|
||||
o = msgp.AppendUint64(o, z.RmvTagFailedTotal)
|
||||
// string "gf"
|
||||
o = append(o, 0xa2, 0x67, 0x66)
|
||||
o = msgp.AppendUint64(o, z.GetFailedTotal)
|
||||
// string "hf"
|
||||
o = append(o, 0xa2, 0x68, 0x66)
|
||||
o = msgp.AppendUint64(o, z.HeadFailedTotal)
|
||||
return
|
||||
}
|
||||
|
||||
// UnmarshalMsg implements msgp.Unmarshaler
|
||||
func (z *ProxyMetric) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
var field []byte
|
||||
_ = field
|
||||
var zb0001 uint32
|
||||
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
for zb0001 > 0 {
|
||||
zb0001--
|
||||
field, bts, err = msgp.ReadMapKeyZC(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
switch msgp.UnsafeString(field) {
|
||||
case "ptc":
|
||||
z.PutTagTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagTotal")
|
||||
return
|
||||
}
|
||||
case "gtc":
|
||||
z.GetTagTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagTotal")
|
||||
return
|
||||
}
|
||||
case "rtc":
|
||||
z.RmvTagTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagTotal")
|
||||
return
|
||||
}
|
||||
case "gc":
|
||||
z.GetTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTotal")
|
||||
return
|
||||
}
|
||||
case "hc":
|
||||
z.HeadTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadTotal")
|
||||
return
|
||||
}
|
||||
case "ptf":
|
||||
z.PutTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PutTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "gtf":
|
||||
z.GetTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "rtf":
|
||||
z.RmvTagFailedTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "RmvTagFailedTotal")
|
||||
return
|
||||
}
|
||||
case "gf":
|
||||
z.GetFailedTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "GetFailedTotal")
|
||||
return
|
||||
}
|
||||
case "hf":
|
||||
z.HeadFailedTotal, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "HeadFailedTotal")
|
||||
return
|
||||
}
|
||||
default:
|
||||
bts, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
o = bts
|
||||
return
|
||||
}
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *ProxyMetric) Msgsize() (s int) {
|
||||
s = 1 + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size
|
||||
return
|
||||
}
|
||||
|
||||
// DecodeMsg implements msgp.Decodable
|
||||
func (z *QStat) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
var field []byte
|
||||
|
@ -348,6 +348,119 @@ func BenchmarkDecodeInQueueStats(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalProxyMetric(t *testing.T) {
|
||||
v := ProxyMetric{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgProxyMetric(b *testing.B) {
|
||||
v := ProxyMetric{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgProxyMetric(b *testing.B) {
|
||||
v := ProxyMetric{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalProxyMetric(b *testing.B) {
|
||||
v := ProxyMetric{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodeProxyMetric(t *testing.T) {
|
||||
v := ProxyMetric{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodeProxyMetric Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := ProxyMetric{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodeProxyMetric(b *testing.B) {
|
||||
v := ProxyMetric{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodeProxyMetric(b *testing.B) {
|
||||
v := ProxyMetric{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalQStat(t *testing.T) {
|
||||
v := QStat{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
|
@ -45,6 +45,8 @@ type ReplicationStats struct {
|
||||
workers *ActiveWorkerStat
|
||||
// queue stats cache
|
||||
qCache queueCache
|
||||
|
||||
pCache proxyStatsCache
|
||||
// mrf backlog stats
|
||||
mrfStats ReplicationMRFStats
|
||||
// for bucket replication, continue to use existing cache
|
||||
@ -305,6 +307,7 @@ func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary {
|
||||
Queued: r.qCache.getSiteStats(),
|
||||
ActiveWorkers: r.ActiveWorkers(),
|
||||
Metrics: r.srStats.get(),
|
||||
Proxied: r.pCache.getSiteStats(),
|
||||
ReplicaSize: atomic.LoadInt64(&r.srStats.ReplicaSize),
|
||||
ReplicaCount: atomic.LoadInt64(&r.srStats.ReplicaCount),
|
||||
}
|
||||
@ -333,6 +336,7 @@ func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *Replicatio
|
||||
rs := ReplicationStats{
|
||||
Cache: make(map[string]*BucketReplicationStats),
|
||||
qCache: newQueueCache(r),
|
||||
pCache: newProxyStatsCache(),
|
||||
srStats: newSRStats(),
|
||||
movingAvgTicker: time.NewTicker(2 * time.Second),
|
||||
wTimer: time.NewTicker(2 * time.Second),
|
||||
@ -371,6 +375,7 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket
|
||||
Stats: make(map[string]*BucketReplicationStat),
|
||||
},
|
||||
QueueStats: ReplicationQueueStats{},
|
||||
ProxyStats: ProxyMetric{},
|
||||
}
|
||||
return bs
|
||||
}
|
||||
@ -430,11 +435,16 @@ func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucket
|
||||
for _, bs := range bucketStats {
|
||||
qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...)
|
||||
}
|
||||
|
||||
qs.Uptime = UTCNow().Unix() - globalBootTime.Unix()
|
||||
|
||||
var ps ProxyMetric
|
||||
for _, bs := range bucketStats {
|
||||
ps.add(bs.ProxyStats)
|
||||
}
|
||||
bs = BucketStats{
|
||||
ReplicationStats: s,
|
||||
QueueStats: qs,
|
||||
ProxyStats: ps,
|
||||
}
|
||||
r.mostRecentStatsMu.Lock()
|
||||
if len(r.mostRecentStats.Stats) == 0 {
|
||||
@ -482,3 +492,12 @@ func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opTyp
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz)
|
||||
atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1)
|
||||
}
|
||||
|
||||
// incProxy increments proxy metrics for proxied calls
|
||||
func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) {
|
||||
r.pCache.inc(bucket, api, isErr)
|
||||
}
|
||||
|
||||
func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric {
|
||||
return r.pCache.getBucketStats(bucket)
|
||||
}
|
||||
|
@ -2235,6 +2235,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
|
||||
if opts.ProxyRequest || (opts.ProxyHeaderSet && !opts.ProxyRequest) { // true only when site B sets MinIOSourceProxyRequest header
|
||||
return nil, oi, proxy
|
||||
}
|
||||
var perr error
|
||||
for _, t := range proxyTargets.Targets {
|
||||
tgt = globalBucketTargetSys.GetRemoteTargetClient(bucket, t.Arn)
|
||||
if tgt == nil || globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
|
||||
@ -2264,6 +2265,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
|
||||
|
||||
objInfo, err := tgt.StatObject(ctx, t.TargetBucket, object, gopts)
|
||||
if err != nil {
|
||||
perr = err
|
||||
if isErrInvalidRange(ErrorRespToObjectError(err, bucket, object)) {
|
||||
return nil, oi, proxyResult{Err: err}
|
||||
}
|
||||
@ -2300,6 +2302,7 @@ func proxyHeadToRepTarget(ctx context.Context, bucket, object string, rs *HTTPRa
|
||||
}
|
||||
return tgt, oi, proxyResult{Proxy: true}
|
||||
}
|
||||
proxy.Err = perr
|
||||
return nil, oi, proxy
|
||||
}
|
||||
|
||||
|
@ -154,6 +154,7 @@ type BucketStats struct {
|
||||
Uptime int64 `json:"uptime"`
|
||||
ReplicationStats BucketReplicationStats `json:"currStats"` // current replication stats since cluster startup
|
||||
QueueStats ReplicationQueueStats `json:"queueStats"` // replication queue stats
|
||||
ProxyStats ProxyMetric `json:"proxyStats"`
|
||||
}
|
||||
|
||||
// BucketReplicationStats represents inline replication statistics
|
||||
|
@ -1142,6 +1142,12 @@ func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
case "ProxyStats":
|
||||
err = z.ProxyStats.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ProxyStats")
|
||||
return
|
||||
}
|
||||
default:
|
||||
err = dc.Skip()
|
||||
if err != nil {
|
||||
@ -1155,9 +1161,9 @@ func (z *BucketStats) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 3
|
||||
// map header, size 4
|
||||
// write "Uptime"
|
||||
err = en.Append(0x83, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65)
|
||||
err = en.Append(0x84, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -1209,15 +1215,25 @@ func (z *BucketStats) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "QueueStats", "Uptime")
|
||||
return
|
||||
}
|
||||
// write "ProxyStats"
|
||||
err = en.Append(0xaa, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = z.ProxyStats.EncodeMsg(en)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ProxyStats")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 3
|
||||
// map header, size 4
|
||||
// string "Uptime"
|
||||
o = append(o, 0x83, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65)
|
||||
o = append(o, 0x84, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65)
|
||||
o = msgp.AppendInt64(o, z.Uptime)
|
||||
// string "ReplicationStats"
|
||||
o = append(o, 0xb0, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x73)
|
||||
@ -1242,6 +1258,13 @@ func (z *BucketStats) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
// string "Uptime"
|
||||
o = append(o, 0xa6, 0x55, 0x70, 0x74, 0x69, 0x6d, 0x65)
|
||||
o = msgp.AppendInt64(o, z.QueueStats.Uptime)
|
||||
// string "ProxyStats"
|
||||
o = append(o, 0xaa, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x53, 0x74, 0x61, 0x74, 0x73)
|
||||
o, err = z.ProxyStats.MarshalMsg(o)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ProxyStats")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -1323,6 +1346,12 @@ func (z *BucketStats) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
case "ProxyStats":
|
||||
bts, err = z.ProxyStats.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ProxyStats")
|
||||
return
|
||||
}
|
||||
default:
|
||||
bts, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
@ -1341,7 +1370,7 @@ func (z *BucketStats) Msgsize() (s int) {
|
||||
for za0001 := range z.QueueStats.Nodes {
|
||||
s += z.QueueStats.Nodes[za0001].Msgsize()
|
||||
}
|
||||
s += 7 + msgp.Int64Size
|
||||
s += 7 + msgp.Int64Size + 11 + z.ProxyStats.Msgsize()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -229,6 +229,16 @@ const (
|
||||
avgInQueueBytes MetricName = "average_queued_bytes"
|
||||
maxInQueueCount MetricName = "max_queued_count"
|
||||
maxInQueueBytes MetricName = "max_queued_bytes"
|
||||
proxiedGetRequestsTotal MetricName = "proxied_get_requests_total"
|
||||
proxiedHeadRequestsTotal MetricName = "proxied_head_requests_total"
|
||||
proxiedPutTaggingRequestsTotal MetricName = "proxied_put_tagging_requests_total"
|
||||
proxiedGetTaggingRequestsTotal MetricName = "proxied_get_tagging_requests_total"
|
||||
proxiedDeleteTaggingRequestsTotal MetricName = "proxied_delete_tagging_requests_total"
|
||||
proxiedGetRequestsFailures MetricName = "proxied_get_requests_failures"
|
||||
proxiedHeadRequestsFailures MetricName = "proxied_head_requests_failures"
|
||||
proxiedPutTaggingRequestFailures MetricName = "proxied_put_tagging_requests_failures"
|
||||
proxiedGetTaggingRequestFailures MetricName = "proxied_get_tagging_requests_failures"
|
||||
proxiedDeleteTaggingRequestFailures MetricName = "proxied_delete_tagging_requests_failures"
|
||||
|
||||
freeBytes MetricName = "free_bytes"
|
||||
readBytes MetricName = "read_bytes"
|
||||
@ -1124,6 +1134,106 @@ func getClusterReplMaxTransferRateMD() MetricDescription {
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedGetOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedGetRequestsTotal,
|
||||
Help: "Number of GET requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedHeadOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedHeadRequestsTotal,
|
||||
Help: "Number of HEAD requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedPutTaggingOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedPutTaggingRequestsTotal,
|
||||
Help: "Number of PUT tagging requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedGetTaggingOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedGetTaggingRequestsTotal,
|
||||
Help: "Number of GET tagging requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedRmvTaggingOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedDeleteTaggingRequestsTotal,
|
||||
Help: "Number of DELETE tagging requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedGetFailedOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedGetRequestsFailures,
|
||||
Help: "Number of failures in GET requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedHeadFailedOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedHeadRequestsFailures,
|
||||
Help: "Number of failures in HEAD requests proxied to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedPutTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedPutTaggingRequestFailures,
|
||||
Help: "Number of failures in PUT tagging proxy requests to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedGetTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedGetTaggingRequestFailures,
|
||||
Help: "Number of failures in GET tagging proxy requests to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getClusterReplProxiedRmvTaggingFailedOperationsMD(ns MetricNamespace) MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: ns,
|
||||
Subsystem: replicationSubsystem,
|
||||
Name: proxiedDeleteTaggingRequestFailures,
|
||||
Help: "Number of failures in DELETE tagging proxy requests to replication target",
|
||||
Type: counterMetric,
|
||||
}
|
||||
}
|
||||
|
||||
func getBucketObjectDistributionMD() MetricDescription {
|
||||
return MetricDescription{
|
||||
Namespace: bucketMetricNamespace,
|
||||
@ -2381,6 +2491,46 @@ func getReplicationSiteMetrics(opts MetricsGroupOpts) *MetricsGroup {
|
||||
})
|
||||
}
|
||||
}
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedGetOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.GetTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedHeadOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.HeadTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedPutTaggingOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.PutTagTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedGetTaggingOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.GetTagTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedRmvTaggingOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.RmvTagTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedGetFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.GetFailedTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedHeadFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.HeadFailedTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedPutTaggingFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.PutTagFailedTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedGetTaggingFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.GetTagFailedTotal),
|
||||
})
|
||||
ml = append(ml, Metric{
|
||||
Description: getClusterReplProxiedRmvTaggingFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(m.Proxied.RmvTagFailedTotal),
|
||||
})
|
||||
}
|
||||
|
||||
return ml
|
||||
@ -3098,6 +3248,51 @@ func getBucketUsageMetrics(opts MetricsGroupOpts) *MetricsGroup {
|
||||
Value: float64(stats.ReplicaCount),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedGetOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.GetTotal),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedHeadOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.HeadTotal),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedPutTaggingOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.PutTagTotal),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedGetTaggingOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.GetTagTotal),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedRmvTaggingOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.RmvTagTotal),
|
||||
VariableLabels: map[string]string{"bucket": bucket},
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedGetFailedOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.GetFailedTotal),
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedHeadFailedOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.HeadFailedTotal),
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedPutTaggingFailedOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.PutTagFailedTotal),
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedGetTaggingFailedOperationsMD(bucketMetricNamespace),
|
||||
Value: float64(s.ProxyStats.GetTagFailedTotal),
|
||||
})
|
||||
metrics = append(metrics, Metric{
|
||||
Description: getClusterReplProxiedRmvTaggingFailedOperationsMD(clusterMetricNamespace),
|
||||
Value: float64(s.ProxyStats.RmvTagFailedTotal),
|
||||
})
|
||||
}
|
||||
if stats.hasReplicationUsage() {
|
||||
for arn, stat := range stats.Stats {
|
||||
|
@ -572,6 +572,7 @@ func (sys *NotificationSys) GetClusterAllBucketStats(ctx context.Context) []Buck
|
||||
for k, replicationStats := range replicationStatsList {
|
||||
bucketStatsMap.Stats[k] = BucketStats{
|
||||
ReplicationStats: replicationStats,
|
||||
ProxyStats: globalReplicationStats.getProxyStats(k),
|
||||
}
|
||||
}
|
||||
|
||||
@ -607,6 +608,7 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
|
||||
bucketStats = append(bucketStats, BucketStats{
|
||||
ReplicationStats: globalReplicationStats.Get(bucketName),
|
||||
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}},
|
||||
ProxyStats: globalReplicationStats.getProxyStats(bucketName),
|
||||
})
|
||||
return bucketStats
|
||||
}
|
||||
|
@ -490,9 +490,11 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
)
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
globalReplicationStats.incProxy(bucket, getObjectAPI, false)
|
||||
// proxy to replication target if active-active replication is in place.
|
||||
reader, proxy, perr = proxyGetToReplicationTarget(ctx, bucket, object, rs, r.Header, opts, proxytgts)
|
||||
if perr != nil {
|
||||
globalReplicationStats.incProxy(bucket, getObjectAPI, true)
|
||||
proxyGetErr := ErrorRespToObjectError(perr, bucket, object)
|
||||
if !isErrBucketNotFound(proxyGetErr) && !isErrObjectNotFound(proxyGetErr) && !isErrVersionNotFound(proxyGetErr) &&
|
||||
!isErrPreconditionFailed(proxyGetErr) && !isErrInvalidRange(proxyGetErr) {
|
||||
@ -1018,12 +1020,14 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
|
||||
// proxy HEAD to replication target if active-active replication configured on bucket
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
globalReplicationStats.incProxy(bucket, headObjectAPI, false)
|
||||
var oi ObjectInfo
|
||||
oi, proxy = proxyHeadToReplicationTarget(ctx, bucket, object, rs, opts, proxytgts)
|
||||
if proxy.Proxy {
|
||||
objInfo = oi
|
||||
}
|
||||
if proxy.Err != nil {
|
||||
globalReplicationStats.incProxy(bucket, headObjectAPI, true)
|
||||
writeErrorResponseHeadersOnly(w, toAPIError(ctx, proxy.Err))
|
||||
return
|
||||
}
|
||||
@ -3310,9 +3314,11 @@ func (api objectAPIHandlers) GetObjectTaggingHandler(w http.ResponseWriter, r *h
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, false)
|
||||
// proxy to replication target if site replication is in place.
|
||||
tags, gerr := proxyGetTaggingToRepTarget(ctx, bucket, object, opts, proxytgts)
|
||||
if gerr.Err != nil {
|
||||
globalReplicationStats.incProxy(bucket, getObjectTaggingAPI, true)
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, gerr.Err), r.URL)
|
||||
return
|
||||
} // overlay tags from peer site.
|
||||
@ -3411,9 +3417,11 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, false)
|
||||
// proxy to replication target if site replication is in place.
|
||||
perr := proxyTaggingToRepTarget(ctx, bucket, object, tags, opts, proxytgts)
|
||||
if perr.Err != nil {
|
||||
globalReplicationStats.incProxy(bucket, putObjectTaggingAPI, true)
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
|
||||
return
|
||||
}
|
||||
@ -3506,9 +3514,11 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
proxytgts := getProxyTargets(ctx, bucket, object, opts)
|
||||
if !proxytgts.Empty() {
|
||||
globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, false)
|
||||
// proxy to replication target if active-active replication is in place.
|
||||
perr := proxyTaggingToRepTarget(ctx, bucket, object, nil, opts, proxytgts)
|
||||
if perr.Err != nil {
|
||||
globalReplicationStats.incProxy(bucket, removeObjectTaggingAPI, true)
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, perr.Err), r.URL)
|
||||
return
|
||||
}
|
||||
|
@ -557,6 +557,7 @@ func (s *peerRESTServer) GetAllBucketStatsHandler(w http.ResponseWriter, r *http
|
||||
for k, v := range replicationStats {
|
||||
bucketStatsMap[k] = BucketStats{
|
||||
ReplicationStats: v,
|
||||
ProxyStats: globalReplicationStats.getProxyStats(k),
|
||||
}
|
||||
}
|
||||
logger.LogIf(r.Context(), msgp.Encode(w, &BucketStatsMap{Stats: bucketStatsMap, Timestamp: UTCNow()}))
|
||||
@ -580,6 +581,7 @@ func (s *peerRESTServer) GetBucketStatsHandler(w http.ResponseWriter, r *http.Re
|
||||
bs := BucketStats{
|
||||
ReplicationStats: globalReplicationStats.Get(bucketName),
|
||||
QueueStats: ReplicationQueueStats{Nodes: []ReplQNodeStats{globalReplicationStats.getNodeQueueStats(bucketName)}},
|
||||
ProxyStats: globalReplicationStats.getProxyStats(bucketName),
|
||||
}
|
||||
logger.LogIf(r.Context(), msgp.Encode(w, &bs))
|
||||
}
|
||||
|
@ -282,6 +282,8 @@ type SRMetricsSummary struct {
|
||||
ReplicaCount int64 `json:"replicaCount"`
|
||||
// Queued operations
|
||||
Queued InQueueMetric `json:"queued"`
|
||||
// Proxy stats
|
||||
Proxied ProxyMetric `json:"proxied"`
|
||||
// replication metrics summary for each site replication peer
|
||||
Metrics map[string]SRMetric `json:"replMetrics"`
|
||||
// uptime of node being queried for site replication metrics
|
||||
|
@ -823,6 +823,12 @@ func (z *SRMetricsSummary) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "Queued")
|
||||
return
|
||||
}
|
||||
case "Proxied":
|
||||
err = z.Proxied.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Proxied")
|
||||
return
|
||||
}
|
||||
case "Metrics":
|
||||
var zb0002 uint32
|
||||
zb0002, err = dc.ReadMapHeader()
|
||||
@ -872,9 +878,9 @@ func (z *SRMetricsSummary) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// map header, size 6
|
||||
// map header, size 7
|
||||
// write "ActiveWorkers"
|
||||
err = en.Append(0x86, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73)
|
||||
err = en.Append(0x87, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -913,6 +919,16 @@ func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "Queued")
|
||||
return
|
||||
}
|
||||
// write "Proxied"
|
||||
err = en.Append(0xa7, 0x50, 0x72, 0x6f, 0x78, 0x69, 0x65, 0x64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = z.Proxied.EncodeMsg(en)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Proxied")
|
||||
return
|
||||
}
|
||||
// write "Metrics"
|
||||
err = en.Append(0xa7, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73)
|
||||
if err != nil {
|
||||
@ -951,9 +967,9 @@ func (z *SRMetricsSummary) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *SRMetricsSummary) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// map header, size 6
|
||||
// map header, size 7
|
||||
// string "ActiveWorkers"
|
||||
o = append(o, 0x86, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73)
|
||||
o = append(o, 0x87, 0xad, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73)
|
||||
o, err = z.ActiveWorkers.MarshalMsg(o)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ActiveWorkers")
|
||||
@ -972,6 +988,13 @@ func (z *SRMetricsSummary) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "Queued")
|
||||
return
|
||||
}
|
||||
// string "Proxied"
|
||||
o = append(o, 0xa7, 0x50, 0x72, 0x6f, 0x78, 0x69, 0x65, 0x64)
|
||||
o, err = z.Proxied.MarshalMsg(o)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Proxied")
|
||||
return
|
||||
}
|
||||
// string "Metrics"
|
||||
o = append(o, 0xa7, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73)
|
||||
o = msgp.AppendMapHeader(o, uint32(len(z.Metrics)))
|
||||
@ -1031,6 +1054,12 @@ func (z *SRMetricsSummary) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "Queued")
|
||||
return
|
||||
}
|
||||
case "Proxied":
|
||||
bts, err = z.Proxied.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Proxied")
|
||||
return
|
||||
}
|
||||
case "Metrics":
|
||||
var zb0002 uint32
|
||||
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
|
||||
@ -1081,7 +1110,7 @@ func (z *SRMetricsSummary) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *SRMetricsSummary) Msgsize() (s int) {
|
||||
s = 1 + 14 + z.ActiveWorkers.Msgsize() + 12 + msgp.Int64Size + 13 + msgp.Int64Size + 7 + z.Queued.Msgsize() + 8 + msgp.MapHeaderSize
|
||||
s = 1 + 14 + z.ActiveWorkers.Msgsize() + 12 + msgp.Int64Size + 13 + msgp.Int64Size + 7 + z.Queued.Msgsize() + 8 + z.Proxied.Msgsize() + 8 + msgp.MapHeaderSize
|
||||
if z.Metrics != nil {
|
||||
for za0001, za0002 := range z.Metrics {
|
||||
_ = za0002
|
||||
|
@ -6016,6 +6016,7 @@ func (c *SiteReplicationSys) getSiteMetrics(ctx context.Context) (madmin.SRMetri
|
||||
}
|
||||
sm.ReplicaCount += peer.ReplicaCount
|
||||
sm.ReplicaSize += peer.ReplicaSize
|
||||
sm.Proxied.Add(madmin.ReplProxyMetric(peer.Proxied))
|
||||
for dID, v := range peer.Metrics {
|
||||
v2, ok := sm.Metrics[dID]
|
||||
if !ok {
|
||||
|
@ -109,6 +109,17 @@ For deployments with [bucket](https://min.io/docs/minio/linux/administration/buc
|
||||
| `minio_cluster_replication_sent_bytes` | (_Site Replication Only_) Total number of bytes replicated to the target cluster. |
|
||||
| `minio_cluster_replication_sent_count` | (_Site Replication Only_) Total number of objects replicated to the target cluster. |
|
||||
| `minio_cluster_replication_credential_errors` | (_Site Replication Only_) Total number of replication credential errors since server start |
|
||||
| `minio_cluster_replication_proxied_get_requests_total` | (_Site Replication Only_)Number of GET requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_head_requests_total` | (_Site Replication Only_)Number of HEAD requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_delete_tagging_requests_total` | (_Site Replication Only_)Number of DELETE tagging requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_get_tagging_requests_total` | (_Site Replication Only_)Number of GET tagging requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_put_tagging_requests_total` | (_Site Replication Only_)Number of PUT tagging requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_get_requests_failures` | (_Site Replication Only_)Number of failures in GET requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_head_requests_failures` | (_Site Replication Only_)Number of failures in HEAD requests proxied to replication target |
|
||||
| `minio_cluster_replication_proxied_delete_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying DELETE tagging requests to replication target |
|
||||
| `minio_cluster_replication_proxied_get_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying GET tagging requests to replication target |
|
||||
| `minio_cluster_replication_proxied_put_tagging_requests_failures` | (_Site Replication Only_)Number of failures proxying PUT tagging requests to replication target |
|
||||
|
||||
|
||||
## Healing Metrics
|
||||
|
||||
@ -290,6 +301,16 @@ For deployments with [Site Replication](https://min.io/docs/minio/linux/operatio
|
||||
| `minio_bucket_replication_sent_bytes` | Total number of bytes replicated to the target bucket. |
|
||||
| `minio_bucket_replication_sent_count` | Total number of objects replicated to the target bucket. |
|
||||
| `minio_bucket_replication_credential_errors` | Total number of replication credential errors since server start |
|
||||
| `minio_bucket_replication_proxied_get_requests_total` | Number of GET requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_head_requests_total` | Number of HEAD requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_delete_tagging_requests_total` | Number of DELETE tagging requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_get_tagging_requests_total` | Number of GET tagging requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_put_tagging_requests_total` | Number of PUT tagging requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_get_requests_failures` | Number of failures in GET requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_head_requests_failures` | Number of failures in HEAD requests proxied to replication target |
|
||||
| `minio_bucket_replication_proxied_delete_tagging_requests_failures` | Number of failures in DELETE tagging proxy requests to replication target |
|
||||
| `minio_bucket_replication_proxied_get_tagging_requests_failures` |Number of failures in GET tagging proxy requests to replication target |
|
||||
| `minio_bucket_replication_proxied_put_tagging_requests_failures` | Number of failures in PUT tagging proxy requests to replication target |
|
||||
|
||||
## Traffic Metrics
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -260,3 +260,5 @@ require (
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
||||
)
|
||||
|
||||
replace github.com/minio/madmin-go/v3 => github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a
|
||||
|
4
go.sum
4
go.sum
@ -443,8 +443,6 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
|
||||
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
|
||||
github.com/minio/kes-go v0.2.0 h1:HA33arq9s3MErbsj3PAXFVfFo4U4yw7lTKQ5kWFrpCA=
|
||||
github.com/minio/kes-go v0.2.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo=
|
||||
github.com/minio/madmin-go/v3 v3.0.43 h1:AkniczVEkBErQ94MikyPINGoOjFWhuP8xH5KmFpAaO0=
|
||||
github.com/minio/madmin-go/v3 v3.0.43/go.mod h1:ZDF7kf5fhmxLhbGTqyq5efs4ao0v4eWf7nOuef/ljJs=
|
||||
github.com/minio/mc v0.0.0-20240129194012-12f446e1de57 h1:FO4a9XVuLcIS5s11efycWkBNrfIz4HtDQgUhR+xmLsQ=
|
||||
github.com/minio/mc v0.0.0-20240129194012-12f446e1de57/go.mod h1:MmDLdb7NWd/OYhcKcXKvwErq2GNa/Zq6xtTWuhdC4II=
|
||||
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||
@ -548,6 +546,8 @@ github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
|
||||
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a h1:AR4wv/f8sXMwPlM2SBN3pNkuluftGrEhInr+/WbBaso=
|
||||
github.com/poornas/madmin-go/v3 v3.0.0-20240205194748-c24ddca6b68a/go.mod h1:ZDF7kf5fhmxLhbGTqyq5efs4ao0v4eWf7nOuef/ljJs=
|
||||
github.com/posener/complete v1.2.3 h1:NP0eAhjcjImqslEwo/1hq7gpajME0fTLTezBKDqfXqo=
|
||||
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||
|
Loading…
Reference in New Issue
Block a user