mirror of
https://github.com/minio/minio.git
synced 2025-01-22 20:23:14 -05:00
allow IAM cache load to be granular and capture missed state (#14930)
anything that is stuck on the disk today can cause latency spikes for all incoming S3 I/O, we need to have this de-coupled so that we can make sure that latency in loading credentials are not reflected back to the S3 API calls. The approach this PR takes is by checking if the calls were updated just in case when the IAM load was in progress, so that we can use merge instead of "replacement" to avoid missing state.
This commit is contained in:
parent
e952e2a691
commit
2dc8ac1e62
@ -26,6 +26,7 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -41,6 +42,10 @@ func runAllIAMConcurrencyTests(suite *TestSuiteIAM, c *check) {
|
||||
}
|
||||
|
||||
func TestIAMInternalIDPConcurrencyServerSuite(t *testing.T) {
|
||||
if runtime.GOOS == globalWindowsOSName {
|
||||
t.Skip("windows is clunky")
|
||||
}
|
||||
|
||||
baseTestCases := []TestSuiteCommon{
|
||||
// Init and run test on FS backend with signature v4.
|
||||
{serverType: "FS", signer: signerV4},
|
||||
|
@ -238,6 +238,7 @@ func (s *TestSuiteIAM) TestUserCreate(c *check) {
|
||||
if err != nil {
|
||||
c.Fatalf("unable to set policy: %v", err)
|
||||
}
|
||||
|
||||
client := s.getUserClient(c, accessKey, secretKey, "")
|
||||
err = client.MakeBucket(ctx, getRandomBucketName(), minio.MakeBucketOptions{})
|
||||
if err != nil {
|
||||
@ -1188,7 +1189,7 @@ func (c *check) mustNotListObjects(ctx context.Context, client *minio.Client, bu
|
||||
res := client.ListObjects(ctx, bucket, minio.ListObjectsOptions{})
|
||||
v, ok := <-res
|
||||
if !ok || v.Err == nil {
|
||||
c.Fatalf("user was able to list unexpectedly!")
|
||||
c.Fatalf("user was able to list unexpectedly! on %s", bucket)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1196,8 +1197,7 @@ func (c *check) mustListObjects(ctx context.Context, client *minio.Client, bucke
|
||||
res := client.ListObjects(ctx, bucket, minio.ListObjectsOptions{})
|
||||
v, ok := <-res
|
||||
if ok && v.Err != nil {
|
||||
msg := fmt.Sprintf("user was unable to list: %v", v.Err)
|
||||
c.Fatalf(msg)
|
||||
c.Fatalf("user was unable to list: %v", v.Err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -244,6 +244,8 @@ type iamWatchEvent struct {
|
||||
|
||||
// iamCache contains in-memory cache of IAM data.
|
||||
type iamCache struct {
|
||||
updatedAt time.Time
|
||||
|
||||
// map of policy names to policy definitions
|
||||
iamPolicyDocsMap map[string]PolicyDoc
|
||||
// map of usernames to credentials
|
||||
@ -432,8 +434,7 @@ func setDefaultCannedPolicies(policies map[string]PolicyDoc) {
|
||||
func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error {
|
||||
newCache := newIamCache()
|
||||
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
loadedAt := time.Now()
|
||||
|
||||
if iamOS, ok := store.IAMStorageAPI.(*IAMObjectStore); ok {
|
||||
err := iamOS.loadAllFromObjStore(ctx, newCache)
|
||||
@ -486,12 +487,28 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error {
|
||||
newCache.buildUserGroupMemberships()
|
||||
}
|
||||
|
||||
cache.iamGroupPolicyMap = newCache.iamGroupPolicyMap
|
||||
cache.iamGroupsMap = newCache.iamGroupsMap
|
||||
cache.iamPolicyDocsMap = newCache.iamPolicyDocsMap
|
||||
cache.iamUserGroupMemberships = newCache.iamUserGroupMemberships
|
||||
cache.iamUserPolicyMap = newCache.iamUserPolicyMap
|
||||
cache.iamUsersMap = newCache.iamUsersMap
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
// We should only update the in-memory cache if there were no changes
|
||||
// to the in-memory cache since the disk loading began. If there
|
||||
// were changes to the in-memory cache we should wait for the next
|
||||
// cycle until we can safely update the in-memory cache.
|
||||
//
|
||||
// An in-memory cache must be replaced only if we know for sure that
|
||||
// the values loaded from disk are not stale. They might be stale
|
||||
// if the cached.updatedAt is recent than the refresh cycle began.
|
||||
if cache.updatedAt.Before(loadedAt) {
|
||||
// No one has updated anything since the config was loaded,
|
||||
// so we just replace whatever is on the disk into memory.
|
||||
cache.iamGroupPolicyMap = newCache.iamGroupPolicyMap
|
||||
cache.iamGroupsMap = newCache.iamGroupsMap
|
||||
cache.iamPolicyDocsMap = newCache.iamPolicyDocsMap
|
||||
cache.iamUserGroupMemberships = newCache.iamUserGroupMemberships
|
||||
cache.iamUserPolicyMap = newCache.iamUserPolicyMap
|
||||
cache.iamUsersMap = newCache.iamUsersMap
|
||||
cache.updatedAt = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -548,6 +565,8 @@ func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group st
|
||||
cache.removeGroupFromMembershipsMap(group)
|
||||
delete(cache.iamGroupsMap, group)
|
||||
delete(cache.iamGroupPolicyMap, group)
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -562,6 +581,7 @@ func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group st
|
||||
// removed, the cache stays current.
|
||||
cache.removeGroupFromMembershipsMap(group)
|
||||
cache.updateGroupMembershipsMap(group, &gi)
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -639,6 +659,8 @@ func (store *IAMStoreSys) AddUsersToGroup(ctx context.Context, group string, mem
|
||||
cache.iamUserGroupMemberships[member] = gset
|
||||
}
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -672,6 +694,7 @@ func removeMembersFromGroup(ctx context.Context, store *IAMStoreSys, cache *iamC
|
||||
cache.iamUserGroupMemberships[member] = gset
|
||||
}
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -720,6 +743,7 @@ func (store *IAMStoreSys) RemoveUsersFromGroup(ctx context.Context, group string
|
||||
// Delete from server memory
|
||||
delete(cache.iamGroupsMap, group)
|
||||
delete(cache.iamGroupPolicyMap, group)
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -749,7 +773,10 @@ func (store *IAMStoreSys) SetGroupStatus(ctx context.Context, group string, enab
|
||||
if err := store.saveGroupInfo(ctx, group, gi); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cache.iamGroupsMap[group] = gi
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -800,7 +827,7 @@ func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err err
|
||||
return
|
||||
}
|
||||
cache.iamGroupsMap = m
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
for k := range cache.iamGroupsMap {
|
||||
res = append(res, k)
|
||||
}
|
||||
@ -813,6 +840,7 @@ func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err err
|
||||
return
|
||||
}
|
||||
cache.iamGroupPolicyMap = m
|
||||
cache.updatedAt = time.Now()
|
||||
for k := range cache.iamGroupPolicyMap {
|
||||
res = append(res, k)
|
||||
}
|
||||
@ -861,6 +889,8 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string,
|
||||
} else {
|
||||
delete(cache.iamGroupPolicyMap, name)
|
||||
}
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -881,6 +911,7 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string,
|
||||
} else {
|
||||
cache.iamGroupPolicyMap[name] = mp
|
||||
}
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -930,6 +961,7 @@ func (store *IAMStoreSys) PolicyNotificationHandler(ctx context.Context, policy
|
||||
cache.iamGroupPolicyMap[g] = newMappedPolicy(strings.Join(pset.ToSlice(), ","))
|
||||
}
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
@ -987,6 +1019,8 @@ func (store *IAMStoreSys) DeletePolicy(ctx context.Context, policy string) error
|
||||
}
|
||||
|
||||
delete(cache.iamPolicyDocsMap, policy)
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1057,6 +1091,8 @@ func (store *IAMStoreSys) SetPolicy(ctx context.Context, name string, policy iam
|
||||
}
|
||||
|
||||
cache.iamPolicyDocsMap[name] = d
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1076,6 +1112,7 @@ func (store *IAMStoreSys) ListPolicies(ctx context.Context, bucketName string) (
|
||||
setDefaultCannedPolicies(m)
|
||||
|
||||
cache.iamPolicyDocsMap = m
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
ret := map[string]iampolicy.Policy{}
|
||||
for k, v := range m {
|
||||
@ -1103,6 +1140,7 @@ func (store *IAMStoreSys) ListPolicyDocs(ctx context.Context, bucketName string)
|
||||
setDefaultCannedPolicies(m)
|
||||
|
||||
cache.iamPolicyDocsMap = m
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
ret := map[string]PolicyDoc{}
|
||||
for k, v := range m {
|
||||
@ -1299,6 +1337,8 @@ func (store *IAMStoreSys) PolicyMappingNotificationHandler(ctx context.Context,
|
||||
// This means that the policy mapping was deleted, so we update
|
||||
// the cache.
|
||||
delete(m, userOrGroup)
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
@ -1347,11 +1387,15 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey
|
||||
|
||||
// 3. Delete any mapped policy
|
||||
delete(cache.iamUserPolicyMap, accessKey)
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if userType != svcUser {
|
||||
err = store.loadMappedPolicy(ctx, accessKey, userType, false, cache.iamUserPolicyMap)
|
||||
// Ignore policy not mapped error
|
||||
@ -1373,6 +1417,7 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey
|
||||
if cred.IsTemp() && cred.ParentUser != "" && cred.ParentUser != globalActiveCred.AccessKey {
|
||||
if _, ok := cache.iamUserPolicyMap[cred.ParentUser]; !ok {
|
||||
cache.iamUserPolicyMap[cred.ParentUser] = cache.iamUserPolicyMap[accessKey]
|
||||
cache.updatedAt = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1430,6 +1475,8 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user
|
||||
}
|
||||
delete(cache.iamUsersMap, accessKey)
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1469,6 +1516,9 @@ func (store *IAMStoreSys) SetTempUser(ctx context.Context, accessKey string, cre
|
||||
}
|
||||
|
||||
cache.iamUsersMap[accessKey] = cred
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1479,6 +1529,7 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
var deleted bool
|
||||
usersToDelete := set.CreateStringSet(users...)
|
||||
for user, cred := range cache.iamUsersMap {
|
||||
userType := regUser
|
||||
@ -1497,9 +1548,15 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error
|
||||
err := store.deleteUserIdentity(ctx, user, userType)
|
||||
logger.LogIf(GlobalContext, err)
|
||||
delete(cache.iamUsersMap, user)
|
||||
|
||||
deleted = true
|
||||
}
|
||||
}
|
||||
|
||||
if deleted {
|
||||
cache.updatedAt = time.Now()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1610,6 +1667,8 @@ func (store *IAMStoreSys) SetUserStatus(ctx context.Context, accessKey string, s
|
||||
}
|
||||
|
||||
cache.iamUsersMap[accessKey] = uinfo.Credentials
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1643,6 +1702,7 @@ func (store *IAMStoreSys) AddServiceAccount(ctx context.Context, cred auth.Crede
|
||||
}
|
||||
|
||||
cache.iamUsersMap[u.Credentials.AccessKey] = u.Credentials
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1725,6 +1785,7 @@ func (store *IAMStoreSys) UpdateServiceAccount(ctx context.Context, accessKey st
|
||||
}
|
||||
|
||||
cache.iamUsersMap[u.Credentials.AccessKey] = u.Credentials
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1770,6 +1831,8 @@ func (store *IAMStoreSys) AddUser(ctx context.Context, accessKey string, ureq ma
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
cr, ok := cache.iamUsersMap[accessKey]
|
||||
|
||||
// It is not possible to update an STS account.
|
||||
@ -1803,6 +1866,8 @@ func (store *IAMStoreSys) UpdateUserSecretKey(ctx context.Context, accessKey, se
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
cred, ok := cache.iamUsersMap[accessKey]
|
||||
if !ok {
|
||||
return errNoSuchUser
|
||||
@ -1837,6 +1902,8 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
userType := regUser
|
||||
if cred.IsServiceAccount() {
|
||||
userType = svcUser
|
||||
@ -1858,6 +1925,8 @@ func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) {
|
||||
cache := store.lock()
|
||||
defer store.unlock()
|
||||
|
||||
cache.updatedAt = time.Now()
|
||||
|
||||
_, found := cache.iamUsersMap[accessKey]
|
||||
if !found {
|
||||
store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap)
|
||||
|
26
cmd/iam.go
26
cmd/iam.go
@ -312,12 +312,14 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
|
||||
switch {
|
||||
case globalOpenIDConfig.ProviderEnabled():
|
||||
go func() {
|
||||
ticker := time.NewTicker(sys.iamRefreshInterval)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(sys.iamRefreshInterval)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
sys.purgeExpiredCredentialsForExternalSSO(ctx)
|
||||
|
||||
timer.Reset(sys.iamRefreshInterval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -325,13 +327,16 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc
|
||||
}()
|
||||
case globalLDAPConfig.Enabled:
|
||||
go func() {
|
||||
ticker := time.NewTicker(sys.iamRefreshInterval)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(sys.iamRefreshInterval)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
sys.purgeExpiredCredentialsForLDAP(ctx)
|
||||
sys.updateGroupMembershipsForLDAP(ctx)
|
||||
|
||||
timer.Reset(sys.iamRefreshInterval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -403,12 +408,12 @@ func (sys *IAMSys) watch(ctx context.Context) {
|
||||
|
||||
var maxRefreshDurationSecondsForLog float64 = 10
|
||||
|
||||
// Fall back to loading all items periodically
|
||||
ticker := time.NewTicker(sys.iamRefreshInterval)
|
||||
defer ticker.Stop()
|
||||
// Load all items periodically
|
||||
timer := time.NewTimer(sys.iamRefreshInterval)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
refreshStart := time.Now()
|
||||
if err := sys.Load(ctx); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Failure in periodic refresh for IAM (took %.2fs): %v", time.Since(refreshStart).Seconds(), err))
|
||||
@ -420,6 +425,7 @@ func (sys *IAMSys) watch(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
timer.Reset(sys.iamRefreshInterval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user