XL: Simplify heal-format operations

This is in preparation for updated admin heal API.

* Improve case analysis of healFormatXL() - fixes a case where disks
  could have unhandled errors.

* Simplify healFormatXLFreshDisks() and healFormatXLCorruptedDisks()
  to share more code and handle fewer cases for improved simplicity
  and reduced code repetition.

* Fix test cases.
This commit is contained in:
Aditya Manthramurthy 2017-08-03 17:07:02 +05:30 committed by Harshavardhana
parent b10fa507b2
commit 32da1aa9d6
4 changed files with 221 additions and 433 deletions

View File

@ -241,48 +241,32 @@ else // No healing at this point forward, some disks are offline or dead.
fi
*/
// error returned when some disks are found to be unformatted.
var errSomeDiskUnformatted = errors.New("some disks are found to be unformatted")
// error returned when some disks are offline.
var errSomeDiskOffline = errors.New("some disks are offline")
// errDiskOrderMismatch - returned when disk UUID is not in consistent JBOD order.
var errDiskOrderMismatch = errors.New("disk order mismatch")
// Returns error slice into understandable errors.
func reduceFormatErrs(errs []error, diskCount int) (err error) {
var errUnformattedDiskCount = 0
var errDiskNotFoundCount = 0
var errCorruptedFormatCount = 0
for _, dErr := range errs {
if dErr == errUnformattedDisk {
errUnformattedDiskCount++
} else if dErr == errDiskNotFound {
errDiskNotFoundCount++
} else if dErr == errCorruptedFormat {
errCorruptedFormatCount++
// formatErrsSummary - summarizes errors into different classes
func formatErrsSummary(errs []error) (formatCount, unformattedDiskCount,
diskNotFoundCount, corruptedFormatCount, otherErrCount int) {
for _, err := range errs {
switch err {
case errDiskNotFound:
diskNotFoundCount++
case errUnformattedDisk:
unformattedDiskCount++
case errCorruptedFormat:
corruptedFormatCount++
case nil:
// implies that format is not nil
formatCount++
default:
otherErrCount++
}
}
if errCorruptedFormatCount > 0 {
return errCorruptedFormat
}
// Unformatted disks found, we need to figure out if any disks are offline.
if errUnformattedDiskCount > 0 {
// Returns errUnformattedDisk if all disks report unFormattedDisk.
if errUnformattedDiskCount < diskCount {
if errDiskNotFoundCount > 0 {
// Only some disks are fresh but some disks are offline as well.
return errSomeDiskOffline
}
// Some disks are fresh disks an unformatted, not disks are offline.
return errSomeDiskUnformatted
}
// All disks returned unformatted, all disks must be fresh.
return errUnformattedDisk
}
// No unformatted disks found no need to handle disk not found case, return success here.
return nil
return
}
// loadAllFormats - load all format config from all input disks in parallel.
@ -469,29 +453,50 @@ func findDiskIndex(disk string, jbod []string) int {
return -1
}
// reorderDisks - reorder disks in JBOD order.
func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) ([]StorageAPI, error) {
var savedJBOD []string
for _, format := range formatConfigs {
if format == nil {
continue
}
savedJBOD = format.XL.JBOD
// reorderDisks - reorder disks in JBOD order, and return reference
// format-config. If assignUUIDs is true, it assigns UUIDs to disks
// with missing format configurations in the reference configuration.
func reorderDisks(bootstrapDisks []StorageAPI,
formatConfigs []*formatConfigV1, assignUUIDs bool) (*formatConfigV1,
[]StorageAPI, error) {
// Pick first non-nil format-cfg as reference
var refCfg *formatConfigV1
for _, formatConfig := range formatConfigs {
if formatConfig != nil {
refCfg = formatConfig
break
}
// Pick the first JBOD list to verify the order and construct new set of disk slice.
}
if refCfg == nil {
return nil, nil, fmt.Errorf("could not find any valid config")
}
refJBOD := refCfg.XL.JBOD
// construct reordered disk slice
var newDisks = make([]StorageAPI, len(bootstrapDisks))
for fIndex, format := range formatConfigs {
if format == nil {
continue
}
jIndex := findDiskIndex(format.XL.Disk, savedJBOD)
jIndex := findDiskIndex(format.XL.Disk, refJBOD)
if jIndex == -1 {
return nil, errors.New("Unrecognized uuid " + format.XL.Disk + " found")
return nil, nil, errors.New("Unrecognized uuid " + format.XL.Disk + " found")
}
newDisks[jIndex] = bootstrapDisks[fIndex]
}
return newDisks, nil
if assignUUIDs {
// Based on orderedDisks generate new UUIDs in the ref. config
// for disks without format-configs.
for index, disk := range newDisks {
if disk == nil {
refCfg.XL.JBOD[index] = mustGetUUID()
}
}
}
return refCfg, newDisks, nil
}
// loadFormat - loads format.json from disk.
@ -550,81 +555,14 @@ func isFormatFound(formats []*formatConfigV1) bool {
return true
}
// Heals any missing format.json on the drives. Returns error only for unexpected errors
// as regular errors can be ignored since there might be enough quorum to be operational.
// Heals only fresh disks.
func healFormatXLFreshDisks(storageDisks []StorageAPI) error {
formatConfigs := make([]*formatConfigV1, len(storageDisks))
var referenceConfig *formatConfigV1
// Loads `format.json` from all disks.
for index, disk := range storageDisks {
// Disk not found or ignored is a valid case.
if disk == nil {
// Return nil, one of the disk is offline.
return nil
}
formatXL, err := loadFormat(disk)
if err != nil {
if err == errUnformattedDisk {
// format.json is missing, should be healed.
continue
} else if err == errDiskNotFound { // Is a valid case we
// can proceed without healing.
return nil
}
// Return error for unsupported errors.
return err
} // Success.
formatConfigs[index] = formatXL
}
// All `format.json` has been read successfully, previously completed.
if isFormatFound(formatConfigs) {
// Return success.
return nil
}
// All disks are fresh, format.json will be written by initFormatXL()
if isFormatNotFound(formatConfigs) {
return initFormatXL(storageDisks)
}
// Validate format configs for consistency in JBOD and disks.
if err := checkFormatXL(formatConfigs); err != nil {
return err
}
if referenceConfig == nil {
// This config will be used to update the drives missing format.json.
for _, formatConfig := range formatConfigs {
if formatConfig == nil {
continue
}
referenceConfig = formatConfig
break
}
}
// Collect new JBOD.
newJBOD := referenceConfig.XL.JBOD
// Reorder the disks based on the JBOD order.
orderedDisks, err := reorderDisks(storageDisks, formatConfigs)
if err != nil {
return err
}
// From ordered disks fill the UUID position.
for index, disk := range orderedDisks {
if disk == nil {
newJBOD[index] = mustGetUUID()
}
}
// Collect new format configs.
var newFormatConfigs = make([]*formatConfigV1, len(orderedDisks))
// collectNSaveNewFormatConfigs - creates new format configs based on
// the reference config and saves it on all disks, this is to be
// called from healFormatXL* functions.
func collectNSaveNewFormatConfigs(referenceConfig *formatConfigV1,
orderedDisks []StorageAPI) error {
// Collect new format configs that need to be written.
var newFormatConfigs = make([]*formatConfigV1, len(orderedDisks))
for index := range orderedDisks {
// New configs are generated since we are going
// to re-populate across all disks.
@ -633,13 +571,35 @@ func healFormatXLFreshDisks(storageDisks []StorageAPI) error {
Format: referenceConfig.Format,
XL: &xlFormat{
Version: referenceConfig.XL.Version,
Disk: newJBOD[index],
JBOD: newJBOD,
Disk: referenceConfig.XL.JBOD[index],
JBOD: referenceConfig.XL.JBOD,
},
}
newFormatConfigs[index] = config
}
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolume(orderedDisks); err != nil {
return fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Save new `format.json` across all disks, in JBOD order.
return saveFormatXL(orderedDisks, newFormatConfigs)
}
// Heals any missing format.json on the drives. Returns error only for
// unexpected errors as regular errors can be ignored since there
// might be enough quorum to be operational. Heals only fresh disks.
func healFormatXLFreshDisks(storageDisks []StorageAPI,
formatConfigs []*formatConfigV1) error {
// Reorder the disks based on the JBOD order.
referenceConfig, orderedDisks, err := reorderDisks(storageDisks,
formatConfigs, true)
if err != nil {
return err
}
// Fill in the missing disk back from format configs.
// We need to make sure we have kept the previous order
// and allowed fresh disks to be arranged anywhere.
@ -658,37 +618,37 @@ func healFormatXLFreshDisks(storageDisks []StorageAPI) error {
}
}
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolume(orderedDisks); err != nil {
return fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Save new `format.json` across all disks, in JBOD order.
return saveFormatXL(orderedDisks, newFormatConfigs)
// apply new format config and save to all disks
return collectNSaveNewFormatConfigs(referenceConfig, orderedDisks)
}
// Disks from storageDiks are put in assignedDisks if found in orderedDisks and in unAssignedDisks otherwise
func splitDisksByUse(storageDisks, orderedDisks []StorageAPI) (assignedDisks []StorageAPI, unAssignedDisks []StorageAPI) {
// Populate unAssignDisks
// collectUnAssignedDisks - collect disks unassigned to orderedDisks
// from storageDisks and return them.
func collectUnAssignedDisks(storageDisks, orderedDisks []StorageAPI) (
uDisks []StorageAPI) {
// search for each disk from storageDisks in orderedDisks
for i := range storageDisks {
found := false
for j := range orderedDisks {
if storageDisks[i] == orderedDisks[j] {
found = true
assignedDisks = append(assignedDisks, storageDisks[i])
break
}
}
if !found {
unAssignedDisks = append(unAssignedDisks, storageDisks[i])
// append not found disk to result
uDisks = append(uDisks, storageDisks[i])
}
}
return assignedDisks, unAssignedDisks
return uDisks
}
// Inspect the content of all disks to guess the right order according to the format files.
// The right order is represented in orderedDisks
func reorderDisksByInspection(orderedDisks, storageDisks []StorageAPI, formatConfigs []*formatConfigV1) ([]StorageAPI, error) {
// Inspect the content of all disks to guess the right order according
// to the format files. The right order is represented in orderedDisks
func reorderDisksByInspection(orderedDisks, storageDisks []StorageAPI,
formatConfigs []*formatConfigV1) ([]StorageAPI, error) {
for index, format := range formatConfigs {
if format != nil {
continue
@ -701,7 +661,8 @@ func reorderDisksByInspection(orderedDisks, storageDisks []StorageAPI, formatCon
continue
}
volName := ""
// Avoid picking minioMetaBucket because ListVols() returns a non ordered list
// Avoid picking minioMetaBucket because ListVols()
// returns a non ordered list
for i := range vols {
if vols[i].Name != minioMetaBucket {
volName = vols[i].Name
@ -742,84 +703,28 @@ func reorderDisksByInspection(orderedDisks, storageDisks []StorageAPI, formatCon
}
// Heals corrupted format json in all disks
func healFormatXLCorruptedDisks(storageDisks []StorageAPI) error {
formatConfigs := make([]*formatConfigV1, len(storageDisks))
var referenceConfig *formatConfigV1
// Loads `format.json` from all disks.
for index, disk := range storageDisks {
// Disk not found or ignored is a valid case.
if disk == nil {
// Return nil, one of the disk is offline.
return nil
}
formatXL, err := loadFormat(disk)
if err != nil {
if err == errUnformattedDisk || err == errCorruptedFormat {
// format.json is missing or corrupted, should be healed.
continue
} else if err == errDiskNotFound { // Is a valid case we
// can proceed without healing.
return nil
}
// Return error for unsupported errors.
return err
} // Success.
formatConfigs[index] = formatXL
}
// All `format.json` has been read successfully, previously completed.
if isFormatFound(formatConfigs) {
// Return success.
return nil
}
// All disks are fresh, format.json will be written by initFormatXL()
if isFormatNotFound(formatConfigs) {
return initFormatXL(storageDisks)
}
// Validate format configs for consistency in JBOD and disks.
if err := checkFormatXL(formatConfigs); err != nil {
return err
}
if referenceConfig == nil {
// This config will be used to update the drives missing format.json.
for _, formatConfig := range formatConfigs {
if formatConfig == nil {
continue
}
referenceConfig = formatConfig
break
}
}
// Collect new JBOD.
newJBOD := referenceConfig.XL.JBOD
func healFormatXLCorruptedDisks(storageDisks []StorageAPI,
formatConfigs []*formatConfigV1) error {
// Reorder the disks based on the JBOD order.
orderedDisks, err := reorderDisks(storageDisks, formatConfigs)
referenceConfig, orderedDisks, err := reorderDisks(storageDisks,
formatConfigs, true)
if err != nil {
return err
}
// From ordered disks fill the UUID position.
for index, disk := range orderedDisks {
if disk == nil {
newJBOD[index] = mustGetUUID()
}
}
// For disks with corrupted formats, inspect the disks contents to guess the disks order
orderedDisks, err = reorderDisksByInspection(orderedDisks, storageDisks, formatConfigs)
// For disks with corrupted formats, inspect the disks
// contents to guess the disks order
orderedDisks, err = reorderDisksByInspection(orderedDisks, storageDisks,
formatConfigs)
if err != nil {
return err
}
// At this stage, all disks with corrupted formats but with objects inside found their way.
// Now take care of unformatted disks, which are the `unAssignedDisks`
_, unAssignedDisks := splitDisksByUse(storageDisks, orderedDisks)
// At this stage, all disks with corrupted formats but with
// objects inside found their way. Now take care of
// unformatted disks, which are the `unAssignedDisks`
unAssignedDisks := collectUnAssignedDisks(storageDisks, orderedDisks)
// Assign unassigned disks to nil elements in orderedDisks
for i, disk := range orderedDisks {
@ -829,27 +734,8 @@ func healFormatXLCorruptedDisks(storageDisks []StorageAPI) error {
}
}
// Collect new format configs.
var newFormatConfigs = make([]*formatConfigV1, len(orderedDisks))
// Collect new format configs that need to be written.
for index := range orderedDisks {
// New configs are generated since we are going
// to re-populate across all disks.
config := &formatConfigV1{
Version: referenceConfig.Version,
Format: referenceConfig.Format,
XL: &xlFormat{
Version: referenceConfig.XL.Version,
Disk: newJBOD[index],
JBOD: newJBOD,
},
}
newFormatConfigs[index] = config
}
// Save new `format.json` across all disks, in JBOD order.
return saveFormatXL(orderedDisks, newFormatConfigs)
// apply new format config and save to all disks
return collectNSaveNewFormatConfigs(referenceConfig, orderedDisks)
}
// loadFormatXL - loads XL `format.json` and returns back properly
@ -900,8 +786,11 @@ func loadFormatXL(bootstrapDisks []StorageAPI, readQuorum int) (disks []StorageA
if err = checkFormatXL(formatConfigs); err != nil {
return nil, err
}
// Erasure code requires disks to be presented in the same order each time.
return reorderDisks(bootstrapDisks, formatConfigs)
// Erasure code requires disks to be presented in the same
// order each time.
_, orderedDisks, err := reorderDisks(bootstrapDisks, formatConfigs,
false)
return orderedDisks, err
}
func checkFormatXLValue(formatXL *formatConfigV1) error {

View File

@ -288,8 +288,12 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// Attempt to load all `format.json`.
formatConfigs, _ := loadAllFormats(storageDisks)
// Start healing disks
err = healFormatXLFreshDisks(storageDisks)
err = healFormatXLFreshDisks(storageDisks, formatConfigs)
if err != nil {
t.Fatal("healing corrupted disk failed: ", err)
}
@ -304,42 +308,6 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
removeRoots(fsDirs)
}
func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) {
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// Create an instance of xl backend.
obj, _, err := initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Error(err)
}
storageDisks, err := prepareFormatXLHealFreshDisks(obj)
if err != nil {
t.Fatal(err)
}
// Prepares all disks are offline.
prepareNOfflineDisks(storageDisks, 16, t)
// Load again XL format.json to validate it
_, err = loadFormatXL(storageDisks, 8)
if err == nil {
t.Fatal("loading format disk error")
}
storageDisks[3] = nil
err = healFormatXLFreshDisks(storageDisks)
if err != nil {
t.Fatal("didn't get nil when one disk is offline")
}
// Clean all
removeRoots(fsDirs)
}
// Simulate XL disks creation, delete some format.json and remove the content of
// a given disk to test healing a corrupted disk
func TestFormatXLHealCorruptedDisks(t *testing.T) {
@ -397,8 +365,10 @@ func TestFormatXLHealCorruptedDisks(t *testing.T) {
xl.storageDisks[3], xl.storageDisks[10], xl.storageDisks[12], xl.storageDisks[9],
xl.storageDisks[5], xl.storageDisks[11]}
formatConfigs, _ := loadAllFormats(permutedStorageDisks)
// Start healing disks
err = healFormatXLCorruptedDisks(permutedStorageDisks)
err = healFormatXLCorruptedDisks(permutedStorageDisks, formatConfigs)
if err != nil {
t.Fatal("healing corrupted disk failed: ", err)
}
@ -454,7 +424,7 @@ func TestFormatXLReorderByInspection(t *testing.T) {
permutedFormatConfigs, _ := loadAllFormats(permutedStorageDisks)
orderedDisks, err := reorderDisks(permutedStorageDisks, permutedFormatConfigs)
_, orderedDisks, err := reorderDisks(permutedStorageDisks, permutedFormatConfigs, false)
if err != nil {
t.Fatal("error reordering disks\n")
}
@ -629,27 +599,29 @@ func TestInitFormatXLErrors(t *testing.T) {
}
}
// Test for reduceFormatErrs()
func TestReduceFormatErrs(t *testing.T) {
// No error founds
if err := reduceFormatErrs([]error{nil, nil, nil, nil}, 4); err != nil {
t.Fatal("Err should be nil, found: ", err)
// Test formatErrsSummary()
func TestFormatErrsSummary(t *testing.T) {
type errSummary struct {
fc, unfmt, ntfnd, crrptd, othr int
}
// One corrupted format
if err := reduceFormatErrs([]error{nil, nil, errCorruptedFormat, nil}, 4); err != errCorruptedFormat {
t.Fatal("Got a different error: ", err)
testCases := []struct {
errs []error
expected errSummary
}{
{nil, errSummary{0, 0, 0, 0, 0}},
{[]error{errDiskNotFound, errUnformattedDisk, errCorruptedFormat, nil, errFaultyDisk},
errSummary{1, 1, 1, 1, 1}},
{[]error{errDiskNotFound, errDiskNotFound, errCorruptedFormat, nil, nil},
errSummary{2, 0, 2, 1, 0}},
}
// All disks unformatted
if err := reduceFormatErrs([]error{errUnformattedDisk, errUnformattedDisk, errUnformattedDisk, errUnformattedDisk}, 4); err != errUnformattedDisk {
t.Fatal("Got a different error: ", err)
for i, testCase := range testCases {
a, b, c, d, e := formatErrsSummary(testCase.errs)
got := errSummary{a, b, c, d, e}
if got != testCase.expected {
t.Errorf("Test %d: Got wrong results: %#v %#v", i+1,
got, testCase.expected)
}
// Some disks unformatted
if err := reduceFormatErrs([]error{nil, nil, errUnformattedDisk, errUnformattedDisk}, 4); err != errSomeDiskUnformatted {
t.Fatal("Got a different error: ", err)
}
// Some disks offline
if err := reduceFormatErrs([]error{nil, nil, errDiskNotFound, errUnformattedDisk}, 4); err != errSomeDiskOffline {
t.Fatal("Got a different error: ", err)
}
}
@ -959,8 +931,10 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) {
if err != nil {
t.Fatal(err)
}
xl := obj.(*xlObjects)
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != nil {
formatConfigs, _ := loadAllFormats(xl.storageDisks)
if err = healFormatXLCorruptedDisks(xl.storageDisks, formatConfigs); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
@ -971,25 +945,6 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) {
t.Fatal(err)
}
// Disks 0..15 are nil
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
for i := 0; i <= 15; i++ {
xl.storageDisks[i] = nil
}
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// One disk returns Faulty Disk
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
@ -1001,45 +956,8 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) {
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != errFaultyDisk {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// One disk is not found, heal corrupted disks should return nil
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
xl.storageDisks[0] = nil
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// Remove format.json of all disks
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
for i := 0; i <= 15; i++ {
if err = xl.storageDisks[i].DeleteFile(minioMetaBucket, formatConfigFile); err != nil {
t.Fatal(err)
}
}
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != nil {
formatConfigs, _ = loadAllFormats(xl.storageDisks)
if err = healFormatXLCorruptedDisks(xl.storageDisks, formatConfigs); err != errFaultyDisk {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
@ -1060,7 +978,8 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) {
t.Fatal(err)
}
}
if err = healFormatXLCorruptedDisks(xl.storageDisks); err == nil {
formatConfigs, _ = loadAllFormats(xl.storageDisks)
if err = healFormatXLCorruptedDisks(xl.storageDisks, formatConfigs); err == nil {
t.Fatal("Should get a json parsing error, ")
}
removeRoots(fsDirs)
@ -1086,26 +1005,8 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) {
t.Fatal(err)
}
xl := obj.(*xlObjects)
if err = healFormatXLFreshDisks(xl.storageDisks); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// Disks 0..15 are nil
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
for i := 0; i <= 15; i++ {
xl.storageDisks[i] = nil
}
if err = healFormatXLFreshDisks(xl.storageDisks); err != nil {
formatConfigs, _ := loadAllFormats(xl.storageDisks)
if err = healFormatXLFreshDisks(xl.storageDisks, formatConfigs); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
@ -1126,7 +1027,8 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) {
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if err = healFormatXLFreshDisks(xl.storageDisks); err != errFaultyDisk {
formatConfigs, _ = loadAllFormats(xl.storageDisks)
if err = healFormatXLFreshDisks(xl.storageDisks, formatConfigs); err != errFaultyDisk {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
@ -1143,59 +1045,9 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) {
}
xl = obj.(*xlObjects)
xl.storageDisks[0] = nil
if err = healFormatXLFreshDisks(xl.storageDisks); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// Remove format.json of all disks
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
for i := 0; i <= 15; i++ {
if err = xl.storageDisks[i].DeleteFile(minioMetaBucket, formatConfigFile); err != nil {
t.Fatal(err)
}
}
if err = healFormatXLFreshDisks(xl.storageDisks); err != nil {
formatConfigs, _ = loadAllFormats(xl.storageDisks)
if err = healFormatXLFreshDisks(xl.storageDisks, formatConfigs); err != nil {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
}
// Tests for isFormatFound()
func TestIsFormatFound(t *testing.T) {
formats := genFormatXLValid()
if found := isFormatFound(formats); !found {
t.Fatal("isFormatFound() should not return false")
}
formats[0] = nil
if found := isFormatFound(formats); found {
t.Fatal("isFormatFound() should not return true")
}
}
// Tests for isFormatNotFound()
func TestIsFormatNotFound(t *testing.T) {
formats := genFormatXLValid()
if found := isFormatNotFound(formats); found {
t.Fatal("isFormatFound() should not return true")
}
formats[0] = nil
if found := isFormatNotFound(formats); found {
t.Fatal("isFormatFound() should not return true")
}
for idx := range formats {
formats[idx] = nil
}
if found := isFormatNotFound(formats); !found {
t.Fatal("isFormatFound() should not return false")
}
}

View File

@ -36,21 +36,37 @@ func healFormatXL(storageDisks []StorageAPI) (err error) {
return err
}
// Handles different cases properly.
switch reduceFormatErrs(sErrs, len(storageDisks)) {
case errCorruptedFormat:
if err = healFormatXLCorruptedDisks(storageDisks); err != nil {
numDisks := len(storageDisks)
_, unformattedDiskCount, diskNotFoundCount,
corruptedFormatCount, otherErrCount := formatErrsSummary(sErrs)
switch {
case unformattedDiskCount == numDisks:
// all unformatted.
if err = initFormatXL(storageDisks); err != nil {
return err
}
case diskNotFoundCount > 0:
return fmt.Errorf("cannot proceed with heal as %s",
errSomeDiskOffline)
case otherErrCount > 0:
return fmt.Errorf("cannot proceed with heal as some disks had unhandled errors")
case corruptedFormatCount > 0:
if err = healFormatXLCorruptedDisks(storageDisks, formatConfigs); err != nil {
return fmt.Errorf("Unable to repair corrupted format, %s", err)
}
case errSomeDiskUnformatted:
case unformattedDiskCount > 0:
// All drives online but some report missing format.json.
if err = healFormatXLFreshDisks(storageDisks); err != nil {
// There was an unexpected unrecoverable error during healing.
if err = healFormatXLFreshDisks(storageDisks, formatConfigs); err != nil {
// There was an unexpected unrecoverable error
// during healing.
return fmt.Errorf("Unable to heal backend %s", err)
}
case errSomeDiskOffline:
// FIXME: in future.
return fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted)
}
return nil
}

View File

@ -97,14 +97,15 @@ func TestHealFormatXL(t *testing.T) {
t.Fatal(err)
}
// One disk is not found, heal corrupted disks should return nil
// One disk is not found, heal corrupted disks should return
// error for offline disk
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
xl.storageDisks[0] = nil
if err = healFormatXL(xl.storageDisks); err != nil {
if err = healFormatXL(xl.storageDisks); err != nil && err.Error() != "cannot proceed with heal as some disks are offline" {
t.Fatal("Got an unexpected error: ", err)
}
removeRoots(fsDirs)
@ -193,7 +194,37 @@ func TestHealFormatXL(t *testing.T) {
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[3] = newNaughtyDisk(posixDisk, nil, errDiskNotFound)
expectedErr := fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted)
expectedErr := fmt.Errorf("cannot proceed with heal as %s", errSomeDiskOffline)
if err = healFormatXL(xl.storageDisks); err != nil {
if err.Error() != expectedErr.Error() {
t.Fatal("Got an unexpected error: ", err)
}
}
removeRoots(fsDirs)
fsDirs, err = getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
// One disk has access denied error, heal should return
// appropriate error
obj, _, err = initObjectLayer(mustGetNewEndpointList(fsDirs...))
if err != nil {
t.Fatal(err)
}
xl = obj.(*xlObjects)
for i := 0; i <= 2; i++ {
if err = xl.storageDisks[i].DeleteFile(minioMetaBucket, formatConfigFile); err != nil {
t.Fatal(err)
}
}
posixDisk, ok = xl.storageDisks[3].(*retryStorage)
if !ok {
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[3] = newNaughtyDisk(posixDisk, nil, errDiskAccessDenied)
expectedErr = fmt.Errorf("cannot proceed with heal as some disks had unhandled errors")
if err = healFormatXL(xl.storageDisks); err != nil {
if err.Error() != expectedErr.Error() {
t.Fatal("Got an unexpected error: ", err)