mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
start watcher after all creds have been loaded (#9301)
start watcher after all creds have been loaded to avoid any conflicting locks that might get deadlocked. Deprecate unused peer calls for LoadUsers()
This commit is contained in:
parent
2054ca5c9a
commit
ac07df2985
@ -3,5 +3,5 @@
|
||||
set -e
|
||||
|
||||
for d in $(go list ./... | grep -v browser); do
|
||||
CGO_ENABLED=1 go test -v -race --timeout 20m "$d"
|
||||
CGO_ENABLED=1 go test -v -race --timeout 50m "$d"
|
||||
done
|
||||
|
@ -559,42 +559,39 @@ func (ies *IAMEtcdStore) deleteGroupInfo(name string) error {
|
||||
}
|
||||
|
||||
func (ies *IAMEtcdStore) watch(ctx context.Context, sys *IAMSys) {
|
||||
watchEtcd := func() {
|
||||
for {
|
||||
outerLoop:
|
||||
// Refresh IAMSys with etcd watch.
|
||||
watchCh := ies.client.Watch(ctx,
|
||||
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
|
||||
for {
|
||||
outerLoop:
|
||||
// Refresh IAMSys with etcd watch.
|
||||
watchCh := ies.client.Watch(ctx,
|
||||
iamConfigPrefix, etcd.WithPrefix(), etcd.WithKeysOnly())
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case watchResp, ok := <-watchCh:
|
||||
if !ok {
|
||||
time.Sleep(1 * time.Second)
|
||||
// Upon an error on watch channel
|
||||
// re-init the watch channel.
|
||||
goto outerLoop
|
||||
}
|
||||
if err := watchResp.Err(); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
// log and retry.
|
||||
time.Sleep(1 * time.Second)
|
||||
// Upon an error on watch channel
|
||||
// re-init the watch channel.
|
||||
goto outerLoop
|
||||
}
|
||||
for _, event := range watchResp.Events {
|
||||
ies.lock()
|
||||
ies.reloadFromEvent(sys, event)
|
||||
ies.unlock()
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case watchResp, ok := <-watchCh:
|
||||
if !ok {
|
||||
time.Sleep(1 * time.Second)
|
||||
// Upon an error on watch channel
|
||||
// re-init the watch channel.
|
||||
goto outerLoop
|
||||
}
|
||||
if err := watchResp.Err(); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
// log and retry.
|
||||
time.Sleep(1 * time.Second)
|
||||
// Upon an error on watch channel
|
||||
// re-init the watch channel.
|
||||
goto outerLoop
|
||||
}
|
||||
for _, event := range watchResp.Events {
|
||||
ies.lock()
|
||||
ies.reloadFromEvent(sys, event)
|
||||
ies.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
go watchEtcd()
|
||||
}
|
||||
|
||||
// sys.RLock is held by caller.
|
||||
|
@ -590,17 +590,13 @@ func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix stri
|
||||
}
|
||||
|
||||
func (iamOS *IAMObjectStore) watch(ctx context.Context, sys *IAMSys) {
|
||||
watchDisk := func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.NewTimer(globalRefreshIAMInterval).C:
|
||||
logger.LogIf(ctx, iamOS.loadAll(ctx, sys))
|
||||
}
|
||||
// Refresh IAMSys.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.NewTimer(globalRefreshIAMInterval).C:
|
||||
logger.LogIf(ctx, iamOS.loadAll(ctx, sys))
|
||||
}
|
||||
}
|
||||
|
||||
// Refresh IAMSys in background.
|
||||
go watchDisk()
|
||||
}
|
||||
|
@ -353,13 +353,6 @@ func (sys *IAMSys) LoadUser(objAPI ObjectLayer, accessKey string, userType IAMUs
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load - loads iam subsystem
|
||||
func (sys *IAMSys) Load(ctx context.Context) error {
|
||||
// Pass nil objectlayer here - it will be loaded internally
|
||||
// from the IAMStorageAPI.
|
||||
return sys.store.loadAll(ctx, sys)
|
||||
}
|
||||
|
||||
// Perform IAM configuration migration.
|
||||
func (sys *IAMSys) doIAMConfigMigration(ctx context.Context) error {
|
||||
return sys.store.migrateBackendFormat(ctx)
|
||||
@ -386,12 +379,12 @@ func (sys *IAMSys) Init(ctx context.Context, objAPI ObjectLayer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sys.store.watch(ctx, sys)
|
||||
err := sys.store.loadAll(ctx, sys)
|
||||
|
||||
// Invalidate the old cred after finishing IAM initialization
|
||||
globalOldCred = auth.Credentials{}
|
||||
|
||||
go sys.store.watch(ctx, sys)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -225,19 +225,6 @@ func (sys *NotificationSys) LoadUser(accessKey string, temp bool) []Notification
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadUsers - calls LoadUsers RPC call on all peers.
|
||||
func (sys *NotificationSys) LoadUsers() []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
continue
|
||||
}
|
||||
client := client
|
||||
ng.Go(context.Background(), client.LoadUsers, idx, *client.host)
|
||||
}
|
||||
return ng.Wait()
|
||||
}
|
||||
|
||||
// LoadGroup - loads a specific group on all peers.
|
||||
func (sys *NotificationSys) LoadGroup(group string) []NotificationPeerErr {
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
|
@ -874,16 +874,6 @@ func (client *peerRESTClient) LoadUser(accessKey string, temp bool) (err error)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadUsers - send load users command to peer nodes.
|
||||
func (client *peerRESTClient) LoadUsers() (err error) {
|
||||
respBody, err := client.call(peerRESTMethodLoadUsers, nil, nil, -1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadGroup - send load group command to peers.
|
||||
func (client *peerRESTClient) LoadGroup(group string) error {
|
||||
values := make(url.Values)
|
||||
|
@ -339,33 +339,6 @@ func (s *peerRESTServer) LoadUserHandler(w http.ResponseWriter, r *http.Request)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// LoadUsersHandler - reloads all users and canned policies.
|
||||
func (s *peerRESTServer) LoadUsersHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("Invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
objAPI := newObjectLayerWithoutSafeModeFn()
|
||||
if objAPI == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
if globalIAMSys == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
err := globalIAMSys.Load(GlobalContext)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
// LoadGroupHandler - reloads group along with members list.
|
||||
func (s *peerRESTServer) LoadGroupHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@ -1359,7 +1332,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadPolicyMapping).HandlerFunc(httpTraceAll(server.LoadPolicyMappingHandler)).Queries(restQueries(peerRESTUserOrGroup)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDeleteUser).HandlerFunc(httpTraceAll(server.LoadUserHandler)).Queries(restQueries(peerRESTUser)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUser).HandlerFunc(httpTraceAll(server.LoadUserHandler)).Queries(restQueries(peerRESTUser, peerRESTUserTemp)...)
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadUsers).HandlerFunc(httpTraceAll(server.LoadUsersHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadGroup).HandlerFunc(httpTraceAll(server.LoadGroupHandler)).Queries(restQueries(peerRESTGroup)...)
|
||||
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStartProfiling).HandlerFunc(httpTraceAll(server.StartProfilingHandler)).Queries(restQueries(peerRESTProfiler)...)
|
||||
|
Loading…
Reference in New Issue
Block a user