For IAM with etcd backend, avoid sending notifications (#13472)

As we use etcd's watch interface, we do not need the 
network notifications as they are no-ops anyway.

Bonus: Remove globalEtcdClient global usage in IAM
This commit is contained in:
Aditya Manthramurthy 2021-10-20 03:22:35 -07:00 committed by GitHub
parent c57ff2640e
commit 5f1af8a69d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 183 additions and 160 deletions

View File

@ -242,6 +242,7 @@ func (a adminAPIHandlers) UpdateGroupMembers(w http.ResponseWriter, r *http.Requ
}
// Notify all other MinIO peers to load group.
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadGroup(updReq.Group) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
@ -249,6 +250,7 @@ func (a adminAPIHandlers) UpdateGroupMembers(w http.ResponseWriter, r *http.Requ
}
}
}
}
// GetGroup - /minio/admin/v3/group?group=mygroup1
func (a adminAPIHandlers) GetGroup(w http.ResponseWriter, r *http.Request) {
@ -334,6 +336,7 @@ func (a adminAPIHandlers) SetGroupStatus(w http.ResponseWriter, r *http.Request)
}
// Notify all other MinIO peers to reload user.
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadGroup(group) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
@ -341,6 +344,7 @@ func (a adminAPIHandlers) SetGroupStatus(w http.ResponseWriter, r *http.Request)
}
}
}
}
// SetUserStatus - PUT /minio/admin/v3/set-user-status?accessKey=<access_key>&status=[enabled|disabled]
func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request) {
@ -369,6 +373,7 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request)
}
// Notify all other MinIO peers to reload user.
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(accessKey, false) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
@ -376,6 +381,7 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request)
}
}
}
}
// AddUser - PUT /minio/admin/v3/add-user?accessKey=<access_key>
func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
@ -477,6 +483,7 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
}
// Notify all other Minio peers to reload user
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(accessKey, false) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
@ -484,6 +491,7 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) {
}
}
}
}
// AddServiceAccount - PUT /minio/admin/v3/add-service-account
func (a adminAPIHandlers) AddServiceAccount(w http.ResponseWriter, r *http.Request) {
@ -623,12 +631,14 @@ func (a adminAPIHandlers) AddServiceAccount(w http.ResponseWriter, r *http.Reque
}
// Notify all other Minio peers to reload user the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(newCred.AccessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
// Call hook for cluster-replication.
//
@ -762,12 +772,14 @@ func (a adminAPIHandlers) UpdateServiceAccount(w http.ResponseWriter, r *http.Re
}
// Notify all other Minio peers to reload user the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(accessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
// Call site replication hook. Only LDAP accounts are supported for
// replication operations.
@ -1422,12 +1434,14 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request
}
// Notify all other MinIO peers to reload policy
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadPolicy(policyName) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
// Call cluster-replication policy creation hook to replicate policy to
// other minio clusters.
@ -1475,12 +1489,14 @@ func (a adminAPIHandlers) SetPolicyForUserOrGroup(w http.ResponseWriter, r *http
}
// Notify all other MinIO peers to reload policy
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadPolicyMapping(entityName, isGroup) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{
Type: madmin.SRIAMItemPolicyMapping,

View File

@ -73,7 +73,7 @@ func prepareAdminErasureTestBed(ctx context.Context) (*adminErasureTestBed, erro
initAllSubsystems(ctx, objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
// Setup admin mgmt REST API handlers.
adminRouter := mux.NewRouter()

View File

@ -366,7 +366,7 @@ func TestIsReqAuthenticated(t *testing.T) {
initAllSubsystems(context.Background(), objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
creds, err := auth.CreateCredentials("myuser", "mypassword")
if err != nil {
@ -457,7 +457,7 @@ func TestValidateAdminSignature(t *testing.T) {
initAllSubsystems(context.Background(), objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
creds, err := auth.CreateCredentials("admin", "mypassword")
if err != nil {

View File

@ -306,7 +306,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
logger.FatalIf(globalNotificationSys.Init(GlobalContext, buckets, newObject), "Unable to initialize notification system")
}
go globalIAMSys.Init(GlobalContext, newObject)
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient)
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.

View File

@ -65,8 +65,8 @@ type IAMEtcdStore struct {
client *etcd.Client
}
func newIAMEtcdStore() *IAMEtcdStore {
return &IAMEtcdStore{client: globalEtcdClient}
func newIAMEtcdStore(client *etcd.Client) *IAMEtcdStore {
return &IAMEtcdStore{client: client}
}
func (ies *IAMEtcdStore) lock() {

View File

@ -36,6 +36,7 @@ import (
"github.com/minio/minio/internal/auth"
"github.com/minio/minio/internal/logger"
iampolicy "github.com/minio/pkg/iam/policy"
etcd "go.etcd.io/etcd/client/v3"
)
// UsersSysType - defines the type of users and groups system that is
@ -299,11 +300,6 @@ func (sys *IAMSys) LoadGroup(objAPI ObjectLayer, group string) error {
return errServerNotInitialized
}
if globalEtcdClient != nil {
// Watch APIs cover this case, so nothing to do.
return nil
}
sys.store.lock()
defer sys.store.unlock()
@ -343,14 +339,9 @@ func (sys *IAMSys) LoadPolicy(objAPI ObjectLayer, policyName string) error {
sys.store.lock()
defer sys.store.unlock()
if globalEtcdClient == nil {
return sys.store.loadPolicyDoc(context.Background(), policyName, sys.iamPolicyDocsMap)
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
}
// LoadPolicyMapping - loads the mapped policy for a user or group
// from storage into server memory.
func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isGroup bool) error {
@ -361,7 +352,6 @@ func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isG
sys.store.lock()
defer sys.store.unlock()
if globalEtcdClient == nil {
var err error
userType := regUser
if sys.usersSysType == LDAPUsersSysType {
@ -382,13 +372,11 @@ func (sys *IAMSys) LoadPolicyMapping(objAPI ObjectLayer, userOrGroup string, isG
}
}
// Ignore policy not mapped error
if err != nil && err != errNoSuchPolicy {
if err == errNoSuchPolicy {
err = nil
}
return err
}
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
}
// LoadUser - reloads a specific user from backend disks or etcd.
func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, userType IAMUserType) error {
@ -399,14 +387,16 @@ func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, userType IAMUs
sys.store.lock()
defer sys.store.unlock()
if globalEtcdClient == nil {
err := sys.store.loadUser(context.Background(), accessKey, userType, sys.iamUsersMap)
if err != nil {
return err
}
err = sys.store.loadMappedPolicy(context.Background(), accessKey, userType, false, sys.iamUserPolicyMap)
// Ignore policy not mapped error
if err != nil && err != errNoSuchPolicy {
if err == errNoSuchPolicy {
err = nil
}
if err != nil {
return err
}
// We are on purpose not persisting the policy map for parent
@ -425,8 +415,6 @@ func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, userType IAMUs
}
}
}
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
}
@ -439,14 +427,7 @@ func (sys *IAMSys) LoadServiceAccount(accessKey string) error {
sys.store.lock()
defer sys.store.unlock()
if globalEtcdClient == nil {
err := sys.store.loadUser(context.Background(), accessKey, svcUser, sys.iamUsersMap)
if err != nil {
return err
}
}
// When etcd is set, we use watch APIs so this code is not needed.
return nil
return sys.store.loadUser(context.Background(), accessKey, svcUser, sys.iamUsersMap)
}
// Perform IAM configuration migration.
@ -455,18 +436,18 @@ func (sys *IAMSys) doIAMConfigMigration(ctx context.Context) error {
}
// InitStore initializes IAM stores
func (sys *IAMSys) InitStore(objAPI ObjectLayer) {
func (sys *IAMSys) InitStore(objAPI ObjectLayer, etcdClient *etcd.Client) {
sys.Lock()
defer sys.Unlock()
if globalEtcdClient == nil {
if etcdClient == nil {
if globalIsGateway {
sys.store = &iamDummyStore{}
} else {
sys.store = newIAMObjectStore(objAPI)
}
} else {
sys.store = newIAMEtcdStore()
sys.store = newIAMEtcdStore(etcdClient)
}
if globalLDAPConfig.Enabled {
@ -584,9 +565,9 @@ func (sys *IAMSys) Load(ctx context.Context, store IAMStorageAPI) error {
}
// Init - initializes config system by reading entries from config/iam
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer, etcdClient *etcd.Client) {
// Initialize IAM store
sys.InitStore(objAPI)
sys.InitStore(objAPI, etcdClient)
retryCtx, cancel := context.WithCancel(ctx)
@ -611,11 +592,11 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
continue
}
if globalEtcdClient != nil {
if etcdClient != nil {
// **** WARNING ****
// Migrating to encrypted backend on etcd should happen before initialization of
// IAM sub-system, make sure that we do not move the above codeblock elsewhere.
if err := migrateIAMConfigsEtcdToEncrypted(retryCtx, globalEtcdClient); err != nil {
if err := migrateIAMConfigsEtcdToEncrypted(retryCtx, etcdClient); err != nil {
txnLk.Unlock(lkctx.Cancel)
if errors.Is(err, errEtcdUnreachable) {
logger.Info("Connection to etcd timed out. Retrying..")
@ -685,6 +666,13 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) {
go sys.watch(ctx)
}
// HasWatcher - returns if the IAM system has a watcher to be notified of
// changes.
func (sys *IAMSys) HasWatcher() bool {
_, ok := sys.store.(iamStorageWatcher)
return ok
}
func (sys *IAMSys) watch(ctx context.Context) {
watcher, ok := sys.store.(iamStorageWatcher)
if ok {

View File

@ -570,7 +570,7 @@ func serverMain(ctx *cli.Context) {
}
// Initialize users credentials and policies in background right after config has initialized.
go globalIAMSys.Init(GlobalContext, newObject)
go globalIAMSys.Init(GlobalContext, newObject, globalEtcdClient)
initDataScanner(GlobalContext, newObject)

View File

@ -42,7 +42,7 @@ func TestCheckValid(t *testing.T) {
initAllSubsystems(context.Background(), objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
req, err := newTestRequest(http.MethodGet, "http://example.com:9000/bucket/object", 0, nil)
if err != nil {

View File

@ -380,12 +380,14 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, sites []madmin
}
// Notify all other Minio peers to reload user the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(svcCred.AccessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
joinReq := madmin.SRInternalJoinReq{
SvcAcctAccessKey: svcCred.AccessKey,
@ -489,12 +491,14 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI
}
// Notify all other Minio peers to reload the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(svcCred.AccessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
state := srState{
Name: ourName,
@ -961,6 +965,7 @@ func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyNam
}
if p != nil {
if !globalIAMSys.HasWatcher() {
// Notify all other MinIO peers to reload policy
for _, nerr := range globalNotificationSys.LoadPolicy(policyName) {
if nerr.Err != nil {
@ -968,6 +973,7 @@ func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyNam
logger.LogIf(ctx, nerr.Err)
}
}
}
return nil
}
@ -1006,12 +1012,14 @@ func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change
}
// Notify all other Minio peers to reload the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(newCred.AccessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
case change.Update != nil:
var sp *iampolicy.Policy
var err error
@ -1033,12 +1041,14 @@ func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change
}
// Notify all other Minio peers to reload the service account
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadServiceAccount(change.Update.AccessKey) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
case change.Delete != nil:
err := globalIAMSys.DeleteServiceAccount(ctx, change.Delete.AccessKey)
@ -1066,13 +1076,14 @@ func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mappi
}
// Notify all other MinIO peers to reload policy
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadPolicyMapping(mapping.UserOrGroup, mapping.IsGroup) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
return nil
}
@ -1119,12 +1130,14 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred madm
}
// Notify in-cluster peers to reload temp users.
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(cred.AccessKey, true) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
return nil
}

View File

@ -277,12 +277,14 @@ func (sts *stsAPIHandlers) AssumeRole(w http.ResponseWriter, r *http.Request) {
}
// Notify all other MinIO peers to reload temp users
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(cred.AccessKey, true) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
assumeRoleResponse := &AssumeRoleResponse{
Result: AssumeRoleResult{
@ -481,12 +483,14 @@ func (sts *stsAPIHandlers) AssumeRoleWithSSO(w http.ResponseWriter, r *http.Requ
}
// Notify all other MinIO peers to reload temp users
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(cred.AccessKey, true) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
var encodedSuccessResponse []byte
switch action {
@ -645,12 +649,14 @@ func (sts *stsAPIHandlers) AssumeRoleWithLDAPIdentity(w http.ResponseWriter, r *
}
// Notify all other MinIO peers to reload temp users
if !globalIAMSys.HasWatcher() {
for _, nerr := range globalNotificationSys.LoadUser(cred.AccessKey, true) {
if nerr.Err != nil {
logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String())
logger.LogIf(ctx, nerr.Err)
}
}
}
// Call hook for cluster-replication.
if err := globalSiteReplicationSys.IAMChangeHook(ctx, madmin.SRIAMItem{

View File

@ -350,7 +350,7 @@ func UnstartedTestServer(t TestErrHandler, instanceType string) TestServer {
initAllSubsystems(ctx, objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
return testServer
}
@ -1470,7 +1470,7 @@ func newTestObjectLayer(ctx context.Context, endpointServerPools EndpointServerP
initAllSubsystems(ctx, z)
globalIAMSys.InitStore(z)
globalIAMSys.InitStore(z, globalEtcdClient)
return z, nil
}
@ -1518,7 +1518,7 @@ func initAPIHandlerTest(obj ObjectLayer, endpoints []string) (string, http.Handl
initAllSubsystems(context.Background(), obj)
globalIAMSys.InitStore(obj)
globalIAMSys.InitStore(obj, globalEtcdClient)
// get random bucket name.
bucketName := getRandomBucketName()
@ -1808,7 +1808,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
initAllSubsystems(ctx, objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
// Executing the object layer tests for single node setup.
objTest(objLayer, FSTestStr, t)
@ -1829,7 +1829,7 @@ func ExecObjectLayerTest(t TestErrHandler, objTest objTestType) {
initAllSubsystems(ctx, objLayer)
globalIAMSys.InitStore(objLayer)
globalIAMSys.InitStore(objLayer, globalEtcdClient)
defer removeRoots(append(fsDirs, fsDir))
// Executing the object layer tests for Erasure.