diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index d2acaed18..700785854 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -94,15 +94,15 @@ type allHealState struct { } // newHealState - initialize global heal state management -func newHealState() *allHealState { - healState := &allHealState{ +func newHealState(cleanup bool) *allHealState { + hstate := &allHealState{ healSeqMap: make(map[string]*healSequence), healLocalDisks: map[Endpoint]struct{}{}, } - - go healState.periodicHealSeqsClean(GlobalContext) - - return healState + if cleanup { + go hstate.periodicHealSeqsClean(GlobalContext) + } + return hstate } func (ahs *allHealState) healDriveCount() int { @@ -779,13 +779,18 @@ func (h *healSequence) healFromSourceCh() { } func (h *healSequence) healDiskMeta(objAPI ObjectLayer) error { - // Start healing the config prefix. - if err := h.healMinioSysMeta(objAPI, minioConfigPrefix)(); err != nil { - return err + // Try to pro-actively heal backend-encrypted file. + if err := h.queueHealTask(healSource{ + bucket: minioMetaBucket, + object: backendEncryptedFile, + }, madmin.HealItemBucketMetadata); err != nil { + if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + return err + } } - // Start healing the bucket config prefix. - return h.healMinioSysMeta(objAPI, bucketConfigPrefix)() + // Start healing the config prefix. + return h.healMinioSysMeta(objAPI, minioConfigPrefix)() } func (h *healSequence) healItems(objAPI ObjectLayer, bucketsOnly bool) error { diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index d41b7ea81..b86ca3d67 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -102,7 +102,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { case task.bucket == SlashSeparator: res, err = healDiskFormat(ctx, objAPI, task.opts) case task.bucket != "" && task.object == "": - res, err = objAPI.HealBucket(ctx, task.bucket, task.opts.DryRun, task.opts.Remove) + res, err = objAPI.HealBucket(ctx, task.bucket, task.opts) case task.bucket != "" && task.object != "": res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts) } diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 0e273f075..faa63f6c1 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -46,16 +46,7 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { initBackgroundHealing(ctx, objAPI) // start quick background healing - var bgSeq *healSequence - var found bool - - for { - bgSeq, found = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if found { - break - } - time.Sleep(time.Second) - } + bgSeq := mustGetHealSequence(ctx) globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...) diff --git a/cmd/bucket-metadata-sys.go b/cmd/bucket-metadata-sys.go index e9f646f19..e1b8f99bc 100644 --- a/cmd/bucket-metadata-sys.go +++ b/cmd/bucket-metadata-sys.go @@ -440,6 +440,11 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck for index := range buckets { index := index g.Go(func() error { + if _, err := objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{ + Remove: true, + }); err != nil { + return err + } meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name) if err != nil { return err diff --git a/cmd/common-main.go b/cmd/common-main.go index 59e602317..59fda680f 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -48,6 +48,11 @@ func init() { rand.Seed(time.Now().UTC().UnixNano()) globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second) + initGlobalContext() + + globalReplicationState = newReplicationState() + globalTransitionState = newTransitionState() + gob.Register(StorageErr("")) } diff --git a/cmd/erasure-bucket.go b/cmd/erasure-bucket.go index 15d950a48..e6f338ba6 100644 --- a/cmd/erasure-bucket.go +++ b/cmd/erasure-bucket.go @@ -32,11 +32,16 @@ var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound) // MakeMultipleBuckets - create a list of buckets -func (er erasureObjects) MakeMultipleBuckets(ctx context.Context, buckets ...string) error { +func (er erasureObjects) MakeMultipleBuckets(ctx context.Context, bucketsInfo ...BucketInfo) error { storageDisks := er.getDisks() g := errgroup.WithNErrs(len(storageDisks)) + buckets := make([]string, len(bucketsInfo)) + for i := range bucketsInfo { + buckets[i] = bucketsInfo[i].Name + } + // Make a volume entry on all underlying storage disks. for index := range storageDisks { index := index diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 8d3399baf..0a8efa17f 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -32,9 +32,9 @@ import ( // Heals a bucket if it doesn't exist on one of the disks, additionally // also heals the missing entries for bucket metadata files // `policy.json, notification.xml, listeners.json`. -func (er erasureObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ( +func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) ( result madmin.HealResultItem, err error) { - if !dryRun { + if !opts.DryRun { defer ObjectPathUpdated(bucket) } @@ -45,7 +45,7 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, dryRun, writeQuorum := getWriteQuorum(len(storageDisks)) // Heal bucket. - return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, dryRun) + return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts.DryRun) } // Heal bucket - create buckets on disks where it does not exist. diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index f946db7b3..ef768c6fa 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -130,7 +130,10 @@ func TestHealing(t *testing.T) { t.Fatal(err) } // This would create the bucket. - _, err = er.HealBucket(ctx, bucket, false, false) + _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ + DryRun: false, + Remove: false, + }) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-server-sets.go b/cmd/erasure-server-sets.go index add3d4fb7..643d9d025 100644 --- a/cmd/erasure-server-sets.go +++ b/cmd/erasure-server-sets.go @@ -430,7 +430,7 @@ func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloom return firstErr } -func (z *erasureServerPools) MakeMultipleBuckets(ctx context.Context, buckets ...string) error { +func (z *erasureServerPools) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error { g := errgroup.WithNErrs(len(z.serverPools)) // Create buckets in parallel across all sets. @@ -1175,14 +1175,22 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi return r, nil } -func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) { +func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { var r = madmin.HealResultItem{ Type: madmin.HealItemBucket, Bucket: bucket, } + // Ensure heal opts for bucket metadata be deep healed all the time. + opts.ScanMode = madmin.HealDeepScan + + // Ignore the results on purpose. + if _, err := z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket), "", opts); err != nil { + logger.LogIf(ctx, fmt.Errorf("Healing bucket metadata for %s failed", bucket)) + } + for _, zone := range z.serverPools { - result, err := zone.HealBucket(ctx, bucket, dryRun, remove) + result, err := zone.HealBucket(ctx, bucket, opts) if err != nil { switch err.(type) { case BucketNotFound: diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 8d6e2a691..919d00504 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -563,7 +563,7 @@ func (s *erasureSets) Shutdown(ctx context.Context) error { } // MakeMultipleBuckets - make many buckets at once. -func (s *erasureSets) MakeMultipleBuckets(ctx context.Context, buckets ...string) error { +func (s *erasureSets) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error { g := errgroup.WithNErrs(len(s.sets)) // Create buckets in parallel across all sets. @@ -1286,7 +1286,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H } // HealBucket - heals inconsistent buckets and bucket metadata on all sets. -func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (result madmin.HealResultItem, err error) { +func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { // Initialize heal result info result = madmin.HealResultItem{ Type: madmin.HealItemBucket, @@ -1297,7 +1297,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem for _, s := range s.sets { var healResult madmin.HealResultItem - healResult, err = s.HealBucket(ctx, bucket, dryRun, remove) + healResult, err = s.HealBucket(ctx, bucket, opts) if err != nil { return result, err } @@ -1333,6 +1333,11 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) listBuckets = append(listBuckets, BucketInfo(v)) } sort.Sort(byBucketName(listBuckets)) + + if err := s.MakeMultipleBuckets(ctx, listBuckets...); err != nil { + return listBuckets, err + } + return listBuckets, nil } @@ -1387,19 +1392,7 @@ func (s *erasureSets) maintainMRFList() { // to find objects related to the new disk that needs to be healed. func (s *erasureSets) healMRFRoutine() { // Wait until background heal state is initialized - var bgSeq *healSequence - for { - if globalBackgroundHealState == nil { - time.Sleep(time.Second) - continue - } - var ok bool - bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if ok { - break - } - time.Sleep(time.Second) - } + bgSeq := mustGetHealSequence(GlobalContext) for e := range s.disksConnectEvent { // Get the list of objects related the er.set diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index b9d1a20ff..9d4578e54 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1540,7 +1540,7 @@ func (fs *FSObjects) HealObject(ctx context.Context, bucket, object, versionID s } // HealBucket - no-op for fs, Valid only for Erasure. -func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, +func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { logger.LogIf(ctx, NotImplemented{}) return madmin.HealResultItem{}, NotImplemented{} @@ -1561,10 +1561,9 @@ func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, opt return NotImplemented{} } -// ListBucketsHeal - list all buckets to be healed. Valid only for Erasure +// ListBucketsHeal - list all buckets to be healed. Valid only for Erasure, returns ListBuckets() in single drive mode. func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { - logger.LogIf(ctx, NotImplemented{}) - return []BucketInfo{}, NotImplemented{} + return fs.ListBuckets(ctx) } // GetMetrics - no op diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 4f115f124..cf0ad8e29 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -52,7 +52,7 @@ func (a GatewayUnsupported) SetDriveCount() int { } // MakeMultipleBuckets is dummy stub for gateway. -func (a GatewayUnsupported) MakeMultipleBuckets(ctx context.Context, buckets ...string) error { +func (a GatewayUnsupported) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error { return NotImplemented{} } @@ -171,7 +171,7 @@ func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin } // HealBucket - Not implemented stub -func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) { +func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { return madmin.HealResultItem{}, NotImplemented{} } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 5f1660dc4..221b95840 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -96,41 +96,50 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { }, true } -// healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { +func mustGetHealSequence(ctx context.Context) *healSequence { // Get background heal sequence to send elements to heal - var bgSeq *healSequence - var ok bool for { - bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) - if ok { - break - } - select { - case <-ctx.Done(): - return nil - case <-time.After(time.Second): + globalHealStateLK.RLock() + hstate := globalBackgroundHealState + globalHealStateLK.RUnlock() + + if hstate == nil { + time.Sleep(time.Second) continue } + + bgSeq, ok := hstate.getHealSequenceByToken(bgHealingUUID) + if !ok { + time.Sleep(time.Second) + continue + } + return bgSeq } +} + +// healErasureSet lists and heals all objects in a specific erasure set +func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { + bgSeq := mustGetHealSequence(ctx) buckets = append(buckets, BucketInfo{ Name: pathJoin(minioMetaBucket, minioConfigPrefix), - }, BucketInfo{ - Name: pathJoin(minioMetaBucket, bucketConfigPrefix), - }) // add metadata .minio.sys/ bucket prefixes to heal + }) // Try to pro-actively heal backend-encrypted file. - bgSeq.sourceCh <- healSource{ + if err := bgSeq.queueHealTask(healSource{ bucket: minioMetaBucket, object: backendEncryptedFile, + }, madmin.HealItemMetadata); err != nil { + logger.LogIf(ctx, err) } // Heal all buckets with all objects for _, bucket := range buckets { // Heal current bucket - bgSeq.sourceCh <- healSource{ + if err := bgSeq.queueHealTask(healSource{ bucket: bucket.Name, + }, madmin.HealItemBucket); err != nil { + logger.LogIf(ctx, err) } var entryChs []FileInfoVersionsCh @@ -165,10 +174,12 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis } for _, version := range entry.Versions { - bgSeq.sourceCh <- healSource{ + if err := bgSeq.queueHealTask(healSource{ bucket: bucket.Name, object: version.Name, versionID: version.VersionID, + }, madmin.HealItemObject); err != nil { + logger.LogIf(ctx, err) } } } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index ccfcd5e62..45bc83f09 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -82,7 +82,7 @@ type ObjectLayer interface { StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) // local queries only local disks // Bucket operations. - MakeMultipleBuckets(ctx context.Context, buckets ...string) error + MakeMultipleBuckets(ctx context.Context, bucketInfo ...BucketInfo) error MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) @@ -122,7 +122,7 @@ type ObjectLayer interface { // Healing operations. HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) - HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) + HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) diff --git a/cmd/server-main.go b/cmd/server-main.go index 787afa903..e709a7d16 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -26,6 +26,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" @@ -162,7 +163,17 @@ func serverHandleEnvVars() { handleCommonEnvVars() } +var globalHealStateLK sync.RWMutex + func newAllSubsystems() { + if globalIsErasure { + globalHealStateLK.Lock() + // New global heal state + globalAllHealState = newHealState(true) + globalBackgroundHealState = newHealState(false) + globalHealStateLK.Unlock() + } + // Create new notification system and initialize notification targets globalNotificationSys = NewNotificationSys(globalEndpoints) @@ -296,29 +307,12 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) { // you want to add extra context to your error. This // ensures top level retry works accordingly. // List buckets to heal, and be re-used for loading configs. - var buckets []BucketInfo rquorum := InsufficientReadQuorum{} wquorum := InsufficientWriteQuorum{} - if globalIsErasure { - buckets, err = newObject.ListBucketsHeal(ctx) - if err != nil { - return fmt.Errorf("Unable to list buckets to heal: %w", err) - } - bucketNames := make([]string, len(buckets)) - for i := range buckets { - bucketNames[i] = buckets[i].Name - } - if err = newObject.MakeMultipleBuckets(ctx, bucketNames...); err != nil { - if errors.As(err, &wquorum) || errors.As(err, &rquorum) { - // Return the error upwards for the caller to retry. - return fmt.Errorf("Unable to heal buckets: %w", err) - } - } - } else { - buckets, err = newObject.ListBuckets(ctx) - if err != nil { - return fmt.Errorf("Unable to list buckets: %w", err) - } + + buckets, err := newObject.ListBucketsHeal(ctx) + if err != nil { + return fmt.Errorf("Unable to list buckets to heal: %w", err) } // Initialize config system. @@ -416,15 +410,6 @@ func serverMain(ctx *cli.Context) { // Set system resources to maximum. setMaxResources() - if globalIsErasure { - // New global heal state - globalAllHealState = newHealState() - globalBackgroundHealState = newHealState() - globalReplicationState = newReplicationState() - globalTransitionState = newTransitionState() - - } - // Configure server. handler, err := configureServerHandler(globalEndpoints) if err != nil { @@ -471,8 +456,6 @@ func serverMain(ctx *cli.Context) { logger.SetDeploymentID(globalDeploymentID) - initDataCrawler(GlobalContext, newObject) - // Enable background operations for erasure coding if globalIsErasure { initAutoHeal(GlobalContext, newObject) @@ -480,6 +463,8 @@ func serverMain(ctx *cli.Context) { initBackgroundTransition(GlobalContext, newObject) } + initDataCrawler(GlobalContext, newObject) + if err = initServer(GlobalContext, newObject); err != nil { var cerr config.Err // For any config error, we don't need to drop into safe-mode diff --git a/cmd/service.go b/cmd/service.go index 4bf3e6a8c..fe4c04e9c 100644 --- a/cmd/service.go +++ b/cmd/service.go @@ -45,11 +45,6 @@ var GlobalContext context.Context // cancelGlobalContext can be used to indicate server shutdown. var cancelGlobalContext context.CancelFunc -// Initialize service mutex once. -func init() { - initGlobalContext() -} - func initGlobalContext() { GlobalContext, cancelGlobalContext = context.WithCancel(context.Background()) GlobalServiceDoneCh = GlobalContext.Done() diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 8a3e6511c..89cb5bfcb 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -640,9 +640,9 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien healthClient.ExpectTimeouts = true restClient.HealthCheckFn = func() bool { ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout) + defer cancel() respBody, err := healthClient.Call(ctx, storageRESTMethodHealth, nil, nil, -1) xhttp.DrainBody(respBody) - cancel() return toStorageErr(err) != errDiskNotFound } } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index aa1d54454..6c4588387 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -428,7 +428,7 @@ func resetGlobalIsErasure() { func resetGlobalHealState() { // Init global heal state if globalAllHealState == nil { - globalAllHealState = newHealState() + globalAllHealState = newHealState(false) } else { globalAllHealState.Lock() for _, v := range globalAllHealState.healSeqMap { @@ -441,7 +441,7 @@ func resetGlobalHealState() { // Init background heal state if globalBackgroundHealState == nil { - globalBackgroundHealState = newHealState() + globalBackgroundHealState = newHealState(false) } else { globalBackgroundHealState.Lock() for _, v := range globalBackgroundHealState.healSeqMap {