mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Support variable server sets (#10314)
This commit is contained in:
parent
f839bb5a0a
commit
aabf053d2f
@ -220,7 +220,7 @@ var (
|
|||||||
globalServerConfigMu sync.RWMutex
|
globalServerConfigMu sync.RWMutex
|
||||||
)
|
)
|
||||||
|
|
||||||
func validateConfig(s config.Config, setDriveCount int) error {
|
func validateConfig(s config.Config, minSetDriveCount int) error {
|
||||||
// Disable merging env values with config for validation.
|
// Disable merging env values with config for validation.
|
||||||
env.SetEnvOff()
|
env.SetEnvOff()
|
||||||
|
|
||||||
@ -240,7 +240,7 @@ func validateConfig(s config.Config, setDriveCount int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], minSetDriveCount, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -320,7 +320,7 @@ func validateConfig(s config.Config, setDriveCount int) error {
|
|||||||
return notify.TestNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs())
|
return notify.TestNotificationTargets(GlobalContext, s, NewGatewayHTTPTransport(), globalNotificationSys.ConfiguredTargetIDs())
|
||||||
}
|
}
|
||||||
|
|
||||||
func lookupConfigs(s config.Config, setDriveCount int) {
|
func lookupConfigs(s config.Config, minSetDriveCount int, freshConfig bool) {
|
||||||
ctx := GlobalContext
|
ctx := GlobalContext
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
@ -407,7 +407,7 @@ func lookupConfigs(s config.Config, setDriveCount int) {
|
|||||||
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
globalAPIConfig.init(apiConfig, setDriveCount)
|
globalAPIConfig.init(apiConfig, minSetDriveCount)
|
||||||
|
|
||||||
// Initialize remote instance transport once.
|
// Initialize remote instance transport once.
|
||||||
getRemoteInstanceTransportOnce.Do(func() {
|
getRemoteInstanceTransportOnce.Do(func() {
|
||||||
@ -415,7 +415,7 @@ func lookupConfigs(s config.Config, setDriveCount int) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
globalStorageClass, err = storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], minSetDriveCount, freshConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
||||||
}
|
}
|
||||||
@ -641,14 +641,19 @@ func getValidConfig(objAPI ObjectLayer) (config.Config, error) {
|
|||||||
|
|
||||||
// loadConfig - loads a new config from disk, overrides params
|
// loadConfig - loads a new config from disk, overrides params
|
||||||
// from env if found and valid
|
// from env if found and valid
|
||||||
func loadConfig(objAPI ObjectLayer) error {
|
func loadConfig(objAPI ObjectLayer, freshConfig bool) (err error) {
|
||||||
srvCfg, err := getValidConfig(objAPI)
|
var srvCfg config.Config
|
||||||
if err != nil {
|
if !freshConfig {
|
||||||
return err
|
srvCfg, err = getValidConfig(objAPI)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
srvCfg = globalServerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Override any values from ENVs.
|
// Override any values from ENVs.
|
||||||
lookupConfigs(srvCfg, objAPI.SetDriveCount())
|
lookupConfigs(srvCfg, objAPI.SetDriveCount(), freshConfig)
|
||||||
|
|
||||||
// hold the mutex lock before a new config is assigned.
|
// hold the mutex lock before a new config is assigned.
|
||||||
globalServerConfigMu.Lock()
|
globalServerConfigMu.Lock()
|
||||||
|
@ -54,7 +54,7 @@ func TestServerConfig(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize server config.
|
// Initialize server config.
|
||||||
if err := loadConfig(objLayer); err != nil {
|
if err := loadConfig(objLayer, false); err != nil {
|
||||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2423,7 +2423,7 @@ func migrateV27ToV28() error {
|
|||||||
|
|
||||||
// Migrates ${HOME}/.minio/config.json to '<export_path>/.minio.sys/config/config.json'
|
// Migrates ${HOME}/.minio/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||||
// if etcd is configured then migrates /config/config.json to '<export_path>/.minio.sys/config/config.json'
|
// if etcd is configured then migrates /config/config.json to '<export_path>/.minio.sys/config/config.json'
|
||||||
func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
func migrateConfigToMinioSys(objAPI ObjectLayer) (freshConfig bool, err error) {
|
||||||
// Construct path to config.json for the given bucket.
|
// Construct path to config.json for the given bucket.
|
||||||
configFile := path.Join(minioConfigPrefix, minioConfigFile)
|
configFile := path.Join(minioConfigPrefix, minioConfigFile)
|
||||||
|
|
||||||
@ -2441,7 +2441,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
|||||||
|
|
||||||
// Verify if backend already has the file (after holding lock)
|
// Verify if backend already has the file (after holding lock)
|
||||||
if err = checkConfig(GlobalContext, objAPI, configFile); err != errConfigNotFound {
|
if err = checkConfig(GlobalContext, objAPI, configFile); err != errConfigNotFound {
|
||||||
return err
|
return false, err
|
||||||
} // if errConfigNotFound proceed to migrate..
|
} // if errConfigNotFound proceed to migrate..
|
||||||
|
|
||||||
var configFiles = []string{
|
var configFiles = []string{
|
||||||
@ -2453,7 +2453,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
|||||||
for _, cfgFile := range configFiles {
|
for _, cfgFile := range configFiles {
|
||||||
if _, err = Load(cfgFile, config); err != nil {
|
if _, err = Load(cfgFile, config); err != nil {
|
||||||
if !osIsNotExist(err) && !osIsPermission(err) {
|
if !osIsNotExist(err) && !osIsPermission(err) {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -2464,9 +2464,9 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
if osIsNotExist(err) || osIsPermission(err) {
|
if osIsNotExist(err) || osIsPermission(err) {
|
||||||
// Initialize the server config, if no config exists.
|
// Initialize the server config, if no config exists.
|
||||||
return newSrvConfig(objAPI)
|
return true, newSrvConfig(objAPI)
|
||||||
}
|
}
|
||||||
return saveServerConfig(GlobalContext, objAPI, config)
|
return false, saveServerConfig(GlobalContext, objAPI, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrates '.minio.sys/config.json' to v33.
|
// Migrates '.minio.sys/config.json' to v33.
|
||||||
|
@ -65,7 +65,7 @@ func TestServerConfigMigrateV1(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize server config and check again if everything is fine
|
// Initialize server config and check again if everything is fine
|
||||||
if err := loadConfig(objLayer); err != nil {
|
if err := loadConfig(objLayer, true); err != nil {
|
||||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -202,7 +202,8 @@ func TestServerConfigMigrateV2toV33(t *testing.T) {
|
|||||||
t.Fatal("Unexpected error: ", err)
|
t.Fatal("Unexpected error: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := migrateConfigToMinioSys(objLayer); err != nil {
|
freshConfig, err := migrateConfigToMinioSys(objLayer)
|
||||||
|
if err != nil {
|
||||||
t.Fatal("Unexpected error: ", err)
|
t.Fatal("Unexpected error: ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,7 +216,7 @@ func TestServerConfigMigrateV2toV33(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize server config and check again if everything is fine
|
// Initialize server config and check again if everything is fine
|
||||||
if err := loadConfig(objLayer); err != nil {
|
if err := loadConfig(objLayer, freshConfig); err != nil {
|
||||||
t.Fatalf("Unable to initialize from updated config file %s", err)
|
t.Fatalf("Unable to initialize from updated config file %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,20 +215,23 @@ func initConfig(objAPI ObjectLayer) error {
|
|||||||
// ignore if the file doesn't exist.
|
// ignore if the file doesn't exist.
|
||||||
// If etcd is set then migrates /config/config.json
|
// If etcd is set then migrates /config/config.json
|
||||||
// to '<export_path>/.minio.sys/config/config.json'
|
// to '<export_path>/.minio.sys/config/config.json'
|
||||||
if err := migrateConfigToMinioSys(objAPI); err != nil {
|
freshConfig, err := migrateConfigToMinioSys(objAPI)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
|
if !freshConfig {
|
||||||
if err := migrateMinioSysConfig(objAPI); err != nil {
|
// Migrates backend '<export_path>/.minio.sys/config/config.json' to latest version.
|
||||||
return err
|
if err := migrateMinioSysConfig(objAPI); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Migrates backend '<export_path>/.minio.sys/config/config.json' to
|
||||||
|
// latest config format.
|
||||||
|
if err := migrateMinioSysConfigToKV(objAPI); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrates backend '<export_path>/.minio.sys/config/config.json' to
|
return loadConfig(objAPI, freshConfig)
|
||||||
// latest config format.
|
|
||||||
if err := migrateMinioSysConfigToKV(objAPI); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return loadConfig(objAPI)
|
|
||||||
}
|
}
|
||||||
|
@ -88,14 +88,13 @@ var (
|
|||||||
// StorageClass - holds storage class information
|
// StorageClass - holds storage class information
|
||||||
type StorageClass struct {
|
type StorageClass struct {
|
||||||
Parity int
|
Parity int
|
||||||
DMA string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config storage class configuration
|
// Config storage class configuration
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Standard StorageClass `json:"standard"`
|
Standard StorageClass `json:"standard"`
|
||||||
RRS StorageClass `json:"rrs"`
|
RRS StorageClass `json:"rrs"`
|
||||||
DMA StorageClass `json:"dma"`
|
DMA string `json:"dma"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||||
@ -112,7 +111,7 @@ func (sCfg *Config) UnmarshalJSON(data []byte) error {
|
|||||||
// IsValid - returns true if input string is a valid
|
// IsValid - returns true if input string is a valid
|
||||||
// storage class kind supported.
|
// storage class kind supported.
|
||||||
func IsValid(sc string) bool {
|
func IsValid(sc string) bool {
|
||||||
return sc == RRS || sc == STANDARD || sc == DMA
|
return sc == RRS || sc == STANDARD
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalText unmarshals storage class from its textual form into
|
// UnmarshalText unmarshals storage class from its textual form into
|
||||||
@ -122,14 +121,6 @@ func (sc *StorageClass) UnmarshalText(b []byte) error {
|
|||||||
if scStr == "" {
|
if scStr == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if scStr == DMAWrite {
|
|
||||||
sc.DMA = DMAWrite
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if scStr == DMAReadWrite {
|
|
||||||
sc.DMA = DMAReadWrite
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
s, err := parseStorageClass(scStr)
|
s, err := parseStorageClass(scStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -143,14 +134,14 @@ func (sc *StorageClass) MarshalText() ([]byte, error) {
|
|||||||
if sc.Parity != 0 {
|
if sc.Parity != 0 {
|
||||||
return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil
|
return []byte(fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)), nil
|
||||||
}
|
}
|
||||||
return []byte(sc.DMA), nil
|
return []byte{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *StorageClass) String() string {
|
func (sc *StorageClass) String() string {
|
||||||
if sc.Parity != 0 {
|
if sc.Parity != 0 {
|
||||||
return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)
|
return fmt.Sprintf("%s:%d", schemePrefix, sc.Parity)
|
||||||
}
|
}
|
||||||
return sc.DMA
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parses given storageClassEnv and returns a storageClass structure.
|
// Parses given storageClassEnv and returns a storageClass structure.
|
||||||
@ -222,8 +213,10 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
|
|||||||
// or config.json fields
|
// or config.json fields
|
||||||
// -- corresponding values are returned
|
// -- corresponding values are returned
|
||||||
// If storage class is not set during startup, default values are returned
|
// If storage class is not set during startup, default values are returned
|
||||||
// -- Default for Reduced Redundancy Storage class is, parity = 2 and data = N-Parity
|
// -- Default for Reduced Redundancy Storage class is, parity = 2
|
||||||
// -- Default for Standard Storage class is, parity = N/2, data = N/2
|
// -- Default for Standard Storage class is, parity = 2 - disks 4, 5
|
||||||
|
// -- Default for Standard Storage class is, parity = 3 - disks 6, 7
|
||||||
|
// -- Default for Standard Storage class is, parity = 4 - disks 8 to 16
|
||||||
// If storage class is empty
|
// If storage class is empty
|
||||||
// -- standard storage class is assumed and corresponding data and parity is returned
|
// -- standard storage class is assumed and corresponding data and parity is returned
|
||||||
func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
||||||
@ -241,7 +234,7 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) {
|
|||||||
|
|
||||||
// GetDMA - returns DMA configuration.
|
// GetDMA - returns DMA configuration.
|
||||||
func (sCfg Config) GetDMA() string {
|
func (sCfg Config) GetDMA() string {
|
||||||
return sCfg.DMA.DMA
|
return sCfg.DMA
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enabled returns if etcd is enabled.
|
// Enabled returns if etcd is enabled.
|
||||||
@ -252,9 +245,23 @@ func Enabled(kvs config.KVS) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LookupConfig - lookup storage class config and override with valid environment settings if any.
|
// LookupConfig - lookup storage class config and override with valid environment settings if any.
|
||||||
func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
func LookupConfig(kvs config.KVS, setDriveCount int, freshConfig bool) (cfg Config, err error) {
|
||||||
cfg = Config{}
|
cfg = Config{}
|
||||||
cfg.Standard.Parity = setDriveCount / 2
|
var defaultStdParity int
|
||||||
|
if freshConfig {
|
||||||
|
switch setDriveCount {
|
||||||
|
case 4, 5:
|
||||||
|
defaultStdParity = 2
|
||||||
|
case 6, 7:
|
||||||
|
defaultStdParity = 3
|
||||||
|
default:
|
||||||
|
defaultStdParity = 4
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
defaultStdParity = setDriveCount / 2
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.Standard.Parity = defaultStdParity
|
||||||
cfg.RRS.Parity = defaultRRSParity
|
cfg.RRS.Parity = defaultRRSParity
|
||||||
|
|
||||||
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
|
if err = config.CheckValidKeys(config.StorageClassSubSys, kvs, DefaultKVS); err != nil {
|
||||||
@ -272,7 +279,7 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cfg.Standard.Parity == 0 {
|
if cfg.Standard.Parity == 0 {
|
||||||
cfg.Standard.Parity = setDriveCount / 2
|
cfg.Standard.Parity = defaultStdParity
|
||||||
}
|
}
|
||||||
|
|
||||||
if rrsc != "" {
|
if rrsc != "" {
|
||||||
@ -291,7 +298,7 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
|||||||
if dma != DMAReadWrite && dma != DMAWrite {
|
if dma != DMAReadWrite && dma != DMAWrite {
|
||||||
return Config{}, errors.New(`valid dma values are "read-write" and "write"`)
|
return Config{}, errors.New(`valid dma values are "read-write" and "write"`)
|
||||||
}
|
}
|
||||||
cfg.DMA.DMA = dma
|
cfg.DMA = dma
|
||||||
|
|
||||||
// Validation is done after parsing both the storage classes. This is needed because we need one
|
// Validation is done after parsing both the storage classes. This is needed because we need one
|
||||||
// storage class value to deduce the correct value of the other storage class.
|
// storage class value to deduce the correct value of the other storage class.
|
||||||
|
@ -276,7 +276,16 @@ func parseEndpointSet(customSetDriveCount uint64, args ...string) (ep endpointSe
|
|||||||
// specific set size.
|
// specific set size.
|
||||||
// For example: {1...64} is divided into 4 sets each of size 16.
|
// For example: {1...64} is divided into 4 sets each of size 16.
|
||||||
// This applies to even distributed setup syntax as well.
|
// This applies to even distributed setup syntax as well.
|
||||||
func GetAllSets(customSetDriveCount uint64, args ...string) ([][]string, error) {
|
func GetAllSets(args ...string) ([][]string, error) {
|
||||||
|
var customSetDriveCount uint64
|
||||||
|
if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {
|
||||||
|
driveCount, err := strconv.Atoi(v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, config.ErrInvalidErasureSetSize(err)
|
||||||
|
}
|
||||||
|
customSetDriveCount = uint64(driveCount)
|
||||||
|
}
|
||||||
|
|
||||||
var setArgs [][]string
|
var setArgs [][]string
|
||||||
if !ellipses.HasEllipses(args...) {
|
if !ellipses.HasEllipses(args...) {
|
||||||
var setIndexes [][]uint64
|
var setIndexes [][]uint64
|
||||||
@ -335,16 +344,8 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
return nil, -1, errInvalidArgument
|
return nil, -1, errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
var setDriveCount int
|
|
||||||
if v := env.Get(EnvErasureSetDriveCount, ""); v != "" {
|
|
||||||
setDriveCount, err = strconv.Atoi(v)
|
|
||||||
if err != nil {
|
|
||||||
return nil, -1, config.ErrInvalidErasureSetSize(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ellipses.HasEllipses(args...) {
|
if !ellipses.HasEllipses(args...) {
|
||||||
setArgs, err := GetAllSets(uint64(setDriveCount), args...)
|
setArgs, err := GetAllSets(args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
@ -363,7 +364,7 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
|
|
||||||
var foundPrevLocal bool
|
var foundPrevLocal bool
|
||||||
for _, arg := range args {
|
for _, arg := range args {
|
||||||
setArgs, err := GetAllSets(uint64(setDriveCount), arg)
|
setArgs, err := GetAllSets(arg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
@ -371,9 +372,6 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
if setDriveCount != 0 && setDriveCount != len(setArgs[0]) {
|
|
||||||
return nil, -1, fmt.Errorf("All serverSets should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
|
||||||
}
|
|
||||||
if err = endpointServerSets.Add(ZoneEndpoints{
|
if err = endpointServerSets.Add(ZoneEndpoints{
|
||||||
SetCount: len(setArgs),
|
SetCount: len(setArgs),
|
||||||
DrivesPerSet: len(setArgs[0]),
|
DrivesPerSet: len(setArgs[0]),
|
||||||
@ -382,9 +380,6 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
foundPrevLocal = endpointList.atleastOneEndpointLocal()
|
foundPrevLocal = endpointList.atleastOneEndpointLocal()
|
||||||
if setDriveCount == 0 {
|
|
||||||
setDriveCount = len(setArgs[0])
|
|
||||||
}
|
|
||||||
if setupType == UnknownSetupType {
|
if setupType == UnknownSetupType {
|
||||||
setupType = gotSetupType
|
setupType = gotSetupType
|
||||||
}
|
}
|
||||||
|
@ -274,8 +274,8 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
|
|
||||||
// Test 4: checks if HealObject returns an error when xl.meta is not found
|
// Test 4: checks if HealObject returns an error when xl.meta is not found
|
||||||
// in more than read quorum number of disks, to create a corrupted situation.
|
// in more than read quorum number of disks, to create a corrupted situation.
|
||||||
for i := 0; i <= len(er.getDisks())/2; i++ {
|
for i := 0; i <= nfi.Erasure.DataBlocks; i++ {
|
||||||
er.getDisks()[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
erasureDisks[i].Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try healing now, expect to receive errFileNotFound.
|
// Try healing now, expect to receive errFileNotFound.
|
||||||
|
@ -334,7 +334,7 @@ func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix
|
|||||||
// Returns per object readQuorum and writeQuorum
|
// Returns per object readQuorum and writeQuorum
|
||||||
// readQuorum is the min required disks to read data.
|
// readQuorum is the min required disks to read data.
|
||||||
// writeQuorum is the min required disks to write data.
|
// writeQuorum is the min required disks to write data.
|
||||||
func objectQuorumFromMeta(ctx context.Context, er erasureObjects, partsMetaData []FileInfo, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
func objectQuorumFromMeta(ctx context.Context, partsMetaData []FileInfo, errs []error) (objectReadQuorum, objectWriteQuorum int, err error) {
|
||||||
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
// get the latest updated Metadata and a count of all the latest updated FileInfo(s)
|
||||||
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
latestFileInfo, err := getLatestFileInfo(ctx, partsMetaData, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -344,12 +344,12 @@ func objectQuorumFromMeta(ctx context.Context, er erasureObjects, partsMetaData
|
|||||||
dataBlocks := latestFileInfo.Erasure.DataBlocks
|
dataBlocks := latestFileInfo.Erasure.DataBlocks
|
||||||
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
parityBlocks := globalStorageClass.GetParityForSC(latestFileInfo.Metadata[xhttp.AmzStorageClass])
|
||||||
if parityBlocks == 0 {
|
if parityBlocks == 0 {
|
||||||
parityBlocks = dataBlocks
|
parityBlocks = getDefaultParityBlocks(len(partsMetaData))
|
||||||
}
|
}
|
||||||
|
|
||||||
writeQuorum := dataBlocks
|
writeQuorum := dataBlocks
|
||||||
if dataBlocks == parityBlocks {
|
if dataBlocks == parityBlocks {
|
||||||
writeQuorum = dataBlocks + 1
|
writeQuorum++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
// Since all the valid erasure code meta updated at the same time are equivalent, pass dataBlocks
|
||||||
|
@ -48,7 +48,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
|
|||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "")
|
||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -258,7 +258,7 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
|||||||
onlineDisks := er.getDisks()
|
onlineDisks := er.getDisks()
|
||||||
parityBlocks := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
parityBlocks := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
||||||
if parityBlocks == 0 {
|
if parityBlocks == 0 {
|
||||||
parityBlocks = len(onlineDisks) / 2
|
parityBlocks = getDefaultParityBlocks(len(onlineDisks))
|
||||||
}
|
}
|
||||||
dataBlocks := len(onlineDisks) - parityBlocks
|
dataBlocks := len(onlineDisks) - parityBlocks
|
||||||
|
|
||||||
@ -387,7 +387,7 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
|||||||
uploadIDPath, "")
|
uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pi, toObjectErr(err, bucket, object)
|
return pi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -568,7 +568,7 @@ func (er erasureObjects) GetMultipartInfo(ctx context.Context, bucket, object, u
|
|||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID)
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||||
}
|
}
|
||||||
@ -616,7 +616,7 @@ func (er erasureObjects) ListObjectParts(ctx context.Context, bucket, object, up
|
|||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
||||||
}
|
}
|
||||||
@ -720,7 +720,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -905,7 +905,7 @@ func (er erasureObjects) AbortMultipartUpload(ctx context.Context, bucket, objec
|
|||||||
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "")
|
partsMetadata, errs := readAllFileInfo(ctx, er.getDisks(), minioMetaMultipartBucket, uploadIDPath, "")
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
_, writeQuorum, err := objectQuorumFromMeta(ctx, er, partsMetadata, errs)
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toObjectErr(err, bucket, object, uploadID)
|
return toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
|||||||
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
|
metaArr, errs := readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID)
|
||||||
|
|
||||||
// get Quorum for this object
|
// get Quorum for this object
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||||
}
|
}
|
||||||
@ -371,7 +371,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
|||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fi, nil, nil, err
|
return fi, nil, nil, err
|
||||||
}
|
}
|
||||||
@ -587,7 +587,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
// writeQuorum is dataBlocks + 1
|
// writeQuorum is dataBlocks + 1
|
||||||
writeQuorum := dataDrives
|
writeQuorum := dataDrives
|
||||||
if dataDrives == parityDrives {
|
if dataDrives == parityDrives {
|
||||||
writeQuorum = dataDrives + 1
|
writeQuorum++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete temporary object in the event of failure.
|
// Delete temporary object in the event of failure.
|
||||||
@ -1073,7 +1073,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
|||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
@ -1134,7 +1134,7 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
|||||||
// Read metadata associated with the object from all disks.
|
// Read metadata associated with the object from all disks.
|
||||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||||
|
|
||||||
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return toObjectErr(err, bucket, object)
|
return toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
@ -194,6 +194,13 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||||
|
restoreGlobalStorageClass := globalStorageClass
|
||||||
|
defer func() {
|
||||||
|
globalStorageClass = restoreGlobalStorageClass
|
||||||
|
}()
|
||||||
|
|
||||||
|
globalStorageClass = storageclass.Config{}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -228,7 +235,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||||||
erasureDisks := xl.getDisks()
|
erasureDisks := xl.getDisks()
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverSets[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
for i := range erasureDisks[:7] {
|
for i := range erasureDisks[:4] {
|
||||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
||||||
}
|
}
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
@ -398,6 +405,8 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
globalStorageClass = restoreGlobalStorageClass
|
globalStorageClass = restoreGlobalStorageClass
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
globalStorageClass = storageclass.Config{}
|
||||||
|
|
||||||
bucket := getRandomBucketName()
|
bucket := getRandomBucketName()
|
||||||
|
|
||||||
var opts ObjectOptions
|
var opts ObjectOptions
|
||||||
@ -425,7 +434,6 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "")
|
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "")
|
||||||
|
|
||||||
parts1SC := globalStorageClass
|
parts1SC := globalStorageClass
|
||||||
|
|
||||||
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
|
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
|
||||||
@ -523,7 +531,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
// Reset global storage class flags
|
// Reset global storage class flags
|
||||||
object7 := "object7"
|
object7 := "object7"
|
||||||
metadata7 := make(map[string]string)
|
metadata7 := make(map[string]string)
|
||||||
metadata7["x-amz-storage-class"] = storageclass.RRS
|
metadata7["x-amz-storage-class"] = storageclass.STANDARD
|
||||||
globalStorageClass = storageclass.Config{
|
globalStorageClass = storageclass.Config{
|
||||||
Standard: storageclass.StorageClass{
|
Standard: storageclass.StorageClass{
|
||||||
Parity: 5,
|
Parity: 5,
|
||||||
@ -550,19 +558,19 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
storageClassCfg storageclass.Config
|
storageClassCfg storageclass.Config
|
||||||
expectedError error
|
expectedError error
|
||||||
}{
|
}{
|
||||||
{parts1, errs1, 8, 9, parts1SC, nil},
|
{parts1, errs1, 12, 12, parts1SC, nil},
|
||||||
{parts2, errs2, 14, 14, parts2SC, nil},
|
{parts2, errs2, 14, 14, parts2SC, nil},
|
||||||
{parts3, errs3, 8, 9, parts3SC, nil},
|
{parts3, errs3, 12, 12, parts3SC, nil},
|
||||||
{parts4, errs4, 10, 10, parts4SC, nil},
|
{parts4, errs4, 10, 10, parts4SC, nil},
|
||||||
{parts5, errs5, 14, 14, parts5SC, nil},
|
{parts5, errs5, 14, 14, parts5SC, nil},
|
||||||
{parts6, errs6, 8, 9, parts6SC, nil},
|
{parts6, errs6, 12, 12, parts6SC, nil},
|
||||||
{parts7, errs7, 14, 14, parts7SC, nil},
|
{parts7, errs7, 11, 11, parts7SC, nil},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
tt := tt
|
tt := tt
|
||||||
t.(*testing.T).Run("", func(t *testing.T) {
|
t.(*testing.T).Run("", func(t *testing.T) {
|
||||||
globalStorageClass = tt.storageClassCfg
|
globalStorageClass = tt.storageClassCfg
|
||||||
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, *xl, tt.parts, tt.errs)
|
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, tt.parts, tt.errs)
|
||||||
if tt.expectedError != nil && err == nil {
|
if tt.expectedError != nil && err == nil {
|
||||||
t.Errorf("Expected %s, got %s", tt.expectedError, err)
|
t.Errorf("Expected %s, got %s", tt.expectedError, err)
|
||||||
}
|
}
|
||||||
|
@ -76,6 +76,7 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if deploymentID == "" {
|
if deploymentID == "" {
|
||||||
|
// all zones should have same deployment ID
|
||||||
deploymentID = formats[i].ID
|
deploymentID = formats[i].ID
|
||||||
}
|
}
|
||||||
z.serverSets[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
|
z.serverSets[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
|
||||||
@ -97,7 +98,13 @@ func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) SetDriveCount() int {
|
func (z *erasureServerSets) SetDriveCount() int {
|
||||||
return z.serverSets[0].SetDriveCount()
|
minSetDriveCount := z.serverSets[0].SetDriveCount()
|
||||||
|
for _, serverSet := range z.serverSets {
|
||||||
|
if minSetDriveCount > serverSet.setDriveCount {
|
||||||
|
minSetDriveCount = serverSet.setDriveCount
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return minSetDriveCount
|
||||||
}
|
}
|
||||||
|
|
||||||
type serverSetsAvailableSpace []zoneAvailableSpace
|
type serverSetsAvailableSpace []zoneAvailableSpace
|
||||||
@ -272,7 +279,7 @@ func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (Storag
|
|||||||
|
|
||||||
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||||
if scParity == 0 {
|
if scParity == 0 {
|
||||||
scParity = z.SetDriveCount() / 2
|
scParity = getDefaultParityBlocks(z.SetDriveCount())
|
||||||
}
|
}
|
||||||
|
|
||||||
storageInfo.Backend.StandardSCData = z.SetDriveCount() - scParity
|
storageInfo.Backend.StandardSCData = z.SetDriveCount() - scParity
|
||||||
@ -1401,7 +1408,6 @@ func (z *erasureServerSets) Health(ctx context.Context, opts HealthOptions) Heal
|
|||||||
|
|
||||||
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
parityDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||||
diskCount := z.SetDriveCount()
|
diskCount := z.SetDriveCount()
|
||||||
|
|
||||||
if parityDrives == 0 {
|
if parityDrives == 0 {
|
||||||
parityDrives = getDefaultParityBlocks(diskCount)
|
parityDrives = getDefaultParityBlocks(diskCount)
|
||||||
}
|
}
|
||||||
|
@ -471,9 +471,8 @@ func checkFormatErasureValues(formats []*formatErasureV3, setDriveCount int) err
|
|||||||
return fmt.Errorf("%s disk is already being used in another erasure deployment. (Number of disks specified: %d but the number of disks found in the %s disk's format.json: %d)",
|
return fmt.Errorf("%s disk is already being used in another erasure deployment. (Number of disks specified: %d but the number of disks found in the %s disk's format.json: %d)",
|
||||||
humanize.Ordinal(i+1), len(formats), humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets)*len(formatErasure.Erasure.Sets[0]))
|
humanize.Ordinal(i+1), len(formats), humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets)*len(formatErasure.Erasure.Sets[0]))
|
||||||
}
|
}
|
||||||
// Only if custom erasure drive count is set,
|
// Only if custom erasure drive count is set, verify if the set_drive_count was manually
|
||||||
// we should fail here other proceed to honor what
|
// changed - we need to honor what present on the drives.
|
||||||
// is present on the disk.
|
|
||||||
if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount {
|
if globalCustomErasureDriveCount && len(formatErasure.Erasure.Sets[0]) != setDriveCount {
|
||||||
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount)
|
return fmt.Errorf("%s disk is already formatted with %d drives per erasure set. This cannot be changed to %d, please revert your MINIO_ERASURE_SET_DRIVE_COUNT setting", humanize.Ordinal(i+1), len(formatErasure.Erasure.Sets[0]), setDriveCount)
|
||||||
}
|
}
|
||||||
@ -895,13 +894,7 @@ func initFormatErasure(ctx context.Context, storageDisks []StorageAPI, setCount,
|
|||||||
func ecDrivesNoConfig(setDriveCount int) int {
|
func ecDrivesNoConfig(setDriveCount int) int {
|
||||||
ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
ecDrives := globalStorageClass.GetParityForSC(storageclass.STANDARD)
|
||||||
if ecDrives == 0 {
|
if ecDrives == 0 {
|
||||||
cfg, err := storageclass.LookupConfig(nil, setDriveCount)
|
ecDrives = getDefaultParityBlocks(setDriveCount)
|
||||||
if err == nil {
|
|
||||||
ecDrives = cfg.Standard.Parity
|
|
||||||
}
|
|
||||||
if ecDrives == 0 {
|
|
||||||
ecDrives = setDriveCount / 2
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ecDrives
|
return ecDrives
|
||||||
}
|
}
|
||||||
|
@ -233,7 +233,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
|||||||
srvCfg := newServerConfig()
|
srvCfg := newServerConfig()
|
||||||
|
|
||||||
// Override any values from ENVs.
|
// Override any values from ENVs.
|
||||||
lookupConfigs(srvCfg, 0)
|
lookupConfigs(srvCfg, 0, false)
|
||||||
|
|
||||||
// hold the mutex lock before a new config is assigned.
|
// hold the mutex lock before a new config is assigned.
|
||||||
globalServerConfigMu.Lock()
|
globalServerConfigMu.Lock()
|
||||||
|
@ -340,8 +340,8 @@ func testGetObjectDiskNotFound(obj ObjectLayer, instanceType string, disks []str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take 8 disks down before GetObject is called, one more we loose quorum on 16 disk node.
|
// Take 4 disks down before GetObject is called, one more we loose quorum on 16 disk node.
|
||||||
for _, disk := range disks[:8] {
|
for _, disk := range disks[:4] {
|
||||||
os.RemoveAll(disk)
|
os.RemoveAll(disk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,8 +225,8 @@ func testObjectAPIPutObjectDiskNotFound(obj ObjectLayer, instanceType string, di
|
|||||||
t.Fatalf("%s : %s", instanceType, err.Error())
|
t.Fatalf("%s : %s", instanceType, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take 8 disks down, one more we loose quorum on 16 disk node.
|
// Take 4 disks down, one more we loose quorum on 16 disk node.
|
||||||
for _, disk := range disks[:7] {
|
for _, disk := range disks[:4] {
|
||||||
os.RemoveAll(disk)
|
os.RemoveAll(disk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,7 +452,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
|
|||||||
|
|
||||||
prevGlobalServerConfig := globalServerConfig
|
prevGlobalServerConfig := globalServerConfig
|
||||||
globalServerConfig = newServerConfig()
|
globalServerConfig = newServerConfig()
|
||||||
lookupConfigs(globalServerConfig, 0)
|
lookupConfigs(globalServerConfig, 0, true)
|
||||||
|
|
||||||
restClient := newStorageRESTClient(endpoint, false)
|
restClient := newStorageRESTClient(endpoint, false)
|
||||||
|
|
||||||
|
@ -93,7 +93,14 @@ func path2BucketObject(s string) (bucket, prefix string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getDefaultParityBlocks(drive int) int {
|
func getDefaultParityBlocks(drive int) int {
|
||||||
return drive / 2
|
switch drive {
|
||||||
|
case 4, 5:
|
||||||
|
return 2
|
||||||
|
case 6, 7:
|
||||||
|
return 3
|
||||||
|
default:
|
||||||
|
return 4
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDefaultDataBlocks(drive int) int {
|
func getDefaultDataBlocks(drive int) int {
|
||||||
|
@ -1067,7 +1067,7 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
for l := 0; l < 2; l++ {
|
for l := 0; l < 2; l++ {
|
||||||
// 1st loop tests with dma=write, 2nd loop tests with dma=read-write.
|
// 1st loop tests with dma=write, 2nd loop tests with dma=read-write.
|
||||||
if l == 1 {
|
if l == 1 {
|
||||||
globalStorageClass.DMA.DMA = storageclass.DMAReadWrite
|
globalStorageClass.DMA = storageclass.DMAReadWrite
|
||||||
}
|
}
|
||||||
// Following block validates all ReadFile test cases.
|
// Following block validates all ReadFile test cases.
|
||||||
for i, testCase := range testCases {
|
for i, testCase := range testCases {
|
||||||
@ -1127,7 +1127,7 @@ func TestXLStorageReadFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reset the flag.
|
// Reset the flag.
|
||||||
globalStorageClass.DMA.DMA = storageclass.DMAWrite
|
globalStorageClass.DMA = storageclass.DMAWrite
|
||||||
|
|
||||||
// TestXLStorage for permission denied.
|
// TestXLStorage for permission denied.
|
||||||
if runtime.GOOS != globalWindowsOSName {
|
if runtime.GOOS != globalWindowsOSName {
|
||||||
|
@ -22,11 +22,11 @@ Now the delete marker becomes the current version of the object. GET requests by
|
|||||||
|
|
||||||
![get](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_GET_versionEnabled.png)
|
![get](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_GET_versionEnabled.png)
|
||||||
|
|
||||||
GET requests by specifying a verison ID as shown below, you can retrieve the specific object version `fae684da`.
|
GET requests by specifying a version ID as shown below, you can retrieve the specific object version `fae684da`.
|
||||||
|
|
||||||
![get_version_id](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_GET_versionEnabled_id.png)
|
![get_version_id](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_GET_versionEnabled_id.png)
|
||||||
|
|
||||||
To permanently delete an object you need to specify the version you want to delete, only the user with appropriate permissions can permanently delete a version. As shown below DELETE request called with a specific version id permenantly deletes an object from a bucket. Delete marker is not added for DELETE requests with version id.
|
To permanently delete an object you need to specify the version you want to delete, only the user with appropriate permissions can permanently delete a version. As shown below DELETE request called with a specific version id permanently deletes an object from a bucket. Delete marker is not added for DELETE requests with version id.
|
||||||
|
|
||||||
![delete_version_id](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_DELETE_versionEnabled_id.png)
|
![delete_version_id](https://raw.githubusercontent.com/minio/minio/master/docs/bucket/versioning/versioning_DELETE_versionEnabled_id.png)
|
||||||
|
|
||||||
|
34
docs/distributed/SIZING.md
Normal file
34
docs/distributed/SIZING.md
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
## Erasure code sizing guide
|
||||||
|
|
||||||
|
### Toy Setups
|
||||||
|
|
||||||
|
Capacity constrained environments, MinIO will work but not recommended for production.
|
||||||
|
|
||||||
|
| servers | drives (per node) | stripe_size | parity chosen (default) | tolerance for reads (servers) | tolerance for writes (servers) |
|
||||||
|
|--------:|------------------:|------------:|------------------------:|------------------------------:|-------------------------------:|
|
||||||
|
| 1 | 1 | 1 | 0 | 0 | 0 |
|
||||||
|
| 1 | 4 | 4 | 2 | 0 | 0 |
|
||||||
|
| 4 | 1 | 4 | 2 | 2 | 3 |
|
||||||
|
| 5 | 1 | 5 | 2 | 2 | 2 |
|
||||||
|
| 6 | 1 | 6 | 3 | 3 | 4 |
|
||||||
|
| 7 | 1 | 7 | 3 | 3 | 3 |
|
||||||
|
|
||||||
|
### Minimum System Configuration for Production
|
||||||
|
|
||||||
|
| servers | drives (per node) | stripe_size | parity chosen (default) | tolerance for reads (servers) | tolerance for writes (servers) |
|
||||||
|
|--------:|------------------:|------------:|------------------------:|------------------------------:|-------------------------------:|
|
||||||
|
| 4 | 2 | 8 | 4 | 2 | 1 |
|
||||||
|
| 5 | 2 | 10 | 4 | 2 | 2 |
|
||||||
|
| 6 | 2 | 12 | 4 | 2 | 2 |
|
||||||
|
| 7 | 2 | 14 | 4 | 2 | 2 |
|
||||||
|
| 8 | 1 | 8 | 4 | 4 | 3 |
|
||||||
|
| 8 | 2 | 16 | 4 | 2 | 2 |
|
||||||
|
| 9 | 2 | 9 | 4 | 4 | 4 |
|
||||||
|
| 10 | 2 | 10 | 4 | 4 | 4 |
|
||||||
|
| 11 | 2 | 11 | 4 | 4 | 4 |
|
||||||
|
| 12 | 2 | 12 | 4 | 4 | 4 |
|
||||||
|
| 13 | 2 | 13 | 4 | 4 | 4 |
|
||||||
|
| 14 | 2 | 14 | 4 | 4 | 4 |
|
||||||
|
| 15 | 2 | 15 | 4 | 4 | 4 |
|
||||||
|
| 16 | 2 | 16 | 4 | 4 | 4 |
|
||||||
|
|
@ -87,7 +87,7 @@ Auto encryption 'sse-s3' is enabled
|
|||||||
```
|
```
|
||||||
|
|
||||||
### Using environment (deprecated)
|
### Using environment (deprecated)
|
||||||
> NOTE: Following ENV might be removed in future, you are advised to move to previous recommeneded approach using `mc encrypt`. S3 gateway supports encryption at gateway layer which may thus be dropped in favor of simplicity, it is advised that S3 gateway users migrate to MinIO server mode or enable encryption at REST at the backend.
|
> NOTE: Following ENV might be removed in future, you are advised to move to previous recommended approach using `mc encrypt`. S3 gateway supports encryption at gateway layer which may thus be dropped in favor of simplicity, it is advised that S3 gateway users migrate to MinIO server mode or enable encryption at REST at the backend.
|
||||||
|
|
||||||
MinIO automatically encrypts all objects on buckets if KMS is successfully configured and following ENV is enabled:
|
MinIO automatically encrypts all objects on buckets if KMS is successfully configured and following ENV is enabled:
|
||||||
```
|
```
|
||||||
|
Loading…
Reference in New Issue
Block a user