heal: Add finished flag to .healing.bin to avoid removing this latter (#20250)

Sometimes, we need historical information in .healing.bin, such as the
number of expired objects that the healing avoids to heal and that can
create drive usage disparency in the same erasure set. For that reason,
this commit will not remove .healing.bin anymore and it will have a new
field called Finished so we know healing is finished in that drive.
This commit is contained in:
Anis Eleuch 2024-08-20 16:42:49 +01:00 committed by GitHub
parent 37383ecd09
commit 85c3db3a93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 121 additions and 21 deletions

View File

@ -72,10 +72,12 @@ type healingTracker struct {
// Numbers when current bucket started healing,
// for resuming with correct numbers.
ResumeItemsHealed uint64 `json:"-"`
ResumeItemsFailed uint64 `json:"-"`
ResumeBytesDone uint64 `json:"-"`
ResumeBytesFailed uint64 `json:"-"`
ResumeItemsHealed uint64 `json:"-"`
ResumeItemsFailed uint64 `json:"-"`
ResumeItemsSkipped uint64 `json:"-"`
ResumeBytesDone uint64 `json:"-"`
ResumeBytesFailed uint64 `json:"-"`
ResumeBytesSkipped uint64 `json:"-"`
// Filled on startup/restarts.
QueuedBuckets []string
@ -90,6 +92,9 @@ type healingTracker struct {
BytesSkipped uint64
RetryAttempts uint64
Finished bool // finished healing, whether with errors or not
// Add future tracking capabilities
// Be sure that they are included in toHealingDisk
}
@ -262,8 +267,10 @@ func (h *healingTracker) resume() {
h.ItemsHealed = h.ResumeItemsHealed
h.ItemsFailed = h.ResumeItemsFailed
h.ItemsSkipped = h.ResumeItemsSkipped
h.BytesDone = h.ResumeBytesDone
h.BytesFailed = h.ResumeBytesFailed
h.BytesSkipped = h.ResumeBytesSkipped
}
// bucketDone should be called when a bucket is done healing.
@ -274,8 +281,10 @@ func (h *healingTracker) bucketDone(bucket string) {
h.ResumeItemsHealed = h.ItemsHealed
h.ResumeItemsFailed = h.ItemsFailed
h.ResumeItemsSkipped = h.ItemsSkipped
h.ResumeBytesDone = h.BytesDone
h.ResumeBytesFailed = h.BytesFailed
h.ResumeBytesSkipped = h.BytesSkipped
h.HealedBuckets = append(h.HealedBuckets, bucket)
for i, b := range h.QueuedBuckets {
if b == bucket {
@ -324,6 +333,7 @@ func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
PoolIndex: h.PoolIndex,
SetIndex: h.SetIndex,
DiskIndex: h.DiskIndex,
Finished: h.Finished,
Path: h.Path,
Started: h.Started.UTC(),
LastUpdate: h.LastUpdate.UTC(),
@ -373,7 +383,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
disksToHeal = append(disksToHeal, disk.Endpoint())
continue
}
if disk.Healing() != nil {
if h := disk.Healing(); h != nil && !h.Finished {
disksToHeal = append(disksToHeal, disk.Endpoint())
}
}
@ -519,7 +529,8 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
continue
}
if t.HealID == tracker.HealID {
t.delete(ctx)
t.Finished = true
t.update(ctx)
}
}

View File

@ -132,6 +132,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
case "ResumeItemsSkipped":
z.ResumeItemsSkipped, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeItemsSkipped")
return
}
case "ResumeBytesDone":
z.ResumeBytesDone, err = dc.ReadUint64()
if err != nil {
@ -144,6 +150,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
case "ResumeBytesSkipped":
z.ResumeBytesSkipped, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeBytesSkipped")
return
}
case "QueuedBuckets":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
@ -206,6 +218,12 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "RetryAttempts")
return
}
case "Finished":
z.Finished, err = dc.ReadBool()
if err != nil {
err = msgp.WrapError(err, "Finished")
return
}
default:
err = dc.Skip()
if err != nil {
@ -219,9 +237,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 26
// map header, size 29
// write "ID"
err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44)
err = en.Append(0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44)
if err != nil {
return
}
@ -400,6 +418,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
// write "ResumeItemsSkipped"
err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeItemsSkipped)
if err != nil {
err = msgp.WrapError(err, "ResumeItemsSkipped")
return
}
// write "ResumeBytesDone"
err = en.Append(0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
if err != nil {
@ -420,6 +448,16 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
// write "ResumeBytesSkipped"
err = en.Append(0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeBytesSkipped)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesSkipped")
return
}
// write "QueuedBuckets"
err = en.Append(0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
if err != nil {
@ -494,15 +532,25 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "RetryAttempts")
return
}
// write "Finished"
err = en.Append(0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteBool(z.Finished)
if err != nil {
err = msgp.WrapError(err, "Finished")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 26
// map header, size 29
// string "ID"
o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x49, 0x44)
o = append(o, 0xde, 0x0, 0x1d, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
// string "PoolIndex"
o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
@ -555,12 +603,18 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
// string "ResumeItemsFailed"
o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeItemsFailed)
// string "ResumeItemsSkipped"
o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeItemsSkipped)
// string "ResumeBytesDone"
o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
o = msgp.AppendUint64(o, z.ResumeBytesDone)
// string "ResumeBytesFailed"
o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeBytesFailed)
// string "ResumeBytesSkipped"
o = append(o, 0xb2, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeBytesSkipped)
// string "QueuedBuckets"
o = append(o, 0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets)))
@ -585,6 +639,9 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
// string "RetryAttempts"
o = append(o, 0xad, 0x52, 0x65, 0x74, 0x72, 0x79, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x73)
o = msgp.AppendUint64(o, z.RetryAttempts)
// string "Finished"
o = append(o, 0xa8, 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64)
o = msgp.AppendBool(o, z.Finished)
return
}
@ -714,6 +771,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "ResumeItemsFailed")
return
}
case "ResumeItemsSkipped":
z.ResumeItemsSkipped, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeItemsSkipped")
return
}
case "ResumeBytesDone":
z.ResumeBytesDone, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
@ -726,6 +789,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
case "ResumeBytesSkipped":
z.ResumeBytesSkipped, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesSkipped")
return
}
case "QueuedBuckets":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
@ -788,6 +857,12 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "RetryAttempts")
return
}
case "Finished":
z.Finished, bts, err = msgp.ReadBoolBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Finished")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -802,7 +877,7 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *healingTracker) Msgsize() (s int) {
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize
s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 18 + msgp.Uint64Size + 17 + msgp.Uint64Size + 12 + msgp.Uint64Size + 12 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 18 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 19 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize
for za0001 := range z.QueuedBuckets {
s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001])
}
@ -810,6 +885,6 @@ func (z *healingTracker) Msgsize() (s int) {
for za0002 := range z.HealedBuckets {
s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002])
}
s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size
s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size + 14 + msgp.Uint64Size + 9 + msgp.BoolSize
return
}

View File

@ -230,8 +230,11 @@ func (s *erasureSets) connectDisks(log bool) {
}
return
}
if disk.IsLocal() && disk.Healing() != nil {
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
if disk.IsLocal() {
h := disk.Healing()
if h != nil && !h.Finished {
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
}
}
s.erasureDisksMu.Lock()
setIndex, diskIndex, err := findDiskIndex(s.format, format)

View File

@ -203,11 +203,9 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint, metrics bool) (disks
di.State = diskErrToDriveState(err)
di.FreeInodes = info.FreeInodes
di.UsedInodes = info.UsedInodes
if info.Healing {
if hi := disks[index].Healing(); hi != nil {
hd := hi.toHealingDisk()
di.HealInfo = &hd
}
if hi := disks[index].Healing(); hi != nil {
hd := hi.toHealingDisk()
di.HealInfo = &hd
}
di.Metrics = &madmin.DiskMetrics{
LastMinute: make(map[string]madmin.TimedAction, len(info.Metrics.LastMinute)),

View File

@ -227,7 +227,11 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// Collect updates to tracker from concurrent healEntry calls
results := make(chan healEntryResult, 1000)
defer close(results)
quitting := make(chan struct{})
defer func() {
close(results)
<-quitting
}()
go func() {
for res := range results {
@ -241,6 +245,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
tracker.updateProgress(res.success, res.skipped, res.bytes)
}
healingLogIf(ctx, tracker.update(ctx))
close(quitting)
}()
var retErr error

View File

@ -341,7 +341,13 @@ func newXLStorage(ep Endpoint, cleanUp bool) (s *xlStorage, err error) {
// Healing is 'true' when
// - if we found an unformatted disk (no 'format.json')
// - if we found healing tracker 'healing.bin'
dcinfo.Healing = errors.Is(err, errUnformattedDisk) || (s.Healing() != nil)
dcinfo.Healing = errors.Is(err, errUnformattedDisk)
if !dcinfo.Healing {
if hi := s.Healing(); hi != nil && !hi.Finished {
dcinfo.Healing = true
}
}
dcinfo.ID = diskID
return dcinfo, err
},