fix: allow set drive count of proper divisible values (#9101)

Currently the code assumed some orthogonal requirements
which led situations where when we have a setup where
we have let's say for example 168 drives, the final
set_drive_count chosen was 14. Indeed 168 drives are
divisible by 12 but this wasn't allowed due to an
unexpected requirement to have 12 to be a perfect modulo
of 14 which is not possible. This assumption was incorrect.

This PR fixes this old assumption properly, also adds
few tests and some negative tests as well. Improvements
are seen in error messages as well.
This commit is contained in:
Harshavardhana 2020-03-08 13:30:25 -07:00 committed by GitHub
parent 792ee48d2c
commit 6a00eb10bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 11 deletions

View File

@ -1,5 +1,5 @@
/* /*
* MinIO Cloud Storage, (C) 2018 MinIO, Inc. * MinIO Cloud Storage, (C) 2018-2020 MinIO, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -108,12 +108,24 @@ func getSetIndexes(args []string, totalSizes []uint64, customSetDriveCount uint6
return ss return ss
} }
setCounts := possibleSetCounts(commonSize)
if len(setCounts) == 0 {
msg := fmt.Sprintf("Incorrect number of endpoints provided %s, number of disks %d is not divisible by any supported erasure set sizes %d", args, commonSize, setSizes)
return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)
}
if customSetDriveCount > 0 { if customSetDriveCount > 0 {
msg := fmt.Sprintf("Invalid set drive count, leads to non-uniform distribution for the given number of disks. Possible values for custom set count are %d", possibleSetCounts(setSize)) msg := fmt.Sprintf("Invalid set drive count. Acceptable values for %d number drives are %d", commonSize, setCounts)
if customSetDriveCount > setSize { if customSetDriveCount > setSize {
return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg) return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg)
} }
if setSize%customSetDriveCount != 0 { var found bool
for _, ss := range setCounts {
if ss == customSetDriveCount {
found = true
}
}
if !found {
return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg) return nil, config.ErrInvalidErasureSetSize(nil).Msg(msg)
} }
setSize = customSetDriveCount setSize = customSetDriveCount
@ -121,7 +133,7 @@ func getSetIndexes(args []string, totalSizes []uint64, customSetDriveCount uint6
// Check whether setSize is with the supported range. // Check whether setSize is with the supported range.
if !isValidSetSize(setSize) { if !isValidSetSize(setSize) {
msg := fmt.Sprintf("Incorrect number of endpoints provided %s", args) msg := fmt.Sprintf("Incorrect number of endpoints provided %s, number of disks %d is not divisible by any supported erasure set sizes %d", args, commonSize, setSizes)
return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg) return nil, config.ErrInvalidNumberOfErasureEndpoints(nil).Msg(msg)
} }
@ -261,9 +273,6 @@ func createServerEndpoints(serverAddr string, args ...string) (
if err != nil { if err != nil {
return nil, -1, -1, config.ErrInvalidErasureSetSize(err) return nil, -1, -1, config.ErrInvalidErasureSetSize(err)
} }
if !isValidSetSize(uint64(setDriveCount)) {
return nil, -1, -1, config.ErrInvalidErasureSetSize(nil)
}
} }
if !ellipses.HasEllipses(args...) { if !ellipses.HasEllipses(args...) {

View File

@ -104,6 +104,36 @@ func TestGetSetIndexesEnvOverride(t *testing.T) {
8, 8,
true, true,
}, },
{
[]string{"http://host{1...12}/data{1...12}"},
[]uint64{144},
[][]uint64{{16, 16, 16, 16, 16, 16, 16, 16, 16}},
16,
true,
},
{
[]string{"http://host{0...5}/data{1...28}"},
[]uint64{168},
[][]uint64{{12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12}},
12,
true,
},
// Incorrect custom set drive count.
{
[]string{"http://host{0...5}/data{1...28}"},
[]uint64{168},
nil,
10,
false,
},
// Failure not divisible number of disks.
{
[]string{"http://host{1...11}/data{1...11}"},
[]uint64{121},
nil,
11,
false,
},
{ {
[]string{"data{1...60}"}, []string{"data{1...60}"},
nil, nil,

View File

@ -426,7 +426,7 @@ func checkFormatXLValue(formatXL *formatXLV3) error {
} }
// Check all format values. // Check all format values.
func checkFormatXLValues(formats []*formatXLV3) error { func checkFormatXLValues(formats []*formatXLV3, drivesPerSet int) error {
for i, formatXL := range formats { for i, formatXL := range formats {
if formatXL == nil { if formatXL == nil {
continue continue
@ -438,6 +438,9 @@ func checkFormatXLValues(formats []*formatXLV3) error {
return fmt.Errorf("%s disk is already being used in another erasure deployment. (Number of disks specified: %d but the number of disks found in the %s disk's format.json: %d)", return fmt.Errorf("%s disk is already being used in another erasure deployment. (Number of disks specified: %d but the number of disks found in the %s disk's format.json: %d)",
humanize.Ordinal(i+1), len(formats), humanize.Ordinal(i+1), len(formatXL.XL.Sets)*len(formatXL.XL.Sets[0])) humanize.Ordinal(i+1), len(formats), humanize.Ordinal(i+1), len(formatXL.XL.Sets)*len(formatXL.XL.Sets[0]))
} }
if len(formatXL.XL.Sets[0]) != drivesPerSet {
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatXL.XL.Sets[0]), drivesPerSet)
}
} }
return nil return nil
} }

View File

@ -255,7 +255,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
// most part unless one of the formats is not consistent // most part unless one of the formats is not consistent
// with expected XL format. For example if a user is // with expected XL format. For example if a user is
// trying to pool FS backend into an XL set. // trying to pool FS backend into an XL set.
if err := checkFormatXLValues(formatConfigs); err != nil { if err := checkFormatXLValues(formatConfigs, drivesPerSet); err != nil {
return nil, err return nil, err
} }

View File

@ -1293,7 +1293,7 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
}(storageDisks) }(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks) formats, sErrs := loadFormatXLAll(storageDisks)
if err = checkFormatXLValues(formats); err != nil { if err = checkFormatXLValues(formats, s.drivesPerSet); err != nil {
return err return err
} }
@ -1406,7 +1406,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
markRootDisksAsDown(storageDisks) markRootDisksAsDown(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks) formats, sErrs := loadFormatXLAll(storageDisks)
if err = checkFormatXLValues(formats); err != nil { if err = checkFormatXLValues(formats, s.drivesPerSet); err != nil {
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }