Listen for PolicyDB events from etcd and fix etcd watch handling (#7992)

This commit is contained in:
Aditya Manthramurthy 2019-07-30 18:50:49 -07:00 committed by Harshavardhana
parent b83413b167
commit c71895f225

View File

@ -378,6 +378,8 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) {
usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix) usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix)
stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix) stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix)
policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix) policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix)
policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix)
policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix)
ctx, cancel := context.WithTimeout(context.Background(), ctx, cancel := context.WithTimeout(context.Background(),
defaultContextTimeout) defaultContextTimeout)
@ -398,6 +400,16 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) {
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix)) iamConfigPoliciesPrefix))
loadEtcdPolicy(ctx, policyName, sys.iamPolicyDocsMap) loadEtcdPolicy(ctx, policyName, sys.iamPolicyDocsMap)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
loadEtcdMappedPolicy(ctx, user, false, sys.iamUserPolicyMap)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
loadEtcdMappedPolicy(ctx, user, true, sys.iamUserPolicyMap)
} }
case eventDelete: case eventDelete:
switch { switch {
@ -413,6 +425,16 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) {
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key), policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix)) iamConfigPoliciesPrefix))
delete(sys.iamPolicyDocsMap, policyName) delete(sys.iamPolicyDocsMap, policyName)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
} }
} }
} }
@ -421,26 +443,27 @@ func (sys *IAMSys) reloadFromEvent(event *etcd.Event) {
func (sys *IAMSys) watchIAMEtcd() { func (sys *IAMSys) watchIAMEtcd() {
watchEtcd := func() { watchEtcd := func() {
// Refresh IAMSys with etcd watch. // Refresh IAMSys with etcd watch.
mainLoop:
for { for {
watchCh := globalEtcdClient.Watch(context.Background(), watchCh := globalEtcdClient.Watch(context.Background(),
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly()) iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
select { for {
case <-GlobalServiceDoneCh: select {
return case <-GlobalServiceDoneCh:
case watchResp, ok := <-watchCh: return
if !ok { case watchResp, ok := <-watchCh:
time.Sleep(1 * time.Second) if !ok {
continue goto mainLoop
} }
if err := watchResp.Err(); err != nil { if err := watchResp.Err(); err != nil {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
// log and retry. // log and retry.
time.Sleep(1 * time.Second) continue
continue }
}
for _, event := range watchResp.Events {
sys.Lock() sys.Lock()
sys.reloadFromEvent(event) for _, event := range watchResp.Events {
sys.reloadFromEvent(event)
}
sys.Unlock() sys.Unlock()
} }
} }