Remove IAMSys dependency from IAMStorageAPI (#13436)

IAMSys is a higher-level object, that should not be called by the lower-level
storage API interface for IAM. This is to prepare for further improvements in
IAM code.
This commit is contained in:
Aditya Manthramurthy 2021-10-18 11:21:57 -07:00 committed by GitHub
parent d86513cbba
commit 221ef78faa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 197 additions and 172 deletions

View File

@ -81,10 +81,6 @@ func (ids *iamDummyStore) loadMappedPolicies(ctx context.Context, userType IAMUs
return nil return nil
} }
func (ids *iamDummyStore) loadAll(ctx context.Context, sys *IAMSys) error {
return sys.Load(ctx, ids)
}
func (ids *iamDummyStore) saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error { func (ids *iamDummyStore) saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error {
return nil return nil
} }
@ -128,6 +124,3 @@ func (ids *iamDummyStore) deleteUserIdentity(ctx context.Context, name string, u
func (ids *iamDummyStore) deleteGroupInfo(ctx context.Context, name string) error { func (ids *iamDummyStore) deleteGroupInfo(ctx context.Context, name string) error {
return nil return nil
} }
func (ids *iamDummyStore) watch(context.Context, *IAMSys) {
}

View File

@ -457,10 +457,6 @@ func (ies *IAMEtcdStore) loadMappedPolicies(ctx context.Context, userType IAMUse
} }
func (ies *IAMEtcdStore) loadAll(ctx context.Context, sys *IAMSys) error {
return sys.Load(ctx, ies)
}
func (ies *IAMEtcdStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error { func (ies *IAMEtcdStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error {
return ies.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName)) return ies.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName))
} }
@ -509,153 +505,58 @@ func (ies *IAMEtcdStore) deleteGroupInfo(ctx context.Context, name string) error
return err return err
} }
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) { func (ies *IAMEtcdStore) watch(ctx context.Context, keyPath string) <-chan iamWatchEvent {
for { ch := make(chan iamWatchEvent)
outerLoop:
// Refresh IAMSys with etcd watch.
watchCh := ies.client.Watch(ctx,
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
// go routine to read events from the etcd watch channel and send them
// down `ch`
go func() {
for { for {
select { outerLoop:
case <-ctx.Done(): watchCh := ies.client.Watch(ctx,
return keyPath, etcd.WithPrefix(), etcd.WithKeysOnly())
case watchResp, ok := <-watchCh:
if !ok {
time.Sleep(1 * time.Second)
// Upon an error on watch channel
// re-init the watch channel.
goto outerLoop
}
if err := watchResp.Err(); err != nil {
logger.LogIf(ctx, err)
// log and retry.
time.Sleep(1 * time.Second)
// Upon an error on watch channel
// re-init the watch channel.
goto outerLoop
}
for _, event := range watchResp.Events {
ies.lock()
ies.reloadFromEvent(sys, event)
ies.unlock()
}
}
}
}
}
// sys.RLock is held by caller. for {
func (ies *IAMEtcdStore) reloadFromEvent(sys *IAMSys, event *etcd.Event) { select {
eventCreate := event.IsModify() || event.IsCreate() case <-ctx.Done():
eventDelete := event.Type == etcd.EventTypeDelete return
usersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigUsersPrefix) case watchResp, ok := <-watchCh:
groupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigGroupsPrefix) if !ok {
stsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigSTSPrefix) time.Sleep(1 * time.Second)
svcPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigServiceAccountsPrefix) // Upon an error on watch channel
policyPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPoliciesPrefix) // re-init the watch channel.
policyDBUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBUsersPrefix) goto outerLoop
policyDBSTSUsersPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBSTSUsersPrefix) }
policyDBGroupsPrefix := strings.HasPrefix(string(event.Kv.Key), iamConfigPolicyDBGroupsPrefix) if err := watchResp.Err(); err != nil {
logger.LogIf(ctx, err)
// log and retry.
time.Sleep(1 * time.Second)
// Upon an error on watch channel
// re-init the watch channel.
goto outerLoop
}
for _, event := range watchResp.Events {
isCreateEvent := event.IsModify() || event.IsCreate()
isDeleteEvent := event.Type == etcd.EventTypeDelete
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) switch {
defer cancel() case isCreateEvent:
ch <- iamWatchEvent{
isCreated: true,
keyPath: string(event.Kv.Key),
}
case isDeleteEvent:
ch <- iamWatchEvent{
isCreated: false,
keyPath: string(event.Kv.Key),
}
}
switch {
case eventCreate:
switch {
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
ies.loadUser(ctx, accessKey, regUser, sys.iamUsersMap)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
// Not using ies.loadUser due to the custom loading of an STS account
var u UserIdentity
if err := ies.loadIAMConfig(ctx, &u, getUserIdentityPath(accessKey, stsUser)); err == nil {
ies.addUser(ctx, accessKey, stsUser, u, sys.iamUsersMap)
// We are on purpose not persisting the policy map for parent
// user, although this is a hack, it is a good enough hack
// at this point in time - we need to overhaul our OIDC
// usage with service accounts with a more cleaner implementation
//
// This mapping is necessary to ensure that valid credentials
// have necessary ParentUser present - this is mainly for only
// webIdentity based STS tokens.
parentAccessKey := u.Credentials.ParentUser
if parentAccessKey != "" && parentAccessKey != globalActiveCred.AccessKey {
if _, ok := sys.iamUserPolicyMap[parentAccessKey]; !ok {
sys.iamUserPolicyMap[parentAccessKey] = sys.iamUserPolicyMap[accessKey]
} }
} }
} }
case svcPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigServiceAccountsPrefix))
ies.loadUser(ctx, accessKey, svcUser, sys.iamUsersMap)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigGroupsPrefix))
ies.loadGroup(ctx, group, sys.iamGroupsMap)
gi := sys.iamGroupsMap[group]
sys.removeGroupFromMembershipsMap(group)
sys.updateGroupMembershipsMap(group, &gi)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
ies.loadPolicyDoc(ctx, policyName, sys.iamPolicyDocsMap)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(ctx, user, regUser, false, sys.iamUserPolicyMap)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(ctx, user, stsUser, false, sys.iamUserPolicyMap)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
ies.loadMappedPolicy(ctx, user, regUser, true, sys.iamGroupPolicyMap)
} }
case eventDelete: }()
switch { return ch
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigUsersPrefix))
delete(sys.iamUsersMap, accessKey)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigSTSPrefix))
delete(sys.iamUsersMap, accessKey)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigGroupsPrefix))
sys.removeGroupFromMembershipsMap(group)
delete(sys.iamGroupsMap, group)
delete(sys.iamGroupPolicyMap, group)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(string(event.Kv.Key),
iamConfigPoliciesPrefix))
delete(sys.iamPolicyDocsMap, policyName)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(string(event.Kv.Key),
iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamGroupPolicyMap, user)
}
}
} }

View File

@ -22,7 +22,6 @@ import (
"path" "path"
"strings" "strings"
"sync" "sync"
"time"
"unicode/utf8" "unicode/utf8"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
@ -370,11 +369,6 @@ func (iamOS *IAMObjectStore) loadMappedPolicies(ctx context.Context, userType IA
return nil return nil
} }
// Refresh IAMSys. If an object layer is passed in use that, otherwise load from global.
func (iamOS *IAMObjectStore) loadAll(ctx context.Context, sys *IAMSys) error {
return sys.Load(ctx, iamOS)
}
func (iamOS *IAMObjectStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error { func (iamOS *IAMObjectStore) savePolicyDoc(ctx context.Context, policyName string, p iampolicy.Policy) error {
return iamOS.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName)) return iamOS.saveIAMConfig(ctx, &p, getPolicyDocPath(policyName))
} }
@ -463,13 +457,3 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri
return ch return ch
} }
func (iamOS *IAMObjectStore) watch(ctx context.Context, sys *IAMSys) {
// Refresh IAMSys.
for {
time.Sleep(globalRefreshIAMInterval)
if err := iamOS.loadAll(ctx, sys); err != nil {
logger.LogIf(ctx, err)
}
}
}

View File

@ -25,6 +25,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"path"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -240,6 +241,11 @@ type options struct {
ttl int64 //expiry in seconds ttl int64 //expiry in seconds
} }
type iamWatchEvent struct {
isCreated bool // !isCreated implies a delete event.
keyPath string
}
// IAMStorageAPI defines an interface for the IAM persistence layer // IAMStorageAPI defines an interface for the IAM persistence layer
type IAMStorageAPI interface { type IAMStorageAPI interface {
lock() lock()
@ -262,8 +268,6 @@ type IAMStorageAPI interface {
loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error loadMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error
loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error loadMappedPolicies(ctx context.Context, userType IAMUserType, isGroup bool, m map[string]MappedPolicy) error
loadAll(context.Context, *IAMSys) error
saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error saveIAMConfig(ctx context.Context, item interface{}, path string, opts ...options) error
loadIAMConfig(ctx context.Context, item interface{}, path string) error loadIAMConfig(ctx context.Context, item interface{}, path string) error
deleteIAMConfig(ctx context.Context, path string) error deleteIAMConfig(ctx context.Context, path string) error
@ -277,8 +281,12 @@ type IAMStorageAPI interface {
deleteMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool) error deleteMappedPolicy(ctx context.Context, name string, userType IAMUserType, isGroup bool) error
deleteUserIdentity(ctx context.Context, name string, userType IAMUserType) error deleteUserIdentity(ctx context.Context, name string, userType IAMUserType) error
deleteGroupInfo(ctx context.Context, name string) error deleteGroupInfo(ctx context.Context, name string) error
}
watch(context.Context, *IAMSys) // iamStorageWatcher is implemented by `IAMStorageAPI` implementers that
// additionally support watching storage for changes.
type iamStorageWatcher interface {
watch(ctx context.Context, keyPath string) <-chan iamWatchEvent
} }
// LoadGroup - loads a specific group from storage, and updates the // LoadGroup - loads a specific group from storage, and updates the
@ -642,7 +650,7 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
} }
for { for {
if err := sys.store.loadAll(retryCtx, sys); err != nil { if err := sys.Load(retryCtx, sys.store); err != nil {
if configRetriableErrors(err) { if configRetriableErrors(err) {
logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err) logger.Info("Waiting for all MinIO IAM sub-system to be initialized.. possible cause (%v)", err)
time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
@ -674,7 +682,146 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
}() }()
} }
go sys.store.watch(ctx, sys) go sys.watch(ctx)
}
func (sys *IAMSys) watch(ctx context.Context) {
watcher, ok := sys.store.(iamStorageWatcher)
if ok {
ch := watcher.watch(ctx, iamConfigPrefix)
for event := range ch {
// we simply log errors
err := sys.loadWatchedEvent(ctx, event)
logger.LogIf(ctx, err)
}
} else {
// Fall back to loading all items
for {
time.Sleep(globalRefreshIAMInterval)
if err := sys.Load(ctx, sys.store); err != nil {
logger.LogIf(ctx, err)
}
}
}
}
func (sys *IAMSys) loadWatchedEvent(outerCtx context.Context, event iamWatchEvent) (err error) {
usersPrefix := strings.HasPrefix(event.keyPath, iamConfigUsersPrefix)
groupsPrefix := strings.HasPrefix(event.keyPath, iamConfigGroupsPrefix)
stsPrefix := strings.HasPrefix(event.keyPath, iamConfigSTSPrefix)
svcPrefix := strings.HasPrefix(event.keyPath, iamConfigServiceAccountsPrefix)
policyPrefix := strings.HasPrefix(event.keyPath, iamConfigPoliciesPrefix)
policyDBUsersPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix)
policyDBSTSUsersPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix)
policyDBGroupsPrefix := strings.HasPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix)
ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout)
defer cancel()
// We need to read from storage and write to in-memory map, so we need
// only a read lock on storage, however in some cases we modify storage
// too (e.g. when credentials from storage are expired, we delete them),
// so we take write locks for both.
sys.Lock()
defer sys.Unlock()
sys.store.lock()
defer sys.store.unlock()
if event.isCreated {
switch {
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigUsersPrefix))
err = sys.store.loadUser(ctx, accessKey, regUser, sys.iamUsersMap)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigSTSPrefix))
err = sys.store.loadUser(ctx, accessKey, stsUser, sys.iamUsersMap)
if err == nil {
// We need to update the policy map for the
// parent below, so we retrieve the credentials
// just added.
creds, ok := sys.iamUsersMap[accessKey]
if !ok {
// This could happen, if the credential
// being loaded has expired.
break
}
// We are on purpose not persisting the policy map for parent
// user, although this is a hack, it is a good enough hack
// at this point in time - we need to overhaul our OIDC
// usage with service accounts with a more cleaner implementation
//
// This mapping is necessary to ensure that valid credentials
// have necessary ParentUser present - this is mainly for only
// webIdentity based STS tokens.
parentAccessKey := creds.ParentUser
if parentAccessKey != "" && parentAccessKey != globalActiveCred.AccessKey {
if _, ok := sys.iamUserPolicyMap[parentAccessKey]; !ok {
sys.iamUserPolicyMap[parentAccessKey] = sys.iamUserPolicyMap[accessKey]
}
}
}
case svcPrefix:
accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigServiceAccountsPrefix))
err = sys.store.loadUser(ctx, accessKey, svcUser, sys.iamUsersMap)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigGroupsPrefix))
err = sys.store.loadGroup(ctx, group, sys.iamGroupsMap)
if err == nil {
gi := sys.iamGroupsMap[group]
sys.removeGroupFromMembershipsMap(group)
sys.updateGroupMembershipsMap(group, &gi)
}
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigPoliciesPrefix))
err = sys.store.loadPolicyDoc(ctx, policyName, sys.iamPolicyDocsMap)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
err = sys.store.loadMappedPolicy(ctx, user, regUser, false, sys.iamUserPolicyMap)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
err = sys.store.loadMappedPolicy(ctx, user, stsUser, false, sys.iamUserPolicyMap)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
err = sys.store.loadMappedPolicy(ctx, user, regUser, true, sys.iamGroupPolicyMap)
}
} else {
// delete event
switch {
case usersPrefix:
accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigUsersPrefix))
delete(sys.iamUsersMap, accessKey)
case stsPrefix:
accessKey := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigSTSPrefix))
delete(sys.iamUsersMap, accessKey)
case groupsPrefix:
group := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigGroupsPrefix))
sys.removeGroupFromMembershipsMap(group)
delete(sys.iamGroupsMap, group)
delete(sys.iamGroupPolicyMap, group)
case policyPrefix:
policyName := path.Dir(strings.TrimPrefix(event.keyPath, iamConfigPoliciesPrefix))
delete(sys.iamPolicyDocsMap, policyName)
case policyDBUsersPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBSTSUsersPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBSTSUsersPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamUserPolicyMap, user)
case policyDBGroupsPrefix:
policyMapFile := strings.TrimPrefix(event.keyPath, iamConfigPolicyDBGroupsPrefix)
user := strings.TrimSuffix(policyMapFile, ".json")
delete(sys.iamGroupPolicyMap, user)
}
}
return err
} }
// DeletePolicy - deletes a canned policy from backend or etcd. // DeletePolicy - deletes a canned policy from backend or etcd.