server: Fix message when corrupted or unsupported format is found. (#4142)

Refer https://github.com/minio/minio/issues/4140

This is a fix to provide a little more elaborate message.
This commit is contained in:
Harshavardhana 2017-04-18 10:35:17 -07:00 committed by GitHub
parent 3032f0f505
commit f4dac979a2
4 changed files with 134 additions and 28 deletions

View File

@ -768,31 +768,39 @@ func loadFormatXL(bootstrapDisks []StorageAPI, readQuorum int) (disks []StorageA
return reorderDisks(bootstrapDisks, formatConfigs)
}
func checkFormatXLValues(formatConfigs []*formatConfigV1) error {
for _, formatXL := range formatConfigs {
if formatXL == nil {
continue
}
// Validate format version and format type.
if formatXL.Version != "1" {
return fmt.Errorf("Unsupported version of backend format [%s] found", formatXL.Version)
}
if formatXL.Format != "xl" {
return fmt.Errorf("Unsupported backend format [%s] found", formatXL.Format)
}
if formatXL.XL.Version != "1" {
return fmt.Errorf("Unsupported XL backend format found [%s]", formatXL.XL.Version)
}
if len(formatConfigs) != len(formatXL.XL.JBOD) {
return fmt.Errorf("Number of disks %d did not match the backend format %d", len(formatConfigs), len(formatXL.XL.JBOD))
}
func checkFormatXLValue(formatXL *formatConfigV1) error {
// Validate format version and format type.
if formatXL.Version != "1" {
return fmt.Errorf("Unsupported version of backend format [%s] found", formatXL.Version)
}
if formatXL.Format != "xl" {
return fmt.Errorf("Unsupported backend format [%s] found", formatXL.Format)
}
if formatXL.XL.Version != "1" {
return fmt.Errorf("Unsupported XL backend format found [%s]", formatXL.XL.Version)
}
return nil
}
func checkFormatXLValues(formatConfigs []*formatConfigV1) (int, error) {
for i, formatXL := range formatConfigs {
if formatXL == nil {
continue
}
if err := checkFormatXLValue(formatXL); err != nil {
return i, err
}
if len(formatConfigs) != len(formatXL.XL.JBOD) {
return i, fmt.Errorf("Number of disks %d did not match the backend format %d",
len(formatConfigs), len(formatXL.XL.JBOD))
}
}
return -1, nil
}
// checkFormatXL - verifies if format.json format is intact.
func checkFormatXL(formatConfigs []*formatConfigV1) error {
if err := checkFormatXLValues(formatConfigs); err != nil {
if _, err := checkFormatXLValues(formatConfigs); err != nil {
return err
}
if err := checkJBODConsistency(formatConfigs); err != nil {

View File

@ -188,6 +188,9 @@ func printRetryMsg(sErrs []error, storageDisks []StorageAPI) {
}
}
// Maximum retry attempts.
const maxRetryAttempts = 5
// Implements a jitter backoff loop for formatting all disks during
// initialization of the server.
func retryFormattingXLDisks(firstDisk bool, endpoints EndpointList, storageDisks []StorageAPI) error {
@ -219,17 +222,28 @@ func retryFormattingXLDisks(firstDisk bool, endpoints EndpointList, storageDisks
case retryCount := <-retryTimerCh:
// Attempt to load all `format.json` from all disks.
formatConfigs, sErrs := loadAllFormats(storageDisks)
if retryCount > 5 {
// After 5 retry attempts we start printing actual errors
// for disks not being available.
if retryCount > maxRetryAttempts {
// After max retry attempts we start printing
// actual errors for disks not being available.
printRetryMsg(sErrs, storageDisks)
}
// Pre-emptively check if one of the formatted disks
// is invalid. This function returns success for the
// most part unless one of the formats is not consistent
// with expected XL format. For example if a user is trying
// to pool FS backend.
if err := checkFormatXLValues(formatConfigs); err != nil {
// with expected XL format. For example if a user is
// trying to pool FS backend into an XL set.
if index, err := checkFormatXLValues(formatConfigs); err != nil {
// We will perhaps print and retry for the first 5 attempts
// because in XL initialization first server is the one which
// initializes the erasure set. This check ensures that the
// rest of the other servers do get a chance to see that the
// first server has a wrong format and exit gracefully.
// refer - https://github.com/minio/minio/issues/4140
if retryCount > maxRetryAttempts {
errorIf(err, "Detected disk (%s) in unexpected format",
storageDisks[index])
continue
}
return err
}
// Check if this is a XL or distributed XL, anything > 1 is considered XL backend.

View File

@ -113,11 +113,18 @@ func newStorageRPC(endpoint Endpoint) StorageAPI {
}
}
// Stringer interface compatible representation of network device.
// Stringer provides a canonicalized representation of network device.
func (n *networkStorage) String() string {
// Remove the storage RPC path prefix, internal paths are meaningless.
serviceEndpoint := strings.TrimPrefix(n.rpcClient.ServiceEndpoint(), storageRPCPath)
return n.rpcClient.ServerAddr() + ":" + serviceEndpoint
serviceEndpoint := strings.TrimPrefix(n.rpcClient.ServiceEndpoint(),
path.Join(minioReservedBucketPath, storageRPCPath))
// Check for the transport layer being used.
scheme := "http"
if n.rpcClient.config.secureConn {
scheme = "https"
}
// Finally construct the disk endpoint in http://<server>/<path> form.
return scheme + "://" + n.rpcClient.ServerAddr() + path.Join("/", serviceEndpoint)
}
// Init - attempts a login to reconnect.

View File

@ -27,6 +27,83 @@ import (
"testing"
)
// Tests the construction of canonical string by the
// Stringer method for StorageAPI.
func TestStorageCanonicalStrings(t *testing.T) {
testCases := []struct {
storageAPI StorageAPI
canonicalPath string
}{
// Canonicalized name as unix path.
{
storageAPI: &posix{
diskPath: "/tmp",
},
canonicalPath: "/tmp",
},
// Canonicalized name as windows path.
{
storageAPI: &posix{
diskPath: "C:/tmp",
},
canonicalPath: "C:/tmp",
},
// Canonicalized name as unix path.
{
storageAPI: &networkStorage{
rpcClient: newAuthRPCClient(authConfig{
accessKey: "",
secretKey: "",
serverAddr: "localhost:9000",
serviceEndpoint: "/tmp",
secureConn: false,
serviceName: "Storage",
disableReconnect: true,
}),
},
canonicalPath: "http://localhost:9000/tmp",
},
// Canonicalized name as non TLS.
{
storageAPI: &networkStorage{
rpcClient: newAuthRPCClient(authConfig{
accessKey: "",
secretKey: "",
serverAddr: "localhost:9000",
serviceEndpoint: "C:/tmp",
secureConn: false,
serviceName: "Storage",
disableReconnect: true,
}),
},
canonicalPath: "http://localhost:9000/C:/tmp",
},
// Canonicalized name as TLS.
{
storageAPI: &networkStorage{
rpcClient: newAuthRPCClient(authConfig{
accessKey: "",
secretKey: "",
serverAddr: "localhost:9000",
serviceEndpoint: "C:/tmp",
secureConn: true,
serviceName: "Storage",
disableReconnect: true,
}),
},
canonicalPath: "https://localhost:9000/C:/tmp",
},
}
// Validate all the test cases.
for i, testCase := range testCases {
p := testCase.storageAPI
if p.String() != testCase.canonicalPath {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.canonicalPath, p.String())
}
}
}
// Tests storage error transformation.
func TestStorageErr(t *testing.T) {
unknownErr := errors.New("Unknown error")