disallow EC:0 if not set during server startup (#17141)

This commit is contained in:
Harshavardhana 2023-05-04 14:44:30 -07:00 committed by GitHub
parent 1d0211d395
commit 5569acd95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 100 additions and 42 deletions

View File

@ -567,13 +567,13 @@ func TestHealingDanglingObject(t *testing.T) {
// Set globalStoragClass.STANDARD to EC:4 for this test
saveSC := globalStorageClass
defer func() {
globalStorageClass = saveSC
globalStorageClass.Update(saveSC)
}()
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 4,
},
}
})
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)

View File

@ -895,14 +895,14 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
// Object for test case 1 - No StorageClass defined, no MetaData in PutObject
object1 := "object1"
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
Standard: storageclass.StorageClass{
Parity: 4,
},
}
})
_, err = obj.PutObject(ctx, bucket, object1, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts)
if err != nil {
t.Fatalf("Failed to putObject %v", err)
@ -939,11 +939,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
object4 := "object4"
metadata4 := make(map[string]string)
metadata4["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 6,
},
}
})
_, err = obj.PutObject(ctx, bucket, object4, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata4})
if err != nil {
@ -962,11 +962,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
object5 := "object5"
metadata5 := make(map[string]string)
metadata5["x-amz-storage-class"] = storageclass.RRS
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
}
})
_, err = obj.PutObject(ctx, bucket, object5, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata5})
if err != nil {
@ -980,14 +980,14 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
object6 := "object6"
metadata6 := make(map[string]string)
metadata6["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 4,
},
RRS: storageclass.StorageClass{
Parity: 2,
},
}
})
_, err = obj.PutObject(ctx, bucket, object6, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata6})
if err != nil {
@ -1006,11 +1006,11 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
object7 := "object7"
metadata7 := make(map[string]string)
metadata7["x-amz-storage-class"] = storageclass.STANDARD
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
Standard: storageclass.StorageClass{
Parity: 5,
},
}
})
_, err = obj.PutObject(ctx, bucket, object7, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{UserDefined: metadata7})
if err != nil {
@ -1043,7 +1043,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
for _, tt := range tests {
tt := tt
t.(*testing.T).Run("", func(t *testing.T) {
globalStorageClass = tt.storageClassCfg
globalStorageClass.Update(tt.storageClassCfg)
actualReadQuorum, actualWriteQuorum, err := objectQuorumFromMeta(ctx, tt.parts, tt.errs, storageclass.DefaultParityBlocks(len(erasureDisks)))
if tt.expectedError != nil && err == nil {
t.Errorf("Expected %s, got %s", tt.expectedError, err)

View File

@ -2115,6 +2115,7 @@ type HealthResult struct {
HealingDrives int
PoolID, SetID int
WriteQuorum int
UsingDefaults bool
}
// ReadHealth returns if the cluster can serve read requests
@ -2209,6 +2210,11 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
}
}
var usingDefaults bool
if globalStorageClass.GetParityForSC(storageclass.STANDARD) < 0 {
usingDefaults = true
}
for poolIdx := range erasureSetUpCount {
for setIdx := range erasureSetUpCount[poolIdx] {
if erasureSetUpCount[poolIdx][setIdx] < poolWriteQuorums[poolIdx] {
@ -2221,6 +2227,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
PoolID: poolIdx,
SetID: setIdx,
WriteQuorum: poolWriteQuorums[poolIdx],
UsingDefaults: usingDefaults, // indicates if config was not initialized and we are using defaults on this node.
}
}
}
@ -2240,8 +2247,9 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
// to look at the healing side of the code.
if !opts.Maintenance {
return HealthResult{
Healthy: true,
WriteQuorum: maximumWriteQuorum,
Healthy: true,
WriteQuorum: maximumWriteQuorum,
UsingDefaults: usingDefaults, // indicates if config was not initialized and we are using defaults on this node.
}
}
@ -2249,6 +2257,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
Healthy: len(aggHealStateResult.HealDisks) == 0,
HealingDrives: len(aggHealStateResult.HealDisks),
WriteQuorum: maximumWriteQuorum,
UsingDefaults: usingDefaults, // indicates if config was not initialized and we are using defaults on this node.
}
}

View File

@ -55,6 +55,8 @@ func ClusterCheckHandler(w http.ResponseWriter, r *http.Request) {
if result.WriteQuorum > 0 {
w.Header().Set(xhttp.MinIOWriteQuorum, strconv.Itoa(result.WriteQuorum))
}
w.Header().Set(xhttp.MinIOStorageClassDefaults, strconv.FormatBool(result.UsingDefaults))
if !result.Healthy {
// return how many drives are being healed if any
if result.HealingDrives > 0 {

View File

@ -1,4 +1,4 @@
// Copyright (c) 2015-2021 MinIO, Inc.
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
@ -114,6 +114,7 @@ const (
capacityRawSubsystem MetricSubsystem = "capacity_raw"
capacityUsableSubsystem MetricSubsystem = "capacity_usable"
diskSubsystem MetricSubsystem = "disk"
storageClassSubsystem MetricSubsystem = "storage_class"
fileDescriptorSubsystem MetricSubsystem = "file_descriptor"
goRoutines MetricSubsystem = "go_routine"
ioSubsystem MetricSubsystem = "io"
@ -430,6 +431,26 @@ func getNodeDrivesTotalMD() MetricDescription {
}
}
func getNodeStandardParityMD() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: storageClassSubsystem,
Name: "standard_parity",
Help: "standard storage class parity",
Type: gaugeMetric,
}
}
func getNodeRRSParityMD() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
Subsystem: storageClassSubsystem,
Name: "rrs_parity",
Help: "reduced redundancy storage class parity",
Type: gaugeMetric,
}
}
func getNodeDrivesFreeInodes() MetricDescription {
return MetricDescription{
Namespace: nodeMetricNamespace,
@ -2187,23 +2208,33 @@ func getLocalStorageMetrics() *MetricsGroup {
Value: float64(disk.FreeInodes),
VariableLabels: map[string]string{"disk": disk.DrivePath},
})
metrics = append(metrics, Metric{
Description: getNodeDrivesOfflineTotalMD(),
Value: float64(offlineDrives.Sum()),
})
metrics = append(metrics, Metric{
Description: getNodeDrivesOnlineTotalMD(),
Value: float64(onlineDrives.Sum()),
})
metrics = append(metrics, Metric{
Description: getNodeDrivesTotalMD(),
Value: float64(totalDrives.Sum()),
})
}
metrics = append(metrics, Metric{
Description: getNodeDrivesOfflineTotalMD(),
Value: float64(offlineDrives.Sum()),
})
metrics = append(metrics, Metric{
Description: getNodeDrivesOnlineTotalMD(),
Value: float64(onlineDrives.Sum()),
})
metrics = append(metrics, Metric{
Description: getNodeDrivesTotalMD(),
Value: float64(totalDrives.Sum()),
})
metrics = append(metrics, Metric{
Description: getNodeStandardParityMD(),
Value: float64(storageInfo.Backend.StandardSCParity),
})
metrics = append(metrics, Metric{
Description: getNodeRRSParityMD(),
Value: float64(storageInfo.Backend.RRSCParity),
})
return
})
return mg

View File

@ -1197,14 +1197,14 @@ func testListObjectPartsDiskNotFound(obj ObjectLayer, instanceType string, disks
objectNames := []string{"minio-object-1.txt"}
uploadIDs := []string{}
globalStorageClass = storageclass.Config{
globalStorageClass.Update(storageclass.Config{
RRS: storageclass.StorageClass{
Parity: 2,
},
Standard: storageclass.StorageClass{
Parity: 4,
},
}
})
// bucketnames[0].
// objectNames[0].

View File

@ -748,7 +748,7 @@ func serverMain(ctx *cli.Context) {
printStartupMessage(getAPIEndpoints(), err)
// Print a warning at the end of the startup banner so it is more noticeable
if globalStorageClass.GetParityForSC("") == 0 {
if newObject.BackendInfo().StandardSCParity == 0 {
logger.Error("Warning: The standard parity is set to 0. This can lead to data loss.")
}
}()

View File

@ -80,8 +80,9 @@ var ConfigLock sync.RWMutex
// Config storage class configuration
type Config struct {
Standard StorageClass `json:"standard"`
RRS StorageClass `json:"rrs"`
Standard StorageClass `json:"standard"`
RRS StorageClass `json:"rrs"`
initialized bool
}
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
@ -217,11 +218,13 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
// returned.
//
// -- if input storage class is empty then standard is assumed
// -- if input is RRS but RRS is not configured default '2' parity
//
// for RRS is assumed
// -- if input is RRS but RRS is not configured/initialized '-1' parity
//
// -- if input is STANDARD but STANDARD is not configured '0' parity
// for RRS is assumed, the caller is expected to choose the right parity
// at that point.
//
// -- if input is STANDARD but STANDARD is not configured/initialized '-1' parity
//
// is returned, the caller is expected to choose the right parity
// at that point.
@ -230,8 +233,14 @@ func (sCfg Config) GetParityForSC(sc string) (parity int) {
defer ConfigLock.RUnlock()
switch strings.TrimSpace(sc) {
case RRS:
if !sCfg.initialized {
return -1
}
return sCfg.RRS.Parity
default:
if !sCfg.initialized {
return -1
}
return sCfg.Standard.Parity
}
}
@ -242,9 +251,10 @@ func (sCfg *Config) Update(newCfg Config) {
defer ConfigLock.Unlock()
sCfg.RRS = newCfg.RRS
sCfg.Standard = newCfg.Standard
sCfg.initialized = true
}
// Enabled returns if etcd is enabled.
// Enabled returns if storageClass is enabled is enabled.
func Enabled(kvs config.KVS) bool {
ssc := kvs.Get(ClassStandard)
rrsc := kvs.Get(ClassRRS)
@ -307,5 +317,6 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
return Config{}, err
}
cfg.initialized = true
return cfg, nil
}

View File

@ -143,6 +143,7 @@ func TestParityCount(t *testing.T) {
RRS: StorageClass{
Parity: 2,
},
initialized: true,
}
// Set env var for test case 4
if i+1 == 4 {

View File

@ -177,6 +177,10 @@ const (
// Writes expected write quorum
MinIOWriteQuorum = "x-minio-write-quorum"
// Indicates if we are using default storage class and there was problem loading config
// if this header is set to "true"
MinIOStorageClassDefaults = "x-minio-storage-class-defaults"
// Reports number of drives currently healing
MinIOHealingDrives = "x-minio-healing-drives"