mirror of
https://github.com/minio/minio.git
synced 2025-07-14 11:21:52 -04:00
add support for concurrent heals
This commit is contained in:
parent
5151c429e4
commit
745a4b31ba
@ -32,6 +32,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -772,14 +773,25 @@ func (a adminAPIHandlers) HealSetsHandler(w http.ResponseWriter, r *http.Request
|
|||||||
|
|
||||||
buckets, _ := objectAPI.ListBucketsHeal(ctx)
|
buckets, _ := objectAPI.ListBucketsHeal(ctx)
|
||||||
ctx, opts.cancel = context.WithCancel(context.Background())
|
ctx, opts.cancel = context.WithCancel(context.Background())
|
||||||
for _, setNumber := range opts.setNumbers {
|
go func() {
|
||||||
go func(setNumber int) {
|
var wg sync.WaitGroup
|
||||||
lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks()
|
for _, setNumber := range opts.setNumbers {
|
||||||
if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
|
wg.Add(1)
|
||||||
logger.LogIf(ctx, err)
|
go func(setNumber int) {
|
||||||
}
|
defer wg.Done()
|
||||||
}(setNumber)
|
lbDisks := z.serverSets[0].sets[setNumber].getOnlineDisks()
|
||||||
}
|
if err := healErasureSet(ctx, setNumber, opts.sleepForIO, opts.sleepDuration, buckets, lbDisks); err != nil {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
}
|
||||||
|
}(setNumber)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
a.mu.Lock()
|
||||||
|
opts.cancel()
|
||||||
|
delete(a.healSetsMap, opts.taskUUID)
|
||||||
|
a.mu.Unlock()
|
||||||
|
logger.Info("Healing finished for %v", vars[healSetsList])
|
||||||
|
}()
|
||||||
|
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
a.healSetsMap[opts.taskUUID] = opts
|
a.healSetsMap[opts.taskUUID] = opts
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -420,7 +421,7 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
|||||||
clientToken := mustGetUUID()
|
clientToken := mustGetUUID()
|
||||||
|
|
||||||
return &healSequence{
|
return &healSequence{
|
||||||
respCh: make(chan healResult),
|
respCh: make(chan healResult, runtime.GOMAXPROCS(0)),
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: objPrefix,
|
object: objPrefix,
|
||||||
reportProgress: true,
|
reportProgress: true,
|
||||||
|
@ -19,6 +19,7 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"path"
|
"path"
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/pkg/madmin"
|
"github.com/minio/minio/pkg/madmin"
|
||||||
@ -125,7 +126,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
|
|
||||||
func newHealRoutine() *healRoutine {
|
func newHealRoutine() *healRoutine {
|
||||||
return &healRoutine{
|
return &healRoutine{
|
||||||
tasks: make(chan healTask),
|
tasks: make(chan healTask, runtime.GOMAXPROCS(0)),
|
||||||
doneCh: make(chan struct{}),
|
doneCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -42,8 +43,8 @@ func newBgHealSequence() *healSequence {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &healSequence{
|
return &healSequence{
|
||||||
sourceCh: make(chan healSource),
|
sourceCh: make(chan healSource, runtime.GOMAXPROCS(0)),
|
||||||
respCh: make(chan healResult),
|
respCh: make(chan healResult, runtime.GOMAXPROCS(0)),
|
||||||
startTime: UTCNow(),
|
startTime: UTCNow(),
|
||||||
clientToken: bgHealingUUID,
|
clientToken: bgHealingUUID,
|
||||||
// run-background heal with reserved bucket
|
// run-background heal with reserved bucket
|
||||||
|
Loading…
x
Reference in New Issue
Block a user