add a way to avoid blocking queueHealTask() depending on caller (#16433)

This commit is contained in:
Harshavardhana 2023-01-19 18:50:54 +05:30 committed by GitHub
parent d98116559b
commit d08e3cc895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 10 deletions

View File

@ -397,6 +397,7 @@ type healSource struct {
bucket string
object string
versionID string
noWait bool // a non blocking call, if task queue is full return right away.
opts *madmin.HealOpts // optional heal option overrides default setting
}
@ -700,7 +701,6 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
object: source.object,
versionID: source.versionID,
opts: h.settings,
respCh: h.respCh,
}
if source.opts != nil {
task.opts = *source.opts
@ -713,15 +713,32 @@ func (h *healSequence) queueHealTask(source healSource, healType madmin.HealItem
h.lastHealActivity = UTCNow()
h.mutex.Unlock()
select {
case globalBackgroundHealRoutine.tasks <- task:
if serverDebugLog {
logger.Info("Task in the queue: %#v", task)
if source.noWait {
select {
case globalBackgroundHealRoutine.tasks <- task:
if serverDebugLog {
logger.Info("Task in the queue: %#v", task)
}
case <-h.ctx.Done():
return nil
default:
// task queue is full, no more workers, we shall move on and heal later.
return nil
}
} else {
// respCh must be set for guaranteed result
task.respCh = h.respCh
select {
case globalBackgroundHealRoutine.tasks <- task:
if serverDebugLog {
logger.Info("Task in the queue: %#v", task)
}
case <-h.ctx.Done():
return nil
}
case <-h.ctx.Done():
return nil
}
// task queued, now wait for the response.
select {
case res := <-h.respCh:
if !h.reportProgress {

View File

@ -88,8 +88,7 @@ func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) {
var err error
switch task.bucket {
case nopHeal:
task.respCh <- healResult{err: errSkipFile}
continue
err = errSkipFile
case SlashSeparator:
res, err = healDiskFormat(ctx, objAPI, task.opts)
default:
@ -100,7 +99,10 @@ func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) {
}
}
task.respCh <- healResult{result: res, err: err}
if task.respCh != nil {
task.respCh <- healResult{result: res, err: err}
}
case <-ctx.Done():
return
}

View File

@ -449,6 +449,7 @@ func healObject(bucket, object, versionID string, scan madmin.HealScanMode) {
bucket: bucket,
object: object,
versionID: versionID,
noWait: true, // do not block callers.
opts: &madmin.HealOpts{
Remove: healDeleteDangling, // if found dangling purge it.
ScanMode: scan,