mirror of
https://github.com/minio/minio.git
synced 2025-01-25 13:43:17 -05:00
Prioritize HTTP requests over Heal (#6468)
Additionally also heal 256 objects at any given time in parallel. Fixes #6196 Fixes #6241
This commit is contained in:
parent
b729a4e83c
commit
6fe9a613c0
@ -20,12 +20,14 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
"github.com/minio/minio/pkg/sync/errgroup"
|
||||
)
|
||||
|
||||
// healStatusSummary - overall short summary of a healing sequence
|
||||
@ -622,17 +624,40 @@ func (h *healSequence) healBucket(bucket string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries := runtime.NumCPU() * globalEndpoints.Nodes()
|
||||
|
||||
marker := ""
|
||||
isTruncated := true
|
||||
for isTruncated {
|
||||
if globalHTTPServer != nil {
|
||||
// Wait at max 1 minute for an inprogress request
|
||||
// before proceeding to heal
|
||||
waitCount := 60
|
||||
// Any requests in progress, delay the heal.
|
||||
for globalHTTPServer.GetRequestCount() > 0 && waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Heal numCPU * nodes objects at a time.
|
||||
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, bucket,
|
||||
h.objPrefix, marker, "", 1000)
|
||||
h.objPrefix, marker, "", entries)
|
||||
if err != nil {
|
||||
return errFnHealFromAPIErr(err)
|
||||
}
|
||||
|
||||
for _, o := range objectInfos.Objects {
|
||||
if err := h.healObject(o.Bucket, o.Name); err != nil {
|
||||
g := errgroup.WithNErrs(len(objectInfos.Objects))
|
||||
for index := range objectInfos.Objects {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
o := objectInfos.Objects[index]
|
||||
return h.healObject(o.Bucket, o.Name)
|
||||
}, index)
|
||||
}
|
||||
|
||||
for _, err := range g.Wait() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -168,6 +168,18 @@ func NewEndpoint(arg string) (ep Endpoint, e error) {
|
||||
// EndpointList - list of same type of endpoint.
|
||||
type EndpointList []Endpoint
|
||||
|
||||
// Nodes - returns number of unique servers.
|
||||
func (endpoints EndpointList) Nodes() int {
|
||||
uniqueNodes := set.NewStringSet()
|
||||
for _, endpoint := range endpoints {
|
||||
if uniqueNodes.Contains(endpoint.Host) {
|
||||
continue
|
||||
}
|
||||
uniqueNodes.Add(endpoint.Host)
|
||||
}
|
||||
return len(uniqueNodes)
|
||||
}
|
||||
|
||||
// IsHTTPS - returns true if secure for URLEndpointType.
|
||||
func (endpoints EndpointList) IsHTTPS() bool {
|
||||
return endpoints[0].IsHTTPS()
|
||||
|
Loading…
x
Reference in New Issue
Block a user