mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: the race in healing tracker code (#17048)
This commit is contained in:
parent
0db34e4b85
commit
224d9a752f
@ -26,6 +26,7 @@ import (
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
@ -43,7 +44,8 @@ const (
|
||||
|
||||
// healingTracker is used to persist healing information during a heal.
|
||||
type healingTracker struct {
|
||||
disk StorageAPI `msg:"-"`
|
||||
disk StorageAPI `msg:"-"`
|
||||
mu *sync.RWMutex `msg:"-"`
|
||||
|
||||
ID string
|
||||
PoolIndex int
|
||||
@ -112,22 +114,76 @@ func loadHealingTracker(ctx context.Context, disk StorageAPI) (*healingTracker,
|
||||
}
|
||||
h.disk = disk
|
||||
h.ID = diskID
|
||||
h.mu = &sync.RWMutex{}
|
||||
return &h, nil
|
||||
}
|
||||
|
||||
// newHealingTracker will create a new healing tracker for the disk.
|
||||
func newHealingTracker(disk StorageAPI, healID string) *healingTracker {
|
||||
diskID, _ := disk.GetDiskID()
|
||||
h := healingTracker{
|
||||
disk: disk,
|
||||
ID: diskID,
|
||||
HealID: healID,
|
||||
Path: disk.String(),
|
||||
Endpoint: disk.Endpoint().String(),
|
||||
Started: time.Now().UTC(),
|
||||
func newHealingTracker() *healingTracker {
|
||||
return &healingTracker{
|
||||
mu: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func initHealingTracker(disk StorageAPI, healID string) *healingTracker {
|
||||
h := newHealingTracker()
|
||||
diskID, _ := disk.GetDiskID()
|
||||
h.disk = disk
|
||||
h.ID = diskID
|
||||
h.HealID = healID
|
||||
h.Path = disk.String()
|
||||
h.Endpoint = disk.Endpoint().String()
|
||||
h.Started = time.Now().UTC()
|
||||
h.PoolIndex, h.SetIndex, h.DiskIndex = disk.GetDiskLoc()
|
||||
return &h
|
||||
return h
|
||||
}
|
||||
|
||||
func (h healingTracker) getLastUpdate() time.Time {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
return h.LastUpdate
|
||||
}
|
||||
|
||||
func (h healingTracker) getBucket() string {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
return h.Bucket
|
||||
}
|
||||
|
||||
func (h *healingTracker) setBucket(bucket string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.Bucket = bucket
|
||||
}
|
||||
|
||||
func (h healingTracker) getObject() string {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
return h.Object
|
||||
}
|
||||
|
||||
func (h *healingTracker) setObject(object string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.Object = object
|
||||
}
|
||||
|
||||
func (h *healingTracker) updateProgress(success bool, bytes uint64) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if success {
|
||||
h.ItemsHealed++
|
||||
h.BytesDone += bytes
|
||||
} else {
|
||||
h.ItemsFailed++
|
||||
h.BytesFailed += bytes
|
||||
}
|
||||
}
|
||||
|
||||
// update will update the tracker on the disk.
|
||||
@ -136,15 +192,18 @@ func (h *healingTracker) update(ctx context.Context) error {
|
||||
if h.disk.Healing() == nil {
|
||||
return fmt.Errorf("healingTracker: drive %q is not marked as healing", h.ID)
|
||||
}
|
||||
h.mu.Lock()
|
||||
if h.ID == "" || h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 {
|
||||
h.ID, _ = h.disk.GetDiskID()
|
||||
h.PoolIndex, h.SetIndex, h.DiskIndex = h.disk.GetDiskLoc()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
return h.save(ctx)
|
||||
}
|
||||
|
||||
// save will unconditionally save the tracker and will be created if not existing.
|
||||
func (h *healingTracker) save(ctx context.Context) error {
|
||||
h.mu.Lock()
|
||||
if h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 {
|
||||
// Attempt to get location.
|
||||
if api := newObjectLayerFn(); api != nil {
|
||||
@ -155,6 +214,7 @@ func (h *healingTracker) save(ctx context.Context) error {
|
||||
}
|
||||
h.LastUpdate = time.Now().UTC()
|
||||
htrackerBytes, err := h.MarshalMsg(nil)
|
||||
h.mu.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -176,6 +236,8 @@ func (h *healingTracker) delete(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (h *healingTracker) isHealed(bucket string) bool {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
for _, v := range h.HealedBuckets {
|
||||
if v == bucket {
|
||||
return true
|
||||
@ -186,6 +248,9 @@ func (h *healingTracker) isHealed(bucket string) bool {
|
||||
|
||||
// resume will reset progress to the numbers at the start of the bucket.
|
||||
func (h *healingTracker) resume() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.ItemsHealed = h.ResumeItemsHealed
|
||||
h.ItemsFailed = h.ResumeItemsFailed
|
||||
h.BytesDone = h.ResumeBytesDone
|
||||
@ -195,6 +260,9 @@ func (h *healingTracker) resume() {
|
||||
// bucketDone should be called when a bucket is done healing.
|
||||
// Adds the bucket to the list of healed buckets and updates resume numbers.
|
||||
func (h *healingTracker) bucketDone(bucket string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.ResumeItemsHealed = h.ItemsHealed
|
||||
h.ResumeItemsFailed = h.ItemsFailed
|
||||
h.ResumeBytesDone = h.BytesDone
|
||||
@ -211,6 +279,9 @@ func (h *healingTracker) bucketDone(bucket string) {
|
||||
// setQueuedBuckets will add buckets, but exclude any that is already in h.HealedBuckets.
|
||||
// Order is preserved.
|
||||
func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
s := set.CreateStringSet(h.HealedBuckets...)
|
||||
h.QueuedBuckets = make([]string, 0, len(buckets))
|
||||
for _, b := range buckets {
|
||||
@ -221,6 +292,9 @@ func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) {
|
||||
}
|
||||
|
||||
func (h *healingTracker) printTo(writer io.Writer) {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
b, err := json.MarshalIndent(h, "", " ")
|
||||
if err != nil {
|
||||
writer.Write([]byte(err.Error()))
|
||||
@ -230,6 +304,9 @@ func (h *healingTracker) printTo(writer io.Writer) {
|
||||
|
||||
// toHealingDisk converts the information to madmin.HealingDisk
|
||||
func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
return madmin.HealingDisk{
|
||||
ID: h.ID,
|
||||
HealID: h.HealID,
|
||||
@ -336,7 +413,7 @@ func healFreshDisk(ctx context.Context, z *erasureServerPools, endpoint Endpoint
|
||||
return nil
|
||||
}
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to load healing tracker on '%s': %w, re-initializing..", disk, err))
|
||||
tracker = newHealingTracker(disk, mustGetUUID())
|
||||
tracker = initHealingTracker(disk, mustGetUUID())
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("Healing drive '%s' - 'mc admin heal alias/ --verbose' to check the current status.", endpoint))
|
||||
|
@ -380,7 +380,7 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, healID string)
|
||||
disk.SetDiskID(diskID)
|
||||
if healID != "" {
|
||||
ctx := context.Background()
|
||||
ht := newHealingTracker(disk, healID)
|
||||
ht := initHealingTracker(disk, healID)
|
||||
return ht.save(ctx)
|
||||
}
|
||||
return nil
|
||||
|
@ -187,16 +187,16 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
}
|
||||
var forwardTo string
|
||||
// If we resume to the same bucket, forward to last known item.
|
||||
if tracker.Bucket != "" {
|
||||
if tracker.Bucket == bucket {
|
||||
forwardTo = tracker.Object
|
||||
if b := tracker.getBucket(); b != "" {
|
||||
if b == bucket {
|
||||
forwardTo = tracker.getObject()
|
||||
} else {
|
||||
// Reset to where last bucket ended if resuming.
|
||||
tracker.resume()
|
||||
}
|
||||
}
|
||||
tracker.Object = ""
|
||||
tracker.Bucket = bucket
|
||||
tracker.setObject("")
|
||||
tracker.setBucket(bucket)
|
||||
// Heal current bucket again in case if it is failed
|
||||
// in the being of erasure set healing
|
||||
if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
@ -208,7 +208,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
|
||||
if serverDebugLog {
|
||||
console.Debugf(color.Green("healDrive:")+" healing bucket %s content on %s erasure set\n",
|
||||
bucket, humanize.Ordinal(tracker.SetIndex+1))
|
||||
bucket, humanize.Ordinal(er.setIndex+1))
|
||||
}
|
||||
|
||||
disks, _ := er.getOnlineDisksWithHealing()
|
||||
@ -251,20 +251,14 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
go func() {
|
||||
for res := range results {
|
||||
if res.entryDone {
|
||||
tracker.Object = res.name
|
||||
if time.Since(tracker.LastUpdate) > time.Minute {
|
||||
tracker.setObject(res.name)
|
||||
if time.Since(tracker.getLastUpdate()) > time.Minute {
|
||||
logger.LogIf(ctx, tracker.update(ctx))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if res.success {
|
||||
tracker.ItemsHealed++
|
||||
tracker.BytesDone += res.bytes
|
||||
} else {
|
||||
tracker.ItemsFailed++
|
||||
tracker.BytesFailed += res.bytes
|
||||
}
|
||||
tracker.updateProgress(res.success, res.bytes)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -420,8 +414,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
logger.LogIf(ctx, tracker.update(ctx))
|
||||
}
|
||||
}
|
||||
tracker.Object = ""
|
||||
tracker.Bucket = ""
|
||||
|
||||
tracker.setObject("")
|
||||
tracker.setBucket("")
|
||||
|
||||
return retErr
|
||||
}
|
||||
|
@ -365,10 +365,10 @@ func (s *xlStorage) Healing() *healingTracker {
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
var h healingTracker
|
||||
h := newHealingTracker()
|
||||
_, err = h.UnmarshalMsg(b)
|
||||
logger.LogIf(GlobalContext, err)
|
||||
return &h
|
||||
return h
|
||||
}
|
||||
|
||||
// checkODirectDiskSupport asks the disk to write some data
|
||||
|
Loading…
Reference in New Issue
Block a user