mirror of
https://github.com/minio/minio.git
synced 2025-11-09 05:34:56 -05:00
heal: Add healing support for bucket, bucket metadata files. (#3252)
This patch implements healing in general but it is only used as part of quickHeal(). Fixes #3237
This commit is contained in:
@@ -16,9 +16,47 @@
|
||||
|
||||
package cmd
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Heals a bucket if it doesn't exist on one of the disks.
|
||||
// healFormatXL - heals missing `format.json` on freshly or corrupted
|
||||
// disks (missing format.json but does have erasure coded data in it).
|
||||
func healFormatXL(storageDisks []StorageAPI) (err error) {
|
||||
// Attempt to load all `format.json`.
|
||||
formatConfigs, sErrs := loadAllFormats(storageDisks)
|
||||
|
||||
// Generic format check validates
|
||||
// if (no quorum) return error
|
||||
// if (disks not recognized) // Always error.
|
||||
if err = genericFormatCheck(formatConfigs, sErrs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Handles different cases properly.
|
||||
switch reduceFormatErrs(sErrs, len(storageDisks)) {
|
||||
case errCorruptedFormat:
|
||||
if err = healFormatXLCorruptedDisks(storageDisks); err != nil {
|
||||
return fmt.Errorf("Unable to repair corrupted format, %s", err)
|
||||
}
|
||||
case errSomeDiskUnformatted:
|
||||
// All drives online but some report missing format.json.
|
||||
if err = healFormatXLFreshDisks(storageDisks); err != nil {
|
||||
// There was an unexpected unrecoverable error during healing.
|
||||
return fmt.Errorf("Unable to heal backend %s", err)
|
||||
}
|
||||
case errSomeDiskOffline:
|
||||
// FIXME: in future.
|
||||
return fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Heals a bucket if it doesn't exist on one of the disks, additionally
|
||||
// also heals the missing entries for bucket metadata files
|
||||
// `policy.json, notification.xml, listeners.json`.
|
||||
func (xl xlObjects) HealBucket(bucket string) error {
|
||||
// Verify if bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
@@ -30,8 +68,17 @@ func (xl xlObjects) HealBucket(bucket string) error {
|
||||
return traceError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
|
||||
// Heal bucket - create buckets on disks where it does not exist.
|
||||
// Heal bucket.
|
||||
if err := healBucket(xl.storageDisks, bucket, xl.writeQuorum); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Proceed to heal bucket metadata.
|
||||
return healBucketMetadata(xl.storageDisks, bucket)
|
||||
}
|
||||
|
||||
func healBucket(storageDisks []StorageAPI, bucket string, writeQuorum int) error {
|
||||
// Heal bucket - create buckets on disks where it does not exist.
|
||||
bucketLock := nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock.Lock()
|
||||
defer bucketLock.Unlock()
|
||||
@@ -40,10 +87,10 @@ func (xl xlObjects) HealBucket(bucket string) error {
|
||||
var wg = &sync.WaitGroup{}
|
||||
|
||||
// Initialize list of errors.
|
||||
var dErrs = make([]error, len(xl.storageDisks))
|
||||
var dErrs = make([]error, len(storageDisks))
|
||||
|
||||
// Make a volume entry on all underlying storage disks.
|
||||
for index, disk := range xl.storageDisks {
|
||||
for index, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
dErrs[index] = traceError(errDiskNotFound)
|
||||
continue
|
||||
@@ -68,9 +115,9 @@ func (xl xlObjects) HealBucket(bucket string) error {
|
||||
wg.Wait()
|
||||
|
||||
// Do we have write quorum?.
|
||||
if !isDiskQuorum(dErrs, xl.writeQuorum) {
|
||||
if !isDiskQuorum(dErrs, writeQuorum) {
|
||||
// Purge successfully created buckets if we don't have writeQuorum.
|
||||
xl.undoMakeBucket(bucket)
|
||||
undoMakeBucket(storageDisks, bucket)
|
||||
return toObjectErr(traceError(errXLWriteQuorum), bucket)
|
||||
}
|
||||
|
||||
@@ -85,26 +132,101 @@ func (xl xlObjects) HealBucket(bucket string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealObject heals a given object for all its missing entries.
|
||||
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
|
||||
// up again, heal on the object should delete it.
|
||||
func (xl xlObjects) HealObject(bucket, object string) error {
|
||||
// Verify if bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return traceError(BucketNameInvalid{Bucket: bucket})
|
||||
// Heals all the metadata associated for a given bucket, this function
|
||||
// heals `policy.json`, `notification.xml` and `listeners.json`.
|
||||
func healBucketMetadata(storageDisks []StorageAPI, bucket string) error {
|
||||
healBucketMetaFn := func(metaPath string) error {
|
||||
metaLock := nsMutex.NewNSLock(minioMetaBucket, metaPath)
|
||||
metaLock.RLock()
|
||||
defer metaLock.RUnlock()
|
||||
// Heals the metaPath.
|
||||
if err := healObject(storageDisks, minioMetaBucket, metaPath); err != nil && !isErrObjectNotFound(err) {
|
||||
return err
|
||||
} // Success.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify if object is valid.
|
||||
if !IsValidObjectName(object) {
|
||||
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
// Heal `policy.json` for missing entries, ignores if `policy.json` is not found.
|
||||
policyPath := pathJoin(bucketConfigPrefix, bucket, policyJSON)
|
||||
if err := healBucketMetaFn(policyPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
// Heal `notification.xml` for missing entries, ignores if `notification.xml` is not found.
|
||||
nConfigPath := path.Join(bucketConfigPrefix, bucket, bucketNotificationConfig)
|
||||
if err := healBucketMetaFn(nConfigPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
|
||||
// Heal `listeners.json` for missing entries, ignores if `listeners.json` is not found.
|
||||
lConfigPath := path.Join(bucketConfigPrefix, bucket, bucketListenerConfig)
|
||||
return healBucketMetaFn(lConfigPath)
|
||||
}
|
||||
|
||||
// listBucketNames list all bucket names from all disks to heal.
|
||||
func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}, err error) {
|
||||
bucketNames = make(map[string]struct{})
|
||||
for _, disk := range storageDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
var volsInfo []VolInfo
|
||||
volsInfo, err = disk.ListVols()
|
||||
if err == nil {
|
||||
for _, volInfo := range volsInfo {
|
||||
// StorageAPI can send volume names which are
|
||||
// incompatible with buckets, handle it and skip them.
|
||||
if !IsValidBucketName(volInfo.Name) {
|
||||
continue
|
||||
}
|
||||
// Ignore the volume special bucket.
|
||||
if volInfo.Name == minioMetaBucket {
|
||||
continue
|
||||
}
|
||||
bucketNames[volInfo.Name] = struct{}{}
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Ignore any disks not found.
|
||||
if isErrIgnored(err, bucketMetadataOpIgnoredErrs) {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return bucketNames, err
|
||||
}
|
||||
|
||||
// This function is meant for all the healing that needs to be done
|
||||
// during startup i.e healing of buckets, bucket metadata (policy.json,
|
||||
// notification.xml, listeners.json) etc. Currently this function
|
||||
// supports quick healing of buckets, bucket metadata.
|
||||
//
|
||||
// TODO :-
|
||||
// - add support for healing dangling `uploads.json`.
|
||||
// - add support for healing dangling `xl.json`.
|
||||
func quickHeal(storageDisks []StorageAPI, writeQuorum int) error {
|
||||
// List all bucket names from all disks.
|
||||
bucketNames, err := listBucketNames(storageDisks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// All bucket names and bucket metadata should be healed.
|
||||
for bucketName := range bucketNames {
|
||||
// Heal bucket and then proceed to heal bucket metadata.
|
||||
if err = healBucket(storageDisks, bucketName, writeQuorum); err == nil {
|
||||
if err = healBucketMetadata(storageDisks, bucketName); err == nil {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Heals an object only the corrupted/missing erasure blocks.
|
||||
func healObject(storageDisks []StorageAPI, bucket string, object string) error {
|
||||
partsMetadata, errs := readAllXLMetadata(storageDisks, bucket, object)
|
||||
if err := reduceErrs(errs, nil); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -115,9 +237,9 @@ func (xl xlObjects) HealObject(bucket, object string) error {
|
||||
}
|
||||
|
||||
// List of disks having latest version of the object.
|
||||
latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
|
||||
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
||||
// List of disks having outdated version of the object or missing object.
|
||||
outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
|
||||
outDatedDisks := outDatedDisks(storageDisks, partsMetadata, errs)
|
||||
// Latest xlMetaV1 for reference.
|
||||
latestMeta := pickValidXLMeta(partsMetadata, modTime)
|
||||
|
||||
@@ -217,3 +339,27 @@ func (xl xlObjects) HealObject(bucket, object string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealObject heals a given object for all its missing entries.
|
||||
// FIXME: If an object object was deleted and one disk was down,
|
||||
// and later the disk comes back up again, heal on the object
|
||||
// should delete it.
|
||||
func (xl xlObjects) HealObject(bucket, object string) error {
|
||||
// Verify if bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return traceError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
|
||||
// Verify if object is valid.
|
||||
if !IsValidObjectName(object) {
|
||||
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := nsMutex.NewNSLock(bucket, object)
|
||||
objectLock.RLock()
|
||||
defer objectLock.RUnlock()
|
||||
|
||||
// Heal the object.
|
||||
return healObject(xl.storageDisks, bucket, object)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user