mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
Revamp bucket metadata healing (#7208)
Bucket metadata healing in the current code was executed multiple times each time for a given set. Bucket metadata just like objects are hashed in accordance with its name on any given set, to allow hashing to play a role we should let the top level code decide where to navigate. Current code also had 3 bucket metadata files hardcoded, whereas we should make it generic by listing and navigating the .minio.sys to heal such objects. We also had another bug where due to isObjectDangling changes without pre-existing bucket metadata files, we were erroneously reporting it as grey/corrupted objects. This PR fixes all of the above items.
This commit is contained in:
parent
9600e2b35e
commit
082f777281
@ -546,8 +546,11 @@ func (h *healSequence) traverseAndHeal() {
|
||||
// Start with format healing
|
||||
checkErr(h.healDiskFormat)
|
||||
|
||||
// Start healing the config.
|
||||
checkErr(h.healConfig)
|
||||
// Start healing the config prefix.
|
||||
checkErr(h.healMinioSysMeta(minioConfigPrefix))
|
||||
|
||||
// Start healing the bucket config prefix.
|
||||
checkErr(h.healMinioSysMeta(bucketConfigPrefix))
|
||||
|
||||
// Heal buckets and objects
|
||||
checkErr(h.healBuckets)
|
||||
@ -559,63 +562,65 @@ func (h *healSequence) traverseAndHeal() {
|
||||
close(h.traverseAndHealDoneCh)
|
||||
}
|
||||
|
||||
// healConfig - heals config.json, retrun value indicates if a failure occurred.
|
||||
func (h *healSequence) healConfig() error {
|
||||
// Get current object layer instance.
|
||||
objectAPI := newObjectLayerFn()
|
||||
if objectAPI == nil {
|
||||
return errServerNotInitialized
|
||||
// healMinioSysMeta - heals all files under a given meta prefix, returns a function
|
||||
// which in-turn heals the respective meta directory path and any files in int.
|
||||
func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
|
||||
return func() error {
|
||||
// Get current object layer instance.
|
||||
objectAPI := newObjectLayerFn()
|
||||
if objectAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
// NOTE: Healing on meta is run regardless
|
||||
// of any bucket being selected, this is to ensure that
|
||||
// meta are always upto date and correct.
|
||||
marker := ""
|
||||
isTruncated := true
|
||||
for isTruncated {
|
||||
if globalHTTPServer != nil {
|
||||
// Wait at max 1 minute for an inprogress request
|
||||
// before proceeding to heal
|
||||
waitCount := 60
|
||||
// Any requests in progress, delay the heal.
|
||||
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Lists all objects under `config` prefix.
|
||||
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, minioMetaBucket, metaPrefix,
|
||||
marker, "", 1000)
|
||||
if err != nil {
|
||||
return errFnHealFromAPIErr(h.ctx, err)
|
||||
}
|
||||
|
||||
for index := range objectInfos.Objects {
|
||||
if h.isQuitting() {
|
||||
return errHealStopSignalled
|
||||
}
|
||||
o := objectInfos.Objects[index]
|
||||
res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove)
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted we ignore this file an move on.
|
||||
if isErrObjectNotFound(herr) {
|
||||
continue
|
||||
}
|
||||
if herr != nil {
|
||||
return herr
|
||||
}
|
||||
res.Type = madmin.HealItemBucketMetadata
|
||||
if err = h.pushHealResultItem(res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
isTruncated = objectInfos.IsTruncated
|
||||
marker = objectInfos.NextMarker
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NOTE: Healing on configs is run regardless
|
||||
// of any bucket being selected, this is to ensure that
|
||||
// configs are always uptodate and correct.
|
||||
marker := ""
|
||||
isTruncated := true
|
||||
for isTruncated {
|
||||
if globalHTTPServer != nil {
|
||||
// Wait at max 1 minute for an inprogress request
|
||||
// before proceeding to heal
|
||||
waitCount := 60
|
||||
// Any requests in progress, delay the heal.
|
||||
for globalHTTPServer.GetRequestCount() > 2 && waitCount > 0 {
|
||||
waitCount--
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Lists all objects under `config` prefix.
|
||||
objectInfos, err := objectAPI.ListObjectsHeal(h.ctx, minioMetaBucket, minioConfigPrefix,
|
||||
marker, "", 1000)
|
||||
if err != nil {
|
||||
return errFnHealFromAPIErr(h.ctx, err)
|
||||
}
|
||||
|
||||
for index := range objectInfos.Objects {
|
||||
if h.isQuitting() {
|
||||
return errHealStopSignalled
|
||||
}
|
||||
o := objectInfos.Objects[index]
|
||||
res, herr := objectAPI.HealObject(h.ctx, o.Bucket, o.Name, h.settings.DryRun, h.settings.Remove)
|
||||
// Object might have been deleted, by the time heal
|
||||
// was attempted we ignore this file an move on.
|
||||
if isErrObjectNotFound(herr) {
|
||||
continue
|
||||
}
|
||||
if herr != nil {
|
||||
return herr
|
||||
}
|
||||
res.Type = madmin.HealItemBucketMetadata
|
||||
if err = h.pushHealResultItem(res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
isTruncated = objectInfos.IsTruncated
|
||||
marker = objectInfos.NextMarker
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// healDiskFormat - heals format.json, return value indicates if a
|
||||
@ -692,18 +697,16 @@ func (h *healSequence) healBucket(bucket string) error {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
results, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove)
|
||||
// push any available results before checking for error
|
||||
for _, result := range results {
|
||||
if perr := h.pushHealResultItem(result); perr != nil {
|
||||
return perr
|
||||
}
|
||||
}
|
||||
result, err := objectAPI.HealBucket(h.ctx, bucket, h.settings.DryRun, h.settings.Remove)
|
||||
// handle heal-bucket error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = h.pushHealResultItem(result); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !h.settings.Recursive {
|
||||
if h.objPrefix != "" {
|
||||
// Check if an object named as the objPrefix exists,
|
||||
|
@ -119,7 +119,7 @@ func (api *DummyObjectLayer) HealFormat(ctx context.Context, dryRun bool) (item
|
||||
return
|
||||
}
|
||||
|
||||
func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (items []madmin.HealResultItem, err error) {
|
||||
func (api *DummyObjectLayer) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (items madmin.HealResultItem, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1252,10 +1252,10 @@ func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryR
|
||||
}
|
||||
|
||||
// HealBucket - no-op for fs, Valid only for XL.
|
||||
func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem,
|
||||
func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem,
|
||||
error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return nil, NotImplemented{}
|
||||
return madmin.HealResultItem{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListObjectsHeal - list all objects to be healed. Valid only for XL
|
||||
|
@ -92,8 +92,8 @@ func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin
|
||||
}
|
||||
|
||||
// HealBucket - Not implemented stub
|
||||
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error) {
|
||||
return nil, NotImplemented{}
|
||||
func (a GatewayUnsupported) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
|
||||
return madmin.HealResultItem{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// ListBucketsHeal - Not implemented stub
|
||||
|
@ -83,7 +83,7 @@ type ObjectLayer interface {
|
||||
// Healing operations.
|
||||
ReloadFormat(ctx context.Context, dryRun bool) error
|
||||
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
|
||||
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) ([]madmin.HealResultItem, error)
|
||||
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
|
||||
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool) (madmin.HealResultItem, error)
|
||||
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
|
||||
ListObjectsHeal(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
|
||||
|
@ -1232,15 +1232,15 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
|
||||
}
|
||||
|
||||
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
|
||||
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (results []madmin.HealResultItem, err error) {
|
||||
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (result madmin.HealResultItem, err error) {
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
|
||||
return nil, err
|
||||
return result, err
|
||||
}
|
||||
defer bucketLock.Unlock()
|
||||
|
||||
// Initialize heal result info
|
||||
res := madmin.HealResultItem{
|
||||
result = madmin.HealResultItem{
|
||||
Type: madmin.HealItemBucket,
|
||||
Bucket: bucket,
|
||||
DiskCount: s.setCount * s.drivesPerSet,
|
||||
@ -1248,25 +1248,22 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
|
||||
}
|
||||
|
||||
for _, s := range s.sets {
|
||||
var setResults []madmin.HealResultItem
|
||||
setResults, _ = s.HealBucket(ctx, bucket, dryRun, remove)
|
||||
for _, setResult := range setResults {
|
||||
if setResult.Type == madmin.HealItemBucket {
|
||||
for _, v := range setResult.Before.Drives {
|
||||
res.Before.Drives = append(res.Before.Drives, v)
|
||||
}
|
||||
for _, v := range setResult.After.Drives {
|
||||
res.After.Drives = append(res.After.Drives, v)
|
||||
}
|
||||
continue
|
||||
}
|
||||
results = append(results, setResult)
|
||||
var healResult madmin.HealResultItem
|
||||
healResult, err = s.HealBucket(ctx, bucket, dryRun, remove)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
for _, v := range healResult.Before.Drives {
|
||||
result.Before.Drives = append(result.Before.Drives, v)
|
||||
}
|
||||
for _, v := range healResult.After.Drives {
|
||||
result.After.Drives = append(result.After.Drives, v)
|
||||
}
|
||||
}
|
||||
|
||||
for _, endpoint := range s.endpoints {
|
||||
var foundBefore bool
|
||||
for _, v := range res.Before.Drives {
|
||||
for _, v := range result.Before.Drives {
|
||||
if endpoint.IsLocal {
|
||||
if v.Endpoint == endpoint.Path {
|
||||
foundBefore = true
|
||||
@ -1278,14 +1275,14 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
|
||||
}
|
||||
}
|
||||
if !foundBefore {
|
||||
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
|
||||
result.Before.Drives = append(result.Before.Drives, madmin.HealDriveInfo{
|
||||
UUID: "",
|
||||
Endpoint: endpoint.String(),
|
||||
State: madmin.DriveStateOffline,
|
||||
})
|
||||
}
|
||||
var foundAfter bool
|
||||
for _, v := range res.After.Drives {
|
||||
for _, v := range result.After.Drives {
|
||||
if endpoint.IsLocal {
|
||||
if v.Endpoint == endpoint.Path {
|
||||
foundAfter = true
|
||||
@ -1297,7 +1294,7 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
|
||||
}
|
||||
}
|
||||
if !foundAfter {
|
||||
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
|
||||
result.After.Drives = append(result.After.Drives, madmin.HealDriveInfo{
|
||||
UUID: "",
|
||||
Endpoint: endpoint.String(),
|
||||
State: madmin.DriveStateOffline,
|
||||
@ -1306,14 +1303,12 @@ func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove b
|
||||
}
|
||||
|
||||
// Check if we had quorum to write, if not return an appropriate error.
|
||||
_, afterDriveOnline := res.GetOnlineCounts()
|
||||
_, afterDriveOnline := result.GetOnlineCounts()
|
||||
if afterDriveOnline < ((s.setCount*s.drivesPerSet)/2)+1 {
|
||||
return nil, toObjectErr(errXLWriteQuorum, bucket)
|
||||
return result, toObjectErr(errXLWriteQuorum, bucket)
|
||||
}
|
||||
|
||||
results = append(results, res)
|
||||
|
||||
return results, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// HealObject - heals inconsistent object on a hashedSet based on object name.
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -42,7 +41,7 @@ func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRes
|
||||
// also heals the missing entries for bucket metadata files
|
||||
// `policy.json, notification.xml, listeners.json`.
|
||||
func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (
|
||||
results []madmin.HealResultItem, err error) {
|
||||
result madmin.HealResultItem, err error) {
|
||||
|
||||
storageDisks := xl.getDisks()
|
||||
|
||||
@ -50,17 +49,7 @@ func (xl xlObjects) HealBucket(ctx context.Context, bucket string, dryRun, remov
|
||||
writeQuorum := len(storageDisks)/2 + 1
|
||||
|
||||
// Heal bucket.
|
||||
var result madmin.HealResultItem
|
||||
result, err = healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
results = append(results, result)
|
||||
|
||||
// Proceed to heal bucket metadata.
|
||||
metaResults, err := healBucketMetadata(xl, bucket, dryRun, remove)
|
||||
results = append(results, metaResults...)
|
||||
return results, err
|
||||
return healBucket(ctx, storageDisks, bucket, writeQuorum, dryRun)
|
||||
}
|
||||
|
||||
// Heal bucket - create buckets on disks where it does not exist.
|
||||
@ -157,51 +146,6 @@ func healBucket(ctx context.Context, storageDisks []StorageAPI, bucket string, w
|
||||
return res, reducedErr
|
||||
}
|
||||
|
||||
// Heals all the metadata associated for a given bucket, this function
|
||||
// heals `policy.json`, `notification.xml` and `listeners.json`.
|
||||
func healBucketMetadata(xl xlObjects, bucket string, dryRun, remove bool) (
|
||||
results []madmin.HealResultItem, err error) {
|
||||
|
||||
healBucketMetaFn := func(metaPath string) error {
|
||||
reqInfo := &logger.ReqInfo{BucketName: bucket}
|
||||
ctx := logger.SetReqInfo(context.Background(), reqInfo)
|
||||
result, healErr := xl.HealObject(ctx, minioMetaBucket, metaPath, dryRun, remove)
|
||||
// If object is not found, skip the file.
|
||||
if isErrObjectNotFound(healErr) {
|
||||
return nil
|
||||
}
|
||||
if healErr != nil {
|
||||
return healErr
|
||||
}
|
||||
result.Type = madmin.HealItemBucketMetadata
|
||||
results = append(results, result)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Heal `policy.json` for missing entries, ignores if
|
||||
// `policy.json` is not found.
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, bucketPolicyConfig)
|
||||
err = healBucketMetaFn(policyPath)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
// Heal `notification.xml` for missing entries, ignores if
|
||||
// `notification.xml` is not found.
|
||||
nConfigPath := path.Join(bucketConfigPrefix, bucket,
|
||||
bucketNotificationConfig)
|
||||
err = healBucketMetaFn(nConfigPath)
|
||||
if err != nil {
|
||||
return results, err
|
||||
}
|
||||
|
||||
// Heal `listeners.json` for missing entries, ignores if
|
||||
// `listeners.json` is not found.
|
||||
lConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
err = healBucketMetaFn(lConfigPath)
|
||||
return results, err
|
||||
}
|
||||
|
||||
// listAllBuckets lists all buckets from all disks. It also
|
||||
// returns the occurrence of each buckets in all disks
|
||||
func listAllBuckets(storageDisks []StorageAPI) (buckets map[string]VolInfo,
|
||||
|
Loading…
x
Reference in New Issue
Block a user