mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
Revert "Support variable server sets (#10314)"
This reverts commit aabf053d2f.
This commit is contained in:
@@ -220,7 +220,7 @@ var (
|
||||
globalServerConfigMu sync.RWMutex
|
||||
)
|
||||
|
||||
func validateConfig(s config.Config, minSetDriveCount int) error {
|
||||
func validateConfig(s config.Config, setDriveCount int) error {
|
||||
// Disable merging env values with config for validation.
|
||||
env.SetEnvOff()
|
||||
|
||||
@@ -240,7 +240,7 @@ func validateConfig(s config.Config, minSetDriveCount int) error {
|
||||
}
|
||||
|
||||
if globalIsErasure {
|
||||
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], minSetDriveCount, false); err != nil {
|
||||
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -320,7 +320,7 @@ func validateConfig(s config.Config, minSetDriveCount int) error {
|
||||
return notify.TestNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs())
|
||||
}
|
||||
|
||||
func lookupConfigs(s config.Config, minSetDriveCount int, freshConfig bool) {
|
||||
func lookupConfigs(s config.Config, setDriveCount int) {
|
||||
ctx := GlobalContext
|
||||
|
||||
var err error
|
||||
@@ -407,7 +407,7 @@ func lookupConfigs(s config.Config, minSetDriveCount int, freshConfig bool) {
|
||||
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
||||
}
|
||||
|
||||
globalAPIConfig.init(apiConfig, minSetDriveCount)
|
||||
globalAPIConfig.init(apiConfig, setDriveCount)
|
||||
|
||||
// Initialize remote instance transport once.
|
||||
getRemoteInstanceTransportOnce.Do(func() {
|
||||
@@ -415,7 +415,7 @@ func lookupConfigs(s config.Config, minSetDriveCount int, freshConfig bool) {
|
||||
})
|
||||
|
||||
if globalIsErasure {
|
||||
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], minSetDriveCount, freshConfig)
|
||||
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
||||
}
|
||||
@@ -641,19 +641,14 @@ func getValidConfig(objAPI ObjectLayer) (config.Config, error) {
|
||||
|
||||
// loadConfig - loads a new config from disk, overrides params
|
||||
// from env if found and valid
|
||||
func loadConfig(objAPI ObjectLayer, freshConfig bool) (err error) {
|
||||
var srvCfg config.Config
|
||||
if !freshConfig {
|
||||
srvCfg, err = getValidConfig(objAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
srvCfg = globalServerConfig
|
||||
func loadConfig(objAPI ObjectLayer) error {
|
||||
srvCfg, err := getValidConfig(objAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, objAPI.SetDriveCount(), freshConfig)
|
||||
lookupConfigs(srvCfg, objAPI.SetDriveCount())
|
||||
|
||||
// hold the mutex lock before a new config is assigned.
|
||||
globalServerConfigMu.Lock()
|
||||
|
||||
@@ -54,7 +54,7 @@ func TestServerConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initialize server config.
|
||||
if err := loadConfig(objLayer, false); err != nil {
|
||||
if err := loadConfig(objLayer); err != nil {
|
||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2423,7 +2423,7 @@ func migrateV27ToV28() error {
|
||||
|
||||
// Migrates ${HOME}/.minio/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||
// if etcd is configured then migrates /config/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||
func migrateConfigToMinioSys(objAPI ObjectLayer) (freshConfig bool, err error) {
|
||||
func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
||||
// Construct path to config.json for the given bucket.
|
||||
configFile := path.Join(minioConfigPrefix, minioConfigFile)
|
||||
|
||||
@@ -2441,7 +2441,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (freshConfig bool, err error) {
|
||||
|
||||
// Verify if backend already has the file (after holding lock)
|
||||
if err = checkConfig(GlobalContext, objAPI, configFile); err != errConfigNotFound {
|
||||
return false, err
|
||||
return err
|
||||
} // if errConfigNotFound proceed to migrate..
|
||||
|
||||
var configFiles = []string{
|
||||
@@ -2453,7 +2453,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (freshConfig bool, err error) {
|
||||
for _, cfgFile := range configFiles {
|
||||
if _, err = Load(cfgFile, config); err != nil {
|
||||
if !osIsNotExist(err) && !osIsPermission(err) {
|
||||
return false, err
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -2464,9 +2464,9 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (freshConfig bool, err error) {
|
||||
}
|
||||
if osIsNotExist(err) || osIsPermission(err) {
|
||||
// Initialize the server config, if no config exists.
|
||||
return true, newSrvConfig(objAPI)
|
||||
return newSrvConfig(objAPI)
|
||||
}
|
||||
return false, saveServerConfig(GlobalContext, objAPI, config)
|
||||
return saveServerConfig(GlobalContext, objAPI, config)
|
||||
}
|
||||
|
||||
// Migrates '.minio.sys/config.json' to v33.
|
||||
|
||||
@@ -65,7 +65,7 @@ func TestServerConfigMigrateV1(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initialize server config and check again if everything is fine
|
||||
if err := loadConfig(objLayer, true); err != nil {
|
||||
if err := loadConfig(objLayer); err != nil {
|
||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||
}
|
||||
}
|
||||
@@ -202,8 +202,7 @@ func TestServerConfigMigrateV2toV33(t *testing.T) {
|
||||
t.Fatal("Unexpected error: ", err)
|
||||
}
|
||||
|
||||
freshConfig, err := migrateConfigToMinioSys(objLayer)
|
||||
if err != nil {
|
||||
if err := migrateConfigToMinioSys(objLayer); err != nil {
|
||||
t.Fatal("Unexpected error: ", err)
|
||||
}
|
||||
|
||||
@@ -216,7 +215,7 @@ func TestServerConfigMigrateV2toV33(t *testing.T) {
|
||||
}
|
||||
|
||||
// Initialize server config and check again if everything is fine
|
||||
if err := loadConfig(objLayer, freshConfig); err != nil {
|
||||
if err := loadConfig(objLayer); err != nil {
|
||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -215,23 +215,20 @@ func initConfig(objAPI ObjectLayer) error {
|
||||
// ignore if the file doesn't exist.
|
||||
// If etcd is set then migrates /config/config.json
|
||||
// to '<export_path>/.minio.sys/config/config.json'
|
||||
freshConfig, err := migrateConfigToMinioSys(objAPI)
|
||||
if err != nil {
|
||||
if err := migrateConfigToMinioSys(objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !freshConfig {
|
||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
|
||||
if err := migrateMinioSysConfig(objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to
|
||||
// latest config format.
|
||||
if err := migrateMinioSysConfigToKV(objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
|
||||
if err := migrateMinioSysConfig(objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return loadConfig(objAPI, freshConfig)
|
||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to
|
||||
// latest config format.
|
||||
if err := migrateMinioSysConfigToKV(objAPI); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return loadConfig(objAPI)
|
||||
}
|
||||
|
||||
@@ -88,13 +88,14 @@ var (
|
||||
// StorageClass - holds storage class information
|
||||
type StorageClass struct {
|
||||
Parity int
|
||||
DMA string
|
||||
}
|
||||
|
||||
// Config storage class configuration
|
||||
type Config struct {
|
||||
Standard StorageClass `json:"standard"`
|
||||
RRS StorageClass `json:"rrs"`
|
||||
DMA string `json:"dma"`
|
||||
DMA StorageClass `json:"dma"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||
@@ -111,7 +112,7 @@ func (sCfg *Config) UnmarshalJSON(data []byte) error {
|
||||
// IsValid - returns true if input string is a valid
|
||||
// storage class kind supported.
|
||||
func IsValid(sc string) bool {
|
||||
return sc == RRS || sc == STANDARD
|
||||
return sc == RRS || sc == STANDARD || sc == DMA
|
||||
}
|
||||
|
||||
// UnmarshalText unmarshals storage class from its textual form into
|
||||
@@ -121,6 +122,14 @@ func (sc *StorageClass) UnmarshalText(b []byte) error {
|
||||
if scStr == "" {
|
||||
return nil
|
||||
}
|
||||
if scStr == DMAWrite {
|
||||
sc.DMA = DMAWrite
|
||||
return nil
|
||||
}
|
||||
if scStr == DMAReadWrite {
|
||||
sc.DMA = DMAReadWrite
|
||||
return nil
|
||||
}
|
||||
s, err := parseStorageClass(scStr)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -134,14 +143,14 @@ func (sc *StorageClass) MarshalText() ([]byte, error) {
|
||||
if sc.Parity != 0 {
|
||||
return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil
|
||||
}
|
||||
return []byte{}, nil
|
||||
return []byte(sc.DMA), nil
|
||||
}
|
||||
|
||||
func (sc *StorageClass) String() string {
|
||||
if sc.Parity != 0 {
|
||||
return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)
|
||||
}
|
||||
return ""
|
||||
return sc.DMA
|
||||
}
|
||||
|
||||
// Parses given storageClassEnv and returns a storageClass structure.
|
||||
@@ -213,10 +222,8 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
|
||||
// or config.json fields
|
||||
// -- corresponding values are returned
|
||||
// If storage class is not set during startup, default values are returned
|
||||
// -- Default for Reduced Redundancy Storage class is, parity = 2
|
||||
// -- Default for Standard Storage class is, parity = 2 - disks 4, 5
|
||||
// -- Default for Standard Storage class is, parity = 3 - disks 6, 7
|
||||
// -- Default for Standard Storage class is, parity = 4 - disks 8 to 16
|
||||
// -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity
|
||||
// -- Default for Standard Storage class is, parity = N/2, data = N/2
|
||||
// If storage class is empty
|
||||
// -- standard storage class is assumed and corresponding data and parity is returned
|
||||
func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
||||
@@ -234,7 +241,7 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
||||
|
||||
// GetDMA - returns DMA configuration.
|
||||
func (sCfg Config) GetDMA() string {
|
||||
return sCfg.DMA
|
||||
return sCfg.DMA.DMA
|
||||
}
|
||||
|
||||
// Enabled returns if etcd is enabled.
|
||||
@@ -245,23 +252,9 @@ func Enabled(kvs config.KVS) bool {
|
||||
}
|
||||
|
||||
// LookupConfig - lookup storage class config and override with valid environment settings if any.
|
||||
func LookupConfig(kvs config.KVS, setDriveCount int, freshConfig bool) (cfg Config, err error) {
|
||||
func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
||||
cfg = Config{}
|
||||
var defaultStdParity int
|
||||
if freshConfig {
|
||||
switch setDriveCount {
|
||||
case 4, 5:
|
||||
defaultStdParity = 2
|
||||
case 6, 7:
|
||||
defaultStdParity = 3
|
||||
default:
|
||||
defaultStdParity = 4
|
||||
}
|
||||
} else {
|
||||
defaultStdParity = setDriveCount / 2
|
||||
}
|
||||
|
||||
cfg.Standard.Parity = defaultStdParity
|
||||
cfg.Standard.Parity = setDriveCount / 2
|
||||
cfg.RRS.Parity = defaultRRSParity
|
||||
|
||||
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
|
||||
@@ -279,7 +272,7 @@ func LookupConfig(kvs config.KVS, setDriveCount int, freshConfig bool) (cfg Conf
|
||||
}
|
||||
}
|
||||
if cfg.Standard.Parity == 0 {
|
||||
cfg.Standard.Parity = defaultStdParity
|
||||
cfg.Standard.Parity = setDriveCount / 2
|
||||
}
|
||||
|
||||
if rrsc != "" {
|
||||
@@ -298,7 +291,7 @@ func LookupConfig(kvs config.KVS, setDriveCount int, freshConfig bool) (cfg Conf
|
||||
if dma != DMAReadWrite && dma != DMAWrite {
|
||||
return Config{}, errors.New(`valid dma values are "read-write" and "write"`)
|
||||
}
|
||||
cfg.DMA = dma
|
||||
cfg.DMA.DMA = dma
|
||||
|
||||
// Validation is done after parsing both the storage classes. This is needed because we need one
|
||||
// storage class value to deduce the correct value of the other storage class.
|
||||
|
||||
@@ -276,16 +276,7 @@ func parseEndpointSet(customSetDriveCount uint64, args ...string) (ep endpointSe
|
||||
// specific set size.
|
||||
// For example: {1...64} is divided into 4 sets each of size 16.
|
||||
// This applies to even distributed setup syntax as well.
|
||||
func GetAllSets(args ...string) ([][]string, error) {
|
||||
var customSetDriveCount uint64
|
||||
if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {
|
||||
driveCount, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, config.ErrInvalidErasureSetSize(err)
|
||||
}
|
||||
customSetDriveCount = uint64(driveCount)
|
||||
}
|
||||
|
||||
func GetAllSets(customSetDriveCount uint64, args ...string) ([][]string, error) {
|
||||
var setArgs [][]string
|
||||
if !ellipses.HasEllipses(args...) {
|
||||
var setIndexes [][]uint64
|
||||
@@ -344,8 +335,16 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
return nil, -1, errInvalidArgument
|
||||
}
|
||||
|
||||
var setDriveCount int
|
||||
if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {
|
||||
setDriveCount, err = strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return nil, -1, config.ErrInvalidErasureSetSize(err)
|
||||
}
|
||||
}
|
||||
|
||||
if !ellipses.HasEllipses(args...) {
|
||||
setArgs, err := GetAllSets(args...)
|
||||
setArgs, err := GetAllSets(uint64(setDriveCount), args...)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
@@ -364,7 +363,7 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
|
||||
var foundPrevLocal bool
|
||||
for _, arg := range args {
|
||||
setArgs, err := GetAllSets(arg)
|
||||
setArgs, err := GetAllSets(uint64(setDriveCount), arg)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
@@ -372,6 +371,9 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
if setDriveCount != 0 && setDriveCount != len(setArgs[0]) {
|
||||
return nil, -1, fmt.Errorf("All serverSets should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
||||
}
|
||||
if err = endpointServerSets.Add(ZoneEndpoints{
|
||||
SetCount: len(setArgs),
|
||||
DrivesPerSet: len(setArgs[0]),
|
||||
@@ -380,6 +382,9 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
return nil, -1, err
|
||||
}
|
||||
foundPrevLocal = endpointList.atleastOneEndpointLocal()
|
||||
if setDriveCount == 0 {
|
||||
setDriveCount = len(setArgs[0])
|
||||
}
|
||||
if setupType == UnknownSetupType {
|
||||
setupType = gotSetupType
|
||||
}
|
||||
|
||||
@@ -274,8 +274,8 @@ func TestHealObjectCorrupted(t *testing.T) {
|
||||
|
||||
// Test 4: checks if HealObject returns an error when xl.meta is not found
|
||||
// in more than read quorum number of disks, to create a corrupted situation.
|
||||
for i := 0; i <= nfi.Erasure.DataBlocks; i++ {
|
||||
erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||
for i := 0; i <= len(er.getDisks())/2; i++ {
|
||||
er.getDisks()[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||
}
|
||||
|
||||
// Try healing now, expect to receive errFileNotFound.
|
||||
|
||||
@@ -334,7 +334,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
|
||||
// Returns per object readQuorum and writeQuorum
|
||||
// readQuorum is the min required disks to read data.
|
||||
// writeQuorum is the min required disks to write data.
|
||||
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||
func objectQuorumFromMeta(ctx context.Context, er erasureObjects, partsMetaData []FileInfo, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
||||
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
||||
if err != nil {
|
||||
@@ -344,12 +344,12 @@ func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []
|
||||
dataBlocks := latestFileInfo.Erasure.DataBlocks
|
||||
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
||||
if parityBlocks == 0 {
|
||||
parityBlocks = getDefaultParityBlocks(len(partsMetaData))
|
||||
parityBlocks = dataBlocks
|
||||
}
|
||||
|
||||
writeQuorum := dataBlocks
|
||||
if dataBlocks == parityBlocks {
|
||||
writeQuorum++
|
||||
writeQuorum = dataBlocks + 1
|
||||
}
|
||||
|
||||
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
||||
|
||||
@@ -48,7 +48,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -258,7 +258,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
||||
onlineDisks := er.getDisks()
|
||||
parityBlocks := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
||||
if parityBlocks == 0 {
|
||||
parityBlocks = getDefaultParityBlocks(len(onlineDisks))
|
||||
parityBlocks = len(onlineDisks) / 2
|
||||
}
|
||||
dataBlocks := len(onlineDisks) - parityBlocks
|
||||
|
||||
@@ -387,7 +387,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||
uploadIDPath, "")
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return pi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -568,7 +568,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
@@ -616,7 +616,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||
}
|
||||
@@ -720,7 +720,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -905,7 +905,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
|
||||
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "")
|
||||
|
||||
// get Quorum for this object
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
|
||||
|
||||
// get Quorum for this object
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
@@ -370,7 +370,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return fi, nil, nil, err
|
||||
}
|
||||
@@ -586,7 +586,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
// writeQuorum is dataBlocks + 1
|
||||
writeQuorum := dataDrives
|
||||
if dataDrives == parityDrives {
|
||||
writeQuorum++
|
||||
writeQuorum = dataDrives + 1
|
||||
}
|
||||
|
||||
// Delete temporary object in the event of failure.
|
||||
@@ -1074,7 +1074,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -1135,7 +1135,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@@ -194,13 +194,6 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
restoreGlobalStorageClass := globalStorageClass
|
||||
defer func() {
|
||||
globalStorageClass = restoreGlobalStorageClass
|
||||
}()
|
||||
|
||||
globalStorageClass = storageclass.Config{}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -235,7 +228,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
erasureDisks := xl.getDisks()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
for i := range erasureDisks[:4] {
|
||||
for i := range erasureDisks[:7] {
|
||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
||||
}
|
||||
return erasureDisks
|
||||
@@ -405,8 +398,6 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
globalStorageClass = restoreGlobalStorageClass
|
||||
}()
|
||||
|
||||
globalStorageClass = storageclass.Config{}
|
||||
|
||||
bucket := getRandomBucketName()
|
||||
|
||||
var opts ObjectOptions
|
||||
@@ -434,6 +425,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
}
|
||||
|
||||
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "")
|
||||
|
||||
parts1SC := globalStorageClass
|
||||
|
||||
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
|
||||
@@ -531,7 +523,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
// Reset global storage class flags
|
||||
object7 := "object7"
|
||||
metadata7 := make(map[string]string)
|
||||
metadata7["x-amz-storage-class"] = storageclass.STANDARD
|
||||
metadata7["x-amz-storage-class"] = storageclass.RRS
|
||||
globalStorageClass = storageclass.Config{
|
||||
Standard: storageclass.StorageClass{
|
||||
Parity: 5,
|
||||
@@ -558,19 +550,19 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
storageClassCfg storageclass.Config
|
||||
expectedError error
|
||||
}{
|
||||
{parts1, errs1, 12, 12, parts1SC, nil},
|
||||
{parts1, errs1, 8, 9, parts1SC, nil},
|
||||
{parts2, errs2, 14, 14, parts2SC, nil},
|
||||
{parts3, errs3, 12, 12, parts3SC, nil},
|
||||
{parts3, errs3, 8, 9, parts3SC, nil},
|
||||
{parts4, errs4, 10, 10, parts4SC, nil},
|
||||
{parts5, errs5, 14, 14, parts5SC, nil},
|
||||
{parts6, errs6, 12, 12, parts6SC, nil},
|
||||
{parts7, errs7, 11, 11, parts7SC, nil},
|
||||
{parts6, errs6, 8, 9, parts6SC, nil},
|
||||
{parts7, errs7, 14, 14, parts7SC, nil},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.(*testing.T).Run("", func(t *testing.T) {
|
||||
globalStorageClass = tt.storageClassCfg
|
||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, tt.parts, tt.errs)
|
||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, *xl, tt.parts, tt.errs)
|
||||
if tt.expectedError != nil && err == nil {
|
||||
t.Errorf("Expected %s, got %s", tt.expectedError, err)
|
||||
}
|
||||
|
||||
@@ -76,7 +76,6 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
||||
return nil, err
|
||||
}
|
||||
if deploymentID == "" {
|
||||
// all zones should have same deployment ID
|
||||
deploymentID = formats[i].ID
|
||||
}
|
||||
z.serverSets[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
|
||||
@@ -98,13 +97,7 @@ func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
|
||||
}
|
||||
|
||||
func (z *erasureServerSets) SetDriveCount() int {
|
||||
minSetDriveCount := z.serverSets[0].SetDriveCount()
|
||||
for _, serverSet := range z.serverSets {
|
||||
if minSetDriveCount > serverSet.setDriveCount {
|
||||
minSetDriveCount = serverSet.setDriveCount
|
||||
}
|
||||
}
|
||||
return minSetDriveCount
|
||||
return z.serverSets[0].SetDriveCount()
|
||||
}
|
||||
|
||||
type serverSetsAvailableSpace []zoneAvailableSpace
|
||||
@@ -279,7 +272,7 @@ func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (Storag
|
||||
|
||||
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
if scParity == 0 {
|
||||
scParity = getDefaultParityBlocks(z.SetDriveCount())
|
||||
scParity = z.SetDriveCount() / 2
|
||||
}
|
||||
|
||||
storageInfo.Backend.StandardSCData = z.SetDriveCount() - scParity
|
||||
@@ -1408,6 +1401,7 @@ func (z *erasureServerSets) Health(ctx context.Context, opts HealthOptions) Heal
|
||||
|
||||
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
diskCount := z.SetDriveCount()
|
||||
|
||||
if parityDrives == 0 {
|
||||
parityDrives = getDefaultParityBlocks(diskCount)
|
||||
}
|
||||
|
||||
@@ -471,8 +471,9 @@ func checkFormatErasureValues(formats []*formatErasureV3, setDriveCount int) err
|
||||
return fmt.Errorf("%s disk is already being used in another erasure deployment. (Number of disks specified: %d but the number of disks found in the %s disk's format.json: %d)",
|
||||
humanize.Ordinal(i+1), len(formats), humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets)*len(formatErasure.Erasure.Sets[0]))
|
||||
}
|
||||
// Only if custom erasure drive count is set, verify if the set_drive_count was manually
|
||||
// changed - we need to honor what present on the drives.
|
||||
// Only if custom erasure drive count is set,
|
||||
// we should fail here other proceed to honor what
|
||||
// is present on the disk.
|
||||
if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount {
|
||||
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount)
|
||||
}
|
||||
@@ -894,7 +895,13 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount,
|
||||
func ecDrivesNoConfig(setDriveCount int) int {
|
||||
ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||
if ecDrives == 0 {
|
||||
ecDrives = getDefaultParityBlocks(setDriveCount)
|
||||
cfg, err := storageclass.LookupConfig(nil, setDriveCount)
|
||||
if err == nil {
|
||||
ecDrives = cfg.Standard.Parity
|
||||
}
|
||||
if ecDrives == 0 {
|
||||
ecDrives = setDriveCount / 2
|
||||
}
|
||||
}
|
||||
return ecDrives
|
||||
}
|
||||
|
||||
@@ -233,7 +233,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||
srvCfg := newServerConfig()
|
||||
|
||||
// Override any values from ENVs.
|
||||
lookupConfigs(srvCfg, 0, false)
|
||||
lookupConfigs(srvCfg, 0)
|
||||
|
||||
// hold the mutex lock before a new config is assigned.
|
||||
globalServerConfigMu.Lock()
|
||||
|
||||
@@ -340,8 +340,8 @@ func testGetObjectDiskNotFound(obj ObjectLayer, instanceType string, disks []str
|
||||
}
|
||||
}
|
||||
|
||||
// Take 4 disks down before GetObject is called, one more we loose quorum on 16 disk node.
|
||||
for _, disk := range disks[:4] {
|
||||
// Take 8 disks down before GetObject is called, one more we loose quorum on 16 disk node.
|
||||
for _, disk := range disks[:8] {
|
||||
os.RemoveAll(disk)
|
||||
}
|
||||
|
||||
|
||||
@@ -225,8 +225,8 @@ func testObjectAPIPutObjectDiskNotFound(obj ObjectLayer, instanceType string, di
|
||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||
}
|
||||
|
||||
// Take 4 disks down, one more we loose quorum on 16 disk node.
|
||||
for _, disk := range disks[:4] {
|
||||
// Take 8 disks down, one more we loose quorum on 16 disk node.
|
||||
for _, disk := range disks[:7] {
|
||||
os.RemoveAll(disk)
|
||||
}
|
||||
|
||||
|
||||
@@ -452,7 +452,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
|
||||
|
||||
prevGlobalServerConfig := globalServerConfig
|
||||
globalServerConfig = newServerConfig()
|
||||
lookupConfigs(globalServerConfig, 0, true)
|
||||
lookupConfigs(globalServerConfig, 0)
|
||||
|
||||
restClient := newStorageRESTClient(endpoint, false)
|
||||
|
||||
|
||||
@@ -93,14 +93,7 @@ func path2BucketObject(s string) (bucket, prefix string) {
|
||||
}
|
||||
|
||||
func getDefaultParityBlocks(drive int) int {
|
||||
switch drive {
|
||||
case 4, 5:
|
||||
return 2
|
||||
case 6, 7:
|
||||
return 3
|
||||
default:
|
||||
return 4
|
||||
}
|
||||
return drive / 2
|
||||
}
|
||||
|
||||
func getDefaultDataBlocks(drive int) int {
|
||||
|
||||
@@ -1067,7 +1067,7 @@ func TestXLStorageReadFile(t *testing.T) {
|
||||
for l := 0; l < 2; l++ {
|
||||
// 1st loop tests with dma=write, 2nd loop tests with dma=read-write.
|
||||
if l == 1 {
|
||||
globalStorageClass.DMA = storageclass.DMAReadWrite
|
||||
globalStorageClass.DMA.DMA = storageclass.DMAReadWrite
|
||||
}
|
||||
// Following block validates all ReadFile test cases.
|
||||
for i, testCase := range testCases {
|
||||
@@ -1127,7 +1127,7 @@ func TestXLStorageReadFile(t *testing.T) {
|
||||
}
|
||||
|
||||
// Reset the flag.
|
||||
globalStorageClass.DMA = storageclass.DMAWrite
|
||||
globalStorageClass.DMA.DMA = storageclass.DMAWrite
|
||||
|
||||
// TestXLStorage for permission denied.
|
||||
if runtime.GOOS != globalWindowsOSName {
|
||||
|
||||
Reference in New Issue
Block a user