heal: Add skipped objects to the heal summary (#19142)

New disk healing code skips/expires objects that ILM supposed to expire.
Add more visibility to the user about this activity by calculating those
objects and print it at the end of healing activity.
This commit is contained in:
Anis Eleuch
2024-02-28 18:05:40 +01:00
committed by GitHub
parent 9a012a53ef
commit 2bdb9511bd
5 changed files with 84 additions and 15 deletions

View File

@@ -87,6 +87,8 @@ type healingTracker struct {
// ID of the current healing operation
HealID string
ItemsSkipped uint64
BytesSkipped uint64
// Add future tracking capabilities
// Be sure that they are included in toHealingDisk
}
@@ -175,14 +177,18 @@ func (h *healingTracker) setObject(object string) {
h.Object = object
}
func (h *healingTracker) updateProgress(success bool, bytes uint64) {
func (h *healingTracker) updateProgress(success, skipped bool, bytes uint64) {
h.mu.Lock()
defer h.mu.Unlock()
if success {
switch {
case success:
h.ItemsHealed++
h.BytesDone += bytes
} else {
case skipped:
h.ItemsSkipped++
h.BytesSkipped += bytes
default:
h.ItemsFailed++
h.BytesFailed += bytes
}
@@ -323,8 +329,10 @@ func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
ObjectsTotalCount: h.ObjectsTotalCount,
ObjectsTotalSize: h.ObjectsTotalSize,
ItemsHealed: h.ItemsHealed,
ItemsSkipped: h.ItemsSkipped,
ItemsFailed: h.ItemsFailed,
BytesDone: h.BytesDone,
BytesSkipped: h.BytesSkipped,
BytesFailed: h.BytesFailed,
Bucket: h.Bucket,
Object: h.Object,
@@ -441,11 +449,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
return err
}
if tracker.ItemsFailed > 0 {
logger.Event(ctx, "Healing of drive '%s' failed (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
} else {
logger.Event(ctx, "Healing of drive '%s' complete (healed: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsFailed)
}
logger.Event(ctx, "Healing of drive '%s' is finished (healed: %d, skipped: %d, failed: %d).", disk, tracker.ItemsHealed, tracker.ItemsSkipped, tracker.ItemsFailed)
if len(tracker.QueuedBuckets) > 0 {
return fmt.Errorf("not all buckets were healed: %v", tracker.QueuedBuckets)

View File

@@ -188,6 +188,18 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "HealID")
return
}
case "ItemsSkipped":
z.ItemsSkipped, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ItemsSkipped")
return
}
case "BytesSkipped":
z.BytesSkipped, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "BytesSkipped")
return
}
default:
err = dc.Skip()
if err != nil {
@@ -201,9 +213,9 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 23
// map header, size 25
// write "ID"
err = en.Append(0xde, 0x0, 0x17, 0xa2, 0x49, 0x44)
err = en.Append(0xde, 0x0, 0x19, 0xa2, 0x49, 0x44)
if err != nil {
return
}
@@ -446,15 +458,35 @@ func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "HealID")
return
}
// write "ItemsSkipped"
err = en.Append(0xac, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ItemsSkipped)
if err != nil {
err = msgp.WrapError(err, "ItemsSkipped")
return
}
// write "BytesSkipped"
err = en.Append(0xac, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.BytesSkipped)
if err != nil {
err = msgp.WrapError(err, "BytesSkipped")
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 23
// map header, size 25
// string "ID"
o = append(o, 0xde, 0x0, 0x17, 0xa2, 0x49, 0x44)
o = append(o, 0xde, 0x0, 0x19, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID)
// string "PoolIndex"
o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
@@ -528,6 +560,12 @@ func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
// string "HealID"
o = append(o, 0xa6, 0x48, 0x65, 0x61, 0x6c, 0x49, 0x44)
o = msgp.AppendString(o, z.HealID)
// string "ItemsSkipped"
o = append(o, 0xac, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ItemsSkipped)
// string "BytesSkipped"
o = append(o, 0xac, 0x42, 0x79, 0x74, 0x65, 0x73, 0x53, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64)
o = msgp.AppendUint64(o, z.BytesSkipped)
return
}
@@ -713,6 +751,18 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "HealID")
return
}
case "ItemsSkipped":
z.ItemsSkipped, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ItemsSkipped")
return
}
case "BytesSkipped":
z.BytesSkipped, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "BytesSkipped")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@@ -735,6 +785,6 @@ func (z *healingTracker) Msgsize() (s int) {
for za0002 := range z.HealedBuckets {
s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002])
}
s += 7 + msgp.StringPrefixSize + len(z.HealID)
s += 7 + msgp.StringPrefixSize + len(z.HealID) + 13 + msgp.Uint64Size + 13 + msgp.Uint64Size
return
}

View File

@@ -238,6 +238,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
type healEntryResult struct {
bytes uint64
success bool
skipped bool
entryDone bool
name string
}
@@ -258,6 +259,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
bytes: sz,
}
}
healEntrySkipped := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
skipped: true,
}
}
filterLifecycle := func(bucket, object string, fi FileInfo) bool {
if lc == nil {
@@ -291,13 +298,16 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
continue
}
tracker.updateProgress(res.success, res.bytes)
tracker.updateProgress(res.success, res.skipped, res.bytes)
}
}()
send := func(result healEntryResult) bool {
select {
case <-ctx.Done():
if !contextCanceled(ctx) {
logger.LogIf(ctx, ctx.Err())
}
return false
case results <- result:
return true
@@ -369,6 +379,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
// Apply lifecycle rules on the objects that are expired.
if filterLifecycle(bucket, version.Name, version) {
versionNotFound++
if !send(healEntrySkipped(uint64(version.Size))) {
return
}
continue
}