diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index b18d1bf1b..84e1c715e 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -26,6 +26,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/dustin/go-humanize" @@ -43,7 +44,8 @@ const ( // healingTracker is used to persist healing information during a heal. type healingTracker struct { - disk StorageAPI `msg:"-"` + disk StorageAPI `msg:"-"` + mu *sync.RWMutex `msg:"-"` ID string PoolIndex int @@ -112,22 +114,76 @@ func loadHealingTracker(ctx context.Context, disk StorageAPI) (*healingTracker, } h.disk = disk h.ID = diskID + h.mu = &sync.RWMutex{} return &h, nil } // newHealingTracker will create a new healing tracker for the disk. -func newHealingTracker(disk StorageAPI, healID string) *healingTracker { - diskID, _ := disk.GetDiskID() - h := healingTracker{ - disk: disk, - ID: diskID, - HealID: healID, - Path: disk.String(), - Endpoint: disk.Endpoint().String(), - Started: time.Now().UTC(), +func newHealingTracker() *healingTracker { + return &healingTracker{ + mu: &sync.RWMutex{}, } +} + +func initHealingTracker(disk StorageAPI, healID string) *healingTracker { + h := newHealingTracker() + diskID, _ := disk.GetDiskID() + h.disk = disk + h.ID = diskID + h.HealID = healID + h.Path = disk.String() + h.Endpoint = disk.Endpoint().String() + h.Started = time.Now().UTC() h.PoolIndex, h.SetIndex, h.DiskIndex = disk.GetDiskLoc() - return &h + return h +} + +func (h healingTracker) getLastUpdate() time.Time { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.LastUpdate +} + +func (h healingTracker) getBucket() string { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.Bucket +} + +func (h *healingTracker) setBucket(bucket string) { + h.mu.Lock() + defer h.mu.Unlock() + + h.Bucket = bucket +} + +func (h healingTracker) getObject() string { + h.mu.RLock() + defer h.mu.RUnlock() + + return h.Object +} + +func (h *healingTracker) setObject(object string) { + h.mu.Lock() + defer h.mu.Unlock() + + h.Object = object +} + +func (h *healingTracker) updateProgress(success bool, bytes uint64) { + h.mu.Lock() + defer h.mu.Unlock() + + if success { + h.ItemsHealed++ + h.BytesDone += bytes + } else { + h.ItemsFailed++ + h.BytesFailed += bytes + } } // update will update the tracker on the disk. @@ -136,15 +192,18 @@ func (h *healingTracker) update(ctx context.Context) error { if h.disk.Healing() == nil { return fmt.Errorf("healingTracker: drive %q is not marked as healing", h.ID) } + h.mu.Lock() if h.ID == "" || h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 { h.ID, _ = h.disk.GetDiskID() h.PoolIndex, h.SetIndex, h.DiskIndex = h.disk.GetDiskLoc() } + h.mu.Unlock() return h.save(ctx) } // save will unconditionally save the tracker and will be created if not existing. func (h *healingTracker) save(ctx context.Context) error { + h.mu.Lock() if h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 { // Attempt to get location. if api := newObjectLayerFn(); api != nil { @@ -155,6 +214,7 @@ func (h *healingTracker) save(ctx context.Context) error { } h.LastUpdate = time.Now().UTC() htrackerBytes, err := h.MarshalMsg(nil) + h.mu.Unlock() if err != nil { return err } @@ -176,6 +236,8 @@ func (h *healingTracker) delete(ctx context.Context) error { } func (h *healingTracker) isHealed(bucket string) bool { + h.mu.RLock() + defer h.mu.RUnlock() for _, v := range h.HealedBuckets { if v == bucket { return true @@ -186,6 +248,9 @@ func (h *healingTracker) isHealed(bucket string) bool { // resume will reset progress to the numbers at the start of the bucket. func (h *healingTracker) resume() { + h.mu.Lock() + defer h.mu.Unlock() + h.ItemsHealed = h.ResumeItemsHealed h.ItemsFailed = h.ResumeItemsFailed h.BytesDone = h.ResumeBytesDone @@ -195,6 +260,9 @@ func (h *healingTracker) resume() { // bucketDone should be called when a bucket is done healing. // Adds the bucket to the list of healed buckets and updates resume numbers. func (h *healingTracker) bucketDone(bucket string) { + h.mu.Lock() + defer h.mu.Unlock() + h.ResumeItemsHealed = h.ItemsHealed h.ResumeItemsFailed = h.ItemsFailed h.ResumeBytesDone = h.BytesDone @@ -211,6 +279,9 @@ func (h *healingTracker) bucketDone(bucket string) { // setQueuedBuckets will add buckets, but exclude any that is already in h.HealedBuckets. // Order is preserved. func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) { + h.mu.Lock() + defer h.mu.Unlock() + s := set.CreateStringSet(h.HealedBuckets...) h.QueuedBuckets = make([]string, 0, len(buckets)) for _, b := range buckets { @@ -221,6 +292,9 @@ func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) { } func (h *healingTracker) printTo(writer io.Writer) { + h.mu.RLock() + defer h.mu.RUnlock() + b, err := json.MarshalIndent(h, "", " ") if err != nil { writer.Write([]byte(err.Error())) @@ -230,6 +304,9 @@ func (h *healingTracker) printTo(writer io.Writer) { // toHealingDisk converts the information to madmin.HealingDisk func (h *healingTracker) toHealingDisk() madmin.HealingDisk { + h.mu.RLock() + defer h.mu.RUnlock() + return madmin.HealingDisk{ ID: h.ID, HealID: h.HealID, @@ -336,7 +413,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint return nil } logger.LogIf(ctx, fmt.Errorf("Unable to load healing tracker on '%s': %w, re-initializing..", disk, err)) - tracker = newHealingTracker(disk, mustGetUUID()) + tracker = initHealingTracker(disk, mustGetUUID()) } logger.Info(fmt.Sprintf("Healing drive '%s' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint)) diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index d535039b8..0702d244c 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -380,7 +380,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string) disk.SetDiskID(diskID) if healID != "" { ctx := context.Background() - ht := newHealingTracker(disk, healID) + ht := initHealingTracker(disk, healID) return ht.save(ctx) } return nil diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 320032c3d..d92e2ab57 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -187,16 +187,16 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, } var forwardTo string // If we resume to the same bucket, forward to last known item. - if tracker.Bucket != "" { - if tracker.Bucket == bucket { - forwardTo = tracker.Object + if b := tracker.getBucket(); b != "" { + if b == bucket { + forwardTo = tracker.getObject() } else { // Reset to where last bucket ended if resuming. tracker.resume() } } - tracker.Object = "" - tracker.Bucket = bucket + tracker.setObject("") + tracker.setBucket(bucket) // Heal current bucket again in case if it is failed // in the being of erasure set healing if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ @@ -208,7 +208,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, if serverDebugLog { console.Debugf(color.Green("healDrive:")+" healing bucket %s content on %s erasure set\n", - bucket, humanize.Ordinal(tracker.SetIndex+1)) + bucket, humanize.Ordinal(er.setIndex+1)) } disks, _ := er.getOnlineDisksWithHealing() @@ -251,20 +251,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, go func() { for res := range results { if res.entryDone { - tracker.Object = res.name - if time.Since(tracker.LastUpdate) > time.Minute { + tracker.setObject(res.name) + if time.Since(tracker.getLastUpdate()) > time.Minute { logger.LogIf(ctx, tracker.update(ctx)) } continue } - if res.success { - tracker.ItemsHealed++ - tracker.BytesDone += res.bytes - } else { - tracker.ItemsFailed++ - tracker.BytesFailed += res.bytes - } + tracker.updateProgress(res.success, res.bytes) } }() @@ -420,8 +414,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, logger.LogIf(ctx, tracker.update(ctx)) } } - tracker.Object = "" - tracker.Bucket = "" + + tracker.setObject("") + tracker.setBucket("") return retErr } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 85b01152d..8e635a3fd 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -365,10 +365,10 @@ func (s *xlStorage) Healing() *healingTracker { if err != nil { return nil } - var h healingTracker + h := newHealingTracker() _, err = h.UnmarshalMsg(b) logger.LogIf(GlobalContext, err) - return &h + return h } // checkODirectDiskSupport asks the disk to write some data