From 2dc8ac1e62706e60207a7253ebe80447d4c9c426 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 17 May 2022 19:58:47 -0700 Subject: [PATCH] allow IAM cache load to be granular and capture missed state (#14930) anything that is stuck on the disk today can cause latency spikes for all incoming S3 I/O, we need to have this de-coupled so that we can make sure that latency in loading credentials are not reflected back to the S3 API calls. The approach this PR takes is by checking if the calls were updated just in case when the IAM load was in progress, so that we can use merge instead of "replacement" to avoid missing state. --- cmd/admin-handlers-users-race_test.go | 5 ++ cmd/admin-handlers-users_test.go | 6 +- cmd/iam-store.go | 87 ++++++++++++++++++++++++--- cmd/iam.go | 26 +++++--- 4 files changed, 102 insertions(+), 22 deletions(-) diff --git a/cmd/admin-handlers-users-race_test.go b/cmd/admin-handlers-users-race_test.go index 3e937bf33..7b10d438e 100644 --- a/cmd/admin-handlers-users-race_test.go +++ b/cmd/admin-handlers-users-race_test.go @@ -26,6 +26,7 @@ package cmd import ( "context" "fmt" + "runtime" "sync" "testing" "time" @@ -41,6 +42,10 @@ func runAllIAMConcurrencyTests(suite *TestSuiteIAM, c *check) { } func TestIAMInternalIDPConcurrencyServerSuite(t *testing.T) { + if runtime.GOOS == globalWindowsOSName { + t.Skip("windows is clunky") + } + baseTestCases := []TestSuiteCommon{ // Init and run test on FS backend with signature v4. {serverType: "FS", signer: signerV4}, diff --git a/cmd/admin-handlers-users_test.go b/cmd/admin-handlers-users_test.go index 438bdae40..9a9ec0391 100644 --- a/cmd/admin-handlers-users_test.go +++ b/cmd/admin-handlers-users_test.go @@ -238,6 +238,7 @@ func (s *TestSuiteIAM) TestUserCreate(c *check) { if err != nil { c.Fatalf("unable to set policy: %v", err) } + client := s.getUserClient(c, accessKey, secretKey, "") err = client.MakeBucket(ctx, getRandomBucketName(), minio.MakeBucketOptions{}) if err != nil { @@ -1188,7 +1189,7 @@ func (c *check) mustNotListObjects(ctx context.Context, client *minio.Client, bu res := client.ListObjects(ctx, bucket, minio.ListObjectsOptions{}) v, ok := <-res if !ok || v.Err == nil { - c.Fatalf("user was able to list unexpectedly!") + c.Fatalf("user was able to list unexpectedly! on %s", bucket) } } @@ -1196,8 +1197,7 @@ func (c *check) mustListObjects(ctx context.Context, client *minio.Client, bucke res := client.ListObjects(ctx, bucket, minio.ListObjectsOptions{}) v, ok := <-res if ok && v.Err != nil { - msg := fmt.Sprintf("user was unable to list: %v", v.Err) - c.Fatalf(msg) + c.Fatalf("user was unable to list: %v", v.Err) } } diff --git a/cmd/iam-store.go b/cmd/iam-store.go index 4deeba2c8..40eaf0629 100644 --- a/cmd/iam-store.go +++ b/cmd/iam-store.go @@ -244,6 +244,8 @@ type iamWatchEvent struct { // iamCache contains in-memory cache of IAM data. type iamCache struct { + updatedAt time.Time + // map of policy names to policy definitions iamPolicyDocsMap map[string]PolicyDoc // map of usernames to credentials @@ -432,8 +434,7 @@ func setDefaultCannedPolicies(policies map[string]PolicyDoc) { func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error { newCache := newIamCache() - cache := store.lock() - defer store.unlock() + loadedAt := time.Now() if iamOS, ok := store.IAMStorageAPI.(*IAMObjectStore); ok { err := iamOS.loadAllFromObjStore(ctx, newCache) @@ -486,12 +487,28 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context) error { newCache.buildUserGroupMemberships() } - cache.iamGroupPolicyMap = newCache.iamGroupPolicyMap - cache.iamGroupsMap = newCache.iamGroupsMap - cache.iamPolicyDocsMap = newCache.iamPolicyDocsMap - cache.iamUserGroupMemberships = newCache.iamUserGroupMemberships - cache.iamUserPolicyMap = newCache.iamUserPolicyMap - cache.iamUsersMap = newCache.iamUsersMap + cache := store.lock() + defer store.unlock() + + // We should only update the in-memory cache if there were no changes + // to the in-memory cache since the disk loading began. If there + // were changes to the in-memory cache we should wait for the next + // cycle until we can safely update the in-memory cache. + // + // An in-memory cache must be replaced only if we know for sure that + // the values loaded from disk are not stale. They might be stale + // if the cached.updatedAt is recent than the refresh cycle began. + if cache.updatedAt.Before(loadedAt) { + // No one has updated anything since the config was loaded, + // so we just replace whatever is on the disk into memory. + cache.iamGroupPolicyMap = newCache.iamGroupPolicyMap + cache.iamGroupsMap = newCache.iamGroupsMap + cache.iamPolicyDocsMap = newCache.iamPolicyDocsMap + cache.iamUserGroupMemberships = newCache.iamUserGroupMemberships + cache.iamUserPolicyMap = newCache.iamUserPolicyMap + cache.iamUsersMap = newCache.iamUsersMap + cache.updatedAt = time.Now() + } return nil } @@ -548,6 +565,8 @@ func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group st cache.removeGroupFromMembershipsMap(group) delete(cache.iamGroupsMap, group) delete(cache.iamGroupPolicyMap, group) + + cache.updatedAt = time.Now() return nil } @@ -562,6 +581,7 @@ func (store *IAMStoreSys) GroupNotificationHandler(ctx context.Context, group st // removed, the cache stays current. cache.removeGroupFromMembershipsMap(group) cache.updateGroupMembershipsMap(group, &gi) + cache.updatedAt = time.Now() return nil } @@ -639,6 +659,8 @@ func (store *IAMStoreSys) AddUsersToGroup(ctx context.Context, group string, mem cache.iamUserGroupMemberships[member] = gset } + cache.updatedAt = time.Now() + return nil } @@ -672,6 +694,7 @@ func removeMembersFromGroup(ctx context.Context, store *IAMStoreSys, cache *iamC cache.iamUserGroupMemberships[member] = gset } + cache.updatedAt = time.Now() return nil } @@ -720,6 +743,7 @@ func (store *IAMStoreSys) RemoveUsersFromGroup(ctx context.Context, group string // Delete from server memory delete(cache.iamGroupsMap, group) delete(cache.iamGroupPolicyMap, group) + cache.updatedAt = time.Now() return nil } @@ -749,7 +773,10 @@ func (store *IAMStoreSys) SetGroupStatus(ctx context.Context, group string, enab if err := store.saveGroupInfo(ctx, group, gi); err != nil { return err } + cache.iamGroupsMap[group] = gi + cache.updatedAt = time.Now() + return nil } @@ -800,7 +827,7 @@ func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err err return } cache.iamGroupsMap = m - + cache.updatedAt = time.Now() for k := range cache.iamGroupsMap { res = append(res, k) } @@ -813,6 +840,7 @@ func (store *IAMStoreSys) ListGroups(ctx context.Context) (res []string, err err return } cache.iamGroupPolicyMap = m + cache.updatedAt = time.Now() for k := range cache.iamGroupPolicyMap { res = append(res, k) } @@ -861,6 +889,8 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string, } else { delete(cache.iamGroupPolicyMap, name) } + cache.updatedAt = time.Now() + return nil } @@ -881,6 +911,7 @@ func (store *IAMStoreSys) PolicyDBSet(ctx context.Context, name, policy string, } else { cache.iamGroupPolicyMap[name] = mp } + cache.updatedAt = time.Now() return nil } @@ -930,6 +961,7 @@ func (store *IAMStoreSys) PolicyNotificationHandler(ctx context.Context, policy cache.iamGroupPolicyMap[g] = newMappedPolicy(strings.Join(pset.ToSlice(), ",")) } + cache.updatedAt = time.Now() return nil } return err @@ -987,6 +1019,8 @@ func (store *IAMStoreSys) DeletePolicy(ctx context.Context, policy string) error } delete(cache.iamPolicyDocsMap, policy) + cache.updatedAt = time.Now() + return nil } @@ -1057,6 +1091,8 @@ func (store *IAMStoreSys) SetPolicy(ctx context.Context, name string, policy iam } cache.iamPolicyDocsMap[name] = d + cache.updatedAt = time.Now() + return nil } @@ -1076,6 +1112,7 @@ func (store *IAMStoreSys) ListPolicies(ctx context.Context, bucketName string) ( setDefaultCannedPolicies(m) cache.iamPolicyDocsMap = m + cache.updatedAt = time.Now() ret := map[string]iampolicy.Policy{} for k, v := range m { @@ -1103,6 +1140,7 @@ func (store *IAMStoreSys) ListPolicyDocs(ctx context.Context, bucketName string) setDefaultCannedPolicies(m) cache.iamPolicyDocsMap = m + cache.updatedAt = time.Now() ret := map[string]PolicyDoc{} for k, v := range m { @@ -1299,6 +1337,8 @@ func (store *IAMStoreSys) PolicyMappingNotificationHandler(ctx context.Context, // This means that the policy mapping was deleted, so we update // the cache. delete(m, userOrGroup) + cache.updatedAt = time.Now() + err = nil } return err @@ -1347,11 +1387,15 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey // 3. Delete any mapped policy delete(cache.iamUserPolicyMap, accessKey) + + cache.updatedAt = time.Now() return nil } + if err != nil { return err } + if userType != svcUser { err = store.loadMappedPolicy(ctx, accessKey, userType, false, cache.iamUserPolicyMap) // Ignore policy not mapped error @@ -1373,6 +1417,7 @@ func (store *IAMStoreSys) UserNotificationHandler(ctx context.Context, accessKey if cred.IsTemp() && cred.ParentUser != "" && cred.ParentUser != globalActiveCred.AccessKey { if _, ok := cache.iamUserPolicyMap[cred.ParentUser]; !ok { cache.iamUserPolicyMap[cred.ParentUser] = cache.iamUserPolicyMap[accessKey] + cache.updatedAt = time.Now() } } } @@ -1430,6 +1475,8 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user } delete(cache.iamUsersMap, accessKey) + cache.updatedAt = time.Now() + return err } @@ -1469,6 +1516,9 @@ func (store *IAMStoreSys) SetTempUser(ctx context.Context, accessKey string, cre } cache.iamUsersMap[accessKey] = cred + + cache.updatedAt = time.Now() + return nil } @@ -1479,6 +1529,7 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error cache := store.lock() defer store.unlock() + var deleted bool usersToDelete := set.CreateStringSet(users...) for user, cred := range cache.iamUsersMap { userType := regUser @@ -1497,9 +1548,15 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error err := store.deleteUserIdentity(ctx, user, userType) logger.LogIf(GlobalContext, err) delete(cache.iamUsersMap, user) + + deleted = true } } + if deleted { + cache.updatedAt = time.Now() + } + return nil } @@ -1610,6 +1667,8 @@ func (store *IAMStoreSys) SetUserStatus(ctx context.Context, accessKey string, s } cache.iamUsersMap[accessKey] = uinfo.Credentials + cache.updatedAt = time.Now() + return nil } @@ -1643,6 +1702,7 @@ func (store *IAMStoreSys) AddServiceAccount(ctx context.Context, cred auth.Crede } cache.iamUsersMap[u.Credentials.AccessKey] = u.Credentials + cache.updatedAt = time.Now() return nil } @@ -1725,6 +1785,7 @@ func (store *IAMStoreSys) UpdateServiceAccount(ctx context.Context, accessKey st } cache.iamUsersMap[u.Credentials.AccessKey] = u.Credentials + cache.updatedAt = time.Now() return nil } @@ -1770,6 +1831,8 @@ func (store *IAMStoreSys) AddUser(ctx context.Context, accessKey string, ureq ma cache := store.lock() defer store.unlock() + cache.updatedAt = time.Now() + cr, ok := cache.iamUsersMap[accessKey] // It is not possible to update an STS account. @@ -1803,6 +1866,8 @@ func (store *IAMStoreSys) UpdateUserSecretKey(ctx context.Context, accessKey, se cache := store.lock() defer store.unlock() + cache.updatedAt = time.Now() + cred, ok := cache.iamUsersMap[accessKey] if !ok { return errNoSuchUser @@ -1837,6 +1902,8 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred cache := store.lock() defer store.unlock() + cache.updatedAt = time.Now() + userType := regUser if cred.IsServiceAccount() { userType = svcUser @@ -1858,6 +1925,8 @@ func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) { cache := store.lock() defer store.unlock() + cache.updatedAt = time.Now() + _, found := cache.iamUsersMap[accessKey] if !found { store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap) diff --git a/cmd/iam.go b/cmd/iam.go index 4fbd139c7..db054c96d 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -312,12 +312,14 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc switch { case globalOpenIDConfig.ProviderEnabled(): go func() { - ticker := time.NewTicker(sys.iamRefreshInterval) - defer ticker.Stop() + timer := time.NewTimer(sys.iamRefreshInterval) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: sys.purgeExpiredCredentialsForExternalSSO(ctx) + + timer.Reset(sys.iamRefreshInterval) case <-ctx.Done(): return } @@ -325,13 +327,16 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etc }() case globalLDAPConfig.Enabled: go func() { - ticker := time.NewTicker(sys.iamRefreshInterval) - defer ticker.Stop() + timer := time.NewTimer(sys.iamRefreshInterval) + defer timer.Stop() + for { select { - case <-ticker.C: + case <-timer.C: sys.purgeExpiredCredentialsForLDAP(ctx) sys.updateGroupMembershipsForLDAP(ctx) + + timer.Reset(sys.iamRefreshInterval) case <-ctx.Done(): return } @@ -403,12 +408,12 @@ func (sys *IAMSys) watch(ctx context.Context) { var maxRefreshDurationSecondsForLog float64 = 10 - // Fall back to loading all items periodically - ticker := time.NewTicker(sys.iamRefreshInterval) - defer ticker.Stop() + // Load all items periodically + timer := time.NewTimer(sys.iamRefreshInterval) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: refreshStart := time.Now() if err := sys.Load(ctx); err != nil { logger.LogIf(ctx, fmt.Errorf("Failure in periodic refresh for IAM (took %.2fs): %v", time.Since(refreshStart).Seconds(), err)) @@ -420,6 +425,7 @@ func (sys *IAMSys) watch(ctx context.Context) { } } + timer.Reset(sys.iamRefreshInterval) case <-ctx.Done(): return }