mirror of https://github.com/minio/minio.git
fix: cache usage deserialization from v5 to v6 (#13258)
This commit is contained in:
parent
565d95a377
commit
0b55a0423e
|
@ -57,6 +57,26 @@ type dataUsageEntry struct {
|
|||
Compacted bool
|
||||
}
|
||||
|
||||
//msgp:tuple replicationStatsV1
|
||||
type replicationStatsV1 struct {
|
||||
PendingSize uint64
|
||||
ReplicatedSize uint64
|
||||
FailedSize uint64
|
||||
ReplicaSize uint64
|
||||
FailedCount uint64
|
||||
PendingCount uint64
|
||||
MissedThresholdSize uint64
|
||||
AfterThresholdSize uint64
|
||||
MissedThresholdCount uint64
|
||||
AfterThresholdCount uint64
|
||||
}
|
||||
|
||||
func (rsv1 replicationStatsV1) Empty() bool {
|
||||
return rsv1.ReplicatedSize == 0 &&
|
||||
rsv1.FailedSize == 0 &&
|
||||
rsv1.FailedCount == 0
|
||||
}
|
||||
|
||||
//msgp:tuple replicationStats
|
||||
type replicationStats struct {
|
||||
PendingSize uint64
|
||||
|
@ -79,7 +99,7 @@ func (rs replicationStats) Empty() bool {
|
|||
//msgp:tuple replicationAllStats
|
||||
type replicationAllStats struct {
|
||||
Targets map[string]replicationStats
|
||||
ReplicaSize uint64
|
||||
ReplicaSize uint64 `msg:"ReplicaSize,omitempty"`
|
||||
}
|
||||
|
||||
//msgp:encode ignore dataUsageEntryV2 dataUsageEntryV3 dataUsageEntryV4
|
||||
|
@ -114,7 +134,7 @@ type dataUsageEntryV4 struct {
|
|||
Size int64
|
||||
Objects uint64
|
||||
ObjSizes sizeHistogram
|
||||
ReplicationStats replicationStats
|
||||
ReplicationStats replicationStatsV1
|
||||
}
|
||||
|
||||
//msgp:tuple dataUsageEntryV5
|
||||
|
@ -125,7 +145,7 @@ type dataUsageEntryV5 struct {
|
|||
Objects uint64
|
||||
Versions uint64 // Versions that are not delete markers.
|
||||
ObjSizes sizeHistogram
|
||||
ReplicationStats *replicationStats
|
||||
ReplicationStats *replicationStatsV1
|
||||
Compacted bool
|
||||
}
|
||||
|
||||
|
@ -968,7 +988,7 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
|
|||
ObjSizes: v.ObjSizes,
|
||||
Children: v.Children,
|
||||
}
|
||||
empty := replicationStats{}
|
||||
empty := replicationStatsV1{}
|
||||
|
||||
if v.ReplicationStats != empty {
|
||||
due.ReplicationStats = &replicationAllStats{
|
||||
|
@ -978,7 +998,13 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
due.ReplicationStats.Targets[cfg.RoleArn] = v.ReplicationStats
|
||||
due.ReplicationStats.Targets[cfg.RoleArn] = replicationStats{
|
||||
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
|
||||
FailedSize: v.ReplicationStats.FailedSize,
|
||||
FailedCount: v.ReplicationStats.FailedCount,
|
||||
PendingSize: v.ReplicationStats.PendingSize,
|
||||
}
|
||||
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
|
||||
}
|
||||
due.Compacted = len(due.Children) == 0 && k != d.Info.Name
|
||||
|
||||
|
@ -1030,7 +1056,13 @@ func (d *dataUsageCache) deserialize(r io.Reader) error {
|
|||
}
|
||||
|
||||
if arn != "" {
|
||||
due.ReplicationStats.Targets[arn] = *v.ReplicationStats
|
||||
due.ReplicationStats.Targets[arn] = replicationStats{
|
||||
ReplicatedSize: v.ReplicationStats.ReplicatedSize,
|
||||
FailedSize: v.ReplicationStats.FailedSize,
|
||||
FailedCount: v.ReplicationStats.FailedCount,
|
||||
PendingSize: v.ReplicationStats.PendingSize,
|
||||
}
|
||||
due.ReplicationStats.ReplicaSize = v.ReplicationStats.ReplicaSize
|
||||
}
|
||||
}
|
||||
due.Compacted = len(due.Children) == 0 && k != d.Info.Name
|
||||
|
|
|
@ -1909,7 +1909,7 @@ func (z *dataUsageEntryV5) DecodeMsg(dc *msgp.Reader) (err error) {
|
|||
z.ReplicationStats = nil
|
||||
} else {
|
||||
if z.ReplicationStats == nil {
|
||||
z.ReplicationStats = new(replicationStats)
|
||||
z.ReplicationStats = new(replicationStatsV1)
|
||||
}
|
||||
err = z.ReplicationStats.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
|
@ -2071,7 +2071,7 @@ func (z *dataUsageEntryV5) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
|||
z.ReplicationStats = nil
|
||||
} else {
|
||||
if z.ReplicationStats == nil {
|
||||
z.ReplicationStats = new(replicationStats)
|
||||
z.ReplicationStats = new(replicationStatsV1)
|
||||
}
|
||||
bts, err = z.ReplicationStats.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
|
@ -2513,6 +2513,221 @@ func (z *replicationStats) Msgsize() (s int) {
|
|||
return
|
||||
}
|
||||
|
||||
// DecodeMsg implements msgp.Decodable
|
||||
func (z *replicationStatsV1) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
var zb0001 uint32
|
||||
zb0001, err = dc.ReadArrayHeader()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 10 {
|
||||
err = msgp.ArrayError{Wanted: 10, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.PendingSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingSize")
|
||||
return
|
||||
}
|
||||
z.ReplicatedSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicatedSize")
|
||||
return
|
||||
}
|
||||
z.FailedSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedSize")
|
||||
return
|
||||
}
|
||||
z.ReplicaSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicaSize")
|
||||
return
|
||||
}
|
||||
z.FailedCount, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedCount")
|
||||
return
|
||||
}
|
||||
z.PendingCount, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingCount")
|
||||
return
|
||||
}
|
||||
z.MissedThresholdSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdSize")
|
||||
return
|
||||
}
|
||||
z.AfterThresholdSize, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdSize")
|
||||
return
|
||||
}
|
||||
z.MissedThresholdCount, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdCount")
|
||||
return
|
||||
}
|
||||
z.AfterThresholdCount, err = dc.ReadUint64()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdCount")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *replicationStatsV1) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// array header, size 10
|
||||
err = en.Append(0x9a)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.PendingSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.ReplicatedSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicatedSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.FailedSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.ReplicaSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicaSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.FailedCount)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedCount")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.PendingCount)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingCount")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.MissedThresholdSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.AfterThresholdSize)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdSize")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.MissedThresholdCount)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdCount")
|
||||
return
|
||||
}
|
||||
err = en.WriteUint64(z.AfterThresholdCount)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdCount")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *replicationStatsV1) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// array header, size 10
|
||||
o = append(o, 0x9a)
|
||||
o = msgp.AppendUint64(o, z.PendingSize)
|
||||
o = msgp.AppendUint64(o, z.ReplicatedSize)
|
||||
o = msgp.AppendUint64(o, z.FailedSize)
|
||||
o = msgp.AppendUint64(o, z.ReplicaSize)
|
||||
o = msgp.AppendUint64(o, z.FailedCount)
|
||||
o = msgp.AppendUint64(o, z.PendingCount)
|
||||
o = msgp.AppendUint64(o, z.MissedThresholdSize)
|
||||
o = msgp.AppendUint64(o, z.AfterThresholdSize)
|
||||
o = msgp.AppendUint64(o, z.MissedThresholdCount)
|
||||
o = msgp.AppendUint64(o, z.AfterThresholdCount)
|
||||
return
|
||||
}
|
||||
|
||||
// UnmarshalMsg implements msgp.Unmarshaler
|
||||
func (z *replicationStatsV1) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
var zb0001 uint32
|
||||
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 10 {
|
||||
err = msgp.ArrayError{Wanted: 10, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.PendingSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingSize")
|
||||
return
|
||||
}
|
||||
z.ReplicatedSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicatedSize")
|
||||
return
|
||||
}
|
||||
z.FailedSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedSize")
|
||||
return
|
||||
}
|
||||
z.ReplicaSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "ReplicaSize")
|
||||
return
|
||||
}
|
||||
z.FailedCount, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "FailedCount")
|
||||
return
|
||||
}
|
||||
z.PendingCount, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "PendingCount")
|
||||
return
|
||||
}
|
||||
z.MissedThresholdSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdSize")
|
||||
return
|
||||
}
|
||||
z.AfterThresholdSize, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdSize")
|
||||
return
|
||||
}
|
||||
z.MissedThresholdCount, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "MissedThresholdCount")
|
||||
return
|
||||
}
|
||||
z.AfterThresholdCount, bts, err = msgp.ReadUint64Bytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "AfterThresholdCount")
|
||||
return
|
||||
}
|
||||
o = bts
|
||||
return
|
||||
}
|
||||
|
||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||
func (z *replicationStatsV1) Msgsize() (s int) {
|
||||
s = 1 + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size + msgp.Uint64Size
|
||||
return
|
||||
}
|
||||
|
||||
// DecodeMsg implements msgp.Decodable
|
||||
func (z *sizeHistogram) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
var zb0001 uint32
|
||||
|
|
|
@ -687,6 +687,119 @@ func BenchmarkDecodereplicationStats(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalreplicationStatsV1(t *testing.T) {
|
||||
v := replicationStatsV1{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
left, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
|
||||
}
|
||||
|
||||
left, err = msgp.Skip(bts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(left) > 0 {
|
||||
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMarshalMsgreplicationStatsV1(b *testing.B) {
|
||||
v := replicationStatsV1{}
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.MarshalMsg(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendMsgreplicationStatsV1(b *testing.B) {
|
||||
v := replicationStatsV1{}
|
||||
bts := make([]byte, 0, v.Msgsize())
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
bts, _ = v.MarshalMsg(bts[0:0])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUnmarshalreplicationStatsV1(b *testing.B) {
|
||||
v := replicationStatsV1{}
|
||||
bts, _ := v.MarshalMsg(nil)
|
||||
b.ReportAllocs()
|
||||
b.SetBytes(int64(len(bts)))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := v.UnmarshalMsg(bts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncodeDecodereplicationStatsV1(t *testing.T) {
|
||||
v := replicationStatsV1{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
|
||||
m := v.Msgsize()
|
||||
if buf.Len() > m {
|
||||
t.Log("WARNING: TestEncodeDecodereplicationStatsV1 Msgsize() is inaccurate")
|
||||
}
|
||||
|
||||
vn := replicationStatsV1{}
|
||||
err := msgp.Decode(&buf, &vn)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
msgp.Encode(&buf, &v)
|
||||
err = msgp.NewReader(&buf).Skip()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEncodereplicationStatsV1(b *testing.B) {
|
||||
v := replicationStatsV1{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
en := msgp.NewWriter(msgp.Nowhere)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
v.EncodeMsg(en)
|
||||
}
|
||||
en.Flush()
|
||||
}
|
||||
|
||||
func BenchmarkDecodereplicationStatsV1(b *testing.B) {
|
||||
v := replicationStatsV1{}
|
||||
var buf bytes.Buffer
|
||||
msgp.Encode(&buf, &v)
|
||||
b.SetBytes(int64(buf.Len()))
|
||||
rd := msgp.NewEndlessReader(buf.Bytes(), b)
|
||||
dc := msgp.NewReader(rd)
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := v.DecodeMsg(dc)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalUnmarshalsizeHistogram(t *testing.T) {
|
||||
v := sizeHistogram{}
|
||||
bts, err := v.MarshalMsg(nil)
|
||||
|
|
|
@ -186,7 +186,12 @@ mc replicate add myminio/srcbucket/Tax --priority 1 --remote-bucket "arn:minio:r
|
|||
Replication configuration applied successfully to myminio/srcbucket.
|
||||
```
|
||||
|
||||
> NOTE: Both source and target instance need to be upgraded to latest release to take advantage of Delete marker replication.
|
||||
> NOTE: In mc versions RELEASE.2021-09-02T09-21-27Z and older, the remote target ARN needs to be passed in the --arn flag and actual remote bucket name in --remote-bucket flag of `mc replicate add`. For example, with the ARN above the replication configuration used to be added with
|
||||
```
|
||||
mc replicate add myminio/srcbucket/Tax --priority 1 --remote-bucket destbucket --arn "arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket" --tags "Year=2019&Company=AcmeCorp" --storage-class "STANDARD" --replicate "delete,delete-marker"
|
||||
Replication configuration applied successfully to myminio/srcbucket.
|
||||
```
|
||||
Also note that for `mc` version `RELEASE.2021-09-02T09-21-27Z` or older supports only a single remote target per bucket. To take advantage of multiple destination replication, use the latest version of `mc`
|
||||
|
||||
Status of delete marker replication can be viewed by doing a GET/HEAD on the object version - it will return a `X-Minio-Replication-DeleteMarker-Status` header and http response code of `405`. In the case of permanent deletes, if the delete replication is pending or failed to propagate to the target cluster, GET/HEAD will return additional `X-Minio-Replication-Delete-Status` header and a http response code of `405`.
|
||||
|
||||
|
|
Loading…
Reference in New Issue