fix: make sure to correctly initialize health checks (#17765)

health checks were missing for drives replaced since

- HealFormat() would replace the drives without a health check
- disconnected drives when they reconnect via connectEndpoint()
  the loop also loses health checks for local disks and merges
  these into a single code.
- other than this separate cleanUp, health check variables to avoid
  overloading them with similar requirements.
- also ensure that we compete via context selector for disk monitoring
  such that the canceled disks don't linger around longer waiting for
  the ticker to trigger.
- allow disabling active monitoring.
This commit is contained in:
Harshavardhana 2023-08-01 10:54:26 -07:00 committed by GitHub
parent 004f1e2f66
commit b0f0e53bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 83 additions and 45 deletions

View File

@ -58,7 +58,7 @@ type epHealth struct {
} }
// isOffline returns current liveness result of remote target. Add endpoint to // isOffline returns current liveness result of remote target. Add endpoint to
// healthcheck map if missing and default to online status // healthCheck map if missing and default to online status
func (sys *BucketTargetSys) isOffline(ep *url.URL) bool { func (sys *BucketTargetSys) isOffline(ep *url.URL) bool {
sys.hMutex.RLock() sys.hMutex.RLock()
defer sys.hMutex.RUnlock() defer sys.hMutex.RUnlock()
@ -126,7 +126,7 @@ func (sys *BucketTargetSys) heartBeat(ctx context.Context) {
} }
} }
// periodically rebuild the healthcheck map from list of targets to clear // periodically rebuild the healthCheck map from list of targets to clear
// out stale endpoints // out stale endpoints
func (sys *BucketTargetSys) reloadHealthCheckers(ctx context.Context) { func (sys *BucketTargetSys) reloadHealthCheckers(ctx context.Context) {
m := make(map[string]epHealth) m := make(map[string]epHealth)
@ -362,7 +362,7 @@ func NewBucketTargetSys(ctx context.Context) *BucketTargetSys {
hc: make(map[string]epHealth), hc: make(map[string]epHealth),
hcClient: newHCClient(), hcClient: newHCClient(),
} }
// reload healthcheck endpoints map periodically to remove stale endpoints from the map. // reload healthCheck endpoints map periodically to remove stale endpoints from the map.
go func() { go func() {
rTimer := time.NewTimer(defaultHealthCheckReloadDuration) rTimer := time.NewTimer(defaultHealthCheckReloadDuration)
defer rTimer.Stop() defer rTimer.Stop()

View File

@ -116,7 +116,10 @@ func (s *erasureSets) getDiskMap() map[Endpoint]StorageAPI {
// Initializes a new StorageAPI from the endpoint argument, returns // Initializes a new StorageAPI from the endpoint argument, returns
// StorageAPI and also `format` which exists on the disk. // StorageAPI and also `format` which exists on the disk.
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) { func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
disk, err := newStorageAPI(endpoint, false) disk, err := newStorageAPI(endpoint, storageOpts{
cleanUp: false,
healthCheck: false,
})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -132,6 +135,15 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error
} }
disk.Close()
disk, err = newStorageAPI(endpoint, storageOpts{
cleanUp: true,
healthCheck: true,
})
if err != nil {
return nil, nil, err
}
return disk, format, nil return disk, format, nil
} }
@ -241,21 +253,10 @@ func (s *erasureSets) connectDisks() {
} }
s.erasureDisks[setIndex][diskIndex].Close() s.erasureDisks[setIndex][diskIndex].Close()
} }
if disk.IsLocal() {
disk.SetDiskID(format.Erasure.This) disk.SetDiskID(format.Erasure.This)
s.erasureDisks[setIndex][diskIndex] = disk
} else {
// Enable healthcheck disk for remote endpoint.
disk, err = newStorageAPI(endpoint, true)
if err != nil {
printEndpointError(endpoint, err, false)
s.erasureDisksMu.Unlock()
return
}
disk.SetDiskID(format.Erasure.This)
s.erasureDisks[setIndex][diskIndex] = disk
}
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
s.erasureDisks[setIndex][diskIndex] = disk
s.erasureDisksMu.Unlock() s.erasureDisksMu.Unlock()
}(endpoint) }(endpoint)
} }
@ -1055,7 +1056,10 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// HealFormat - heals missing `format.json` on fresh unformatted disks. // HealFormat - heals missing `format.json` on fresh unformatted disks.
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, false) storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{
cleanUp: true,
healthCheck: true,
})
defer func(storageDisks []StorageAPI) { defer func(storageDisks []StorageAPI) {
if err != nil { if err != nil {

View File

@ -663,14 +663,14 @@ func closeStorageDisks(storageDisks ...StorageAPI) {
// Initialize storage disks for each endpoint. // Initialize storage disks for each endpoint.
// Errors are returned for each endpoint with matching index. // Errors are returned for each endpoint with matching index.
func initStorageDisksWithErrors(endpoints Endpoints, healthCheck bool) ([]StorageAPI, []error) { func initStorageDisksWithErrors(endpoints Endpoints, opts storageOpts) ([]StorageAPI, []error) {
// Bootstrap disks. // Bootstrap disks.
storageDisks := make([]StorageAPI, len(endpoints)) storageDisks := make([]StorageAPI, len(endpoints))
g := errgroup.WithNErrs(len(endpoints)) g := errgroup.WithNErrs(len(endpoints))
for index := range endpoints { for index := range endpoints {
index := index index := index
g.Go(func() (err error) { g.Go(func() (err error) {
storageDisks[index], err = newStorageAPI(endpoints[index], healthCheck) storageDisks[index], err = newStorageAPI(endpoints[index], opts)
return err return err
}, index) }, index)
} }

View File

@ -38,7 +38,7 @@ func TestFixFormatV3(t *testing.T) {
} }
endpoints := mustGetNewEndpoints(0, 8, erasureDirs...) endpoints := mustGetNewEndpoints(0, 8, erasureDirs...)
storageDisks, errs := initStorageDisksWithErrors(endpoints, false) storageDisks, errs := initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: false, healthCheck: false})
for _, err := range errs { for _, err := range errs {
if err != nil && err != errDiskNotFound { if err != nil && err != errDiskNotFound {
t.Fatal(err) t.Fatal(err)
@ -560,7 +560,7 @@ func benchmarkInitStorageDisksN(b *testing.B, nDisks int) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
endpoints := endpoints endpoints := endpoints
for pb.Next() { for pb.Next() {
initStorageDisksWithErrors(endpoints, false) initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: false, healthCheck: false})
} }
}) })
} }

View File

@ -207,7 +207,7 @@ func getRedirectLocation(r *http.Request) *xnet.URL {
} }
// guessIsHealthCheckReq - returns true if incoming request looks // guessIsHealthCheckReq - returns true if incoming request looks
// like healthcheck request // like healthCheck request
func guessIsHealthCheckReq(req *http.Request) bool { func guessIsHealthCheckReq(req *http.Request) bool {
if req == nil { if req == nil {
return false return false

View File

@ -193,7 +193,7 @@ var (
globalBucketSSEConfigSys *BucketSSEConfigSys globalBucketSSEConfigSys *BucketSSEConfigSys
globalBucketTargetSys *BucketTargetSys globalBucketTargetSys *BucketTargetSys
// globalAPIConfig controls S3 API requests throttling, // globalAPIConfig controls S3 API requests throttling,
// healthcheck readiness deadlines and cors settings. // healthCheck readiness deadlines and cors settings.
globalAPIConfig = apiConfig{listQuorum: "strict", rootAccess: true} globalAPIConfig = apiConfig{listQuorum: "strict", rootAccess: true}
globalStorageClass storageclass.Config globalStorageClass storageclass.Config

View File

@ -55,15 +55,20 @@ var globalObjectAPI ObjectLayer
// Global cacheObjects, only accessed by newCacheObjectsFn(). // Global cacheObjects, only accessed by newCacheObjectsFn().
var globalCacheObjectAPI CacheObjectLayer var globalCacheObjectAPI CacheObjectLayer
type storageOpts struct {
cleanUp bool
healthCheck bool
}
// Depending on the disk type network or local, initialize storage API. // Depending on the disk type network or local, initialize storage API.
func newStorageAPI(endpoint Endpoint, healthCheck bool) (storage StorageAPI, err error) { func newStorageAPI(endpoint Endpoint, opts storageOpts) (storage StorageAPI, err error) {
if endpoint.IsLocal { if endpoint.IsLocal {
storage, err := newXLStorage(endpoint, healthCheck) storage, err := newXLStorage(endpoint, opts.cleanUp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newXLStorageDiskIDCheck(storage, healthCheck), nil return newXLStorageDiskIDCheck(storage, opts.healthCheck), nil
} }
return newStorageRESTClient(endpoint, healthCheck), nil return newStorageRESTClient(endpoint, opts.healthCheck), nil
} }

View File

@ -101,11 +101,12 @@ func bgFormatErasureCleanupTmp(diskPath string) {
// Delete all temporary files created for DirectIO write check // Delete all temporary files created for DirectIO write check
files, _ := filepath.Glob(filepath.Join(diskPath, ".writable-check-*.tmp")) files, _ := filepath.Glob(filepath.Join(diskPath, ".writable-check-*.tmp"))
for _, file := range files { for _, file := range files {
removeAll(file) go removeAll(file)
} }
// Remove the entire folder in case there are leftovers that didn't get cleaned up before restart. // Remove the entire folder in case there are leftovers that didn't get cleaned up before restart.
go removeAll(pathJoin(diskPath, minioMetaTmpBucket+"-old")) go removeAll(pathJoin(diskPath, minioMetaTmpBucket+"-old"))
// Renames and schedules for purging all bucket metacache. // Renames and schedules for purging all bucket metacache.
go renameAllBucketMetacache(diskPath) go renameAllBucketMetacache(diskPath)
} }
@ -155,7 +156,7 @@ func isServerResolvable(endpoint Endpoint, timeout time.Duration) error {
// time. additionally make sure to close all the disks used in this attempt. // time. additionally make sure to close all the disks used in this attempt.
func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpoints, poolCount, setCount, setDriveCount int, deploymentID, distributionAlgo string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpoints, poolCount, setCount, setDriveCount int, deploymentID, distributionAlgo string) (storageDisks []StorageAPI, format *formatErasureV3, err error) {
// Initialize all storage disks // Initialize all storage disks
storageDisks, errs := initStorageDisksWithErrors(endpoints, true) storageDisks, errs := initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: true, healthCheck: true})
defer func(storageDisks []StorageAPI) { defer func(storageDisks []StorageAPI) {
if err != nil { if err != nil {

View File

@ -85,7 +85,7 @@ func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handl
// Add Admin router, all APIs are enabled in server mode. // Add Admin router, all APIs are enabled in server mode.
registerAdminRouter(router, true) registerAdminRouter(router, true)
// Add healthcheck router // Add healthCheck router
registerHealthCheckRouter(router) registerHealthCheckRouter(router)
// Add server metrics router // Add server metrics router

View File

@ -828,7 +828,7 @@ func (client *storageRESTClient) Close() error {
} }
// Returns a storage rest client. // Returns a storage rest client.
func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClient { func newStorageRESTClient(endpoint Endpoint, healthCheck bool) *storageRESTClient {
serverURL := &url.URL{ serverURL := &url.URL{
Scheme: endpoint.Scheme, Scheme: endpoint.Scheme,
Host: endpoint.Host, Host: endpoint.Host,
@ -837,7 +837,7 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien
restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
if healthcheck { if healthCheck {
// Use a separate client to avoid recursive calls. // Use a separate client to avoid recursive calls.
healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken())
healthClient.NoMetrics = true healthClient.NoMetrics = true

View File

@ -31,6 +31,7 @@ import (
"time" "time"
"github.com/minio/madmin-go/v3" "github.com/minio/madmin-go/v3"
"github.com/minio/minio/internal/config"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/logger"
"github.com/minio/pkg/env" "github.com/minio/pkg/env"
@ -176,7 +177,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis
for i := range xl.apiLatencies[:] { for i := range xl.apiLatencies[:] {
xl.apiLatencies[i] = &lockedLastMinuteLatency{} xl.apiLatencies[i] = &lockedLastMinuteLatency{}
} }
if healthCheck { if healthCheck && diskActiveMonitoring {
go xl.monitorDiskWritable(xl.diskCtx) go xl.monitorDiskWritable(xl.diskCtx)
} }
return &xl return &xl
@ -718,6 +719,9 @@ var diskStartChecking = 32
// offline under active monitoring. // offline under active monitoring.
var diskMaxTimeout = 2 * time.Minute var diskMaxTimeout = 2 * time.Minute
// diskActiveMonitoring indicates if we have enabled "active" disk monitoring
var diskActiveMonitoring = true
func init() { func init() {
s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "") s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "")
if s != "" { if s != "" {
@ -727,6 +731,7 @@ func init() {
diskMaxConcurrent = 512 diskMaxConcurrent = 512
} }
} }
d := env.Get("_MINIO_DISK_MAX_TIMEOUT", "") d := env.Get("_MINIO_DISK_MAX_TIMEOUT", "")
if d != "" { if d != "" {
timeoutOperation, _ := time.ParseDuration(d) timeoutOperation, _ := time.ParseDuration(d)
@ -737,6 +742,8 @@ func init() {
} }
} }
diskActiveMonitoring = env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn
diskStartChecking = 16 + diskMaxConcurrent/8 diskStartChecking = 16 + diskMaxConcurrent/8
if diskStartChecking > diskMaxConcurrent { if diskStartChecking > diskMaxConcurrent {
diskStartChecking = diskMaxConcurrent diskStartChecking = diskMaxConcurrent
@ -997,41 +1004,48 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
toWrite := []byte{xioutil.DirectioAlignSize + 1: 42} toWrite := []byte{xioutil.DirectioAlignSize + 1: 42}
rng := rand.New(rand.NewSource(time.Now().UnixNano())) rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for range t.C { monitor := func() bool {
if contextCanceled(ctx) { if contextCanceled(ctx) {
break return false
} }
if atomic.LoadInt32(&p.health.status) != diskHealthOK { if atomic.LoadInt32(&p.health.status) != diskHealthOK {
continue return true
} }
if time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) < skipIfSuccessBefore { if time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) < skipIfSuccessBefore {
// We recently saw a success - no need to check. // We recently saw a success - no need to check.
continue return true
} }
goOffline := func(err error, spent time.Duration) { goOffline := func(err error, spent time.Duration) {
if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) {
logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err)) logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err))
go p.monitorDiskStatus(spent) go p.monitorDiskStatus(spent)
} }
} }
// Offset checks a bit. // Offset checks a bit.
time.Sleep(time.Duration(rng.Int63n(int64(1 * time.Second)))) time.Sleep(time.Duration(rng.Int63n(int64(1 * time.Second))))
done := make(chan struct{})
dctx, dcancel := context.WithCancel(ctx)
started := time.Now() started := time.Now()
go func() { go func() {
timeout := time.NewTimer(diskMaxTimeout) timeout := time.NewTimer(diskMaxTimeout)
select { select {
case <-timeout.C: case <-dctx.Done():
spent := time.Since(started)
goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)), spent)
case <-done:
if !timeout.Stop() { if !timeout.Stop() {
<-timeout.C <-timeout.C
} }
case <-timeout.C:
spent := time.Since(started)
goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)), spent)
} }
}() }()
func() { func() {
defer close(done) defer dcancel()
err := p.storage.WriteAll(ctx, minioMetaTmpBucket, fn, toWrite) err := p.storage.WriteAll(ctx, minioMetaTmpBucket, fn, toWrite)
if err != nil { if err != nil {
if osErrToFileErr(err) == errFaultyDisk { if osErrToFileErr(err) == errFaultyDisk {
@ -1047,6 +1061,20 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) {
return return
} }
}() }()
// Continue to monitor
return true
}
for {
select {
case <-ctx.Done():
return
case <-t.C:
if !monitor() {
return
}
}
} }
} }