Add naughty disk to simulate disk errors (#2645)

This commit is contained in:
Anis Elleuch 2016-09-09 20:53:09 +01:00 committed by Harshavardhana
parent 400e9309f1
commit 421cccb1d7
4 changed files with 350 additions and 150 deletions

View File

@ -1,115 +0,0 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
// Simulates disk returning errFaultyDisk on all methods of StorageAPI
// interface after successCount number of successes.
type faultyDisk struct {
disk *posix
successCount int
}
// instantiates a faulty
func newFaultyDisk(disk *posix, n int) *faultyDisk {
return &faultyDisk{disk: disk, successCount: n}
}
func (f *faultyDisk) MakeVol(volume string) (err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.MakeVol(volume)
}
return errFaultyDisk
}
func (f *faultyDisk) ListVols() (vols []VolInfo, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.ListVols()
}
return nil, errFaultyDisk
}
func (f *faultyDisk) StatVol(volume string) (volInfo VolInfo, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.StatVol(volume)
}
return VolInfo{}, errFaultyDisk
}
func (f *faultyDisk) DeleteVol(volume string) (err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.DeleteVol(volume)
}
return errFaultyDisk
}
func (f *faultyDisk) ListDir(volume, path string) (entries []string, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.ListDir(volume, path)
}
return []string{}, errFaultyDisk
}
func (f *faultyDisk) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.ReadFile(volume, path, offset, buf)
}
return 0, errFaultyDisk
}
func (f *faultyDisk) AppendFile(volume, path string, buf []byte) error {
if f.successCount > 0 {
f.successCount--
return f.disk.AppendFile(volume, path, buf)
}
return errFaultyDisk
}
func (f *faultyDisk) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if f.successCount > 0 {
f.successCount--
return f.disk.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
}
return errFaultyDisk
}
func (f *faultyDisk) StatFile(volume string, path string) (file FileInfo, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.StatFile(volume, path)
}
return FileInfo{}, errFaultyDisk
}
func (f *faultyDisk) DeleteFile(volume string, path string) (err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.DeleteFile(volume, path)
}
return errFaultyDisk
}
func (f *faultyDisk) ReadAll(volume string, path string) (buf []byte, err error) {
if f.successCount > 0 {
f.successCount--
return f.disk.ReadAll(volume, path)
}
return nil, errFaultyDisk
}

View File

@ -66,6 +66,75 @@ func genFormatXLInvalidVersion() []*formatConfigV1 {
return formatConfigs
}
// generates a invalid format.json version for XL backend.
func genFormatXLInvalidFormat() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Corrupt version numbers.
formatConfigs[0].Format = "lx"
formatConfigs[3].Format = "lx"
return formatConfigs
}
// generates a invalid format.json version for XL backend.
func genFormatXLInvalidXLVersion() []*formatConfigV1 {
jbod := make([]string, 8)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
// Corrupt version numbers.
formatConfigs[0].XL.Version = "10"
formatConfigs[3].XL.Version = "-1"
return formatConfigs
}
// generates a invalid format.json version for XL backend.
func genFormatXLInvalidJBODCount() []*formatConfigV1 {
jbod := make([]string, 7)
formatConfigs := make([]*formatConfigV1, 8)
for index := range jbod {
jbod[index] = getUUID()
}
for index := range jbod {
formatConfigs[index] = &formatConfigV1{
Version: "1",
Format: "xl",
XL: &xlFormat{
Version: "1",
Disk: jbod[index],
JBOD: jbod,
},
}
}
return formatConfigs
}
// generates a invalid format.json JBOD for XL backend.
func genFormatXLInvalidJBOD() []*formatConfigV1 {
jbod := make([]string, 8)
@ -145,18 +214,14 @@ func genFormatXLInvalidDisksOrder() []*formatConfigV1 {
return formatConfigs
}
func TestFormatXLHealFreshDisks(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Fatal(err)
}
func prepareFormatXLHealFreshDisks(obj ObjectLayer) ([]StorageAPI, error) {
var err error
xl := obj.(xlObjects)
err = obj.MakeBucket("bucket")
if err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
bucket := "bucket"
@ -164,34 +229,26 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
_, err = obj.PutObject(bucket, object, int64(len("abcd")), bytes.NewReader([]byte("abcd")), nil)
if err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
/* // Now, remove two format files.. Load them and reorder
if err = xl.storageDisks[3].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
}
if err = xl.storageDisks[11].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
} */
// Remove the content of export dir 10 but preserve .minio.sys because it is automatically
// created when minio starts
for i := 3; i <= 5; i++ {
if err = xl.storageDisks[i].DeleteFile(".minio.sys", "format.json"); err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
if err = xl.storageDisks[i].DeleteFile(".minio.sys", "tmp"); err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
if err = xl.storageDisks[i].DeleteFile(bucket, object+"/xl.json"); err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
if err = xl.storageDisks[i].DeleteFile(bucket, object+"/part.1"); err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
if err = xl.storageDisks[i].DeleteVol(bucket); err != nil {
t.Fatal(err)
return []StorageAPI{}, err
}
}
@ -201,14 +258,29 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
xl.storageDisks[3], xl.storageDisks[10], xl.storageDisks[12], xl.storageDisks[9],
xl.storageDisks[5], xl.storageDisks[11]}
return permutedStorageDisks, nil
}
func TestFormatXLHealFreshDisks(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Error(err)
}
storageDisks, err := prepareFormatXLHealFreshDisks(obj)
if err != nil {
t.Fatal(err)
}
// Start healing disks
err = healFormatXLFreshDisks(permutedStorageDisks)
err = healFormatXLFreshDisks(storageDisks)
if err != nil {
t.Fatal("healing corrupted disk failed: ", err)
}
// Load again XL format.json to validate it
_, err = loadFormatXL(permutedStorageDisks)
_, err = loadFormatXL(storageDisks)
if err != nil {
t.Fatal("loading healed disk failed: ", err)
}
@ -217,6 +289,39 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
removeRoots(fsDirs)
}
func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Error(err)
}
storageDisks, err := prepareFormatXLHealFreshDisks(obj)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 16; i++ {
d := storageDisks[i].(*posix)
storageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound}
}
// Load again XL format.json to validate it
_, err = loadFormatXL(storageDisks)
if err == nil {
t.Fatal("loading format disk error")
}
storageDisks[3] = nil
err = healFormatXLFreshDisks(storageDisks)
if err != nil {
t.Fatal("didn't get nil when one disk is offline")
}
// Clean all
removeRoots(fsDirs)
}
// Simulate XL disks creation, delete some format.json and remove the content of
// a given disk to test healing a corrupted disk
func TestFormatXLHealCorruptedDisks(t *testing.T) {
@ -355,12 +460,18 @@ func TestFormatXLReorderByInspection(t *testing.T) {
// Wrapper for calling FormatXL tests - currently validates
// - valid format
// - unrecognized version number
// - unrecognized format tag
// - unrecognized xl version
// - wrong number of JBOD entries
// - invalid JBOD
// - invalid Disk uuid
func TestFormatXL(t *testing.T) {
formatInputCases := [][]*formatConfigV1{
genFormatXLValid(),
genFormatXLInvalidVersion(),
genFormatXLInvalidFormat(),
genFormatXLInvalidXLVersion(),
genFormatXLInvalidJBODCount(),
genFormatXLInvalidJBOD(),
genFormatXLInvalidDisks(),
genFormatXLInvalidDisksOrder(),
@ -389,6 +500,18 @@ func TestFormatXL(t *testing.T) {
formatConfigs: formatInputCases[4],
shouldPass: false,
},
{
formatConfigs: formatInputCases[5],
shouldPass: false,
},
{
formatConfigs: formatInputCases[6],
shouldPass: false,
},
{
formatConfigs: formatInputCases[7],
shouldPass: false,
},
}
for i, testCase := range testCases {
@ -443,3 +566,65 @@ func TestSavedUUIDOrder(t *testing.T) {
}
}
}
func TestInitFormatXLErrors(t *testing.T) {
// Create an instance of xl backend.
obj, fsDirs, err := getXLObjectLayer()
if err != nil {
t.Fatal(err)
}
xl := obj.(xlObjects)
testStorageDisks := make([]StorageAPI, 16)
for i := 0; i < 16; i++ {
d := xl.storageDisks[i].(*posix)
testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound}
}
if err := initFormatXL(testStorageDisks); err != errDiskNotFound {
t.Fatal("Got a different error: ", err)
}
for i := 0; i < 15; i++ {
d := xl.storageDisks[i].(*posix)
testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound, errors: map[int]error{0: nil, 1: nil, 2: nil}}
}
if err := initFormatXL(testStorageDisks); err != errDiskNotFound {
t.Fatal("Got a different error: ", err)
}
for i := 0; i < 15; i++ {
testStorageDisks[i] = nil
}
if err := initFormatXL(testStorageDisks); err != errDiskNotFound {
t.Fatal("Got a different error: ", err)
}
removeRoots(fsDirs)
}
func TestReduceFormatErrs(t *testing.T) {
// No error founds
if err := reduceFormatErrs([]error{nil, nil, nil, nil}, 4); err != nil {
t.Fatal("Err should be nil, found: ", err)
}
// Expect corrupted format error
if err := reduceFormatErrs([]error{nil, nil, errCorruptedFormat, nil}, 4); err != errCorruptedFormat {
t.Fatal("Got a differnt error: ", err)
}
// Expect unformatted disk
if err := reduceFormatErrs([]error{errUnformattedDisk, errUnformattedDisk, errUnformattedDisk, errUnformattedDisk}, 4); err != errUnformattedDisk {
t.Fatal("Got a differnt error: ", err)
}
// Expect some disks unformatted
if err := reduceFormatErrs([]error{nil, nil, errUnformattedDisk, errUnformattedDisk}, 4); err != errSomeDiskUnformatted {
t.Fatal("Got a differnt error: ", err)
}
// Expect some disks offline
if err := reduceFormatErrs([]error{nil, nil, errDiskNotFound, errUnformattedDisk}, 4); err != errSomeDiskOffline {
t.Fatal("Got a differnt error: ", err)
}
}

123
cmd/naughty-disk_test.go Normal file
View File

@ -0,0 +1,123 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
// naughtyDisk wraps a POSIX disk and returns programmed errors
// specified by the developer. The purpose is to simulate errors
// that are hard to simulate in practise like DiskNotFound.
// Programmed errors are stored in errors field.
type naughtyDisk struct {
// The real disk
disk *posix
// Programmed errors: API call number => error to return
errors map[int]error
// The error to return when no error value is programmed
defaultErr error
// The current API call number
callNR int
}
func newNaughtyDisk(d *posix, errs map[int]error, defaultErr error) *naughtyDisk {
return &naughtyDisk{disk: d, errors: errs, defaultErr: defaultErr}
}
func (d *naughtyDisk) calcError() (err error) {
d.callNR++
if err, ok := d.errors[d.callNR]; ok {
return err
}
if d.defaultErr != nil {
return d.defaultErr
}
return nil
}
func (d *naughtyDisk) MakeVol(volume string) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.MakeVol(volume)
}
func (d *naughtyDisk) ListVols() (vols []VolInfo, err error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.ListVols()
}
func (d *naughtyDisk) StatVol(volume string) (volInfo VolInfo, err error) {
if err := d.calcError(); err != nil {
return VolInfo{}, err
}
return d.disk.StatVol(volume)
}
func (d *naughtyDisk) DeleteVol(volume string) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.DeleteVol(volume)
}
func (d *naughtyDisk) ListDir(volume, path string) (entries []string, err error) {
if err := d.calcError(); err != nil {
return []string{}, err
}
return d.disk.ListDir(volume, path)
}
func (d *naughtyDisk) ReadFile(volume string, path string, offset int64, buf []byte) (n int64, err error) {
if err := d.calcError(); err != nil {
return 0, err
}
return d.disk.ReadFile(volume, path, offset, buf)
}
func (d *naughtyDisk) AppendFile(volume, path string, buf []byte) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.AppendFile(volume, path, buf)
}
func (d *naughtyDisk) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if err := d.calcError(); err != nil {
return err
}
return d.disk.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
}
func (d *naughtyDisk) StatFile(volume string, path string) (file FileInfo, err error) {
if err := d.calcError(); err != nil {
return FileInfo{}, err
}
return d.disk.StatFile(volume, path)
}
func (d *naughtyDisk) DeleteFile(volume string, path string) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.DeleteFile(volume, path)
}
func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.ReadAll(volume, path)
}

View File

@ -59,7 +59,6 @@ func TestRepeatPutObjectPart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
}
func TestXLDeleteObjectBasic(t *testing.T) {
@ -130,7 +129,7 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) {
// for a 16 disk setup, quorum is 9. To simulate disks not found yet
// quorum is available, we remove disks leaving quorum disks behind.
for i := range xl.storageDisks[:7] {
xl.storageDisks[i] = newFaultyDisk(xl.storageDisks[i].(*posix), 0)
xl.storageDisks[i] = newNaughtyDisk(xl.storageDisks[i].(*posix), nil, errFaultyDisk)
}
err = obj.DeleteObject(bucket, object)
if err != nil {
@ -144,8 +143,8 @@ func TestXLDeleteObjectDiskNotFound(t *testing.T) {
}
// Remove one more disk to 'lose' quorum, by setting it to nil.
xl.storageDisks[7] = &faultyDisk{}
xl.storageDisks[8] = &faultyDisk{}
xl.storageDisks[7] = nil
xl.storageDisks[8] = nil
err = obj.DeleteObject(bucket, object)
if err != toObjectErr(errXLWriteQuorum, bucket, object) {
t.Errorf("Expected deleteObject to fail with %v, but failed with %v", toObjectErr(errXLWriteQuorum, bucket, object), err)
@ -180,15 +179,19 @@ func TestGetObjectNoQuorum(t *testing.T) {
xl.objCacheEnabled = false
// Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk XL setup. The original disks are 'replaced' with
// faultyDisks that fail after 'f' successful StorageAPI method
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,2)
for f := 0; f < 2; f++ {
diskErrors := make(map[int]error)
for i := 0; i <= f; i++ {
diskErrors[i] = nil
}
for i := range xl.storageDisks[:9] {
switch diskType := xl.storageDisks[i].(type) {
case *posix:
xl.storageDisks[i] = newFaultyDisk(diskType, f)
case *faultyDisk:
xl.storageDisks[i] = newFaultyDisk(diskType.disk, f)
xl.storageDisks[i] = newNaughtyDisk(diskType, diskErrors, errFaultyDisk)
case *naughtyDisk:
xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
}
}
// Fetch object from store.
@ -226,15 +229,19 @@ func TestPutObjectNoQuorum(t *testing.T) {
// Make 9 disks offline, which leaves less than quorum number of disks
// in a 16 disk XL setup. The original disks are 'replaced' with
// faultyDisks that fail after 'f' successful StorageAPI method
// naughtyDisks that fail after 'f' successful StorageAPI method
// invocations, where f - [0,3)
for f := 0; f < 3; f++ {
diskErrors := make(map[int]error)
for i := 0; i <= f; i++ {
diskErrors[i] = nil
}
for i := range xl.storageDisks[:9] {
switch diskType := xl.storageDisks[i].(type) {
case *posix:
xl.storageDisks[i] = newFaultyDisk(diskType, f)
case *faultyDisk:
xl.storageDisks[i] = newFaultyDisk(diskType.disk, f)
xl.storageDisks[i] = newNaughtyDisk(diskType, diskErrors, errFaultyDisk)
case *naughtyDisk:
xl.storageDisks[i] = newNaughtyDisk(diskType.disk, diskErrors, errFaultyDisk)
}
}
// Upload new content to same object "object"