mirror of https://github.com/minio/minio.git
fix: generalize SC config and also skip healing sub-sys under SD (#15757)
This commit is contained in:
parent
048a46ec2a
commit
94dbb4a427
|
@ -246,6 +246,9 @@ outer:
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap {
|
func (r *ReplicationStats) getAllCachedLatest() BucketStatsMap {
|
||||||
|
if r == nil {
|
||||||
|
return BucketStatsMap{}
|
||||||
|
}
|
||||||
r.dlock.RLock()
|
r.dlock.RLock()
|
||||||
defer r.dlock.RUnlock()
|
defer r.dlock.RUnlock()
|
||||||
return r.mostRecentStats
|
return r.mostRecentStats
|
||||||
|
|
|
@ -69,7 +69,6 @@ func initHelp() {
|
||||||
config.LoggerWebhookSubSys: logger.DefaultLoggerWebhookKVS,
|
config.LoggerWebhookSubSys: logger.DefaultLoggerWebhookKVS,
|
||||||
config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS,
|
config.AuditWebhookSubSys: logger.DefaultAuditWebhookKVS,
|
||||||
config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS,
|
config.AuditKafkaSubSys: logger.DefaultAuditKafkaKVS,
|
||||||
config.HealSubSys: heal.DefaultKVS,
|
|
||||||
config.ScannerSubSys: scanner.DefaultKVS,
|
config.ScannerSubSys: scanner.DefaultKVS,
|
||||||
config.SubnetSubSys: subnet.DefaultKVS,
|
config.SubnetSubSys: subnet.DefaultKVS,
|
||||||
config.CallhomeSubSys: callhome.DefaultKVS,
|
config.CallhomeSubSys: callhome.DefaultKVS,
|
||||||
|
@ -79,6 +78,7 @@ func initHelp() {
|
||||||
}
|
}
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
kvs[config.StorageClassSubSys] = storageclass.DefaultKVS
|
kvs[config.StorageClassSubSys] = storageclass.DefaultKVS
|
||||||
|
kvs[config.HealSubSys] = heal.DefaultKVS
|
||||||
}
|
}
|
||||||
config.RegisterDefaultKVS(kvs)
|
config.RegisterDefaultKVS(kvs)
|
||||||
|
|
||||||
|
@ -125,10 +125,6 @@ func initHelp() {
|
||||||
Key: config.APISubSys,
|
Key: config.APISubSys,
|
||||||
Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.",
|
Description: "manage global HTTP API call specific features, such as throttling, authentication types, etc.",
|
||||||
},
|
},
|
||||||
config.HelpKV{
|
|
||||||
Key: config.HealSubSys,
|
|
||||||
Description: "manage object healing frequency and bitrot verification checks",
|
|
||||||
},
|
|
||||||
config.HelpKV{
|
config.HelpKV{
|
||||||
Key: config.ScannerSubSys,
|
Key: config.ScannerSubSys,
|
||||||
Description: "manage namespace scanning for usage calculation, lifecycle, healing and more",
|
Description: "manage namespace scanning for usage calculation, lifecycle, healing and more",
|
||||||
|
@ -213,12 +209,13 @@ func initHelp() {
|
||||||
}
|
}
|
||||||
|
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
helpSubSys = append(helpSubSys, config.HelpKV{})
|
helpSubSys = append(helpSubSys, config.HelpKV{
|
||||||
copy(helpSubSys[2:], helpSubSys[1:])
|
|
||||||
helpSubSys[1] = config.HelpKV{
|
|
||||||
Key: config.StorageClassSubSys,
|
Key: config.StorageClassSubSys,
|
||||||
Description: "define object level redundancy",
|
Description: "define object level redundancy",
|
||||||
}
|
}, config.HelpKV{
|
||||||
|
Key: config.HealSubSys,
|
||||||
|
Description: "manage object healing frequency and bitrot verification checks",
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
helpMap := map[string]config.HelpKVS{
|
helpMap := map[string]config.HelpKVS{
|
||||||
|
@ -289,14 +286,12 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case config.StorageClassSubSys:
|
case config.StorageClassSubSys:
|
||||||
if globalIsErasure {
|
if objAPI == nil {
|
||||||
if objAPI == nil {
|
return errServerNotInitialized
|
||||||
return errServerNotInitialized
|
}
|
||||||
}
|
for _, setDriveCount := range objAPI.SetDriveCounts() {
|
||||||
for _, setDriveCount := range objAPI.SetDriveCounts() {
|
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
||||||
if _, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount); err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case config.CacheSubSys:
|
case config.CacheSubSys:
|
||||||
|
@ -573,16 +568,18 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s config.Config, subSys string) error {
|
func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s config.Config, subSys string) error {
|
||||||
|
if objAPI == nil {
|
||||||
|
return errServerNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
|
setDriveCounts := objAPI.SetDriveCounts()
|
||||||
switch subSys {
|
switch subSys {
|
||||||
case config.APISubSys:
|
case config.APISubSys:
|
||||||
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
apiConfig, err := api.LookupConfig(s[config.APISubSys][config.Default])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Invalid api configuration: %w", err))
|
||||||
}
|
}
|
||||||
var setDriveCounts []int
|
|
||||||
if objAPI != nil {
|
|
||||||
setDriveCounts = objAPI.SetDriveCounts()
|
|
||||||
}
|
|
||||||
globalAPIConfig.init(apiConfig, setDriveCounts)
|
globalAPIConfig.init(apiConfig, setDriveCounts)
|
||||||
|
|
||||||
// Initialize remote instance transport once.
|
// Initialize remote instance transport once.
|
||||||
|
@ -595,10 +592,8 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
||||||
return fmt.Errorf("Unable to setup Compression: %w", err)
|
return fmt.Errorf("Unable to setup Compression: %w", err)
|
||||||
}
|
}
|
||||||
// Validate if the object layer supports compression.
|
// Validate if the object layer supports compression.
|
||||||
if objAPI != nil {
|
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
||||||
if cmpCfg.Enabled && !objAPI.IsCompressionSupported() {
|
return fmt.Errorf("Backend does not support compression")
|
||||||
return fmt.Errorf("Backend does not support compression")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
globalCompressConfigMu.Lock()
|
globalCompressConfigMu.Lock()
|
||||||
globalCompressConfig = cmpCfg
|
globalCompressConfig = cmpCfg
|
||||||
|
@ -667,19 +662,16 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err))
|
logger.LogIf(ctx, fmt.Errorf("Unable to update audit kafka targets: %w", err))
|
||||||
}
|
}
|
||||||
case config.StorageClassSubSys:
|
case config.StorageClassSubSys:
|
||||||
if globalIsErasure && objAPI != nil {
|
for i, setDriveCount := range setDriveCounts {
|
||||||
setDriveCounts := objAPI.SetDriveCounts()
|
sc, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
||||||
for i, setDriveCount := range setDriveCounts {
|
if err != nil {
|
||||||
sc, err := storageclass.LookupConfig(s[config.StorageClassSubSys][config.Default], setDriveCount)
|
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
||||||
if err != nil {
|
break
|
||||||
logger.LogIf(ctx, fmt.Errorf("Unable to initialize storage class config: %w", err))
|
}
|
||||||
break
|
// if we validated all setDriveCounts and it was successful
|
||||||
}
|
// proceed to store the correct storage class globally.
|
||||||
// if we validated all setDriveCounts and it was successful
|
if i == len(setDriveCounts)-1 {
|
||||||
// proceed to store the correct storage class globally.
|
globalStorageClass.Update(sc)
|
||||||
if i == len(setDriveCounts)-1 {
|
|
||||||
globalStorageClass.Update(sc)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case config.CallhomeSubSys:
|
case config.CallhomeSubSys:
|
||||||
|
|
|
@ -230,6 +230,13 @@ func TestHealing(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer obj.Shutdown(context.Background())
|
defer obj.Shutdown(context.Background())
|
||||||
|
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
if err = newTestConfig(globalMinioDefaultRegion, obj); err != nil {
|
||||||
|
t.Fatalf("Unable to initialize server config. %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
defer removeRoots(fsDirs)
|
defer removeRoots(fsDirs)
|
||||||
|
|
||||||
z := obj.(*erasureServerPools)
|
z := obj.(*erasureServerPools)
|
||||||
|
|
|
@ -220,17 +220,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||||
// Set when gateway is enabled
|
// Set when gateway is enabled
|
||||||
globalIsGateway = true
|
globalIsGateway = true
|
||||||
|
|
||||||
// Initialize server config.
|
|
||||||
srvCfg := newServerConfig()
|
|
||||||
|
|
||||||
// Override any values from ENVs.
|
|
||||||
lookupConfigs(srvCfg, nil)
|
|
||||||
|
|
||||||
// hold the mutex lock before a new config is assigned.
|
|
||||||
globalServerConfigMu.Lock()
|
|
||||||
globalServerConfig = srvCfg
|
|
||||||
globalServerConfigMu.Unlock()
|
|
||||||
|
|
||||||
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
||||||
// normalizing URL path minio/minio#3256
|
// normalizing URL path minio/minio#3256
|
||||||
// avoid URL path encoding minio/minio#8950
|
// avoid URL path encoding minio/minio#8950
|
||||||
|
@ -252,11 +241,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||||
// Add API router.
|
// Add API router.
|
||||||
registerAPIRouter(router)
|
registerAPIRouter(router)
|
||||||
|
|
||||||
// Enable bucket forwarding handler only if bucket federation is enabled.
|
|
||||||
if globalDNSConfig != nil && globalBucketFederation {
|
|
||||||
globalHandlers = append(globalHandlers, setBucketForwardingHandler)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use all the middlewares
|
// Use all the middlewares
|
||||||
router.Use(globalHandlers...)
|
router.Use(globalHandlers...)
|
||||||
|
|
||||||
|
@ -307,6 +291,17 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
|
||||||
globalObjectAPI = newObject
|
globalObjectAPI = newObject
|
||||||
globalObjLayerMutex.Unlock()
|
globalObjLayerMutex.Unlock()
|
||||||
|
|
||||||
|
// Initialize server config.
|
||||||
|
srvCfg := newServerConfig()
|
||||||
|
|
||||||
|
// Override any values from ENVs.
|
||||||
|
lookupConfigs(srvCfg, newObject)
|
||||||
|
|
||||||
|
// hold the mutex lock before a new config is assigned.
|
||||||
|
globalServerConfigMu.Lock()
|
||||||
|
globalServerConfig = srvCfg
|
||||||
|
globalServerConfigMu.Unlock()
|
||||||
|
|
||||||
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient, globalRefreshIAMInterval)
|
||||||
|
|
||||||
if gatewayName == NASBackendGateway {
|
if gatewayName == NASBackendGateway {
|
||||||
|
|
|
@ -78,11 +78,15 @@ func handleSignals() {
|
||||||
case <-globalHTTPServerErrorCh:
|
case <-globalHTTPServerErrorCh:
|
||||||
exit(stopProcess())
|
exit(stopProcess())
|
||||||
case osSignal := <-globalOSSignalCh:
|
case osSignal := <-globalOSSignalCh:
|
||||||
globalReplicationPool.SaveState(context.Background())
|
if !globalIsGateway {
|
||||||
|
globalReplicationPool.SaveState(context.Background())
|
||||||
|
}
|
||||||
logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String()))
|
logger.Info("Exiting on signal: %s", strings.ToUpper(osSignal.String()))
|
||||||
exit(stopProcess())
|
exit(stopProcess())
|
||||||
case signal := <-globalServiceSignalCh:
|
case signal := <-globalServiceSignalCh:
|
||||||
globalReplicationPool.SaveState(context.Background())
|
if !globalIsGateway {
|
||||||
|
globalReplicationPool.SaveState(context.Background())
|
||||||
|
}
|
||||||
switch signal {
|
switch signal {
|
||||||
case serviceRestart:
|
case serviceRestart:
|
||||||
logger.Info("Restarting on service signal")
|
logger.Info("Restarting on service signal")
|
||||||
|
|
|
@ -215,6 +215,7 @@ func prepareErasure(ctx context.Context, nDisks int) (ObjectLayer, []string, err
|
||||||
removeRoots(fsDirs)
|
removeRoots(fsDirs)
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return obj, fsDirs, nil
|
return obj, fsDirs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -502,6 +503,8 @@ func newTestConfig(bucketLocation string, obj ObjectLayer) (err error) {
|
||||||
// Set a default region.
|
// Set a default region.
|
||||||
config.SetRegion(globalServerConfig, bucketLocation)
|
config.SetRegion(globalServerConfig, bucketLocation)
|
||||||
|
|
||||||
|
applyDynamicConfigForSubSys(context.Background(), obj, globalServerConfig, config.StorageClassSubSys)
|
||||||
|
|
||||||
// Save config.
|
// Save config.
|
||||||
return saveServerConfig(context.Background(), obj, globalServerConfig)
|
return saveServerConfig(context.Background(), obj, globalServerConfig)
|
||||||
}
|
}
|
||||||
|
@ -1742,6 +1745,10 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
|
||||||
// Executing the object layer tests for single node setup.
|
// Executing the object layer tests for single node setup.
|
||||||
objAPITest(objLayer, ErasureSDStr, bucketFS, fsAPIRouter, credentials, t)
|
objAPITest(objLayer, ErasureSDStr, bucketFS, fsAPIRouter, credentials, t)
|
||||||
|
|
||||||
|
// reset globals.
|
||||||
|
// this is to make sure that the tests are not affected by modified value.
|
||||||
|
resetTestGlobals()
|
||||||
|
|
||||||
objLayer, erasureDisks, err := prepareErasure16(ctx)
|
objLayer, erasureDisks, err := prepareErasure16(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
t.Fatalf("Initialization of object layer failed for Erasure setup: %s", err)
|
||||||
|
@ -1752,6 +1759,13 @@ func ExecObjectLayerAPITest(t *testing.T, objAPITest objAPITestType, endpoints [
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Initialzation of API handler tests failed: <ERROR> %s", err)
|
t.Fatalf("Initialzation of API handler tests failed: <ERROR> %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initialize the server and obtain the credentials and root.
|
||||||
|
// credentials are necessary to sign the HTTP request.
|
||||||
|
if err = newTestConfig(globalMinioDefaultRegion, objLayer); err != nil {
|
||||||
|
t.Fatalf("Unable to initialize server config. %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Executing the object layer tests for Erasure.
|
// Executing the object layer tests for Erasure.
|
||||||
objAPITest(objLayer, ErasureTestStr, bucketErasure, erAPIRouter, credentials, t)
|
objAPITest(objLayer, ErasureTestStr, bucketErasure, erAPIRouter, credentials, t)
|
||||||
|
|
||||||
|
|
|
@ -193,12 +193,14 @@ func validateParity(ssParity, rrsParity, setDriveCount int) (err error) {
|
||||||
return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks)
|
return fmt.Errorf("Reduced redundancy storage class parity %d should be greater than or equal to %d", rrsParity, minParityDisks)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ssParity > setDriveCount/2 {
|
if setDriveCount > 2 {
|
||||||
return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, setDriveCount/2)
|
if ssParity > setDriveCount/2 {
|
||||||
}
|
return fmt.Errorf("Standard storage class parity %d should be less than or equal to %d", ssParity, setDriveCount/2)
|
||||||
|
}
|
||||||
|
|
||||||
if rrsParity > setDriveCount/2 {
|
if rrsParity > setDriveCount/2 {
|
||||||
return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, setDriveCount/2)
|
return fmt.Errorf("Reduced redundancy storage class parity %d should be less than or equal to %d", rrsParity, setDriveCount/2)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ssParity > 0 && rrsParity > 0 {
|
if ssParity > 0 && rrsParity > 0 {
|
||||||
|
@ -283,6 +285,8 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
cfg.Standard.Parity = DefaultParityBlocks(setDriveCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rrsc != "" {
|
if rrsc != "" {
|
||||||
|
@ -290,14 +294,11 @@ func LookupConfig(kvs config.KVS, setDriveCount int) (cfg Config, err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Config{}, err
|
return Config{}, err
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
|
||||||
if cfg.RRS.Parity == 0 && rrsc == "" {
|
|
||||||
cfg.RRS.Parity = defaultRRSParity
|
cfg.RRS.Parity = defaultRRSParity
|
||||||
}
|
if setDriveCount == 1 {
|
||||||
|
cfg.RRS.Parity = 0
|
||||||
if cfg.Standard.Parity == 0 && ssc == "" {
|
}
|
||||||
cfg.Standard.Parity = DefaultParityBlocks(setDriveCount)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validation is done after parsing both the storage classes. This is needed because we need one
|
// Validation is done after parsing both the storage classes. This is needed because we need one
|
||||||
|
|
Loading…
Reference in New Issue