Handle read/quorum errors when initializing all subsystems (#6585)

- Only require len(disks)/2 to initialize the cluster
- Fix checking of read/write quorm in subsystems init
- Add retry mechanism in policy and notification to avoid aborting in case of read/write quorums errors
This commit is contained in:
Anis Elleuch 2018-10-08 23:47:13 +01:00 committed by kannappanr
parent d8a2975a68
commit cbc5d78a09
4 changed files with 83 additions and 44 deletions

View File

@ -26,6 +26,7 @@ import (
"os" "os"
"path" "path"
"runtime" "runtime"
"strings"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -234,7 +235,8 @@ func (sys *ConfigSys) Init(objAPI ObjectLayer) error {
case _ = <-retryTimerCh: case _ = <-retryTimerCh:
err := initConfig(objAPI) err := initConfig(objAPI)
if err != nil { if err != nil {
if isInsufficientReadQuorum(err) || isInsufficientWriteQuorum(err) { if strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for configuration to be initialized..") logger.Info("Waiting for configuration to be initialized..")
continue continue
} }

View File

@ -241,10 +241,10 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
// and configFile, take a transaction lock to avoid data race between readConfig() // and configFile, take a transaction lock to avoid data race between readConfig()
// and saveConfig(). // and saveConfig().
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile) objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
if err := objLock.GetLock(globalOperationTimeout); err != nil { if err := objLock.GetRLock(globalOperationTimeout); err != nil {
return err return err
} }
defer objLock.Unlock() defer objLock.RUnlock()
reader, e := readConfig(ctx, objAPI, configFile) reader, e := readConfig(ctx, objAPI, configFile)
if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) { if e != nil && !IsErrIgnored(e, errDiskNotFound, errConfigNotFound) {
@ -265,7 +265,6 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
return nil return nil
} }
activeListenerList := []ListenBucketNotificationArgs{}
for _, args := range listenerList { for _, args := range listenerList {
found, err := isLocalHost(args.Addr.Name) found, err := isLocalHost(args.Addr.Name)
if err != nil { if err != nil {
@ -301,16 +300,31 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
return err return err
} }
activeListenerList = append(activeListenerList, args)
} }
data, err := json.Marshal(activeListenerList) return nil
}
func (sys *NotificationSys) refresh(objAPI ObjectLayer) error {
buckets, err := objAPI.ListBuckets(context.Background())
if err != nil { if err != nil {
logger.LogIf(ctx, err)
return err return err
} }
for _, bucket := range buckets {
return saveConfig(objAPI, configFile, data) ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
config, err := readNotificationConfig(ctx, objAPI, bucket.Name)
if err != nil && err != errNoSuchNotifications {
return err
}
if err == errNoSuchNotifications {
continue
}
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil {
return err
}
}
return nil
} }
// Init - initializes notification system from notification.xml and listener.json of all buckets. // Init - initializes notification system from notification.xml and listener.json of all buckets.
@ -319,29 +333,30 @@ func (sys *NotificationSys) Init(objAPI ObjectLayer) error {
return errInvalidArgument return errInvalidArgument
} }
buckets, err := objAPI.ListBuckets(context.Background()) doneCh := make(chan struct{})
if err != nil { defer close(doneCh)
// Initializing notification needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
retryTimerCh := newRetryTimerSimple(doneCh)
for {
select {
case _ = <-retryTimerCh:
if err := sys.refresh(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for notification subsystem to be initialized..")
continue
}
return err return err
} }
for _, bucket := range buckets {
ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{BucketName: bucket.Name})
config, err := readNotificationConfig(ctx, objAPI, bucket.Name)
if err != nil {
if !IsErrIgnored(err, errDiskNotFound, errNoSuchNotifications) {
return err
}
} else {
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
}
if err = sys.initListeners(ctx, objAPI, bucket.Name); err != nil {
return err
}
}
return nil return nil
} }
}
}
// AddRulesMap - adds rules map for bucket name. // AddRulesMap - adds rules map for bucket name.
func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) { func (sys *NotificationSys) AddRulesMap(bucketName string, rulesMap event.RulesMap) {

View File

@ -21,6 +21,7 @@ import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"path" "path"
"strings"
"sync" "sync"
"time" "time"
@ -131,11 +132,7 @@ func (sys *PolicySys) Init(objAPI ObjectLayer) error {
return errInvalidArgument return errInvalidArgument
} }
// Load PolicySys once during boot. defer func() {
if err := sys.refresh(objAPI); err != nil {
return err
}
// Refresh PolicySys in background. // Refresh PolicySys in background.
go func() { go func() {
ticker := time.NewTicker(globalRefreshBucketPolicyInterval) ticker := time.NewTicker(globalRefreshBucketPolicyInterval)
@ -149,8 +146,33 @@ func (sys *PolicySys) Init(objAPI ObjectLayer) error {
} }
} }
}() }()
}()
doneCh := make(chan struct{})
defer close(doneCh)
// Initializing policy needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
retryTimerCh := newRetryTimerSimple(doneCh)
for {
select {
case _ = <-retryTimerCh:
// Load PolicySys once during boot.
if err := sys.refresh(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for policy subsystem to be initialized..")
continue
}
return err
}
return nil return nil
} }
}
}
// NewPolicySys - creates new policy system. // NewPolicySys - creates new policy system.
func NewPolicySys() *PolicySys { func NewPolicySys() *PolicySys {

View File

@ -174,7 +174,7 @@ func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, f
// any given sets. // any given sets.
func (s *xlSets) connectDisksWithQuorum() { func (s *xlSets) connectDisksWithQuorum() {
var onlineDisks int var onlineDisks int
for onlineDisks < (len(s.endpoints)/2)+1 { for onlineDisks < len(s.endpoints)/2 {
for _, endpoint := range s.endpoints { for _, endpoint := range s.endpoints {
if s.isConnected(endpoint) { if s.isConnected(endpoint) {
continue continue