simplify storage class validation (#5470)

Check if the storage class is set in an
non XL setup instead of relying on `globalEndpoints`
value. Also converge the checks for both SS
and RRS parity configuration.

This PR also removes redundant `tt.name` in all
test cases, since each testcase doesn't need to
be numbered explicitly they are numbered implicitly.
This commit is contained in:
Harshavardhana 2018-01-31 23:30:07 -08:00 committed by Nitish Tiwari
parent 033cfb5cef
commit 3316dbc037
4 changed files with 133 additions and 200 deletions

View File

@ -137,13 +137,13 @@ func handleCommonEnvVars() {
// Validation is done after parsing both the storage classes. This is needed because we need one
// storage class value to deduce the correct value of the other storage class.
if globalRRStorageClass.Scheme != "" {
err := validateRRSParity(globalRRStorageClass.Parity, globalStandardStorageClass.Parity)
err = validateParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity)
fatalIf(err, "Invalid value set in environment variable %s.", reducedRedundancyStorageClassEnv)
globalIsStorageClass = true
}
if globalStandardStorageClass.Scheme != "" {
err := validateSSParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity)
err = validateParity(globalStandardStorageClass.Parity, globalRRStorageClass.Parity)
fatalIf(err, "Invalid value set in environment variable %s.", standardStorageClassEnv)
globalIsStorageClass = true
}

View File

@ -91,27 +91,9 @@ func (s *serverConfig) SetStorageClass(standardClass, rrsClass storageClass) {
s.StorageClass.RRS = rrsClass
}
// GetStorageClass reads storage class fields from current config, parses and validates it.
// GetStorageClass reads storage class fields from current config.
// It returns the standard and reduced redundancy storage class struct
func (s *serverConfig) GetStorageClass() (storageClass, storageClass) {
var err error
// Storage Class from config.json is already parsed and stored in s.StorageClass
// Now validate the storage class fields
ssc := s.StorageClass.Standard
rrsc := s.StorageClass.RRS
if rrsc.Scheme != "" {
err = validateRRSParity(rrsc.Parity, ssc.Parity)
fatalIf(err, "Invalid value %s:%d set in config.json", rrsc.Scheme, rrsc.Parity)
globalIsStorageClass = true
}
if ssc.Scheme != "" {
err = validateSSParity(ssc.Parity, rrsc.Parity)
fatalIf(err, "Invalid value %s:%d set in config.json", ssc.Scheme, ssc.Parity)
globalIsStorageClass = true
}
return s.StorageClass.Standard, s.StorageClass.RRS
}
@ -172,12 +154,15 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string {
func newServerConfig() *serverConfig {
srvCfg := &serverConfig{
Version: serverConfigVersion,
Credential: auth.MustGetNewCredentials(),
Region: globalMinioDefaultRegion,
Browser: true,
StorageClass: storageClassConfig{},
Notify: notifier{},
Version: serverConfigVersion,
Credential: auth.MustGetNewCredentials(),
Region: globalMinioDefaultRegion,
Browser: true,
StorageClass: storageClassConfig{
Standard: storageClass{},
RRS: storageClass{},
},
Notify: notifier{},
}
// Make sure to initialize notification configs.

View File

@ -17,6 +17,7 @@
package cmd
import (
"encoding/json"
"errors"
"fmt"
"strconv"
@ -54,6 +55,20 @@ type storageClassConfig struct {
RRS storageClass `json:"rrs"`
}
// Validate SS and RRS parity when unmarshalling JSON.
func (sCfg *storageClassConfig) UnmarshalJSON(data []byte) error {
type Alias storageClassConfig
aux := &struct {
*Alias
}{
Alias: (*Alias)(sCfg),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
return validateParity(aux.Standard.Parity, aux.RRS.Parity)
}
// Validate if storage class in metadata
// Only Standard and RRS Storage classes are supported
func isValidStorageClassMeta(sc string) bool {
@ -62,17 +77,15 @@ func isValidStorageClassMeta(sc string) bool {
func (sc *storageClass) UnmarshalText(b []byte) error {
scStr := string(b)
if scStr != "" {
s, err := parseStorageClass(scStr)
if err != nil {
return err
}
sc.Parity = s.Parity
sc.Scheme = s.Scheme
} else {
sc = &storageClass{}
if scStr == "" {
return nil
}
s, err := parseStorageClass(scStr)
if err != nil {
return err
}
sc.Parity = s.Parity
sc.Scheme = s.Scheme
return nil
}
@ -115,68 +128,39 @@ func parseStorageClass(storageClassEnv string) (sc storageClass, err error) {
return sc, nil
}
// Validates the parity disks for Reduced Redundancy storage class
func validateRRSParity(rrsParity, ssParity int) (err error) {
disks := len(globalEndpoints)
// disks < 4 means this is not a erasure coded setup and so storage class is not supported
if disks < 4 {
// Validates the parity disks.
func validateParity(ssParity, rrsParity int) (err error) {
if ssParity == 0 && rrsParity == 0 {
return nil
}
if !globalIsXL {
return fmt.Errorf("Setting storage class only allowed for erasure coding mode")
}
// Reduced redundancy storage class is not supported for 4 disks erasure coded setup.
if disks == 4 && rrsParity != 0 {
return fmt.Errorf("Reduced redundancy storage class not supported for " + strconv.Itoa(disks) + " disk setup")
// SS parity disks should be greater than or equal to minimumParityDisks. Parity below minimumParityDisks is not recommended.
if ssParity > 0 && ssParity < minimumParityDisks {
return fmt.Errorf("Standard storage class parity %d should be greater than or equal to %d", ssParity, minimumParityDisks)
}
// RRS parity disks should be greater than or equal to minimumParityDisks. Parity below minimumParityDisks is not recommended.
if rrsParity < minimumParityDisks {
return fmt.Errorf("Reduced redundancy storage class parity should be greater than or equal to " + strconv.Itoa(minimumParityDisks))
if rrsParity > 0 && rrsParity < minimumParityDisks {
return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minimumParityDisks)
}
// Reduced redundancy implies lesser parity than standard storage class. So, RRS parity disks should be
// - less than N/2, if StorageClass parity is not set.
// - less than StorageClass Parity, if Storage class parity is set.
switch ssParity {
case 0:
if rrsParity >= disks/2 {
return fmt.Errorf("Reduced redundancy storage class parity disks should be less than " + strconv.Itoa(disks/2))
}
default:
if rrsParity >= ssParity {
return fmt.Errorf("Reduced redundancy storage class parity disks should be less than " + strconv.Itoa(ssParity))
if ssParity > len(globalEndpoints)/2 {
return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, len(globalEndpoints)/2)
}
if rrsParity > len(globalEndpoints)/2 {
return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, len(globalEndpoints)/2)
}
if ssParity > 0 && rrsParity > 0 {
if ssParity < rrsParity {
return fmt.Errorf("Standard storage class parity disks %d should be greater than or equal to Reduced redundancy storage class parity disks %d", ssParity, rrsParity)
}
}
return nil
}
// Validates the parity disks for Standard storage class
func validateSSParity(ssParity, rrsParity int) (err error) {
disks := len(globalEndpoints)
// disks < 4 means this is not a erasure coded setup and so storage class is not supported
if disks < 4 {
return fmt.Errorf("Setting storage class only allowed for erasure coding mode")
}
// Standard storage class implies more parity than Reduced redundancy storage class. So, Standard storage parity disks should be
// - greater than or equal to 2, if RRS parity is not set.
// - greater than RRS Parity, if RRS parity is set.
switch rrsParity {
case 0:
if ssParity < minimumParityDisks {
return fmt.Errorf("Standard storage class parity disks should be greater than or equal to " + strconv.Itoa(minimumParityDisks))
}
default:
if ssParity <= rrsParity {
return fmt.Errorf("Standard storage class parity disks should be greater than " + strconv.Itoa(rrsParity))
}
}
// Standard storage class parity should be less than or equal to N/2
if ssParity > disks/2 {
return fmt.Errorf("Standard storage class parity disks should be less than or equal to " + strconv.Itoa(disks/2))
}
return nil
}

View File

@ -29,125 +29,93 @@ func TestParseStorageClass(t *testing.T) {
func testParseStorageClass(obj ObjectLayer, instanceType string, t TestErrHandler) {
tests := []struct {
name int
storageClassEnv string
wantSc storageClass
expectedError error
}{
{1, "EC:3", storageClass{
{"EC:3", storageClass{
Scheme: "EC",
Parity: 3},
nil},
{2, "EC:4", storageClass{
{"EC:4", storageClass{
Scheme: "EC",
Parity: 4},
nil},
{3, "AB:4", storageClass{
{"AB:4", storageClass{
Scheme: "EC",
Parity: 4},
errors.New("Unsupported scheme AB. Supported scheme is EC")},
{4, "EC:4:5", storageClass{
{"EC:4:5", storageClass{
Scheme: "EC",
Parity: 4},
errors.New("Too many sections in EC:4:5")},
{5, "AB", storageClass{
{"AB", storageClass{
Scheme: "EC",
Parity: 4},
errors.New("Too few sections in AB")},
}
for _, tt := range tests {
for i, tt := range tests {
gotSc, err := parseStorageClass(tt.storageClassEnv)
if err != nil && tt.expectedError == nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err)
return
}
if err == nil && tt.expectedError != nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err)
return
}
if tt.expectedError == nil && !reflect.DeepEqual(gotSc, tt.wantSc) {
t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.wantSc, gotSc)
t.Errorf("Test %d, Expected %v, got %v", i+1, tt.wantSc, gotSc)
return
}
if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) {
t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err)
t.Errorf("Test %d, Expected %v, got %v", i+1, tt.expectedError, err)
}
}
}
func TestValidateRRSParity(t *testing.T) {
ExecObjectLayerTestWithDirs(t, testValidateRRSParity)
func TestValidateParity(t *testing.T) {
ExecObjectLayerTestWithDirs(t, testValidateParity)
}
func testValidateRRSParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) {
func testValidateParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) {
// Reset global storage class flags
resetGlobalStorageEnvs()
// Set globalEndpoints for a single node XL setup.
endpoints := globalEndpoints
defer func() {
globalEndpoints = endpoints
}()
isXL := globalIsXL
defer func() {
globalIsXL = isXL
}()
globalIsXL = true
globalEndpoints = mustGetNewEndpointList(dirs...)
tests := []struct {
name int
rrsParity int
ssParity int
expectedError error
rrsParity int
ssParity int
success bool
}{
{1, 2, 4, nil},
{2, 1, 4, errors.New("Reduced redundancy storage class parity should be greater than or equal to 2")},
{3, 7, 6, errors.New("Reduced redundancy storage class parity disks should be less than 6")},
{4, 9, 0, errors.New("Reduced redundancy storage class parity disks should be less than 8")},
{5, 3, 3, errors.New("Reduced redundancy storage class parity disks should be less than 3")},
{2, 4, true},
{3, 3, true},
{1, 4, false},
{7, 6, false},
{9, 0, false},
{9, 9, false},
{2, 9, false},
}
for _, tt := range tests {
err := validateRRSParity(tt.rrsParity, tt.ssParity)
if err != nil && tt.expectedError == nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
return
for i, tt := range tests {
err := validateParity(tt.ssParity, tt.rrsParity)
if err != nil && tt.success {
t.Errorf("Test %d, Expected success, got %s", i+1, err)
}
if err == nil && tt.expectedError != nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
return
}
if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) {
t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err)
}
}
}
func TestValidateSSParity(t *testing.T) {
ExecObjectLayerTestWithDirs(t, testValidateSSParity)
}
func testValidateSSParity(obj ObjectLayer, instanceType string, dirs []string, t TestErrHandler) {
// Reset global storage class flags
resetGlobalStorageEnvs()
// Set globalEndpoints for a single node XL setup.
globalEndpoints = mustGetNewEndpointList(dirs...)
tests := []struct {
name int
ssParity int
rrsParity int
expectedError error
}{
{1, 4, 2, nil},
{2, 6, 5, nil},
{3, 1, 0, errors.New("Standard storage class parity disks should be greater than or equal to 2")},
{4, 4, 6, errors.New("Standard storage class parity disks should be greater than 6")},
{5, 9, 0, errors.New("Standard storage class parity disks should be less than or equal to 8")},
{6, 3, 3, errors.New("Standard storage class parity disks should be greater than 3")},
}
for _, tt := range tests {
err := validateSSParity(tt.ssParity, tt.rrsParity)
if err != nil && tt.expectedError == nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
return
}
if err == nil && tt.expectedError != nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
return
}
if tt.expectedError != nil && !reflect.DeepEqual(err, tt.expectedError) {
t.Errorf("Test %d, Expected %v, got %v", tt.name, tt.expectedError, err)
if err == nil && !tt.success {
t.Errorf("Test %d, Expected failure, got success", i+1)
}
}
}
@ -162,39 +130,38 @@ func testGetRedundancyCount(obj ObjectLayer, instanceType string, dirs []string,
xl := obj.(*xlObjects)
tests := []struct {
name int
sc string
disks []StorageAPI
expectedData int
expectedParity int
}{
{1, reducedRedundancyStorageClass, xl.storageDisks, 14, 2},
{2, standardStorageClass, xl.storageDisks, 8, 8},
{3, "", xl.storageDisks, 8, 8},
{4, reducedRedundancyStorageClass, xl.storageDisks, 9, 7},
{5, standardStorageClass, xl.storageDisks, 10, 6},
{6, "", xl.storageDisks, 9, 7},
{reducedRedundancyStorageClass, xl.storageDisks, 14, 2},
{standardStorageClass, xl.storageDisks, 8, 8},
{"", xl.storageDisks, 8, 8},
{reducedRedundancyStorageClass, xl.storageDisks, 9, 7},
{standardStorageClass, xl.storageDisks, 10, 6},
{"", xl.storageDisks, 9, 7},
}
for _, tt := range tests {
for i, tt := range tests {
// Set env var for test case 4
if tt.name == 4 {
if i+1 == 4 {
globalRRStorageClass.Parity = 7
}
// Set env var for test case 5
if tt.name == 5 {
if i+1 == 5 {
globalStandardStorageClass.Parity = 6
}
// Set env var for test case 6
if tt.name == 6 {
if i+1 == 6 {
globalStandardStorageClass.Parity = 7
}
data, parity := getRedundancyCount(tt.sc, len(tt.disks))
if data != tt.expectedData {
t.Errorf("Test %d, Expected data disks %d, got %d", tt.name, tt.expectedData, data)
t.Errorf("Test %d, Expected data disks %d, got %d", i+1, tt.expectedData, data)
return
}
if parity != tt.expectedParity {
t.Errorf("Test %d, Expected parity disks %d, got %d", tt.name, tt.expectedParity, parity)
t.Errorf("Test %d, Expected parity disks %d, got %d", i+1, tt.expectedParity, parity)
return
}
}
@ -322,38 +289,36 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
parts7, errs7 := readAllXLMetadata(xlDisks, bucket, object7)
tests := []struct {
name int
xl xlObjects
parts []xlMetaV1
errs []error
expectedReadQuorum int
expectedWriteQuorum int
expectedError error
}{
{1, *xl, parts1, errs1, 8, 9, nil},
{2, *xl, parts2, errs2, 14, 15, nil},
{3, *xl, parts3, errs3, 8, 9, nil},
{4, *xl, parts4, errs4, 10, 11, nil},
{5, *xl, parts5, errs5, 14, 15, nil},
{6, *xl, parts6, errs6, 8, 9, nil},
{7, *xl, parts7, errs7, 14, 15, nil},
{parts1, errs1, 8, 9, nil},
{parts2, errs2, 14, 15, nil},
{parts3, errs3, 8, 9, nil},
{parts4, errs4, 10, 11, nil},
{parts5, errs5, 14, 15, nil},
{parts6, errs6, 8, 9, nil},
{parts7, errs7, 14, 15, nil},
}
for _, tt := range tests {
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(tt.xl, tt.parts, tt.errs)
for i, tt := range tests {
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(*xl, tt.parts, tt.errs)
if tt.expectedError != nil && err == nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err)
return
}
if tt.expectedError == nil && err != nil {
t.Errorf("Test %d, Expected %s, got %s", tt.name, tt.expectedError, err)
t.Errorf("Test %d, Expected %s, got %s", i+1, tt.expectedError, err)
return
}
if tt.expectedReadQuorum != actualReadQuorum {
t.Errorf("Test %d, Expected Read Quorum %d, got %d", tt.name, tt.expectedReadQuorum, actualReadQuorum)
t.Errorf("Test %d, Expected Read Quorum %d, got %d", i+1, tt.expectedReadQuorum, actualReadQuorum)
return
}
if tt.expectedWriteQuorum != actualWriteQuorum {
t.Errorf("Test %d, Expected Write Quorum %d, got %d", tt.name, tt.expectedWriteQuorum, actualWriteQuorum)
t.Errorf("Test %d, Expected Write Quorum %d, got %d", i+1, tt.expectedWriteQuorum, actualWriteQuorum)
return
}
}
@ -362,21 +327,20 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
// Test isValidStorageClassMeta method with valid and invalid inputs
func TestIsValidStorageClassMeta(t *testing.T) {
tests := []struct {
name int
sc string
want bool
}{
{1, "STANDARD", true},
{2, "REDUCED_REDUNDANCY", true},
{3, "", false},
{4, "INVALID", false},
{5, "123", false},
{6, "MINIO_STORAGE_CLASS_RRS", false},
{7, "MINIO_STORAGE_CLASS_STANDARD", false},
{"STANDARD", true},
{"REDUCED_REDUNDANCY", true},
{"", false},
{"INVALID", false},
{"123", false},
{"MINIO_STORAGE_CLASS_RRS", false},
{"MINIO_STORAGE_CLASS_STANDARD", false},
}
for _, tt := range tests {
for i, tt := range tests {
if got := isValidStorageClassMeta(tt.sc); got != tt.want {
t.Errorf("Test %d, Expected Storage Class to be %t, got %t", tt.name, tt.want, got)
t.Errorf("Test %d, Expected Storage Class to be %t, got %t", i+1, tt.want, got)
}
}
}