diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 616e3c01c..946310afc 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -48,18 +48,23 @@ const ( type healingTracker struct { disk StorageAPI `msg:"-"` - ID string - PoolIndex int - SetIndex int - DiskIndex int - Path string - Endpoint string - Started time.Time - LastUpdate time.Time - ObjectsHealed uint64 - ObjectsFailed uint64 - BytesDone uint64 - BytesFailed uint64 + ID string + PoolIndex int + SetIndex int + DiskIndex int + Path string + Endpoint string + Started time.Time + LastUpdate time.Time + + ObjectsTotalCount uint64 + ObjectsTotalSize uint64 + + ItemsHealed uint64 + ItemsFailed uint64 + + BytesDone uint64 + BytesFailed uint64 // Last object scanned. Bucket string `json:"-"` @@ -67,10 +72,10 @@ type healingTracker struct { // Numbers when current bucket started healing, // for resuming with correct numbers. - ResumeObjectsHealed uint64 `json:"-"` - ResumeObjectsFailed uint64 `json:"-"` - ResumeBytesDone uint64 `json:"-"` - ResumeBytesFailed uint64 `json:"-"` + ResumeItemsHealed uint64 `json:"-"` + ResumeItemsFailed uint64 `json:"-"` + ResumeBytesDone uint64 `json:"-"` + ResumeBytesFailed uint64 `json:"-"` // Filled on startup/restarts. QueuedBuckets []string @@ -175,8 +180,8 @@ func (h *healingTracker) isHealed(bucket string) bool { // resume will reset progress to the numbers at the start of the bucket. func (h *healingTracker) resume() { - h.ObjectsHealed = h.ResumeObjectsHealed - h.ObjectsFailed = h.ResumeObjectsFailed + h.ItemsHealed = h.ResumeItemsHealed + h.ItemsFailed = h.ResumeItemsFailed h.BytesDone = h.ResumeBytesDone h.BytesFailed = h.ResumeBytesFailed } @@ -184,8 +189,8 @@ func (h *healingTracker) resume() { // bucketDone should be called when a bucket is done healing. // Adds the bucket to the list of healed buckets and updates resume numbers. func (h *healingTracker) bucketDone(bucket string) { - h.ResumeObjectsHealed = h.ObjectsHealed - h.ResumeObjectsFailed = h.ObjectsFailed + h.ResumeItemsHealed = h.ItemsHealed + h.ResumeItemsFailed = h.ItemsFailed h.ResumeBytesDone = h.BytesDone h.ResumeBytesFailed = h.BytesFailed h.HealedBuckets = append(h.HealedBuckets, bucket) @@ -220,22 +225,28 @@ func (h *healingTracker) printTo(writer io.Writer) { // toHealingDisk converts the information to madmin.HealingDisk func (h *healingTracker) toHealingDisk() madmin.HealingDisk { return madmin.HealingDisk{ - ID: h.ID, - Endpoint: h.Endpoint, - PoolIndex: h.PoolIndex, - SetIndex: h.SetIndex, - DiskIndex: h.DiskIndex, - Path: h.Path, - Started: h.Started.UTC(), - LastUpdate: h.LastUpdate.UTC(), - ObjectsHealed: h.ObjectsHealed, - ObjectsFailed: h.ObjectsFailed, - BytesDone: h.BytesDone, - BytesFailed: h.BytesFailed, - Bucket: h.Bucket, - Object: h.Object, - QueuedBuckets: h.QueuedBuckets, - HealedBuckets: h.HealedBuckets, + ID: h.ID, + Endpoint: h.Endpoint, + PoolIndex: h.PoolIndex, + SetIndex: h.SetIndex, + DiskIndex: h.DiskIndex, + Path: h.Path, + Started: h.Started.UTC(), + LastUpdate: h.LastUpdate.UTC(), + ObjectsTotalCount: h.ObjectsTotalCount, + ObjectsTotalSize: h.ObjectsTotalSize, + ItemsHealed: h.ItemsHealed, + ItemsFailed: h.ItemsFailed, + BytesDone: h.BytesDone, + BytesFailed: h.BytesFailed, + Bucket: h.Bucket, + Object: h.Object, + QueuedBuckets: h.QueuedBuckets, + HealedBuckets: h.HealedBuckets, + + ObjectsHealed: h.ItemsHealed, // Deprecated July 2021 + ObjectsFailed: h.ItemsFailed, // Deprecated July 2021 + } } @@ -413,6 +424,14 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq tracker = newHealingTracker(disk) } + // Load bucket totals + cache := dataUsageCache{} + if err := cache.load(ctx, z.serverPools[i].sets[setIndex], dataUsageCacheName); err == nil { + dataUsageInfo := cache.dui(dataUsageRoot, nil) + tracker.ObjectsTotalCount = dataUsageInfo.ObjectsTotalCount + tracker.ObjectsTotalSize = dataUsageInfo.ObjectsTotalSize + } + tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc() tracker.setQueuedBuckets(buckets) if err := tracker.save(ctx); err != nil { diff --git a/cmd/background-newdisks-heal-ops_gen.go b/cmd/background-newdisks-heal-ops_gen.go index 13ca63c32..0b34df90c 100644 --- a/cmd/background-newdisks-heal-ops_gen.go +++ b/cmd/background-newdisks-heal-ops_gen.go @@ -72,16 +72,28 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "LastUpdate") return } - case "ObjectsHealed": - z.ObjectsHealed, err = dc.ReadUint64() + case "ObjectsTotalCount": + z.ObjectsTotalCount, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "ObjectsHealed") + err = msgp.WrapError(err, "ObjectsTotalCount") return } - case "ObjectsFailed": - z.ObjectsFailed, err = dc.ReadUint64() + case "ObjectsTotalSize": + z.ObjectsTotalSize, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "ObjectsFailed") + err = msgp.WrapError(err, "ObjectsTotalSize") + return + } + case "ItemsHealed": + z.ItemsHealed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ItemsHealed") + return + } + case "ItemsFailed": + z.ItemsFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ItemsFailed") return } case "BytesDone": @@ -108,16 +120,16 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "Object") return } - case "ResumeObjectsHealed": - z.ResumeObjectsHealed, err = dc.ReadUint64() + case "ResumeItemsHealed": + z.ResumeItemsHealed, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "ResumeObjectsHealed") + err = msgp.WrapError(err, "ResumeItemsHealed") return } - case "ResumeObjectsFailed": - z.ResumeObjectsFailed, err = dc.ReadUint64() + case "ResumeItemsFailed": + z.ResumeItemsFailed, err = dc.ReadUint64() if err != nil { - err = msgp.WrapError(err, "ResumeObjectsFailed") + err = msgp.WrapError(err, "ResumeItemsFailed") return } case "ResumeBytesDone": @@ -183,9 +195,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 20 + // map header, size 22 // write "ID" - err = en.Append(0xde, 0x0, 0x14, 0xa2, 0x49, 0x44) + err = en.Append(0xde, 0x0, 0x16, 0xa2, 0x49, 0x44) if err != nil { return } @@ -264,24 +276,44 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "LastUpdate") return } - // write "ObjectsHealed" - err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + // write "ObjectsTotalCount" + err = en.Append(0xb1, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74) if err != nil { return } - err = en.WriteUint64(z.ObjectsHealed) + err = en.WriteUint64(z.ObjectsTotalCount) if err != nil { - err = msgp.WrapError(err, "ObjectsHealed") + err = msgp.WrapError(err, "ObjectsTotalCount") return } - // write "ObjectsFailed" - err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + // write "ObjectsTotalSize" + err = en.Append(0xb0, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65) if err != nil { return } - err = en.WriteUint64(z.ObjectsFailed) + err = en.WriteUint64(z.ObjectsTotalSize) if err != nil { - err = msgp.WrapError(err, "ObjectsFailed") + err = msgp.WrapError(err, "ObjectsTotalSize") + return + } + // write "ItemsHealed" + err = en.Append(0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ItemsHealed) + if err != nil { + err = msgp.WrapError(err, "ItemsHealed") + return + } + // write "ItemsFailed" + err = en.Append(0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ItemsFailed) + if err != nil { + err = msgp.WrapError(err, "ItemsFailed") return } // write "BytesDone" @@ -324,24 +356,24 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "Object") return } - // write "ResumeObjectsHealed" - err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + // write "ResumeItemsHealed" + err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) if err != nil { return } - err = en.WriteUint64(z.ResumeObjectsHealed) + err = en.WriteUint64(z.ResumeItemsHealed) if err != nil { - err = msgp.WrapError(err, "ResumeObjectsHealed") + err = msgp.WrapError(err, "ResumeItemsHealed") return } - // write "ResumeObjectsFailed" - err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + // write "ResumeItemsFailed" + err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) if err != nil { return } - err = en.WriteUint64(z.ResumeObjectsFailed) + err = en.WriteUint64(z.ResumeItemsFailed) if err != nil { - err = msgp.WrapError(err, "ResumeObjectsFailed") + err = msgp.WrapError(err, "ResumeItemsFailed") return } // write "ResumeBytesDone" @@ -404,9 +436,9 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 20 + // map header, size 22 // string "ID" - o = append(o, 0xde, 0x0, 0x14, 0xa2, 0x49, 0x44) + o = append(o, 0xde, 0x0, 0x16, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) // string "PoolIndex" o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) @@ -429,12 +461,18 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "LastUpdate" o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) o = msgp.AppendTime(o, z.LastUpdate) - // string "ObjectsHealed" - o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) - o = msgp.AppendUint64(o, z.ObjectsHealed) - // string "ObjectsFailed" - o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) - o = msgp.AppendUint64(o, z.ObjectsFailed) + // string "ObjectsTotalCount" + o = append(o, 0xb1, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74) + o = msgp.AppendUint64(o, z.ObjectsTotalCount) + // string "ObjectsTotalSize" + o = append(o, 0xb0, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendUint64(o, z.ObjectsTotalSize) + // string "ItemsHealed" + o = append(o, 0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ItemsHealed) + // string "ItemsFailed" + o = append(o, 0xab, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ItemsFailed) // string "BytesDone" o = append(o, 0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) o = msgp.AppendUint64(o, z.BytesDone) @@ -447,12 +485,12 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { // string "Object" o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) o = msgp.AppendString(o, z.Object) - // string "ResumeObjectsHealed" - o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) - o = msgp.AppendUint64(o, z.ResumeObjectsHealed) - // string "ResumeObjectsFailed" - o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) - o = msgp.AppendUint64(o, z.ResumeObjectsFailed) + // string "ResumeItemsHealed" + o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeItemsHealed) + // string "ResumeItemsFailed" + o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeItemsFailed) // string "ResumeBytesDone" o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) o = msgp.AppendUint64(o, z.ResumeBytesDone) @@ -540,16 +578,28 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "LastUpdate") return } - case "ObjectsHealed": - z.ObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts) + case "ObjectsTotalCount": + z.ObjectsTotalCount, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "ObjectsHealed") + err = msgp.WrapError(err, "ObjectsTotalCount") return } - case "ObjectsFailed": - z.ObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts) + case "ObjectsTotalSize": + z.ObjectsTotalSize, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "ObjectsFailed") + err = msgp.WrapError(err, "ObjectsTotalSize") + return + } + case "ItemsHealed": + z.ItemsHealed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ItemsHealed") + return + } + case "ItemsFailed": + z.ItemsFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ItemsFailed") return } case "BytesDone": @@ -576,16 +626,16 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Object") return } - case "ResumeObjectsHealed": - z.ResumeObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts) + case "ResumeItemsHealed": + z.ResumeItemsHealed, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "ResumeObjectsHealed") + err = msgp.WrapError(err, "ResumeItemsHealed") return } - case "ResumeObjectsFailed": - z.ResumeObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts) + case "ResumeItemsFailed": + z.ResumeItemsFailed, bts, err = msgp.ReadUint64Bytes(bts) if err != nil { - err = msgp.WrapError(err, "ResumeObjectsFailed") + err = msgp.WrapError(err, "ResumeItemsFailed") return } case "ResumeBytesDone": @@ -652,7 +702,7 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *healingTracker) Msgsize() (s int) { - s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 14 + msgp.Uint64Size + 14 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 20 + msgp.Uint64Size + 20 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize + s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize for za0001 := range z.QueuedBuckets { s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 9db82f8ff..c24da8a71 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -945,7 +945,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, fi.VersionID) + er.addPartial(bucket, object, fi.VersionID, fi.Size) break } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 5a70e6802..99b7f8d9c 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -826,16 +826,6 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st return ObjectInfo{}, toObjectErr(err, bucket, object) } - // Whether a disk was initially or becomes offline - // during this upload, send it to the MRF list. - for i := 0; i < len(onlineDisks); i++ { - if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { - continue - } - er.addPartial(bucket, object, fi.VersionID) - break - } - for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // Object info is the same in all disks, so we can pick @@ -844,6 +834,17 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st break } } + + // Whether a disk was initially or becomes offline + // during this upload, send it to the MRF list. + for i := 0; i < len(onlineDisks); i++ { + if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { + continue + } + er.addPartial(bucket, object, fi.VersionID, fi.Size) + break + } + online = countOnlineDisks(onlineDisks) return fi.ToObjectInfo(bucket, object), nil @@ -1029,7 +1030,7 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec // all other direct versionId references we should // ensure no dangling file is left over. - er.addPartial(bucket, version.Name, version.VersionID) + er.addPartial(bucket, version.Name, version.VersionID, -1) break } } @@ -1177,7 +1178,7 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, opts.VersionID) + er.addPartial(bucket, object, opts.VersionID, -1) break } @@ -1192,11 +1193,15 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string // Send the successful but partial upload/delete, however ignore // if the channel is blocked by other items. -func (er erasureObjects) addPartial(bucket, object, versionID string) { - select { - case er.mrfOpCh <- partialOperation{bucket: bucket, object: object, versionID: versionID}: - default: - } +func (er erasureObjects) addPartial(bucket, object, versionID string, size int64) { + globalMRFState.addPartialOp(partialOperation{ + bucket: bucket, + object: object, + versionID: versionID, + size: size, + setIndex: er.setIndex, + poolIndex: er.poolIndex, + }) } func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { @@ -1421,7 +1426,7 @@ func (er erasureObjects) TransitionObject(ctx context.Context, bucket, object st if disk != nil && disk.IsOnline() { continue } - er.addPartial(bucket, object, opts.VersionID) + er.addPartial(bucket, object, opts.VersionID, -1) break } // Notify object deleted event. diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index caff485e3..a1d3fd0fb 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -96,8 +96,6 @@ type erasureSets struct { disksStorageInfoCache timedValue - mrfMU sync.Mutex - mrfOperations map[healSource]int lastConnectDisksOpTime time.Time } @@ -268,20 +266,11 @@ func (s *erasureSets) connectDisks() { wg.Wait() go func() { - idler := time.NewTimer(100 * time.Millisecond) - defer idler.Stop() - for setIndex, justConnected := range setsJustConnected { if !justConnected { continue } - - // Send a new set connect event with a timeout - idler.Reset(100 * time.Millisecond) - select { - case s.setReconnectEvent <- setIndex: - case <-idler.C: - } + globalMRFState.newSetReconnected(setIndex, s.poolIndex) } }() } @@ -375,7 +364,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto setReconnectEvent: make(chan int), distributionAlgo: format.Erasure.DistributionAlgo, deploymentID: uuid.MustParse(format.ID), - mrfOperations: make(map[healSource]int), poolIndex: poolIdx, } @@ -446,7 +434,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto nsMutex: mutex, bp: bp, bpOld: bpOld, - mrfOpCh: make(chan partialOperation, 10000), } } @@ -466,8 +453,6 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(ctx, defaultMonitorConnectEndpointInterval) - go s.maintainMRFList() - go s.healMRFRoutine() return s, nil } @@ -1388,71 +1373,6 @@ func (s *erasureSets) GetObjectTags(ctx context.Context, bucket, object string, return er.GetObjectTags(ctx, bucket, object, opts) } -// maintainMRFList gathers the list of successful partial uploads -// from all underlying er.sets and puts them in a global map which -// should not have more than 10000 entries. -func (s *erasureSets) maintainMRFList() { - var agg = make(chan partialOperation, 10000) - for i, er := range s.sets { - go func(c <-chan partialOperation, setIndex int) { - for msg := range c { - msg.failedSet = setIndex - select { - case agg <- msg: - default: - } - } - }(er.mrfOpCh, i) - } - - for fOp := range agg { - s.mrfMU.Lock() - if len(s.mrfOperations) > 10000 { - s.mrfMU.Unlock() - continue - } - s.mrfOperations[healSource{ - bucket: fOp.bucket, - object: fOp.object, - versionID: fOp.versionID, - opts: &madmin.HealOpts{Remove: true}, - }] = fOp.failedSet - s.mrfMU.Unlock() - } -} - -// healMRFRoutine monitors new disks connection, sweep the MRF list -// to find objects related to the new disk that needs to be healed. -func (s *erasureSets) healMRFRoutine() { - // Wait until background heal state is initialized - bgSeq := mustGetHealSequence(GlobalContext) - - for setIndex := range s.setReconnectEvent { - // Get the list of objects related the er.set - // to which the connected disk belongs. - var mrfOperations []healSource - s.mrfMU.Lock() - for k, v := range s.mrfOperations { - if v == setIndex { - mrfOperations = append(mrfOperations, k) - } - } - s.mrfMU.Unlock() - - // Heal objects - for _, u := range mrfOperations { - waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) - - // Send an object to background heal - bgSeq.sourceCh <- u - - s.mrfMU.Lock() - delete(s.mrfOperations, u) - s.mrfMU.Unlock() - } - } -} - // TransitionObject - transition object content to target tier. func (s *erasureSets) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error { return s.getHashedSet(object).TransitionObject(ctx, bucket, object, opts) diff --git a/cmd/erasure.go b/cmd/erasure.go index d2c200e29..dca91febf 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -39,15 +39,6 @@ import ( // OfflineDisk represents an unavailable disk. var OfflineDisk StorageAPI // zero value is nil -// partialOperation is a successful upload/delete of an object -// but not written in all disks (having quorum) -type partialOperation struct { - bucket string - object string - versionID string - failedSet int -} - // erasureObjects - Implements ER object layer. type erasureObjects struct { GatewayUnsupported @@ -78,8 +69,6 @@ type erasureObjects struct { // legacy objects. bpOld *bpool.BytePoolCap - mrfOpCh chan partialOperation - deletedCleanupSleeper *dynamicSleeper } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 3326347b0..39326c5c1 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -68,6 +68,17 @@ func newBgHealSequence() *healSequence { } } +func getCurrentMRFStatus() madmin.MRFStatus { + mrfInfo := globalMRFState.getCurrentMRFRoundInfo() + return madmin.MRFStatus{ + BytesHealed: mrfInfo.bytesHealed, + ItemsHealed: mrfInfo.itemsHealed, + TotalItems: mrfInfo.itemsHealed + mrfInfo.pendingItems, + TotalBytes: mrfInfo.bytesHealed + mrfInfo.pendingBytes, + Started: mrfInfo.triggeredAt, + } +} + // getBackgroundHealStatus will return the func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealState, bool) { if globalBackgroundHealState == nil { @@ -79,13 +90,20 @@ func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealS return madmin.BgHealState{}, false } + status := madmin.BgHealState{ + ScannedItemsCount: bgSeq.getScannedItemsCount(), + } + + if globalMRFState != nil { + status.MRF = map[string]madmin.MRFStatus{ + globalLocalNodeName: getCurrentMRFStatus(), + } + } + var healDisksMap = map[string]struct{}{} for _, ep := range getLocalDisksToHeal() { healDisksMap[ep.String()] = struct{}{} } - status := madmin.BgHealState{ - ScannedItemsCount: bgSeq.getScannedItemsCount(), - } if o == nil { healing := globalBackgroundHealState.getLocalHealingDisks() @@ -225,12 +243,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // If not deleted, assume they failed. - tracker.ObjectsFailed++ + tracker.ItemsFailed++ tracker.BytesFailed += uint64(version.Size) logger.LogIf(ctx, err) } } else { - tracker.ObjectsHealed++ + tracker.ItemsHealed++ tracker.BytesDone += uint64(version.Size) } bgSeq.logHeal(madmin.HealItemObject) diff --git a/cmd/globals.go b/cmd/globals.go index 5e513be4c..34baf3614 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -293,6 +293,8 @@ var ( globalBackgroundHealRoutine *healRoutine globalBackgroundHealState *allHealState + globalMRFState *mrfState + // If writes to FS backend should be O_SYNC. globalFSOSync bool diff --git a/cmd/mrf.go b/cmd/mrf.go new file mode 100644 index 000000000..00e53ce5d --- /dev/null +++ b/cmd/mrf.go @@ -0,0 +1,242 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "sync" + "time" + + "github.com/minio/madmin-go" + "github.com/minio/minio/internal/logger" +) + +var mrfHealingOpts = madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling} + +const ( + mrfInfoResetInterval = 10 * time.Second + mrfOpsQueueSize = 10000 +) + +// partialOperation is a successful upload/delete of an object +// but not written in all disks (having quorum) +type partialOperation struct { + bucket string + object string + versionID string + size int64 + setIndex int + poolIndex int +} + +type setInfo struct { + index, pool int +} + +type mrfStats struct { + triggeredAt time.Time + + itemsHealed uint64 + bytesHealed uint64 + + pendingItems uint64 + pendingBytes uint64 +} + +// mrfState sncapsulates all the information +// related to the global background MRF. +type mrfState struct { + ctx context.Context + objectAPI ObjectLayer + + mu sync.Mutex + opCh chan partialOperation + pendingOps map[partialOperation]setInfo + setReconnectEvent chan setInfo + + itemsHealed uint64 + bytesHealed uint64 + pendingItems uint64 + pendingBytes uint64 + + triggeredAt time.Time +} + +// Add a partial S3 operation (put/delete) when one or more disks are offline. +func (m *mrfState) addPartialOp(op partialOperation) { + if m == nil { + return + } + + select { + case m.opCh <- op: + default: + } +} + +// Receive the new set (disk) reconnection event +func (m *mrfState) newSetReconnected(pool, set int) { + if m == nil { + return + } + + idler := time.NewTimer(100 * time.Millisecond) + defer idler.Stop() + + select { + case m.setReconnectEvent <- setInfo{index: set, pool: pool}: + case <-idler.C: + } +} + +// Get current MRF stats +func (m *mrfState) getCurrentMRFRoundInfo() mrfStats { + m.mu.Lock() + triggeredAt := m.triggeredAt + itemsHealed := m.itemsHealed + bytesHealed := m.bytesHealed + pendingItems := m.pendingItems + pendingBytes := m.pendingBytes + m.mu.Unlock() + + if pendingItems == 0 { + return mrfStats{} + } + + return mrfStats{ + triggeredAt: triggeredAt, + itemsHealed: itemsHealed, + bytesHealed: bytesHealed, + pendingItems: pendingItems, + pendingBytes: pendingBytes, + } +} + +// maintainMRFList gathers the list of successful partial uploads +// from all underlying er.sets and puts them in a global map which +// should not have more than 10000 entries. +func (m *mrfState) maintainMRFList() { + for fOp := range m.opCh { + m.mu.Lock() + if len(m.pendingOps) > mrfOpsQueueSize { + m.mu.Unlock() + continue + } + + m.pendingOps[fOp] = setInfo{index: fOp.setIndex, pool: fOp.poolIndex} + m.pendingItems++ + if fOp.size > 0 { + m.pendingBytes += uint64(fOp.size) + } + + m.mu.Unlock() + } +} + +// Reset current MRF stats +func (m *mrfState) resetMRFInfoIfNoPendingOps() { + m.mu.Lock() + defer m.mu.Unlock() + + if m.pendingItems > 0 { + return + } + + m.itemsHealed = 0 + m.bytesHealed = 0 + m.pendingItems = 0 + m.pendingBytes = 0 + m.triggeredAt = time.Time{} +} + +// healRoutine listens to new disks reconnection events and +// issues healing requests for queued objects belonging to the +// corresponding erasure set +func (m *mrfState) healRoutine() { + idler := time.NewTimer(mrfInfoResetInterval) + defer idler.Stop() + + for { + idler.Reset(mrfInfoResetInterval) + select { + case <-m.ctx.Done(): + return + case <-idler.C: + m.resetMRFInfoIfNoPendingOps() + case setInfo := <-m.setReconnectEvent: + // Get the list of objects related the er.set + // to which the connected disk belongs. + var mrfOperations []partialOperation + m.mu.Lock() + for k, v := range m.pendingOps { + if v == setInfo { + mrfOperations = append(mrfOperations, k) + } + } + m.mu.Unlock() + + if len(mrfOperations) == 0 { + continue + } + + m.mu.Lock() + m.triggeredAt = time.Now().UTC() + m.mu.Unlock() + + // Heal objects + for _, u := range mrfOperations { + waitForLowHTTPReq(globalHealConfig.IOCount, globalHealConfig.Sleep) + if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil { + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + // If not deleted, assume they failed. + logger.LogIf(m.ctx, err) + } else { + m.mu.Lock() + m.itemsHealed++ + m.pendingItems-- + m.mu.Unlock() + } + } else { + m.mu.Lock() + m.itemsHealed++ + m.pendingItems-- + m.bytesHealed += uint64(u.size) + m.pendingBytes -= uint64(u.size) + m.mu.Unlock() + } + + m.mu.Lock() + delete(m.pendingOps, u) + m.mu.Unlock() + } + } + } +} + +// Initialize healing MRF +func initHealMRF(ctx context.Context, obj ObjectLayer) { + globalMRFState = &mrfState{ + ctx: ctx, + objectAPI: obj, + opCh: make(chan partialOperation, mrfOpsQueueSize), + pendingOps: make(map[partialOperation]setInfo), + setReconnectEvent: make(chan setInfo), + } + go globalMRFState.maintainMRFList() + go globalMRFState.healRoutine() +} diff --git a/cmd/server-main.go b/cmd/server-main.go index a84fa96d2..0de6868e1 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -537,6 +537,7 @@ func serverMain(ctx *cli.Context) { // Enable background operations for erasure coding if globalIsErasure { initAutoHeal(GlobalContext, newObject) + initHealMRF(GlobalContext, newObject) initBackgroundTransition(GlobalContext, newObject) } diff --git a/go.mod b/go.mod index bf122ea8f..571e6a605 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/secure-io/sio-go v0.3.1 github.com/streadway/amqp v1.0.0 github.com/tinylib/msgp v1.1.6-0.20210521143832-0becd170c402 + github.com/ulikunitz/xz v0.5.10 // indirect github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c diff --git a/go.sum b/go.sum index 365142efc..b6fa32169 100644 --- a/go.sum +++ b/go.sum @@ -1410,6 +1410,8 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8= github.com/ulikunitz/xz v0.5.7/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= +github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/ultraware/funlen v0.0.2/go.mod h1:Dp4UiAus7Wdb9KUZsYWZEWiRzGuM2kXM1lPbfaF6xhA= github.com/ultraware/whitespace v0.0.4/go.mod h1:aVMh/gQve5Maj9hQ/hg+F75lr/X5A89uZnzAmWSineA= github.com/unrolled/secure v1.0.7 h1:BcQHp3iKZyZCKj5gRqwQG+5urnGBF00wGgoPPwtheVQ=