From d06b63d0560bc25d9951b78270a0bbe3c6599c4a Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 12 Jun 2024 13:47:56 -0700 Subject: [PATCH] load credential for in-flights requests as singleflight (#19920) avoid concurrent callers for LoadUser() to even initiate object read() requests, if an on-going operation is in progress. this avoids many callers hitting the drives causing I/O spikes, also allows for loading credentials faster. --- cmd/iam-store.go | 121 +++++++++++++++++++++++++++-------------------- cmd/iam.go | 5 +- go.mod | 2 +- 3 files changed, 73 insertions(+), 55 deletions(-) diff --git a/cmd/iam-store.go b/cmd/iam-store.go index ae5981500..b4bbecd7d 100644 --- a/cmd/iam-store.go +++ b/cmd/iam-store.go @@ -33,8 +33,10 @@ import ( "github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/config/identity/openid" "github.com/minio/minio/internal/jwt" + "github.com/minio/pkg/v3/console" "github.com/minio/pkg/v3/policy" "github.com/puzpuzpuz/xsync/v3" + "golang.org/x/sync/singleflight" ) const ( @@ -633,6 +635,8 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) erro // layer. type IAMStoreSys struct { IAMStorageAPI + + group *singleflight.Group } // HasWatcher - returns if the storage system has a watcher. @@ -1800,6 +1804,7 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user err = nil } delete(cache.iamUsersMap, accessKey) + store.group.Forget(accessKey) cache.updatedAt = time.Now() @@ -1875,6 +1880,7 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error err := store.deleteUserIdentity(ctx, user, userType) iamLogIf(GlobalContext, err) delete(cache.iamUsersMap, user) + store.group.Forget(user) deleted = true } @@ -2563,67 +2569,78 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred // LoadUser - attempts to load user info from storage and updates cache. func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) { - cache := store.lock() - defer store.unlock() + // We use singleflight to de-duplicate requests when server + // is coming up and loading accessKey and its associated assets + val, err, shared := store.group.Do(accessKey, func() (interface{}, error) { + cache := store.lock() + defer func() { + cache.updatedAt = time.Now() + store.unlock() + }() - cache.updatedAt = time.Now() + _, found := cache.iamUsersMap[accessKey] - _, found := cache.iamUsersMap[accessKey] - - // Check for regular user access key - if !found { - store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap) - if _, found = cache.iamUsersMap[accessKey]; found { - // load mapped policies - store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, cache.iamUserPolicyMap, 3) - } - } - - // Check for service account - if !found { - store.loadUser(ctx, accessKey, svcUser, cache.iamUsersMap) - var svc UserIdentity - svc, found = cache.iamUsersMap[accessKey] - if found { - // Load parent user and mapped policies. - if store.getUsersSysType() == MinIOUsersSysType { - store.loadUser(ctx, svc.Credentials.ParentUser, regUser, cache.iamUsersMap) - store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, cache.iamUserPolicyMap, 3) - } else { - // In case of LDAP the parent user's policy mapping needs to be - // loaded into sts map - store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) + // Check for regular user access key + if !found { + store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap) + if _, found = cache.iamUsersMap[accessKey]; found { + // load mapped policies + store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, cache.iamUserPolicyMap, 3) } } - } - // Check for STS account - stsAccountFound := false - var stsUserCred UserIdentity - if !found { - store.loadUser(ctx, accessKey, stsUser, cache.iamSTSAccountsMap) - if stsUserCred, found = cache.iamSTSAccountsMap[accessKey]; found { - // Load mapped policy - store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) - stsAccountFound = true - } - } - - // Load any associated policy definitions - if !stsAccountFound { - pols, _ := cache.iamUserPolicyMap.Load(accessKey) - for _, policy := range pols.toSlice() { - if _, found = cache.iamPolicyDocsMap[policy]; !found { - store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) + // Check for service account + if !found { + store.loadUser(ctx, accessKey, svcUser, cache.iamUsersMap) + var svc UserIdentity + svc, found = cache.iamUsersMap[accessKey] + if found { + // Load parent user and mapped policies. + if store.getUsersSysType() == MinIOUsersSysType { + store.loadUser(ctx, svc.Credentials.ParentUser, regUser, cache.iamUsersMap) + store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, cache.iamUserPolicyMap, 3) + } else { + // In case of LDAP the parent user's policy mapping needs to be + // loaded into sts map + store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) + } } } - } else { - pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) - for _, policy := range pols.toSlice() { - if _, found = cache.iamPolicyDocsMap[policy]; !found { - store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) + + // Check for STS account + stsAccountFound := false + var stsUserCred UserIdentity + if !found { + store.loadUser(ctx, accessKey, stsUser, cache.iamSTSAccountsMap) + if stsUserCred, found = cache.iamSTSAccountsMap[accessKey]; found { + // Load mapped policy + store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) + stsAccountFound = true } } + + // Load any associated policy definitions + if !stsAccountFound { + pols, _ := cache.iamUserPolicyMap.Load(accessKey) + for _, policy := range pols.toSlice() { + if _, found = cache.iamPolicyDocsMap[policy]; !found { + store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) + } + } + } else { + pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) + for _, policy := range pols.toSlice() { + if _, found = cache.iamPolicyDocsMap[policy]; !found { + store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) + } + } + } + + return "done", nil + }) + + if serverDebugLog { + console.Debugln("loadUser: loading shared", val, err, shared) } } diff --git a/cmd/iam.go b/cmd/iam.go index 6a9afc45e..46873de12 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -52,6 +52,7 @@ import ( "github.com/minio/pkg/v3/ldap" "github.com/minio/pkg/v3/policy" etcd "go.etcd.io/etcd/client/v3" + "golang.org/x/sync/singleflight" ) // UsersSysType - defines the type of users and groups system that is @@ -177,9 +178,9 @@ func (sys *IAMSys) initStore(objAPI ObjectLayer, etcdClient *etcd.Client) { } if etcdClient == nil { - sys.store = &IAMStoreSys{newIAMObjectStore(objAPI, sys.usersSysType)} + sys.store = &IAMStoreSys{newIAMObjectStore(objAPI, sys.usersSysType), &singleflight.Group{}} } else { - sys.store = &IAMStoreSys{newIAMEtcdStore(etcdClient, sys.usersSysType)} + sys.store = &IAMStoreSys{newIAMEtcdStore(etcdClient, sys.usersSysType), &singleflight.Group{}} } } diff --git a/go.mod b/go.mod index 3d4a80a5e..aede08f08 100644 --- a/go.mod +++ b/go.mod @@ -95,6 +95,7 @@ require ( golang.org/x/crypto v0.24.0 golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/oauth2 v0.21.0 + golang.org/x/sync v0.7.0 golang.org/x/sys v0.21.0 golang.org/x/term v0.21.0 golang.org/x/time v0.5.0 @@ -250,7 +251,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/net v0.26.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect