serialize updates to healing tracker (#15647)

When healing is parallelized by setting the ` _MINIO_HEAL_WORKERS` 
environment variable, multiple goroutines may race while updating the disk's 
healing tracker. This change serializes only these concurrent updates using a
channel. Note, the healing tracker is still not concurrency safe in other contexts.
This commit is contained in:
Krishnan Parthasarathi 2022-09-07 08:47:21 -07:00 committed by GitHub
parent 8e997eba4a
commit 96bfa77856
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -243,6 +243,53 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
disks = disks[:3]
}
type healEntryResult struct {
bytes uint64
success bool
entryDone bool
name string
}
healEntryDone := func(name string) healEntryResult {
return healEntryResult{
entryDone: true,
name: name,
}
}
healEntrySuccess := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
success: true,
}
}
healEntryFailure := func(sz uint64) healEntryResult {
return healEntryResult{
bytes: sz,
}
}
// Collect updates to tracker from concurrent healEntry calls
results := make(chan healEntryResult)
go func() {
for res := range results {
if res.entryDone {
tracker.Object = res.name
if time.Since(tracker.LastUpdate) > time.Minute {
logger.LogIf(ctx, tracker.update(ctx))
}
continue
}
if res.success {
tracker.ItemsHealed++
tracker.BytesDone += res.bytes
} else {
tracker.ItemsFailed++
tracker.BytesFailed += res.bytes
}
}
}()
// Note: updates from healEntry to tracker must be sent on results channel.
healEntry := func(entry metaCacheEntry) {
defer jt.Give()
@ -269,6 +316,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
}
var result healEntryResult
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
err := bgSeq.queueHealTask(healSource{
@ -277,12 +325,18 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
versionID: "",
}, madmin.HealItemObject)
if err != nil {
tracker.ItemsFailed++
result = healEntryFailure(0)
logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, entry.name, err))
} else {
tracker.ItemsHealed++
result = healEntrySuccess(0)
}
bgSeq.logHeal(madmin.HealItemObject)
select {
case <-ctx.Done():
return
case results <- result:
}
return
}
@ -296,22 +350,27 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
Remove: healDeleteDangling,
}); err != nil {
// If not deleted, assume they failed.
tracker.ItemsFailed++
tracker.BytesFailed += uint64(version.Size)
result = healEntryFailure(uint64(version.Size))
if version.VersionID != "" {
logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s-v(%s): %w", bucket, version.Name, version.VersionID, err))
} else {
logger.LogIf(ctx, fmt.Errorf("unable to heal object %s/%s: %w", bucket, version.Name, err))
}
} else {
tracker.ItemsHealed++
tracker.BytesDone += uint64(version.Size)
result = healEntrySuccess(uint64(version.Size))
}
bgSeq.logHeal(madmin.HealItemObject)
select {
case <-ctx.Done():
return
case results <- result:
}
}
tracker.Object = entry.name
if time.Since(tracker.LastUpdate) > time.Minute {
logger.LogIf(ctx, tracker.update(ctx))
select {
case <-ctx.Done():
return
case results <- healEntryDone(entry.name):
}
// Wait and proceed if there are active requests
@ -349,6 +408,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
finished: nil,
})
jt.Wait() // synchronize all the concurrent heal jobs
close(results)
if err != nil {
// Set this such that when we return this function
// we let the caller retry this disk again for the