load credential for in-flights requests as singleflight (#19920)

avoid concurrent callers for LoadUser() to even initiate
object read() requests, if an on-going operation is in progress.

this avoids many callers hitting the drives causing I/O
spikes, also allows for loading credentials faster.
This commit is contained in:
Harshavardhana 2024-06-12 13:47:56 -07:00 committed by GitHub
parent 7ce28c3b1d
commit d06b63d056
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 73 additions and 55 deletions

View File

@ -33,8 +33,10 @@ import (
"github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/config/identity/openid" "github.com/minio/minio/internal/config/identity/openid"
"github.com/minio/minio/internal/jwt" "github.com/minio/minio/internal/jwt"
"github.com/minio/pkg/v3/console"
"github.com/minio/pkg/v3/policy" "github.com/minio/pkg/v3/policy"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
"golang.org/x/sync/singleflight"
) )
const ( const (
@ -633,6 +635,8 @@ func (store *IAMStoreSys) LoadIAMCache(ctx context.Context, firstTime bool) erro
// layer. // layer.
type IAMStoreSys struct { type IAMStoreSys struct {
IAMStorageAPI IAMStorageAPI
group *singleflight.Group
} }
// HasWatcher - returns if the storage system has a watcher. // HasWatcher - returns if the storage system has a watcher.
@ -1800,6 +1804,7 @@ func (store *IAMStoreSys) DeleteUser(ctx context.Context, accessKey string, user
err = nil err = nil
} }
delete(cache.iamUsersMap, accessKey) delete(cache.iamUsersMap, accessKey)
store.group.Forget(accessKey)
cache.updatedAt = time.Now() cache.updatedAt = time.Now()
@ -1875,6 +1880,7 @@ func (store *IAMStoreSys) DeleteUsers(ctx context.Context, users []string) error
err := store.deleteUserIdentity(ctx, user, userType) err := store.deleteUserIdentity(ctx, user, userType)
iamLogIf(GlobalContext, err) iamLogIf(GlobalContext, err)
delete(cache.iamUsersMap, user) delete(cache.iamUsersMap, user)
store.group.Forget(user)
deleted = true deleted = true
} }
@ -2563,67 +2569,78 @@ func (store *IAMStoreSys) UpdateUserIdentity(ctx context.Context, cred auth.Cred
// LoadUser - attempts to load user info from storage and updates cache. // LoadUser - attempts to load user info from storage and updates cache.
func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) { func (store *IAMStoreSys) LoadUser(ctx context.Context, accessKey string) {
cache := store.lock() // We use singleflight to de-duplicate requests when server
defer store.unlock() // is coming up and loading accessKey and its associated assets
val, err, shared := store.group.Do(accessKey, func() (interface{}, error) {
cache := store.lock()
defer func() {
cache.updatedAt = time.Now()
store.unlock()
}()
cache.updatedAt = time.Now() _, found := cache.iamUsersMap[accessKey]
_, found := cache.iamUsersMap[accessKey] // Check for regular user access key
if !found {
// Check for regular user access key store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap)
if !found { if _, found = cache.iamUsersMap[accessKey]; found {
store.loadUser(ctx, accessKey, regUser, cache.iamUsersMap) // load mapped policies
if _, found = cache.iamUsersMap[accessKey]; found { store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, cache.iamUserPolicyMap, 3)
// load mapped policies
store.loadMappedPolicyWithRetry(ctx, accessKey, regUser, false, cache.iamUserPolicyMap, 3)
}
}
// Check for service account
if !found {
store.loadUser(ctx, accessKey, svcUser, cache.iamUsersMap)
var svc UserIdentity
svc, found = cache.iamUsersMap[accessKey]
if found {
// Load parent user and mapped policies.
if store.getUsersSysType() == MinIOUsersSysType {
store.loadUser(ctx, svc.Credentials.ParentUser, regUser, cache.iamUsersMap)
store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, cache.iamUserPolicyMap, 3)
} else {
// In case of LDAP the parent user's policy mapping needs to be
// loaded into sts map
store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3)
} }
} }
}
// Check for STS account // Check for service account
stsAccountFound := false if !found {
var stsUserCred UserIdentity store.loadUser(ctx, accessKey, svcUser, cache.iamUsersMap)
if !found { var svc UserIdentity
store.loadUser(ctx, accessKey, stsUser, cache.iamSTSAccountsMap) svc, found = cache.iamUsersMap[accessKey]
if stsUserCred, found = cache.iamSTSAccountsMap[accessKey]; found { if found {
// Load mapped policy // Load parent user and mapped policies.
store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3) if store.getUsersSysType() == MinIOUsersSysType {
stsAccountFound = true store.loadUser(ctx, svc.Credentials.ParentUser, regUser, cache.iamUsersMap)
} store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, regUser, false, cache.iamUserPolicyMap, 3)
} } else {
// In case of LDAP the parent user's policy mapping needs to be
// Load any associated policy definitions // loaded into sts map
if !stsAccountFound { store.loadMappedPolicyWithRetry(ctx, svc.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3)
pols, _ := cache.iamUserPolicyMap.Load(accessKey) }
for _, policy := range pols.toSlice() {
if _, found = cache.iamPolicyDocsMap[policy]; !found {
store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3)
} }
} }
} else {
pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey) // Check for STS account
for _, policy := range pols.toSlice() { stsAccountFound := false
if _, found = cache.iamPolicyDocsMap[policy]; !found { var stsUserCred UserIdentity
store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3) if !found {
store.loadUser(ctx, accessKey, stsUser, cache.iamSTSAccountsMap)
if stsUserCred, found = cache.iamSTSAccountsMap[accessKey]; found {
// Load mapped policy
store.loadMappedPolicyWithRetry(ctx, stsUserCred.Credentials.ParentUser, stsUser, false, cache.iamSTSPolicyMap, 3)
stsAccountFound = true
} }
} }
// Load any associated policy definitions
if !stsAccountFound {
pols, _ := cache.iamUserPolicyMap.Load(accessKey)
for _, policy := range pols.toSlice() {
if _, found = cache.iamPolicyDocsMap[policy]; !found {
store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3)
}
}
} else {
pols, _ := cache.iamSTSPolicyMap.Load(stsUserCred.Credentials.AccessKey)
for _, policy := range pols.toSlice() {
if _, found = cache.iamPolicyDocsMap[policy]; !found {
store.loadPolicyDocWithRetry(ctx, policy, cache.iamPolicyDocsMap, 3)
}
}
}
return "done", nil
})
if serverDebugLog {
console.Debugln("loadUser: loading shared", val, err, shared)
} }
} }

View File

@ -52,6 +52,7 @@ import (
"github.com/minio/pkg/v3/ldap" "github.com/minio/pkg/v3/ldap"
"github.com/minio/pkg/v3/policy" "github.com/minio/pkg/v3/policy"
etcd "go.etcd.io/etcd/client/v3" etcd "go.etcd.io/etcd/client/v3"
"golang.org/x/sync/singleflight"
) )
// UsersSysType - defines the type of users and groups system that is // UsersSysType - defines the type of users and groups system that is
@ -177,9 +178,9 @@ func (sys *IAMSys) initStore(objAPI ObjectLayer, etcdClient *etcd.Client) {
} }
if etcdClient == nil { if etcdClient == nil {
sys.store = &IAMStoreSys{newIAMObjectStore(objAPI, sys.usersSysType)} sys.store = &IAMStoreSys{newIAMObjectStore(objAPI, sys.usersSysType), &singleflight.Group{}}
} else { } else {
sys.store = &IAMStoreSys{newIAMEtcdStore(etcdClient, sys.usersSysType)} sys.store = &IAMStoreSys{newIAMEtcdStore(etcdClient, sys.usersSysType), &singleflight.Group{}}
} }
} }

2
go.mod
View File

@ -95,6 +95,7 @@ require (
golang.org/x/crypto v0.24.0 golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/oauth2 v0.21.0 golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.21.0 golang.org/x/sys v0.21.0
golang.org/x/term v0.21.0 golang.org/x/term v0.21.0
golang.org/x/time v0.5.0 golang.org/x/time v0.5.0
@ -250,7 +251,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.18.0 // indirect golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.26.0 // indirect golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect