readiness returns error quickly if any of the set is down (#9662)

This PR adds a new configuration parameter which allows readiness
check to respond within 10secs, this can be reduced to a lower value
if necessary using 

```
mc admin config set api ready_deadline=5s
```

 or

```
export MINIO_API_READY_DEADLINE=5s
```
This commit is contained in:
Krishna Srinivas
2020-05-23 17:38:39 -07:00
committed by GitHub
parent 3f6d624c7b
commit 7d19ab9f62
17 changed files with 266 additions and 118 deletions

View File

@@ -371,6 +371,7 @@ func lookupConfigs(s config.Config) {
}
globalAPIThrottling.init(apiRequestsMax, apiConfig.APIRequestsDeadline)
globalReadyDeadline = apiConfig.APIReadyDeadline
if globalIsXL {
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default],

View File

@@ -165,13 +165,14 @@ func readServerConfig(ctx context.Context, objAPI ObjectLayer) (config.Config, e
}
}
var config = config.New()
var srvCfg = config.New()
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(configData, &config); err != nil {
if err = json.Unmarshal(configData, &srvCfg); err != nil {
return nil, err
}
return config, nil
// Add any missing entries
return srvCfg.Merge(), nil
}
// ConfigSys - config system.

View File

@@ -29,6 +29,7 @@ import (
const (
apiRequestsMax = "requests_max"
apiRequestsDeadline = "requests_deadline"
apiReadyDeadline = "ready_deadline"
)
// DefaultKVS - default storage class config
@@ -42,6 +43,10 @@ var (
Key: apiRequestsDeadline,
Value: "10s",
},
config.KV{
Key: apiReadyDeadline,
Value: "10s",
},
}
)
@@ -49,6 +54,7 @@ var (
type Config struct {
APIRequestsMax int `json:"requests_max"`
APIRequestsDeadline time.Duration `json:"requests_deadline"`
APIReadyDeadline time.Duration `json:"ready_deadline"`
}
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
@@ -83,9 +89,15 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
return cfg, err
}
readyDeadline, err := time.ParseDuration(env.Get(config.EnvAPIReadyDeadline, kvs.Get(apiReadyDeadline)))
if err != nil {
return cfg, err
}
cfg = Config{
APIRequestsMax: requestsMax,
APIRequestsDeadline: requestsDeadline,
APIReadyDeadline: readyDeadline,
}
return cfg, nil
}

View File

@@ -33,5 +33,11 @@ var (
Optional: true,
Type: "duration",
},
config.HelpKV{
Key: apiReadyDeadline,
Description: `set the deadline for health check API /minio/health/ready e.g. "1m"`,
Optional: true,
Type: "duration",
},
}
)

View File

@@ -434,6 +434,26 @@ func LookupWorm() (bool, error) {
return ParseBool(env.Get(EnvWorm, EnableOff))
}
// Merge - merges a new config with all the
// missing values for default configs,
// returns a config.
func (c Config) Merge() Config {
cp := New()
for subSys, tgtKV := range c {
for tgt := range tgtKV {
ckvs := c[subSys][tgt]
for _, kv := range cp[subSys][Default] {
_, ok := c[subSys][tgt].Lookup(kv.Key)
if !ok {
ckvs.Set(kv.Key, kv.Value)
}
}
cp[subSys][tgt] = ckvs
}
}
return cp
}
// New - initialize a new server config.
func New() Config {
srvCfg := make(Config)

View File

@@ -37,6 +37,7 @@ const (
// API sub-system
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
EnvAPIReadyDeadline = "MINIO_API_READY_DEADLINE"
EnvUpdate = "MINIO_UPDATE"

View File

@@ -275,6 +275,9 @@ var (
// If writes to FS backend should be O_SYNC.
globalFSOSync bool
// Deadline by which /minio/health/ready should respond.
globalReadyDeadline time.Duration
// Add new variable global values here.
)

View File

@@ -18,15 +18,11 @@ package cmd
import (
"net/http"
"os"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
)
// ReadinessCheckHandler -- Checks if the quorum number of disks are available.
// ReadinessCheckHandler returns if the server is ready to receive requests.
// For FS - Checks if the backend disk is available
// For Zones - Checks if all the zones have enough read quorum
// For Erasure backend - Checks if all the erasure sets are writable
func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ReadinessCheckHandler")
@@ -40,61 +36,7 @@ func ReadinessCheckHandler(w http.ResponseWriter, r *http.Request) {
writeResponse(w, http.StatusOK, nil, mimeNone)
}
// LivenessCheckHandler -- checks if server can reach its disks internally.
// If not, server is considered to have failed and needs to be restarted.
// Liveness probes are used to detect situations where application (minio)
// has gone into a state where it can not recover except by being restarted.
// LivenessCheckHandler - Checks if the process is up. Always returns success.
func LivenessCheckHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "LivenessCheckHandler")
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet
if objLayer == nil {
// Respond with 200 OK while server initializes to ensure a distributed cluster
// is able to start on orchestration platforms like Docker Swarm.
// Refer https://github.com/minio/minio/issues/8140 for more details.
// Make sure to add server not initialized status in header
w.Header().Set(xhttp.MinIOServerStatus, "server-not-initialized")
writeSuccessResponseHeadersOnly(w)
return
}
if !globalIsXL && !globalIsDistXL {
s := objLayer.StorageInfo(ctx, false)
if s.Backend.Type == BackendGateway {
if !s.Backend.GatewayOnline {
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
return
}
writeResponse(w, http.StatusOK, nil, mimeNone)
return
}
}
// For FS and Erasure backend, check if local disks are up.
var erroredDisks int
for _, ep := range globalEndpoints {
for _, endpoint := range ep.Endpoints {
// Check only if local disks are accessible, we do not have
// to reach to rest of the other servers in a distributed setup.
if !endpoint.IsLocal {
continue
}
// Attempt a stat to backend, any error resulting
// from this Stat() operation is considered as backend
// is not available, count them as errors.
if _, err := os.Stat(endpoint.Path); err != nil && os.IsNotExist(err) {
logger.LogIf(ctx, err)
erroredDisks++
}
}
}
// Any errored disks, we let orchestrators take us down.
if erroredDisks > 0 {
writeResponse(w, http.StatusServiceUnavailable, nil, mimeNone)
return
}
writeResponse(w, http.StatusOK, nil, mimeNone)
}

View File

@@ -1163,6 +1163,29 @@ func (sys *NotificationSys) ServerInfo() []madmin.ServerProperties {
return reply
}
// GetLocalDiskIDs - return disk ids of the local disks of the peers.
func (sys *NotificationSys) GetLocalDiskIDs() []string {
var diskIDs []string
var mu sync.Mutex
var wg sync.WaitGroup
for _, client := range sys.peerClients {
if client == nil {
continue
}
wg.Add(1)
go func(client *peerRESTClient) {
defer wg.Done()
ids := client.GetLocalDiskIDs()
mu.Lock()
diskIDs = append(diskIDs, ids...)
mu.Unlock()
}(client)
}
wg.Wait()
return diskIDs
}
// NewNotificationSys - creates new notification system object.
func NewNotificationSys(endpoints EndpointZones) *NotificationSys {
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()

View File

@@ -676,6 +676,32 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
return state, err
}
// GetLocalDiskIDs - get a peer's local disks' IDs.
func (client *peerRESTClient) GetLocalDiskIDs() []string {
doneCh := make(chan struct{})
defer close(doneCh)
ctx, cancel := context.WithCancel(GlobalContext)
go func() {
select {
case <-doneCh:
return
case <-time.After(globalReadyDeadline):
cancel()
}
}()
respBody, err := client.callWithContext(ctx, peerRESTMethodGetLocalDiskIDs, nil, nil, -1)
if err != nil {
return nil
}
defer http.DrainBody(respBody)
var diskIDs []string
if err = gob.NewDecoder(respBody).Decode(&diskIDs); err != nil {
return nil
}
return diskIDs
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))

View File

@@ -54,6 +54,7 @@ const (
peerRESTMethodTrace = "/trace"
peerRESTMethodListen = "/listen"
peerRESTMethodLog = "/log"
peerRESTMethodGetLocalDiskIDs = "/getlocaldiskids"
)
const (

View File

@@ -701,6 +701,62 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *
w.(http.Flusher).Flush()
}
// Return disk IDs of all the local disks.
func getLocalDiskIDs(z *xlZones) []string {
var ids []string
for zoneIdx := range z.zones {
for _, set := range z.zones[zoneIdx].sets {
disks := set.getDisks()
for _, disk := range disks {
if disk == nil {
continue
}
if disk.IsLocal() {
id, err := disk.GetDiskID()
if err != nil {
continue
}
if id == "" {
continue
}
ids = append(ids, id)
}
}
}
}
return ids
}
// GetLocalDiskIDs - Return disk IDs of all the local disks.
func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "GetLocalDiskIDs")
objLayer := newObjectLayerWithoutSafeModeFn()
// Service not initialized yet
if objLayer == nil {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
z, ok := objLayer.(*xlZones)
if !ok {
s.writeErrorResponse(w, errServerNotInitialized)
return
}
ids := getLocalDiskIDs(z)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(ids))
w.(http.Flusher).Flush()
}
// ServerUpdateHandler - updates the current server.
func (s *peerRESTServer) ServerUpdateHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@@ -996,4 +1052,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLog).HandlerFunc(server.ConsoleLogHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLocalDiskIDs).HandlerFunc(httpTraceHdrs(server.GetLocalDiskIDs))
}

View File

@@ -56,7 +56,7 @@ func (p *posixDiskIDCheck) Close() error {
}
func (p *posixDiskIDCheck) GetDiskID() (string, error) {
return p.diskID, nil
return p.storage.GetDiskID()
}
func (p *posixDiskIDCheck) SetDiskID(id string) {

View File

@@ -1709,30 +1709,6 @@ func (s *xlSets) GetMetrics(ctx context.Context) (*Metrics, error) {
return &Metrics{}, NotImplemented{}
}
// IsReady - Returns true if atleast n/2 disks (read quorum) are online
func (s *xlSets) IsReady(_ context.Context) bool {
s.xlDisksMu.RLock()
defer s.xlDisksMu.RUnlock()
var activeDisks int
for i := 0; i < s.setCount; i++ {
for j := 0; j < s.drivesPerSet; j++ {
if s.xlDisks[i][j] == nil {
continue
}
if s.xlDisks[i][j].IsOnline() {
activeDisks++
}
// Return true if read quorum is available.
if activeDisks >= len(s.endpoints)/2 {
return true
}
}
}
// Disks are not ready
return false
}
// maintainMRFList gathers the list of successful partial uploads
// from all underlying xl sets and puts them in a global map which
// should not have more than 10000 entries.

View File

@@ -27,6 +27,7 @@ import (
"time"
"github.com/minio/minio-go/v6/pkg/tags"
"github.com/minio/minio/cmd/config/storageclass"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/madmin"
@@ -1537,9 +1538,57 @@ func (z *xlZones) GetMetrics(ctx context.Context) (*Metrics, error) {
return &Metrics{}, NotImplemented{}
}
// IsReady - Returns true if first zone returns true
func (z *xlZones) getZoneAndSet(id string) (int, int, error) {
for zoneIdx := range z.zones {
format := z.zones[zoneIdx].format
for setIdx, set := range format.XL.Sets {
for _, diskID := range set {
if diskID == id {
return zoneIdx, setIdx, nil
}
}
}
}
return 0, 0, errDiskNotFound
}
// IsReady - Returns true all the erasure sets are writable.
func (z *xlZones) IsReady(ctx context.Context) bool {
return z.zones[0].IsReady(ctx)
erasureSetUpCount := make([][]int, len(z.zones))
for i := range z.zones {
erasureSetUpCount[i] = make([]int, len(z.zones[i].sets))
}
diskIDs := globalNotificationSys.GetLocalDiskIDs()
diskIDs = append(diskIDs, getLocalDiskIDs(z)...)
for _, id := range diskIDs {
zoneIdx, setIdx, err := z.getZoneAndSet(id)
if err != nil {
continue
}
erasureSetUpCount[zoneIdx][setIdx]++
}
for zoneIdx := range erasureSetUpCount {
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
diskCount := len(z.zones[zoneIdx].format.XL.Sets[0])
if parityDrives == 0 {
parityDrives = getDefaultParityBlocks(diskCount)
}
dataDrives := diskCount - parityDrives
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum++
}
for setIdx := range erasureSetUpCount[zoneIdx] {
if erasureSetUpCount[zoneIdx][setIdx] < writeQuorum {
return false
}
}
}
return true
}
// PutObjectTags - replace or add tags to an existing object