Heal corrupted formats of disks already containing objects (#2297)

This commit is contained in:
Anis Elleuch 2016-07-29 01:49:59 +02:00 committed by Harshavardhana
parent f239fcac67
commit 14cefd352c
3 changed files with 325 additions and 3 deletions

View File

@ -118,13 +118,19 @@ var errDiskOrderMismatch = errors.New("disk order mismatch")
func reduceFormatErrs(errs []error, diskCount int) (err error) { func reduceFormatErrs(errs []error, diskCount int) (err error) {
var errUnformattedDiskCount = 0 var errUnformattedDiskCount = 0
var errDiskNotFoundCount = 0 var errDiskNotFoundCount = 0
var errCorruptedFormatCount = 0
for _, dErr := range errs { for _, dErr := range errs {
if dErr == errUnformattedDisk { if dErr == errUnformattedDisk {
errUnformattedDiskCount++ errUnformattedDiskCount++
} else if dErr == errDiskNotFound { } else if dErr == errDiskNotFound {
errDiskNotFoundCount++ errDiskNotFoundCount++
} else if dErr == errCorruptedFormat {
errCorruptedFormatCount++
} }
} }
if errCorruptedFormatCount > 0 {
return errCorruptedFormat
}
// Unformatted disks found, we need to figure out if any disks are offline. // Unformatted disks found, we need to figure out if any disks are offline.
if errUnformattedDiskCount > 0 { if errUnformattedDiskCount > 0 {
// Returns errUnformattedDisk if all disks report unFormattedDisk. // Returns errUnformattedDisk if all disks report unFormattedDisk.
@ -222,8 +228,8 @@ func genericFormatCheck(formatConfigs []*formatConfigV1, sErrs []error) (err err
return errXLReadQuorum return errXLReadQuorum
} }
// One of the disk has corrupt format, return error. // Check if number of corrupted format under quorum
if errCorruptFormatCount > 0 { if errCorruptFormatCount > len(formatConfigs)-readQuorum {
return errCorruptedFormat return errCorruptedFormat
} }
@ -522,6 +528,180 @@ func healFormatXLFreshDisks(storageDisks []StorageAPI) error {
return saveFormatXL(orderedDisks, newFormatConfigs) return saveFormatXL(orderedDisks, newFormatConfigs)
} }
// Disks from storageDiks are put in assignedDisks if found in orderedDisks and in unAssignedDisks otherwise
func splitDisksByUse(storageDisks, orderedDisks []StorageAPI) (assignedDisks []StorageAPI, unAssignedDisks []StorageAPI) {
// Populate unAssignDisks
for i := range storageDisks {
found := false
for j := range orderedDisks {
if storageDisks[i] == orderedDisks[j] {
found = true
assignedDisks = append(assignedDisks, storageDisks[i])
break
}
}
if !found {
unAssignedDisks = append(unAssignedDisks, storageDisks[i])
}
}
return assignedDisks, unAssignedDisks
}
// Inspect the content of all disks to guess the right order according to the format files.
// The right order is represented in orderedDisks
func reorderDisksByInspection(orderedDisks, storageDisks []StorageAPI, formatConfigs []*formatConfigV1) ([]StorageAPI, error) {
for index, format := range formatConfigs {
if format != nil {
continue
}
vols, err := storageDisks[index].ListVols()
if err != nil {
return nil, err
}
if len(vols) == 0 {
continue
}
objects, err := storageDisks[index].ListDir(vols[0].Name, "")
if err != nil {
return nil, err
}
if len(objects) == 0 {
continue
}
xlData, err := readXLMeta(storageDisks[index], vols[0].Name, objects[0])
if err != nil {
if err == errFileNotFound {
continue
}
return nil, err
}
diskIndex := -1
for i, d := range xlData.Erasure.Distribution {
if d == xlData.Erasure.Index {
diskIndex = i
}
}
// Check for found results
if diskIndex == -1 || orderedDisks[diskIndex] != nil {
// Some inconsistent data are found, exit immediately.
return nil, errCorruptedFormat
}
orderedDisks[diskIndex] = storageDisks[index]
}
return orderedDisks, nil
}
// Heals corrupted format json in all disks
func healFormatXLCorruptedDisks(storageDisks []StorageAPI) error {
formatConfigs := make([]*formatConfigV1, len(storageDisks))
var referenceConfig *formatConfigV1
// Loads `format.json` from all disks.
for index, disk := range storageDisks {
// Disk not found or ignored is a valid case.
if disk == nil {
// Return nil, one of the disk is offline.
return nil
}
formatXL, err := loadFormat(disk)
if err != nil {
if err == errUnformattedDisk || err == errCorruptedFormat {
// format.json is missing or corrupted, should be healed.
continue
} else if err == errDiskNotFound { // Is a valid case we
// can proceed without healing.
return nil
}
// Return error for unsupported errors.
return err
} // Success.
formatConfigs[index] = formatXL
}
// All `format.json` has been read successfully, previously completed.
if isFormatFound(formatConfigs) {
// Return success.
return nil
}
// All disks are fresh, format.json will be written by initFormatXL()
if isFormatNotFound(formatConfigs) {
return initFormatXL(storageDisks)
}
// Validate format configs for consistency in JBOD and disks.
if err := checkFormatXL(formatConfigs); err != nil {
return err
}
if referenceConfig == nil {
// This config will be used to update the drives missing format.json.
for _, formatConfig := range formatConfigs {
if formatConfig == nil {
continue
}
referenceConfig = formatConfig
break
}
}
// Collect new JBOD.
newJBOD := referenceConfig.XL.JBOD
// Reorder the disks based on the JBOD order.
orderedDisks, err := reorderDisks(storageDisks, formatConfigs)
if err != nil {
return err
}
// From ordered disks fill the UUID position.
for index, disk := range orderedDisks {
if disk == nil {
newJBOD[index] = getUUID()
}
}
// For disks with corrupted formats, inspect the disks contents to guess the disks order
orderedDisks, err = reorderDisksByInspection(orderedDisks, storageDisks, formatConfigs)
if err != nil {
return err
}
// At this stage, all disks with corrupted formats but with objects inside found their way.
// Now take care of unformatted disks, which are the `unAssignedDisks`
_, unAssignedDisks := splitDisksByUse(storageDisks, orderedDisks)
// Assign unassigned disks to nil elements in orderedDisks
for i, disk := range orderedDisks {
if disk == nil && len(unAssignedDisks) > 0 {
orderedDisks[i] = unAssignedDisks[0]
unAssignedDisks = unAssignedDisks[1:]
}
}
// Collect new format configs.
var newFormatConfigs = make([]*formatConfigV1, len(orderedDisks))
// Collect new format configs that need to be written.
for index := range orderedDisks {
// New configs are generated since we are going
// to re-populate across all disks.
config := &formatConfigV1{
Version: referenceConfig.Version,
Format: referenceConfig.Format,
XL: &xlFormat{
Version: referenceConfig.XL.Version,
Disk: newJBOD[index],
JBOD: newJBOD,
},
}
newFormatConfigs[index] = config
}
// Save new `format.json` across all disks, in JBOD order.
return saveFormatXL(orderedDisks, newFormatConfigs)
}
// loadFormatXL - loads XL `format.json` and returns back properly // loadFormatXL - loads XL `format.json` and returns back properly
// ordered storage slice based on `format.json`. // ordered storage slice based on `format.json`.
func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) {

View File

@ -16,7 +16,10 @@
package main package main
import "testing" import (
"bytes"
"testing"
)
// generates a valid format.json for XL backend. // generates a valid format.json for XL backend.
func genFormatXLValid() []*formatConfigV1 { func genFormatXLValid() []*formatConfigV1 {
@ -142,6 +145,141 @@ func genFormatXLInvalidDisksOrder() []*formatConfigV1 {
return formatConfigs return formatConfigs
} }
// Simulate XL disks creation, delete some format.json and remove the content of
// a given disk to test healing a corrupted disk
func TestFormatXLHeal(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Fatal(err)
}
xl := obj.(xlObjects)
err = obj.MakeBucket("bucket")
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
_, err = obj.PutObject(bucket, object, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil)
if err != nil {
t.Fatal(err)
}
// Now, remove two format files.. Load them and reorder
if err = xl.storageDisks[3].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[11].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
// Remove the content of export dir 10 but preserve .minio.sys because it is automatically
// created when minio starts
if err = xl.storageDisks[10].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[10].DeleteFile(".minio.sys", "tmp"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[10].DeleteFile(bucket, object+"/xl.json"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[10].DeleteFile(bucket, object+"/part.1"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[10].DeleteVol(bucket); err != nil {
t.Fatal(err)
}
permutedStorageDisks := []StorageAPI{xl.storageDisks[1], xl.storageDisks[4],
xl.storageDisks[2], xl.storageDisks[8], xl.storageDisks[6], xl.storageDisks[7],
xl.storageDisks[0], xl.storageDisks[15], xl.storageDisks[13], xl.storageDisks[14],
xl.storageDisks[3], xl.storageDisks[10], xl.storageDisks[12], xl.storageDisks[9],
xl.storageDisks[5], xl.storageDisks[11]}
// Start healing disks
err = healFormatXLCorruptedDisks(permutedStorageDisks)
if err != nil {
t.Fatal("healing corrupted disk failed: ", err)
}
// Load again XL format.json to validate it
_, err = loadFormatXL(permutedStorageDisks)
if err != nil {
t.Fatal("loading healed disk failed: ", err)
}
// Clean all
removeRoots(fsDirs)
}
// Test on ReorderByInspection by simulating creating disks and removing
// some of format.json
func TestFormatXLReorderByInspection(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Fatal(err)
}
xl := obj.(xlObjects)
err = obj.MakeBucket("bucket")
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
_, err = obj.PutObject(bucket, object, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil)
if err != nil {
t.Fatal(err)
}
// Now, remove two format files.. Load them and reorder
if err = xl.storageDisks[3].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[5].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
permutedStorageDisks := []StorageAPI{xl.storageDisks[1], xl.storageDisks[4],
xl.storageDisks[2], xl.storageDisks[8], xl.storageDisks[6], xl.storageDisks[7],
xl.storageDisks[0], xl.storageDisks[15], xl.storageDisks[13], xl.storageDisks[14],
xl.storageDisks[3], xl.storageDisks[10], xl.storageDisks[12], xl.storageDisks[9],
xl.storageDisks[5], xl.storageDisks[11]}
permutedFormatConfigs, _ := loadAllFormats(permutedStorageDisks)
orderedDisks, err := reorderDisks(permutedStorageDisks, permutedFormatConfigs)
if err != nil {
t.Fatal("error reordering disks\n")
}
orderedDisks, err = reorderDisksByInspection(orderedDisks, permutedStorageDisks, permutedFormatConfigs)
if err != nil {
t.Fatal("failed to reorder disk by inspection")
}
// Check disks reordering
for i := 0; i <= 15; i++ {
if orderedDisks[i] == nil && i != 3 && i != 5 {
t.Fatal("should not be nil")
}
if orderedDisks[i] != nil && orderedDisks[i] != xl.storageDisks[i] {
t.Fatal("Disks were not ordered correctly.")
}
}
removeRoots(fsDirs)
}
// Wrapper for calling FormatXL tests - currently validates // Wrapper for calling FormatXL tests - currently validates
// - valid format // - valid format
// - unrecognized version number // - unrecognized version number

View File

@ -145,6 +145,10 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
// Handles different cases properly. // Handles different cases properly.
switch reduceFormatErrs(sErrs, len(storageDisks)) { switch reduceFormatErrs(sErrs, len(storageDisks)) {
case errCorruptedFormat:
if err := healFormatXLCorruptedDisks(storageDisks); err != nil {
return nil, fmt.Errorf("Unable to repair corrupted format, %s", err)
}
case errUnformattedDisk: case errUnformattedDisk:
// All drives online but fresh, initialize format. // All drives online but fresh, initialize format.
if err := initFormatXL(storageDisks); err != nil { if err := initFormatXL(storageDisks); err != nil {