mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: healing deadlocks and ordering (#16643)
This commit is contained in:
parent
98a84d88e2
commit
84bb7d05a9
@ -407,9 +407,6 @@ type healSequence struct {
|
|||||||
// bucket, and object on which heal seq. was initiated
|
// bucket, and object on which heal seq. was initiated
|
||||||
bucket, object string
|
bucket, object string
|
||||||
|
|
||||||
// A channel of entities with heal result
|
|
||||||
respCh chan healResult
|
|
||||||
|
|
||||||
// Report healing progress
|
// Report healing progress
|
||||||
reportProgress bool
|
reportProgress bool
|
||||||
|
|
||||||
@ -472,7 +469,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
|||||||
clientToken := mustGetUUID()
|
clientToken := mustGetUUID()
|
||||||
|
|
||||||
return &healSequence{
|
return &healSequence{
|
||||||
respCh: make(chan healResult),
|
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: objPrefix,
|
object: objPrefix,
|
||||||
reportProgress: true,
|
reportProgress: true,
|
||||||
@ -719,28 +715,30 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
|
|||||||
if serverDebugLog {
|
if serverDebugLog {
|
||||||
logger.Info("Task in the queue: %#v", task)
|
logger.Info("Task in the queue: %#v", task)
|
||||||
}
|
}
|
||||||
case <-h.ctx.Done():
|
|
||||||
return nil
|
|
||||||
default:
|
default:
|
||||||
// task queue is full, no more workers, we shall move on and heal later.
|
// task queue is full, no more workers, we shall move on and heal later.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
// Don't wait for result
|
||||||
// respCh must be set for guaranteed result
|
return nil
|
||||||
task.respCh = h.respCh
|
}
|
||||||
select {
|
|
||||||
case globalBackgroundHealRoutine.tasks <- task:
|
// respCh must be set to wait for result.
|
||||||
if serverDebugLog {
|
// We make it size 1, so a result can always be written
|
||||||
logger.Info("Task in the queue: %#v", task)
|
// even if we aren't listening.
|
||||||
}
|
task.respCh = make(chan healResult, 1)
|
||||||
case <-h.ctx.Done():
|
select {
|
||||||
return nil
|
case globalBackgroundHealRoutine.tasks <- task:
|
||||||
|
if serverDebugLog {
|
||||||
|
logger.Info("Task in the queue: %#v", task)
|
||||||
}
|
}
|
||||||
|
case <-h.ctx.Done():
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// task queued, now wait for the response.
|
// task queued, now wait for the response.
|
||||||
select {
|
select {
|
||||||
case res := <-h.respCh:
|
case res := <-task.respCh:
|
||||||
if !h.reportProgress {
|
if !h.reportProgress {
|
||||||
if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal
|
if errors.Is(res.err, errSkipFile) { // this is only sent usually by nopHeal
|
||||||
return nil
|
return nil
|
||||||
|
@ -48,7 +48,6 @@ func newBgHealSequence() *healSequence {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &healSequence{
|
return &healSequence{
|
||||||
respCh: make(chan healResult),
|
|
||||||
startTime: UTCNow(),
|
startTime: UTCNow(),
|
||||||
clientToken: bgHealingUUID,
|
clientToken: bgHealingUUID,
|
||||||
// run-background heal with reserved bucket
|
// run-background heal with reserved bucket
|
||||||
|
Loading…
Reference in New Issue
Block a user