From b0f0e53bba345dcb3f3f13c039f534d65e33c822 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 1 Aug 2023 10:54:26 -0700 Subject: [PATCH] fix: make sure to correctly initialize health checks (#17765) health checks were missing for drives replaced since - HealFormat() would replace the drives without a health check - disconnected drives when they reconnect via connectEndpoint() the loop also loses health checks for local disks and merges these into a single code. - other than this separate cleanUp, health check variables to avoid overloading them with similar requirements. - also ensure that we compete via context selector for disk monitoring such that the canceled disks don't linger around longer waiting for the ticker to trigger. - allow disabling active monitoring. --- cmd/bucket-targets.go | 6 ++-- cmd/erasure-sets.go | 36 +++++++++++++----------- cmd/format-erasure.go | 4 +-- cmd/format-erasure_test.go | 4 +-- cmd/generic-handlers.go | 2 +- cmd/globals.go | 2 +- cmd/object-api-common.go | 13 ++++++--- cmd/prepare-storage.go | 5 ++-- cmd/routers.go | 2 +- cmd/storage-rest-client.go | 4 +-- cmd/xl-storage-disk-id-check.go | 50 +++++++++++++++++++++++++-------- 11 files changed, 83 insertions(+), 45 deletions(-) diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index e1249465b..26a862267 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -58,7 +58,7 @@ type epHealth struct { } // isOffline returns current liveness result of remote target. Add endpoint to -// healthcheck map if missing and default to online status +// healthCheck map if missing and default to online status func (sys *BucketTargetSys) isOffline(ep *url.URL) bool { sys.hMutex.RLock() defer sys.hMutex.RUnlock() @@ -126,7 +126,7 @@ func (sys *BucketTargetSys) heartBeat(ctx context.Context) { } } -// periodically rebuild the healthcheck map from list of targets to clear +// periodically rebuild the healthCheck map from list of targets to clear // out stale endpoints func (sys *BucketTargetSys) reloadHealthCheckers(ctx context.Context) { m := make(map[string]epHealth) @@ -362,7 +362,7 @@ func NewBucketTargetSys(ctx context.Context) *BucketTargetSys { hc: make(map[string]epHealth), hcClient: newHCClient(), } - // reload healthcheck endpoints map periodically to remove stale endpoints from the map. + // reload healthCheck endpoints map periodically to remove stale endpoints from the map. go func() { rTimer := time.NewTimer(defaultHealthCheckReloadDuration) defer rTimer.Stop() diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index db63c3116..2f56eb4f6 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -116,7 +116,10 @@ func (s *erasureSets) getDiskMap() map[Endpoint]StorageAPI { // Initializes a new StorageAPI from the endpoint argument, returns // StorageAPI and also `format` which exists on the disk. func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) { - disk, err := newStorageAPI(endpoint, false) + disk, err := newStorageAPI(endpoint, storageOpts{ + cleanUp: false, + healthCheck: false, + }) if err != nil { return nil, nil, err } @@ -132,6 +135,15 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) { return nil, nil, fmt.Errorf("Drive: %s returned %w", disk, err) // make sure to '%w' to wrap the error } + disk.Close() + disk, err = newStorageAPI(endpoint, storageOpts{ + cleanUp: true, + healthCheck: true, + }) + if err != nil { + return nil, nil, err + } + return disk, format, nil } @@ -241,21 +253,10 @@ func (s *erasureSets) connectDisks() { } s.erasureDisks[setIndex][diskIndex].Close() } - if disk.IsLocal() { - disk.SetDiskID(format.Erasure.This) - s.erasureDisks[setIndex][diskIndex] = disk - } else { - // Enable healthcheck disk for remote endpoint. - disk, err = newStorageAPI(endpoint, true) - if err != nil { - printEndpointError(endpoint, err, false) - s.erasureDisksMu.Unlock() - return - } - disk.SetDiskID(format.Erasure.This) - s.erasureDisks[setIndex][diskIndex] = disk - } + + disk.SetDiskID(format.Erasure.This) disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) + s.erasureDisks[setIndex][diskIndex] = disk s.erasureDisksMu.Unlock() }(endpoint) } @@ -1055,7 +1056,10 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) { // HealFormat - heals missing `format.json` on fresh unformatted disks. func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { - storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, false) + storageDisks, _ := initStorageDisksWithErrors(s.endpoints.Endpoints, storageOpts{ + cleanUp: true, + healthCheck: true, + }) defer func(storageDisks []StorageAPI) { if err != nil { diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 874737cb3..cd85e3e5c 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -663,14 +663,14 @@ func closeStorageDisks(storageDisks ...StorageAPI) { // Initialize storage disks for each endpoint. // Errors are returned for each endpoint with matching index. -func initStorageDisksWithErrors(endpoints Endpoints, healthCheck bool) ([]StorageAPI, []error) { +func initStorageDisksWithErrors(endpoints Endpoints, opts storageOpts) ([]StorageAPI, []error) { // Bootstrap disks. storageDisks := make([]StorageAPI, len(endpoints)) g := errgroup.WithNErrs(len(endpoints)) for index := range endpoints { index := index g.Go(func() (err error) { - storageDisks[index], err = newStorageAPI(endpoints[index], healthCheck) + storageDisks[index], err = newStorageAPI(endpoints[index], opts) return err }, index) } diff --git a/cmd/format-erasure_test.go b/cmd/format-erasure_test.go index eb380f37c..2fe317045 100644 --- a/cmd/format-erasure_test.go +++ b/cmd/format-erasure_test.go @@ -38,7 +38,7 @@ func TestFixFormatV3(t *testing.T) { } endpoints := mustGetNewEndpoints(0, 8, erasureDirs...) - storageDisks, errs := initStorageDisksWithErrors(endpoints, false) + storageDisks, errs := initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: false, healthCheck: false}) for _, err := range errs { if err != nil && err != errDiskNotFound { t.Fatal(err) @@ -560,7 +560,7 @@ func benchmarkInitStorageDisksN(b *testing.B, nDisks int) { b.RunParallel(func(pb *testing.PB) { endpoints := endpoints for pb.Next() { - initStorageDisksWithErrors(endpoints, false) + initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: false, healthCheck: false}) } }) } diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index 4d56c7b98..506e8c561 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -207,7 +207,7 @@ func getRedirectLocation(r *http.Request) *xnet.URL { } // guessIsHealthCheckReq - returns true if incoming request looks -// like healthcheck request +// like healthCheck request func guessIsHealthCheckReq(req *http.Request) bool { if req == nil { return false diff --git a/cmd/globals.go b/cmd/globals.go index c10bcffe8..88911ebfe 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -193,7 +193,7 @@ var ( globalBucketSSEConfigSys *BucketSSEConfigSys globalBucketTargetSys *BucketTargetSys // globalAPIConfig controls S3 API requests throttling, - // healthcheck readiness deadlines and cors settings. + // healthCheck readiness deadlines and cors settings. globalAPIConfig = apiConfig{listQuorum: "strict", rootAccess: true} globalStorageClass storageclass.Config diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index e1a4b9a79..eec84200c 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -55,15 +55,20 @@ var globalObjectAPI ObjectLayer // Global cacheObjects, only accessed by newCacheObjectsFn(). var globalCacheObjectAPI CacheObjectLayer +type storageOpts struct { + cleanUp bool + healthCheck bool +} + // Depending on the disk type network or local, initialize storage API. -func newStorageAPI(endpoint Endpoint, healthCheck bool) (storage StorageAPI, err error) { +func newStorageAPI(endpoint Endpoint, opts storageOpts) (storage StorageAPI, err error) { if endpoint.IsLocal { - storage, err := newXLStorage(endpoint, healthCheck) + storage, err := newXLStorage(endpoint, opts.cleanUp) if err != nil { return nil, err } - return newXLStorageDiskIDCheck(storage, healthCheck), nil + return newXLStorageDiskIDCheck(storage, opts.healthCheck), nil } - return newStorageRESTClient(endpoint, healthCheck), nil + return newStorageRESTClient(endpoint, opts.healthCheck), nil } diff --git a/cmd/prepare-storage.go b/cmd/prepare-storage.go index d29b612aa..cf7e7544b 100644 --- a/cmd/prepare-storage.go +++ b/cmd/prepare-storage.go @@ -101,11 +101,12 @@ func bgFormatErasureCleanupTmp(diskPath string) { // Delete all temporary files created for DirectIO write check files, _ := filepath.Glob(filepath.Join(diskPath, ".writable-check-*.tmp")) for _, file := range files { - removeAll(file) + go removeAll(file) } // Remove the entire folder in case there are leftovers that didn't get cleaned up before restart. go removeAll(pathJoin(diskPath, minioMetaTmpBucket+"-old")) + // Renames and schedules for purging all bucket metacache. go renameAllBucketMetacache(diskPath) } @@ -155,7 +156,7 @@ func isServerResolvable(endpoint Endpoint, timeout time.Duration) error { // time. additionally make sure to close all the disks used in this attempt. func connectLoadInitFormats(verboseLogging bool, firstDisk bool, endpoints Endpoints, poolCount, setCount, setDriveCount int, deploymentID, distributionAlgo string) (storageDisks []StorageAPI, format *formatErasureV3, err error) { // Initialize all storage disks - storageDisks, errs := initStorageDisksWithErrors(endpoints, true) + storageDisks, errs := initStorageDisksWithErrors(endpoints, storageOpts{cleanUp: true, healthCheck: true}) defer func(storageDisks []StorageAPI) { if err != nil { diff --git a/cmd/routers.go b/cmd/routers.go index 55e5ef62c..47d52df9c 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -85,7 +85,7 @@ func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handl // Add Admin router, all APIs are enabled in server mode. registerAdminRouter(router, true) - // Add healthcheck router + // Add healthCheck router registerHealthCheckRouter(router) // Add server metrics router diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 47be43dd6..56d542f65 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -828,7 +828,7 @@ func (client *storageRESTClient) Close() error { } // Returns a storage rest client. -func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClient { +func newStorageRESTClient(endpoint Endpoint, healthCheck bool) *storageRESTClient { serverURL := &url.URL{ Scheme: endpoint.Scheme, Host: endpoint.Host, @@ -837,7 +837,7 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien restClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) - if healthcheck { + if healthCheck { // Use a separate client to avoid recursive calls. healthClient := rest.NewClient(serverURL, globalInternodeTransport, newCachedAuthToken()) healthClient.NoMetrics = true diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index 3df1a69f4..24758a241 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -31,6 +31,7 @@ import ( "time" "github.com/minio/madmin-go/v3" + "github.com/minio/minio/internal/config" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/env" @@ -176,7 +177,7 @@ func newXLStorageDiskIDCheck(storage *xlStorage, healthCheck bool) *xlStorageDis for i := range xl.apiLatencies[:] { xl.apiLatencies[i] = &lockedLastMinuteLatency{} } - if healthCheck { + if healthCheck && diskActiveMonitoring { go xl.monitorDiskWritable(xl.diskCtx) } return &xl @@ -718,6 +719,9 @@ var diskStartChecking = 32 // offline under active monitoring. var diskMaxTimeout = 2 * time.Minute +// diskActiveMonitoring indicates if we have enabled "active" disk monitoring +var diskActiveMonitoring = true + func init() { s := env.Get("_MINIO_DISK_MAX_CONCURRENT", "") if s != "" { @@ -727,6 +731,7 @@ func init() { diskMaxConcurrent = 512 } } + d := env.Get("_MINIO_DISK_MAX_TIMEOUT", "") if d != "" { timeoutOperation, _ := time.ParseDuration(d) @@ -737,6 +742,8 @@ func init() { } } + diskActiveMonitoring = env.Get("_MINIO_DISK_ACTIVE_MONITORING", config.EnableOn) == config.EnableOn + diskStartChecking = 16 + diskMaxConcurrent/8 if diskStartChecking > diskMaxConcurrent { diskStartChecking = diskMaxConcurrent @@ -997,41 +1004,48 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { toWrite := []byte{xioutil.DirectioAlignSize + 1: 42} rng := rand.New(rand.NewSource(time.Now().UnixNano())) - for range t.C { + monitor := func() bool { if contextCanceled(ctx) { - break + return false } + if atomic.LoadInt32(&p.health.status) != diskHealthOK { - continue + return true } + if time.Since(time.Unix(0, atomic.LoadInt64(&p.health.lastSuccess))) < skipIfSuccessBefore { // We recently saw a success - no need to check. - continue + return true } + goOffline := func(err error, spent time.Duration) { if atomic.CompareAndSwapInt32(&p.health.status, diskHealthOK, diskHealthFaulty) { logger.LogAlwaysIf(ctx, fmt.Errorf("node(%s): taking drive %s offline: %v", globalLocalNodeName, p.storage.String(), err)) go p.monitorDiskStatus(spent) } } + // Offset checks a bit. time.Sleep(time.Duration(rng.Int63n(int64(1 * time.Second)))) - done := make(chan struct{}) + + dctx, dcancel := context.WithCancel(ctx) started := time.Now() go func() { timeout := time.NewTimer(diskMaxTimeout) select { - case <-timeout.C: - spent := time.Since(started) - goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)), spent) - case <-done: + case <-dctx.Done(): if !timeout.Stop() { <-timeout.C } + case <-timeout.C: + spent := time.Since(started) + goOffline(fmt.Errorf("unable to write+read for %v", spent.Round(time.Millisecond)), spent) } }() + func() { - defer close(done) + defer dcancel() + err := p.storage.WriteAll(ctx, minioMetaTmpBucket, fn, toWrite) if err != nil { if osErrToFileErr(err) == errFaultyDisk { @@ -1047,6 +1061,20 @@ func (p *xlStorageDiskIDCheck) monitorDiskWritable(ctx context.Context) { return } }() + + // Continue to monitor + return true + } + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if !monitor() { + return + } + } } }