XL: Handle object layer initialization properly.

Initialization when disk was down the network disk
reported an incorrect error rather than errDiskNotFound.

This resulted in incorrect error handling during
prepInitStorage() stage.

Fixes #2577
This commit is contained in:
Harshavardhana 2016-09-01 02:49:06 -07:00
parent d936ed90ae
commit ae64b7fac8
4 changed files with 22 additions and 10 deletions

View File

@ -183,7 +183,7 @@ func xlHouseKeeping(storageDisks []StorageAPI) error {
// Cleanup all temp entries upon start.
err := cleanupDir(disk, minioMetaBucket, tmpMetaPrefix)
if err != nil {
if err != nil && err != errDiskNotFound {
errs[index] = err
}
}(index, disk)

View File

@ -133,9 +133,9 @@ func prepForInit(disks []string, sErrs []error, diskCount int) InitActions {
// Already formatted, proceed to initialization of object layer.
if disksFormatted == diskCount {
return InitObjectLayer
} else if disksFormatted > quorum && disksFormatted+disksOffline == diskCount {
} else if disksFormatted >= quorum && disksFormatted+disksOffline == diskCount {
return InitObjectLayer
} else if disksFormatted > quorum {
} else if disksFormatted >= quorum {
// TODO: Print minioctl heal command
return InitObjectLayer
}
@ -163,7 +163,6 @@ func retryFormattingDisks(disks []string, storageDisks []StorageAPI) ([]StorageA
case <-time.After(nextBackoff * time.Second):
// Attempt to load all `format.json`.
_, sErrs := loadAllFormats(storageDisks)
switch prepForInit(disks, sErrs, len(storageDisks)) {
case Abort:
err = errCorruptedFormat

View File

@ -297,12 +297,15 @@ func isDistributedSetup(disks []string) (isDist bool) {
// Format disks before initialization object layer.
func formatDisks(disks, ignoredDisks []string) error {
storageDisks, err := waitForFormattingDisks(disks, ignoredDisks)
for i := range storageDisks {
switch storage := storageDisks[i].(type) {
for _, storage := range storageDisks {
if storage == nil {
continue
}
switch store := storage.(type) {
// Closing associated TCP connections since
// []StorageAPI is garbage collected eventually.
case networkStorage:
storage.rpcClient.Close()
store.rpcClient.Close()
}
}
if err != nil {
@ -310,13 +313,13 @@ func formatDisks(disks, ignoredDisks []string) error {
}
if isLocalStorage(disks[0]) {
// notify every one else that they can try init again.
for i := range storageDisks {
switch storage := storageDisks[i].(type) {
for _, storage := range storageDisks {
switch store := storage.(type) {
// Closing associated TCP connections since
// []StorageAPI is garage collected eventually.
case networkStorage:
var reply GenericReply
_ = storage.rpcClient.Call("Storage.TryInitHandler", &GenericArgs{}, &reply)
_ = store.rpcClient.Call("Storage.TryInitHandler", &GenericArgs{}, &reply)
}
}
}

View File

@ -18,6 +18,8 @@ package cmd
import (
"io"
"net"
"net/rpc"
"path"
"strconv"
"strings"
@ -42,11 +44,19 @@ func toStorageErr(err error) error {
if err == nil {
return nil
}
switch err.(type) {
case *net.OpError:
return errDiskNotFound
}
switch err.Error() {
case io.EOF.Error():
return io.EOF
case io.ErrUnexpectedEOF.Error():
return io.ErrUnexpectedEOF
case rpc.ErrShutdown.Error():
return errDiskNotFound
case errUnexpected.Error():
return errUnexpected
case errDiskFull.Error():