XL/format: Fix a bug in checkDisksConsistency. (#1894)

This commit is contained in:
Harshavardhana 2016-06-14 13:42:15 +05:30 committed by Anand Babu (AB) Periasamy
parent ed4fe689b4
commit 23c88ffb1d
3 changed files with 282 additions and 19 deletions

View File

@ -224,10 +224,46 @@ func genericFormatCheck(formatConfigs []*formatConfigV1, sErrs []error) (err err
return nil return nil
} }
// errDiskOrderMismatch - returned when disk UUID is not in consistent JBOD order.
var errDiskOrderMismatch = errors.New("disk order mismatch")
// isSavedUUIDInOrder - validates if disk uuid is present and valid in all
// available format config JBOD. This function also validates if the disk UUID
// is always available on all JBOD under the same order.
func isSavedUUIDInOrder(uuid string, formatConfigs []*formatConfigV1) bool {
var orderIndexes []int
// Validate each for format.json for relevant uuid.
for _, formatConfig := range formatConfigs {
if formatConfig == nil {
continue
}
// Validate if UUID is present in JBOD.
uuidIndex := findDiskIndex(uuid, formatConfig.XL.JBOD)
if uuidIndex == -1 {
// UUID not found.
errorIf(errDiskNotFound, "Disk %s not found in JBOD list", uuid)
return false
}
// Save the position of UUID present in JBOD.
orderIndexes = append(orderIndexes, uuidIndex+1)
}
// Once uuid is found, verify if the uuid
// present in same order across all format configs.
prevOrderIndex := orderIndexes[0]
for _, orderIndex := range orderIndexes {
if prevOrderIndex != orderIndex {
errorIf(errDiskOrderMismatch, "Disk %s is in wrong order wanted %d, saw %d ", uuid, prevOrderIndex, orderIndex)
return false
}
}
// Returns success, when we have verified if uuid
// is consistent and valid across all format configs.
return true
}
// checkDisksConsistency - checks if all disks are consistent with all JBOD entries on all disks. // checkDisksConsistency - checks if all disks are consistent with all JBOD entries on all disks.
func checkDisksConsistency(formatConfigs []*formatConfigV1) error { func checkDisksConsistency(formatConfigs []*formatConfigV1) error {
var disks = make([]string, len(formatConfigs)) var disks = make([]string, len(formatConfigs))
var disksFound = make(map[string]bool)
// Collect currently available disk uuids. // Collect currently available disk uuids.
for index, formatConfig := range formatConfigs { for index, formatConfig := range formatConfigs {
if formatConfig == nil { if formatConfig == nil {
@ -237,21 +273,13 @@ func checkDisksConsistency(formatConfigs []*formatConfigV1) error {
disks[index] = formatConfig.XL.Disk disks[index] = formatConfig.XL.Disk
} }
// Validate collected uuids and verify JBOD. // Validate collected uuids and verify JBOD.
for index, uuid := range disks { for _, uuid := range disks {
if uuid == "" { if uuid == "" {
continue continue
} }
var formatConfig = formatConfigs[index] // Is uuid present on all JBOD ?.
for _, savedUUID := range formatConfig.XL.JBOD { if !isSavedUUIDInOrder(uuid, formatConfigs) {
if savedUUID == uuid { return fmt.Errorf("%s disk not found in JBOD", uuid)
disksFound[uuid] = true
}
}
}
// Check if all disks are found.
for _, value := range disksFound {
if !value {
return errors.New("Some disks not found in JBOD.")
} }
} }
return nil return nil
@ -259,16 +287,15 @@ func checkDisksConsistency(formatConfigs []*formatConfigV1) error {
// checkJBODConsistency - validate xl jbod order if they are consistent. // checkJBODConsistency - validate xl jbod order if they are consistent.
func checkJBODConsistency(formatConfigs []*formatConfigV1) error { func checkJBODConsistency(formatConfigs []*formatConfigV1) error {
var firstJBOD []string var jbodStr string
// Extract first valid JBOD. // Extract first valid JBOD.
for _, format := range formatConfigs { for _, format := range formatConfigs {
if format == nil { if format == nil {
continue continue
} }
firstJBOD = format.XL.JBOD jbodStr = strings.Join(format.XL.JBOD, ".")
break break
} }
jbodStr := strings.Join(firstJBOD, ".")
for _, format := range formatConfigs { for _, format := range formatConfigs {
if format == nil { if format == nil {
continue continue
@ -281,7 +308,8 @@ func checkJBODConsistency(formatConfigs []*formatConfigV1) error {
return nil return nil
} }
func findIndex(disk string, jbod []string) int { // findDiskIndex returns position of disk in JBOD.
func findDiskIndex(disk string, jbod []string) int {
for index, uuid := range jbod { for index, uuid := range jbod {
if uuid == disk { if uuid == disk {
return index return index
@ -306,7 +334,7 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1)
if format == nil { if format == nil {
continue continue
} }
jIndex := findIndex(format.XL.Disk, savedJBOD) jIndex := findDiskIndex(format.XL.Disk, savedJBOD)
if jIndex == -1 { if jIndex == -1 {
return nil, errors.New("Unrecognized uuid " + format.XL.Disk + " found") return nil, errors.New("Unrecognized uuid " + format.XL.Disk + " found")
} }

235
format-config-v1_test.go Normal file
View File

@ -0,0 +1,235 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import "testing"
// generates a valid format.json for XL backend.
func genFormatXLValid() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
return formatConfigs
}
// generates a invalid format.json version for XL backend.
func genFormatXLInvalidVersion() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Corrupt version numbers.
formatConfigs[0].Version = "2"
formatConfigs[3].Version = "-1"
return formatConfigs
}
// generates a invalid format.json JBOD for XL backend.
func genFormatXLInvalidJBOD() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
for index := range jbod {
jbod[index] = getUUID()
}
// Corrupt JBOD entries on disk 6 and disk 8.
formatConfigs[5].XL.JBOD = jbod
formatConfigs[7].XL.JBOD = jbod
return formatConfigs
}
// generates a invalid format.json Disk UUID for XL backend.
func genFormatXLInvalidDisks() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Make disk 5 and disk 8 have inconsistent disk uuid's.
formatConfigs[4].XL.Disk = getUUID()
formatConfigs[7].XL.Disk = getUUID()
return formatConfigs
}
// generates a invalid format.json Disk UUID in wrong order for XL backend.
func genFormatXLInvalidDisksOrder() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Re order jbod for failure case.
var jbod1 = make([]string, 8)
for i, j := range jbod {
jbod1[i] = j
}
jbod1[1], jbod1[2] = jbod[2], jbod[1]
formatConfigs[2].XL.JBOD = jbod1
return formatConfigs
}
// Wrapper for calling FormatXL tests - currently validates
// - valid format
// - unrecognized version number
// - invalid JBOD
// - invalid Disk uuid
func TestFormatXL(t *testing.T) {
formatInputCases := [][]*formatConfigV1{
genFormatXLValid(),
genFormatXLInvalidVersion(),
genFormatXLInvalidJBOD(),
genFormatXLInvalidDisks(),
genFormatXLInvalidDisksOrder(),
}
testCases := []struct {
formatConfigs []*formatConfigV1
shouldPass bool
}{
{
formatConfigs: formatInputCases[0],
shouldPass: true,
},
{
formatConfigs: formatInputCases[1],
shouldPass: false,
},
{
formatConfigs: formatInputCases[2],
shouldPass: false,
},
{
formatConfigs: formatInputCases[3],
shouldPass: false,
},
{
formatConfigs: formatInputCases[4],
shouldPass: false,
},
}
for i, testCase := range testCases {
err := checkFormatXL(testCase.formatConfigs)
if err != nil && testCase.shouldPass {
t.Errorf("Test %d: Expected to pass but failed with %s", i+1, err)
}
if err == nil && !testCase.shouldPass {
t.Errorf("Test %d: Expected to fail but passed instead", i+1)
}
}
}
// Tests uuid order verification function.
func TestSavedUUIDOrder(t *testing.T) {
uuidTestCases := make([]struct {
uuid string
shouldPass bool
}, 8)
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
uuidTestCases[index].uuid = jbod[index]
uuidTestCases[index].shouldPass = true
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Re order jbod for failure case.
var jbod1 = make([]string, 8)
for i, j := range jbod {
jbod1[i] = j
}
jbod1[1], jbod1[2] = jbod[2], jbod[1]
formatConfigs[2].XL.JBOD = jbod1
uuidTestCases[1].shouldPass = false
uuidTestCases[2].shouldPass = false
for i, testCase := range uuidTestCases {
// Is uuid present on all JBOD ?.
if testCase.shouldPass != isSavedUUIDInOrder(testCase.uuid, formatConfigs) {
t.Errorf("Test %d: Expected to pass but failed", i+1)
}
}
}

View File

@ -119,7 +119,7 @@ func newXLObjects(disks []string) (ObjectLayer, error) {
// Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc.
xlHouseKeeping(storageDisks) xlHouseKeeping(storageDisks)
// Attempt to load all `format.json` // Attempt to load all `format.json`.
formatConfigs, sErrs := loadAllFormats(storageDisks) formatConfigs, sErrs := loadAllFormats(storageDisks)
// Generic format check validates all necessary cases. // Generic format check validates all necessary cases.