fix: heal bucket metadata right before healing bucket (#11097)

optimization mainly to avoid listing the entire
`.minio.sys/buckets/.minio.sys` directory, this
can get really huge and comes in the way of startup
routines, contents inside `.minio.sys/buckets/.minio.sys`
are rather transient and not necessary to be healed.
This commit is contained in:
Harshavardhana 2020-12-13 11:57:08 -08:00 committed by GitHub
parent 705e196b6c
commit 2eb52ca5f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 117 additions and 112 deletions

View File

@ -94,15 +94,15 @@ type allHealState struct {
}
// newHealState - initialize global heal state management
func newHealState() *allHealState {
healState := &allHealState{
func newHealState(cleanup bool) *allHealState {
hstate := &allHealState{
healSeqMap: make(map[string]*healSequence),
healLocalDisks: map[Endpoint]struct{}{},
}
go healState.periodicHealSeqsClean(GlobalContext)
return healState
if cleanup {
go hstate.periodicHealSeqsClean(GlobalContext)
}
return hstate
}
func (ahs *allHealState) healDriveCount() int {
@ -779,13 +779,18 @@ func (h *healSequence) healFromSourceCh() {
}
func (h *healSequence) healDiskMeta(objAPI ObjectLayer) error {
// Start healing the config prefix.
if err := h.healMinioSysMeta(objAPI, minioConfigPrefix)(); err != nil {
// Try to pro-actively heal backend-encrypted file.
if err := h.queueHealTask(healSource{
bucket: minioMetaBucket,
object: backendEncryptedFile,
}, madmin.HealItemBucketMetadata); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
return err
}
}
// Start healing the bucket config prefix.
return h.healMinioSysMeta(objAPI, bucketConfigPrefix)()
// Start healing the config prefix.
return h.healMinioSysMeta(objAPI, minioConfigPrefix)()
}
func (h *healSequence) healItems(objAPI ObjectLayer, bucketsOnly bool) error {

View File

@ -102,7 +102,7 @@ func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
case task.bucket == SlashSeparator:
res, err = healDiskFormat(ctx, objAPI, task.opts)
case task.bucket != "" && task.object == "":
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts.DryRun, task.opts.Remove)
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts)
case task.bucket != "" && task.object != "":
res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts)
}

View File

@ -46,16 +46,7 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
initBackgroundHealing(ctx, objAPI) // start quick background healing
var bgSeq *healSequence
var found bool
for {
bgSeq, found = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if found {
break
}
time.Sleep(time.Second)
}
bgSeq := mustGetHealSequence(ctx)
globalBackgroundHealState.pushHealLocalDisks(getLocalDisksToHeal()...)

View File

@ -440,6 +440,11 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
for index := range buckets {
index := index
g.Go(func() error {
if _, err := objAPI.HealBucket(ctx, buckets[index].Name, madmin.HealOpts{
Remove: true,
}); err != nil {
return err
}
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
if err != nil {
return err

View File

@ -48,6 +48,11 @@ func init() {
rand.Seed(time.Now().UTC().UnixNano())
globalDNSCache = xhttp.NewDNSCache(3*time.Second, 10*time.Second)
initGlobalContext()
globalReplicationState = newReplicationState()
globalTransitionState = newTransitionState()
gob.Register(StorageErr(""))
}

View File

@ -32,11 +32,16 @@ var bucketOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errUnform
var bucketMetadataOpIgnoredErrs = append(bucketOpIgnoredErrs, errVolumeNotFound)
// MakeMultipleBuckets - create a list of buckets
func (er erasureObjects) MakeMultipleBuckets(ctx context.Context, buckets ...string) error {
func (er erasureObjects) MakeMultipleBuckets(ctx context.Context, bucketsInfo ...BucketInfo) error {
storageDisks := er.getDisks()
g := errgroup.WithNErrs(len(storageDisks))
buckets := make([]string, len(bucketsInfo))
for i := range bucketsInfo {
buckets[i] = bucketsInfo[i].Name
}
// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index

View File

@ -32,9 +32,9 @@ import (
// Heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
result madmin.HealResultItem, err error) {
if !dryRun {
if !opts.DryRun {
defer ObjectPathUpdated(bucket)
}
@ -45,7 +45,7 @@ func (er erasureObjects) HealBucket(ctx context.Context, bucket string, dryRun,
writeQuorum := getWriteQuorum(len(storageDisks))
// Heal bucket.
return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, dryRun)
return healBucket(ctx, storageDisks, storageEndpoints, bucket, writeQuorum, opts.DryRun)
}
// Heal bucket - create buckets on disks where it does not exist.

View File

@ -130,7 +130,10 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
// This would create the bucket.
_, err = er.HealBucket(ctx, bucket, false, false)
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
DryRun: false,
Remove: false,
})
if err != nil {
t.Fatal(err)
}

View File

@ -430,7 +430,7 @@ func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloom
return firstErr
}
func (z *erasureServerPools) MakeMultipleBuckets(ctx context.Context, buckets ...string) error {
func (z *erasureServerPools) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error {
g := errgroup.WithNErrs(len(z.serverPools))
// Create buckets in parallel across all sets.
@ -1175,14 +1175,22 @@ func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmi
return r, nil
}
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
var r = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
}
// Ensure heal opts for bucket metadata be deep healed all the time.
opts.ScanMode = madmin.HealDeepScan
// Ignore the results on purpose.
if _, err := z.HealObject(ctx, minioMetaBucket, pathJoin(bucketConfigPrefix, bucket), "", opts); err != nil {
logger.LogIf(ctx, fmt.Errorf("Healing bucket metadata for %s failed", bucket))
}
for _, zone := range z.serverPools {
result, err := zone.HealBucket(ctx, bucket, dryRun, remove)
result, err := zone.HealBucket(ctx, bucket, opts)
if err != nil {
switch err.(type) {
case BucketNotFound:

View File

@ -563,7 +563,7 @@ func (s *erasureSets) Shutdown(ctx context.Context) error {
}
// MakeMultipleBuckets - make many buckets at once.
func (s *erasureSets) MakeMultipleBuckets(ctx context.Context, buckets ...string) error {
func (s *erasureSets) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error {
g := errgroup.WithNErrs(len(s.sets))
// Create buckets in parallel across all sets.
@ -1286,7 +1286,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (result madmin.HealResultItem, err error) {
func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
// Initialize heal result info
result = madmin.HealResultItem{
Type: madmin.HealItemBucket,
@ -1297,7 +1297,7 @@ func (s *erasureSets) HealBucket(ctx context.Context, bucket string, dryRun, rem
for _, s := range s.sets {
var healResult madmin.HealResultItem
healResult, err = s.HealBucket(ctx, bucket, dryRun, remove)
healResult, err = s.HealBucket(ctx, bucket, opts)
if err != nil {
return result, err
}
@ -1333,6 +1333,11 @@ func (s *erasureSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error)
listBuckets = append(listBuckets, BucketInfo(v))
}
sort.Sort(byBucketName(listBuckets))
if err := s.MakeMultipleBuckets(ctx, listBuckets...); err != nil {
return listBuckets, err
}
return listBuckets, nil
}
@ -1387,19 +1392,7 @@ func (s *erasureSets) maintainMRFList() {
// to find objects related to the new disk that needs to be healed.
func (s *erasureSets) healMRFRoutine() {
// Wait until background heal state is initialized
var bgSeq *healSequence
for {
if globalBackgroundHealState == nil {
time.Sleep(time.Second)
continue
}
var ok bool
bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if ok {
break
}
time.Sleep(time.Second)
}
bgSeq := mustGetHealSequence(GlobalContext)
for e := range s.disksConnectEvent {
// Get the list of objects related the er.set

View File

@ -1540,7 +1540,7 @@ func (fs *FSObjects) HealObject(ctx context.Context, bucket, object, versionID s
}
// HealBucket - no-op for fs, Valid only for Erasure.
func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem,
func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem,
error) {
logger.LogIf(ctx, NotImplemented{})
return madmin.HealResultItem{}, NotImplemented{}
@ -1561,10 +1561,9 @@ func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, opt
return NotImplemented{}
}
// ListBucketsHeal - list all buckets to be healed. Valid only for Erasure
// ListBucketsHeal - list all buckets to be healed. Valid only for Erasure, returns ListBuckets() in single drive mode.
func (fs *FSObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
logger.LogIf(ctx, NotImplemented{})
return []BucketInfo{}, NotImplemented{}
return fs.ListBuckets(ctx)
}
// GetMetrics - no op

View File

@ -52,7 +52,7 @@ func (a GatewayUnsupported) SetDriveCount() int {
}
// MakeMultipleBuckets is dummy stub for gateway.
func (a GatewayUnsupported) MakeMultipleBuckets(ctx context.Context, buckets ...string) error {
func (a GatewayUnsupported) MakeMultipleBuckets(ctx context.Context, buckets ...BucketInfo) error {
return NotImplemented{}
}
@ -171,7 +171,7 @@ func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin
}
// HealBucket - Not implemented stub
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return madmin.HealResultItem{}, NotImplemented{}
}

View File

@ -96,41 +96,50 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
}, true
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error {
func mustGetHealSequence(ctx context.Context) *healSequence {
// Get background heal sequence to send elements to heal
var bgSeq *healSequence
var ok bool
for {
bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
if ok {
break
}
select {
case <-ctx.Done():
return nil
case <-time.After(time.Second):
globalHealStateLK.RLock()
hstate := globalBackgroundHealState
globalHealStateLK.RUnlock()
if hstate == nil {
time.Sleep(time.Second)
continue
}
bgSeq, ok := hstate.getHealSequenceByToken(bgHealingUUID)
if !ok {
time.Sleep(time.Second)
continue
}
return bgSeq
}
}
// healErasureSet lists and heals all objects in a specific erasure set
func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error {
bgSeq := mustGetHealSequence(ctx)
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
}, BucketInfo{
Name: pathJoin(minioMetaBucket, bucketConfigPrefix),
}) // add metadata .minio.sys/ bucket prefixes to heal
})
// Try to pro-actively heal backend-encrypted file.
bgSeq.sourceCh <- healSource{
if err := bgSeq.queueHealTask(healSource{
bucket: minioMetaBucket,
object: backendEncryptedFile,
}, madmin.HealItemMetadata); err != nil {
logger.LogIf(ctx, err)
}
// Heal all buckets with all objects
for _, bucket := range buckets {
// Heal current bucket
bgSeq.sourceCh <- healSource{
if err := bgSeq.queueHealTask(healSource{
bucket: bucket.Name,
}, madmin.HealItemBucket); err != nil {
logger.LogIf(ctx, err)
}
var entryChs []FileInfoVersionsCh
@ -165,10 +174,12 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis
}
for _, version := range entry.Versions {
bgSeq.sourceCh <- healSource{
if err := bgSeq.queueHealTask(healSource{
bucket: bucket.Name,
object: version.Name,
versionID: version.VersionID,
}, madmin.HealItemObject); err != nil {
logger.LogIf(ctx, err)
}
}
}

View File

@ -82,7 +82,7 @@ type ObjectLayer interface {
StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) // local queries only local disks
// Bucket operations.
MakeMultipleBuckets(ctx context.Context, buckets ...string) error
MakeMultipleBuckets(ctx context.Context, bucketInfo ...BucketInfo) error
MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error
GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error)
ListBuckets(ctx context.Context) (buckets []BucketInfo, err error)
@ -122,7 +122,7 @@ type ObjectLayer interface {
// Healing operations.
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)

View File

@ -26,6 +26,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
@ -162,7 +163,17 @@ func serverHandleEnvVars() {
handleCommonEnvVars()
}
var globalHealStateLK sync.RWMutex
func newAllSubsystems() {
if globalIsErasure {
globalHealStateLK.Lock()
// New global heal state
globalAllHealState = newHealState(true)
globalBackgroundHealState = newHealState(false)
globalHealStateLK.Unlock()
}
// Create new notification system and initialize notification targets
globalNotificationSys = NewNotificationSys(globalEndpoints)
@ -296,30 +307,13 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
// you want to add extra context to your error. This
// ensures top level retry works accordingly.
// List buckets to heal, and be re-used for loading configs.
var buckets []BucketInfo
rquorum := InsufficientReadQuorum{}
wquorum := InsufficientWriteQuorum{}
if globalIsErasure {
buckets, err = newObject.ListBucketsHeal(ctx)
buckets, err := newObject.ListBucketsHeal(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets to heal: %w", err)
}
bucketNames := make([]string, len(buckets))
for i := range buckets {
bucketNames[i] = buckets[i].Name
}
if err = newObject.MakeMultipleBuckets(ctx, bucketNames...); err != nil {
if errors.As(err, &wquorum) || errors.As(err, &rquorum) {
// Return the error upwards for the caller to retry.
return fmt.Errorf("Unable to heal buckets: %w", err)
}
}
} else {
buckets, err = newObject.ListBuckets(ctx)
if err != nil {
return fmt.Errorf("Unable to list buckets: %w", err)
}
}
// Initialize config system.
if err = globalConfigSys.Init(newObject); err != nil {
@ -416,15 +410,6 @@ func serverMain(ctx *cli.Context) {
// Set system resources to maximum.
setMaxResources()
if globalIsErasure {
// New global heal state
globalAllHealState = newHealState()
globalBackgroundHealState = newHealState()
globalReplicationState = newReplicationState()
globalTransitionState = newTransitionState()
}
// Configure server.
handler, err := configureServerHandler(globalEndpoints)
if err != nil {
@ -471,8 +456,6 @@ func serverMain(ctx *cli.Context) {
logger.SetDeploymentID(globalDeploymentID)
initDataCrawler(GlobalContext, newObject)
// Enable background operations for erasure coding
if globalIsErasure {
initAutoHeal(GlobalContext, newObject)
@ -480,6 +463,8 @@ func serverMain(ctx *cli.Context) {
initBackgroundTransition(GlobalContext, newObject)
}
initDataCrawler(GlobalContext, newObject)
if err = initServer(GlobalContext, newObject); err != nil {
var cerr config.Err
// For any config error, we don't need to drop into safe-mode

View File

@ -45,11 +45,6 @@ var GlobalContext context.Context
// cancelGlobalContext can be used to indicate server shutdown.
var cancelGlobalContext context.CancelFunc
// Initialize service mutex once.
func init() {
initGlobalContext()
}
func initGlobalContext() {
GlobalContext, cancelGlobalContext = context.WithCancel(context.Background())
GlobalServiceDoneCh = GlobalContext.Done()

View File

@ -640,9 +640,9 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien
healthClient.ExpectTimeouts = true
restClient.HealthCheckFn = func() bool {
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
defer cancel()
respBody, err := healthClient.Call(ctx, storageRESTMethodHealth, nil, nil, -1)
xhttp.DrainBody(respBody)
cancel()
return toStorageErr(err) != errDiskNotFound
}
}

View File

@ -428,7 +428,7 @@ func resetGlobalIsErasure() {
func resetGlobalHealState() {
// Init global heal state
if globalAllHealState == nil {
globalAllHealState = newHealState()
globalAllHealState = newHealState(false)
} else {
globalAllHealState.Lock()
for _, v := range globalAllHealState.healSeqMap {
@ -441,7 +441,7 @@ func resetGlobalHealState() {
// Init background heal state
if globalBackgroundHealState == nil {
globalBackgroundHealState = newHealState()
globalBackgroundHealState = newHealState(false)
} else {
globalBackgroundHealState.Lock()
for _, v := range globalBackgroundHealState.healSeqMap {