From 4f31a9a33b09a9a4af6f7d8dbee82442aade2398 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 18 Dec 2018 14:39:21 -0800 Subject: [PATCH] Reload users upon AddUser on peers (#6975) Also migrate ReloadFormat to notification subsystem, remove GetConfig() we do not use this API anymore --- cmd/admin-handlers.go | 40 ++++++++++++++++++ cmd/admin-heal-ops.go | 7 +++- cmd/admin-rpc-client.go | 39 ------------------ cmd/admin-rpc-server.go | 17 -------- cmd/admin-rpc_test.go | 75 ---------------------------------- cmd/iam.go | 4 +- cmd/local-admin-client.go | 11 ----- cmd/local-admin-client_test.go | 8 ---- cmd/notification.go | 56 ++++++++++++++++++++++++- cmd/peer-rpc-client.go | 20 ++++++++- cmd/peer-rpc-server.go | 24 +++++++++++ 11 files changed, 146 insertions(+), 155 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index e0d2e037a..be882823d 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -867,6 +867,14 @@ func (a adminAPIHandlers) SetUserStatus(w http.ResponseWriter, r *http.Request) writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + + // Notify all other Minio peers to reload users + for host, err := range globalNotificationSys.LoadUsers() { + if err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) + logger.LogIf(ctx, err) + } + } } // AddUser - PUT /minio/admin/v1/add-user?accessKey= @@ -927,6 +935,14 @@ func (a adminAPIHandlers) AddUser(w http.ResponseWriter, r *http.Request) { writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + + // Notify all other Minio peers to reload users + for host, err := range globalNotificationSys.LoadUsers() { + if err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) + logger.LogIf(ctx, err) + } + } } // ListCannedPolicies - GET /minio/admin/v1/list-canned-policies @@ -992,6 +1008,14 @@ func (a adminAPIHandlers) RemoveCannedPolicy(w http.ResponseWriter, r *http.Requ writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + + // Notify all other Minio peers to reload users + for host, err := range globalNotificationSys.LoadUsers() { + if err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) + logger.LogIf(ctx, err) + } + } } // AddCannedPolicy - PUT /minio/admin/v1/add-canned-policy?name= @@ -1049,6 +1073,14 @@ func (a adminAPIHandlers) AddCannedPolicy(w http.ResponseWriter, r *http.Request writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) return } + + // Notify all other Minio peers to reload users + for host, err := range globalNotificationSys.LoadUsers() { + if err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) + logger.LogIf(ctx, err) + } + } } // SetUserPolicy - PUT /minio/admin/v1/set-user-policy?accessKey=&name= @@ -1088,6 +1120,14 @@ func (a adminAPIHandlers) SetUserPolicy(w http.ResponseWriter, r *http.Request) if err := globalIAMSys.SetUserPolicy(accessKey, policyName); err != nil { writeErrorResponseJSON(w, toAdminAPIErrCode(ctx, err), r.URL) } + + // Notify all other Minio peers to reload users + for host, err := range globalNotificationSys.LoadUsers() { + if err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", host.String()) + logger.LogIf(ctx, err) + } + } } // SetConfigHandler - PUT /minio/admin/v1/config diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index b4344a3e2..274ff603b 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -641,7 +641,12 @@ func (h *healSequence) healDiskFormat() error { // Healing succeeded notify the peers to reload format and re-initialize disks. // We will not notify peers only if healing succeeded. if err == nil { - peersReInitFormat(globalAdminPeers, h.settings.DryRun) + for host, rerr := range globalNotificationSys.ReloadFormat(h.settings.DryRun) { + if rerr != nil { + logger.GetReqInfo(h.ctx).SetTags("peerAddress", host.String()) + logger.LogIf(h.ctx, rerr) + } + } } // Push format heal result diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go index 9f5a4b9f5..874dc2f6f 100644 --- a/cmd/admin-rpc-client.go +++ b/cmd/admin-rpc-client.go @@ -45,29 +45,12 @@ func (rpcClient *AdminRPCClient) SignalService(signal serviceSignal) (err error) return rpcClient.Call(adminServiceName+".SignalService", &args, &reply) } -// ReInitFormat - re-initialize disk format, remotely. -func (rpcClient *AdminRPCClient) ReInitFormat(dryRun bool) error { - args := ReInitFormatArgs{DryRun: dryRun} - reply := VoidReply{} - - return rpcClient.Call(adminServiceName+".ReInitFormat", &args, &reply) -} - // ServerInfo - returns the server info of the server to which the RPC call is made. func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) { err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid) return sid, err } -// GetConfig - returns config.json of the remote server. -func (rpcClient *AdminRPCClient) GetConfig() ([]byte, error) { - args := AuthArgs{} - var reply []byte - - err := rpcClient.Call(adminServiceName+".GetConfig", &args, &reply) - return reply, err -} - // StartProfiling - starts profiling in the remote server. func (rpcClient *AdminRPCClient) StartProfiling(profiler string) error { args := StartProfilingArgs{Profiler: profiler} @@ -125,9 +108,7 @@ func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) { // commands like service stop and service restart. type adminCmdRunner interface { SignalService(s serviceSignal) error - ReInitFormat(dryRun bool) error ServerInfo() (ServerInfoData, error) - GetConfig() ([]byte, error) StartProfiling(string) error DownloadProfilingData() ([]byte, error) } @@ -174,26 +155,6 @@ func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) { return adminPeerList } -// peersReInitFormat - reinitialize remote object layers to new format. -func peersReInitFormat(peers adminPeers, dryRun bool) error { - errs := make([]error, len(peers)) - - // Send ReInitFormat RPC call to all nodes. - // for local adminPeer this is a no-op. - wg := sync.WaitGroup{} - for i, peer := range peers { - wg.Add(1) - go func(idx int, peer adminPeer) { - defer wg.Done() - if !peer.isLocal { - errs[idx] = peer.cmdRunner.ReInitFormat(dryRun) - } - }(i, peer) - } - wg.Wait() - return nil -} - // Initialize global adminPeer collection. func initGlobalAdminPeers(endpoints EndpointList) { globalAdminPeers = makeAdminPeers(endpoints) diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go index a71f51ea6..91afa2e2d 100644 --- a/cmd/admin-rpc-server.go +++ b/cmd/admin-rpc-server.go @@ -68,23 +68,6 @@ func (receiver *adminRPCReceiver) DownloadProfilingData(args *AuthArgs, reply *[ return } -// GetConfig - returns the config.json of this server. -func (receiver *adminRPCReceiver) GetConfig(args *AuthArgs, reply *[]byte) (err error) { - *reply, err = receiver.local.GetConfig() - return err -} - -// ReInitFormatArgs - provides dry-run information to re-initialize format.json -type ReInitFormatArgs struct { - AuthArgs - DryRun bool -} - -// ReInitFormat - re-init 'format.json' -func (receiver *adminRPCReceiver) ReInitFormat(args *ReInitFormatArgs, reply *VoidReply) error { - return receiver.local.ReInitFormat(args.DryRun) -} - // NewAdminRPCServer - returns new admin RPC server. func NewAdminRPCServer() (*xrpc.Server, error) { rpcServer := xrpc.NewServer() diff --git a/cmd/admin-rpc_test.go b/cmd/admin-rpc_test.go index 1b86a0256..f92176e1e 100644 --- a/cmd/admin-rpc_test.go +++ b/cmd/admin-rpc_test.go @@ -63,34 +63,6 @@ func testAdminCmdRunnerSignalService(t *testing.T, client adminCmdRunner) { } } -func testAdminCmdRunnerReInitFormat(t *testing.T, client adminCmdRunner) { - tmpGlobalObjectAPI := globalObjectAPI - defer func() { - globalObjectAPI = tmpGlobalObjectAPI - }() - - testCases := []struct { - objectAPI ObjectLayer - dryRun bool - expectErr bool - }{ - {&DummyObjectLayer{}, true, false}, - {&DummyObjectLayer{}, false, false}, - {nil, true, true}, - {nil, false, true}, - } - - for i, testCase := range testCases { - globalObjectAPI = testCase.objectAPI - err := client.ReInitFormat(testCase.dryRun) - expectErr := (err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) { tmpGlobalBootTime := globalBootTime tmpGlobalObjectAPI := globalObjectAPI @@ -137,33 +109,6 @@ func testAdminCmdRunnerServerInfo(t *testing.T, client adminCmdRunner) { } } -func testAdminCmdRunnerGetConfig(t *testing.T, client adminCmdRunner) { - tmpGlobalServerConfig := globalServerConfig - defer func() { - globalServerConfig = tmpGlobalServerConfig - }() - - config := newServerConfig() - - testCases := []struct { - config *serverConfig - expectErr bool - }{ - {globalServerConfig, false}, - {config, false}, - } - - for i, testCase := range testCases { - globalServerConfig = testCase.config - _, err := client.GetConfig() - expectErr := (err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("case %v: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - func newAdminRPCHTTPServerClient(t *testing.T) (*httptest.Server, *AdminRPCClient, *serverConfig) { rpcServer, err := NewAdminRPCServer() if err != nil { @@ -205,16 +150,6 @@ func TestAdminRPCClientSignalService(t *testing.T) { testAdminCmdRunnerSignalService(t, rpcClient) } -func TestAdminRPCClientReInitFormat(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t) - defer httpServer.Close() - defer func() { - globalServerConfig = prevGlobalServerConfig - }() - - testAdminCmdRunnerReInitFormat(t, rpcClient) -} - func TestAdminRPCClientServerInfo(t *testing.T) { httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t) defer httpServer.Close() @@ -224,13 +159,3 @@ func TestAdminRPCClientServerInfo(t *testing.T) { testAdminCmdRunnerServerInfo(t, rpcClient) } - -func TestAdminRPCClientGetConfig(t *testing.T) { - httpServer, rpcClient, prevGlobalServerConfig := newAdminRPCHTTPServerClient(t) - defer httpServer.Close() - defer func() { - globalServerConfig = prevGlobalServerConfig - }() - - testAdminCmdRunnerGetConfig(t, rpcClient) -} diff --git a/cmd/iam.go b/cmd/iam.go index 772800013..7d1838338 100644 --- a/cmd/iam.go +++ b/cmd/iam.go @@ -60,9 +60,9 @@ type IAMSys struct { iamCannedPolicyMap map[string]iampolicy.Policy } -// Load - load iam.json +// Load - loads iam subsystem func (sys *IAMSys) Load(objAPI ObjectLayer) error { - return sys.Init(objAPI) + return sys.refresh(objAPI) } // Init - initializes config system from iam.json diff --git a/cmd/local-admin-client.go b/cmd/local-admin-client.go index c6a5f715d..72411c4bb 100644 --- a/cmd/local-admin-client.go +++ b/cmd/local-admin-client.go @@ -18,9 +18,7 @@ package cmd import ( "context" - "encoding/json" "errors" - "fmt" "os" "io/ioutil" @@ -76,15 +74,6 @@ func (lc localAdminClient) ServerInfo() (sid ServerInfoData, e error) { }, nil } -// GetConfig - returns config.json of the local server. -func (lc localAdminClient) GetConfig() ([]byte, error) { - if globalServerConfig == nil { - return nil, fmt.Errorf("config not present") - } - - return json.Marshal(globalServerConfig) -} - // StartProfiling - starts profiling on the local server. func (lc localAdminClient) StartProfiling(profiler string) error { if globalProfiler != nil { diff --git a/cmd/local-admin-client_test.go b/cmd/local-admin-client_test.go index fcbe6fedc..c6a2355ff 100644 --- a/cmd/local-admin-client_test.go +++ b/cmd/local-admin-client_test.go @@ -24,14 +24,6 @@ func TestLocalAdminClientSignalService(t *testing.T) { testAdminCmdRunnerSignalService(t, &localAdminClient{}) } -func TestLocalAdminClientReInitFormat(t *testing.T) { - testAdminCmdRunnerReInitFormat(t, &localAdminClient{}) -} - func TestLocalAdminClientServerInfo(t *testing.T) { testAdminCmdRunnerServerInfo(t, &localAdminClient{}) } - -func TestLocalAdminClientGetConfig(t *testing.T) { - testAdminCmdRunnerGetConfig(t, &localAdminClient{}) -} diff --git a/cmd/notification.go b/cmd/notification.go index a04106642..ab7cd50bc 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -90,6 +90,60 @@ func (sys *NotificationSys) DeleteBucket(ctx context.Context, bucketName string) }() } +// ReloadFormat - calls ReloadFormat RPC call on all peers. +func (sys *NotificationSys) ReloadFormat(dryRun bool) map[xnet.Host]error { + errors := make(map[xnet.Host]error) + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + // Try to load format in three attempts, before giving up. + for i := 0; i < 3; i++ { + err := client.ReloadFormat(dryRun) + if err == nil { + break + } + errors[addr] = err + // Wait for one second and no need wait after last attempt. + if i < 2 { + time.Sleep(1 * time.Second) + } + } + }(addr, client) + } + wg.Wait() + + return errors +} + +// LoadUsers - calls LoadUsers RPC call on all peers. +func (sys *NotificationSys) LoadUsers() map[xnet.Host]error { + errors := make(map[xnet.Host]error) + var wg sync.WaitGroup + for addr, client := range sys.peerRPCClientMap { + wg.Add(1) + go func(addr xnet.Host, client *PeerRPCClient) { + defer wg.Done() + // Try to load users in three attempts. + for i := 0; i < 3; i++ { + err := client.LoadUsers() + if err == nil { + break + } + errors[addr] = err + // Wait for one second and no need wait after last attempt. + if i < 2 { + time.Sleep(1 * time.Second) + } + } + }(addr, client) + } + wg.Wait() + + return errors +} + // LoadCredentials - calls LoadCredentials RPC call on all peers. func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error { errors := make(map[xnet.Host]error) @@ -98,7 +152,7 @@ func (sys *NotificationSys) LoadCredentials() map[xnet.Host]error { wg.Add(1) go func(addr xnet.Host, client *PeerRPCClient) { defer wg.Done() - // Try to set credentials in three attempts. + // Try to load credentials in three attempts. for i := 0; i < 3; i++ { err := client.LoadCredentials() if err == nil { diff --git a/cmd/peer-rpc-client.go b/cmd/peer-rpc-client.go index 9893c773f..8e73b24f2 100644 --- a/cmd/peer-rpc-client.go +++ b/cmd/peer-rpc-client.go @@ -115,12 +115,30 @@ func (rpcClient *PeerRPCClient) SendEvent(bucketName string, targetID, remoteTar return err } +// ReloadFormat - calls reload format RPC. +func (rpcClient *PeerRPCClient) ReloadFormat(dryRun bool) error { + args := ReloadFormatArgs{ + DryRun: dryRun, + } + reply := VoidReply{} + + return rpcClient.Call(peerServiceName+".ReloadFormat", &args, &reply) +} + +// LoadUsers - calls load users RPC. +func (rpcClient *PeerRPCClient) LoadUsers() error { + args := AuthArgs{} + reply := VoidReply{} + + return rpcClient.Call(peerServiceName+".LoadUsers", &args, &reply) +} + // LoadCredentials - calls load credentials RPC. func (rpcClient *PeerRPCClient) LoadCredentials() error { args := AuthArgs{} reply := VoidReply{} - return rpcClient.Call(peerServiceName+".SetCredentials", &args, &reply) + return rpcClient.Call(peerServiceName+".LoadCredentials", &args, &reply) } // NewPeerRPCClient - returns new peer RPC client. diff --git a/cmd/peer-rpc-server.go b/cmd/peer-rpc-server.go index ebabcd275..c243996c4 100644 --- a/cmd/peer-rpc-server.go +++ b/cmd/peer-rpc-server.go @@ -192,6 +192,30 @@ func (receiver *peerRPCReceiver) SendEvent(args *SendEventArgs, reply *bool) err return nil } +// ReloadFormatArgs - send event RPC arguments. +type ReloadFormatArgs struct { + AuthArgs + DryRun bool +} + +// ReloadFormat - handles reload format RPC call, reloads latest `format.json` +func (receiver *peerRPCReceiver) ReloadFormat(args *ReloadFormatArgs, reply *VoidReply) error { + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + return objAPI.ReloadFormat(context.Background(), args.DryRun) +} + +// LoadUsers - handles load users RPC call. +func (receiver *peerRPCReceiver) LoadUsers(args *AuthArgs, reply *VoidReply) error { + objAPI := newObjectLayerFn() + if objAPI == nil { + return errServerNotInitialized + } + return globalIAMSys.Load(objAPI) +} + // LoadCredentials - handles load credentials RPC call. func (receiver *peerRPCReceiver) LoadCredentials(args *AuthArgs, reply *VoidReply) error { objAPI := newObjectLayerFn()