From 83ca1a8d6446fcb9dd7449443884604e7f955771 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 26 Apr 2019 06:18:50 -0700 Subject: [PATCH] Use etcd watch to reload IAM users (#7551) Currently we used to reload users every five minutes, regardless of etcd is configured or not. But with etcd configured we can do this more asynchronously to trigger a refresh by using the watch API Fixes #7515 --- cmd/config-common.go | 27 +++++++--- cmd/iam.go | 121 +++++++++++++++++++++++++++++++------------ 2 files changed, 108 insertions(+), 40 deletions(-) diff --git a/cmd/config-common.go b/cmd/config-common.go index c6f32bcba..fce820931 100644 --- a/cmd/config-common.go +++ b/cmd/config-common.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "time" etcd "github.com/coreos/etcd/clientv3" "github.com/minio/minio/cmd/logger" @@ -105,12 +106,26 @@ func readConfigEtcd(ctx context.Context, client *etcd.Client, configFile string) // watchConfigEtcd - watches for changes on `configFile` on etcd and loads them. func watchConfigEtcd(objAPI ObjectLayer, configFile string, loadCfgFn func(ObjectLayer) error) { - ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) - defer cancel() - for watchResp := range globalEtcdClient.Watch(ctx, configFile) { - for _, event := range watchResp.Events { - if event.IsModify() || event.IsCreate() { - loadCfgFn(objAPI) + for { + watchCh := globalEtcdClient.Watch(context.Background(), iamConfigPrefix) + select { + case <-GlobalServiceDoneCh: + return + case watchResp, ok := <-watchCh: + if !ok { + time.Sleep(1 * time.Second) + continue + } + if err := watchResp.Err(); err != nil { + logger.LogIf(context.Background(), err) + // log and retry. + time.Sleep(1 * time.Second) + continue + } + for _, event := range watchResp.Events { + if event.IsModify() || event.IsCreate() { + loadCfgFn(objAPI) + } } } } diff --git a/cmd/iam.go b/cmd/iam.go index 393ab042a..062446d4f 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -62,6 +62,9 @@ type IAMSys struct { // Load - loads iam subsystem func (sys *IAMSys) Load(objAPI ObjectLayer) error { + if globalEtcdClient != nil { + return sys.refreshEtcd() + } return sys.refresh(objAPI) } @@ -71,21 +74,52 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error { return errInvalidArgument } - defer func() { - // Refresh IAMSys in background. - go func() { - ticker := time.NewTicker(globalRefreshIAMInterval) - defer ticker.Stop() - for { - select { - case <-GlobalServiceDoneCh: - return - case <-ticker.C: - sys.refresh(objAPI) + if globalEtcdClient != nil { + defer func() { + go func() { + // Refresh IAMSys with etcd watch. + for { + watchCh := globalEtcdClient.Watch(context.Background(), iamConfigPrefix) + select { + case <-GlobalServiceDoneCh: + return + case watchResp, ok := <-watchCh: + if !ok { + time.Sleep(1 * time.Second) + continue + } + if err := watchResp.Err(); err != nil { + logger.LogIf(context.Background(), err) + // log and retry. + time.Sleep(1 * time.Second) + continue + } + for _, event := range watchResp.Events { + if event.IsModify() || event.IsCreate() || event.Type == etcd.EventTypeDelete { + sys.refreshEtcd() + } + } + } } - } + }() }() - }() + } else { + defer func() { + // Refresh IAMSys in background. + go func() { + ticker := time.NewTicker(globalRefreshIAMInterval) + defer ticker.Stop() + for { + select { + case <-GlobalServiceDoneCh: + return + case <-ticker.C: + sys.refresh(objAPI) + } + } + }() + }() + } doneCh := make(chan struct{}) defer close(doneCh) @@ -95,6 +129,9 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error { // - Read quorum is lost just after the initialization // of the object layer. for range newRetryTimerSimple(doneCh) { + if globalEtcdClient != nil { + return sys.refreshEtcd() + } // Load IAMSys once during boot. if err := sys.refresh(objAPI); err != nil { if err == errDiskNotFound || @@ -697,35 +734,51 @@ func setDefaultCannedPolicies(policies map[string]iampolicy.Policy) { } } +func (sys *IAMSys) refreshEtcd() error { + iamUsersMap := make(map[string]auth.Credentials) + iamPolicyMap := make(map[string]string) + iamCannedPolicyMap := make(map[string]iampolicy.Policy) + + if err := reloadEtcdPolicies(iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil { + return err + } + if err := reloadEtcdUsers(iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil { + return err + } + if err := reloadEtcdUsers(iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil { + return err + } + + // Sets default canned policies, if none are set. + setDefaultCannedPolicies(iamCannedPolicyMap) + + sys.Lock() + defer sys.Unlock() + + sys.iamUsersMap = iamUsersMap + sys.iamPolicyMap = iamPolicyMap + sys.iamCannedPolicyMap = iamCannedPolicyMap + + return nil +} + // Refresh IAMSys. func (sys *IAMSys) refresh(objAPI ObjectLayer) error { iamUsersMap := make(map[string]auth.Credentials) iamPolicyMap := make(map[string]string) iamCannedPolicyMap := make(map[string]iampolicy.Policy) - if globalEtcdClient != nil { - if err := reloadEtcdPolicies(iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil { - return err - } - if err := reloadEtcdUsers(iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil { - return err - } - if err := reloadEtcdUsers(iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil { - return err - } - } else { - if err := reloadPolicies(objAPI, iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil { - return err - } - if err := reloadUsers(objAPI, iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil { - return err - } - if err := reloadUsers(objAPI, iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil { - return err - } + if err := reloadPolicies(objAPI, iamConfigPoliciesPrefix, iamCannedPolicyMap); err != nil { + return err + } + if err := reloadUsers(objAPI, iamConfigUsersPrefix, iamUsersMap, iamPolicyMap); err != nil { + return err + } + if err := reloadUsers(objAPI, iamConfigSTSPrefix, iamUsersMap, iamPolicyMap); err != nil { + return err } - // Sets default canned policies, if none set. + // Sets default canned policies, if none are set. setDefaultCannedPolicies(iamCannedPolicyMap) sys.Lock()