Cluster healthcheck improvements (#10408)

- do not fail the healthcheck if heal status
  was not obtained from one of the nodes,
  if many nodes fail then report this as a
  catastrophic error.
- add "x-minio-write-quorum" value to match
  the write tolerance supported by server.
- admin info now states if a drive is healing
  where madmin.Disk.Healing is set to true
  and madmin.Disk.State is "ok"
This commit is contained in:
Harshavardhana
2020-09-02 22:54:56 -07:00
committed by GitHub
parent 650dccfa9e
commit 8a291e1dc0
5 changed files with 49 additions and 25 deletions

View File

@@ -799,14 +799,12 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
keepConnLive(w, r, respCh)
}
func getAggregatedBackgroundHealState(ctx context.Context, failOnErr bool) (madmin.BgHealState, error) {
func getAggregatedBackgroundHealState(ctx context.Context) (madmin.BgHealState, error) {
var bgHealStates []madmin.BgHealState
localHealState, ok := getLocalBackgroundHealStatus()
if !ok {
if failOnErr {
return madmin.BgHealState{}, errServerNotInitialized
}
return madmin.BgHealState{}, errServerNotInitialized
}
// Get local heal status first
@@ -815,14 +813,16 @@ func getAggregatedBackgroundHealState(ctx context.Context, failOnErr bool) (madm
if globalIsDistErasure {
// Get heal status from other peers
peersHealStates, nerrs := globalNotificationSys.BackgroundHealStatus()
var errCount int
for _, nerr := range nerrs {
if nerr.Err != nil {
if failOnErr {
return madmin.BgHealState{}, nerr.Err
}
logger.LogIf(ctx, nerr.Err)
errCount++
}
}
if errCount == len(nerrs) {
return madmin.BgHealState{}, fmt.Errorf("all remote servers failed to report heal status, cluster is unhealthy")
}
bgHealStates = append(bgHealStates, peersHealStates...)
}
@@ -868,7 +868,12 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
return
}
aggregateHealStateResult, _ := getAggregatedBackgroundHealState(r.Context(), false)
aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context())
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
if err := json.NewEncoder(w).Encode(aggregateHealStateResult); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
@@ -1489,20 +1494,34 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
Notifications: notifyTarget,
}
// Collect any disk healing.
healing, _ := getAggregatedBackgroundHealState(ctx)
healDisks := make(map[string]struct{}, len(healing.HealDisks))
for _, disk := range healing.HealDisks {
healDisks[disk] = struct{}{}
}
// find all disks which belong to each respective endpoints
for i := range servers {
for _, disk := range storageInfo.Disks {
if strings.Contains(disk.Endpoint, servers[i].Endpoint) {
if _, ok := healDisks[disk.Endpoint]; ok {
disk.Healing = true
}
servers[i].Disks = append(servers[i].Disks, disk)
}
}
}
// add all the disks local to this server.
for _, disk := range storageInfo.Disks {
if disk.DrivePath == "" && disk.Endpoint == "" {
continue
}
if disk.Endpoint == disk.DrivePath {
if _, ok := healDisks[disk.Endpoint]; ok {
disk.Healing = true
}
servers[len(servers)-1].Disks = append(servers[len(servers)-1].Disks, disk)
}
}