From 48da4aeee0a03fdbf4ac06ee5d9524592d9f75de Mon Sep 17 00:00:00 2001 From: Poorna Date: Fri, 21 Jan 2022 08:48:21 -0800 Subject: [PATCH] Add API for removing site(s) from site replication (#14022) --- cmd/admin-handlers-site-replication.go | 55 +++++++++++ cmd/admin-router.go | 2 + cmd/site-replication.go | 121 +++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 5 files changed, 181 insertions(+), 3 deletions(-) diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index e21d3d5ca..7e07588de 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -373,3 +373,58 @@ func (a adminAPIHandlers) SiteReplicationMetaInfo(w http.ResponseWriter, r *http return } } + +// SiteReplicationEdit - PUT /minio/admin/v3/site-replication/edit +func (a adminAPIHandlers) SiteReplicationEdit(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationEdit") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, cred := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction) + if objectAPI == nil { + return + } + var site madmin.PeerInfo + err := parseJSONBody(ctx, r.Body, &site, cred.SecretKey) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + status, err := globalSiteReplicationSys.EditPeerCluster(ctx, site) + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + body, err := json.Marshal(status) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + writeSuccessResponseJSON(w, body) +} + +// SRPeerEdit - PUT /minio/admin/v3/site-replication/peer/edit +// +// used internally to tell current cluster to update endpoint for peer +func (a adminAPIHandlers) SRPeerEdit(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRPeerEdit") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationAddAction) + if objectAPI == nil { + return + } + + var pi madmin.PeerInfo + if err := parseJSONBody(ctx, r.Body, &pi, ""); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err := globalSiteReplicationSys.PeerEditReq(ctx, pi); err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 6abdb5e4c..503fc3013 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -207,6 +207,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateIAMItem))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/bucket-meta").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateBucketItem))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerGetIDPSettings))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit))) } if globalIsDistErasure { diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 75f0ac228..b4c4f0377 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1335,6 +1335,20 @@ func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID st return getAdminClient(peer.Endpoint, creds.AccessKey, creds.SecretKey) } +// getAdminClientWithEndpoint - NOTE: ensure to take at least a read lock on SiteReplicationSys +// before calling this. +func (c *SiteReplicationSys) getAdminClientWithEndpoint(ctx context.Context, deploymentID, endpoint string) (*madmin.AdminClient, error) { + creds, err := c.getPeerCreds() + if err != nil { + return nil, err + } + + if _, ok := c.state.Peers[deploymentID]; !ok { + return nil, errSRPeerNotFound + } + return getAdminClient(endpoint, creds.AccessKey, creds.SecretKey) +} + func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) { creds, ok := globalIAMSys.store.GetUser(c.state.ServiceAccountAccessKey) if !ok { @@ -2474,3 +2488,110 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI } return info, nil } + +// EditPeerCluster - edits replication configuration and updates peer endpoint. +func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.PeerInfo) (madmin.ReplicateEditStatus, error) { + sites, err := c.GetClusterInfo(ctx) + if err != nil { + return madmin.ReplicateEditStatus{}, errSRBackendIssue(err) + } + if !sites.Enabled { + return madmin.ReplicateEditStatus{}, errSRNotEnabled + } + + var ( + found bool + admClient *madmin.AdminClient + ) + + for _, v := range sites.Sites { + if peer.DeploymentID == v.DeploymentID { + found = true + if peer.Endpoint == v.Endpoint { + return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID)) + } + admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) + if err != nil { + return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) + } + // check if endpoint is reachable + if _, err = admClient.ServerInfo(ctx); err != nil { + return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("Endpoint %s not reachable: %w", peer.Endpoint, err)) + } + } + } + + if !found { + return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID)) + } + + errs := make(map[string]error, len(c.state.Peers)) + var wg sync.WaitGroup + + pi := c.state.Peers[peer.DeploymentID] + pi.Endpoint = peer.Endpoint + + for i, v := range sites.Sites { + if v.DeploymentID == globalDeploymentID { + c.state.Peers[peer.DeploymentID] = pi + continue + } + wg.Add(1) + go func(pi madmin.PeerInfo, i int) { + defer wg.Done() + v := sites.Sites[i] + admClient, err := c.getAdminClient(ctx, v.DeploymentID) + if v.DeploymentID == peer.DeploymentID { + admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) + } + if err != nil { + errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) + return + } + if err = admClient.SRPeerEdit(ctx, pi); err != nil { + errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err)) + return + } + }(pi, i) + } + + wg.Wait() + for dID, err := range errs { + if err != nil { + return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err)) + } + } + // we can now save the cluster replication configuration state. + if err = c.saveToDisk(ctx, c.state); err != nil { + return madmin.ReplicateEditStatus{ + Status: madmin.ReplicateAddStatusPartial, + ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), + }, nil + } + + result := madmin.ReplicateEditStatus{ + Success: true, + Status: fmt.Sprintf("Cluster replication configuration updated with endpoint %s for peer %s successfully", peer.Endpoint, peer.Name), + } + return result, nil +} + +// PeerEditReq - internal API handler to respond to a peer cluster's request +// to edit endpoint. +func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error { + ourName := "" + for i := range c.state.Peers { + p := c.state.Peers[i] + if p.DeploymentID == arg.DeploymentID { + p.Endpoint = arg.Endpoint + c.state.Peers[arg.DeploymentID] = p + } + if p.DeploymentID == globalDeploymentID { + ourName = p.Name + } + } + if err := c.saveToDisk(ctx, c.state); err != nil { + return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err)) + } + return nil +} diff --git a/go.mod b/go.mod index 204da23cc..a3c5be0c1 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/minio/csvparser v1.0.0 github.com/minio/highwayhash v1.0.2 github.com/minio/kes v0.14.0 - github.com/minio/madmin-go v1.2.6 + github.com/minio/madmin-go v1.2.7 github.com/minio/minio-go/v7 v7.0.20 github.com/minio/parquet-go v1.1.0 github.com/minio/pkg v1.1.14 diff --git a/go.sum b/go.sum index a0c218f61..75fb659e0 100644 --- a/go.sum +++ b/go.sum @@ -1092,8 +1092,8 @@ github.com/minio/kes v0.14.0/go.mod h1:OUensXz2BpgMfiogslKxv7Anyx/wj+6bFC6qA7BQc github.com/minio/madmin-go v1.0.12/go.mod h1:BK+z4XRx7Y1v8SFWXsuLNqQqnq5BO/axJ8IDJfgyvfs= github.com/minio/madmin-go v1.1.15/go.mod h1:Iu0OnrMWNBYx1lqJTW+BFjBMx0Hi0wjw8VmqhiOs2Jo= github.com/minio/madmin-go v1.1.23/go.mod h1:wv8zCroSCnpjjQdmgsdJEkFH2oD4w9J40OZqbhxjiJ4= -github.com/minio/madmin-go v1.2.6 h1:k4q5I+6nV/r7QBFZtvlPWMJh8uOo5AbEMx/39VXP7kg= -github.com/minio/madmin-go v1.2.6/go.mod h1:/rOfQv4ohkXJ+7EaSnhg9IJEX7cobX08zkSLfh8G3Ks= +github.com/minio/madmin-go v1.2.7 h1:zxXxflVKf3J1TUCnWoT2rNbDLOJQMbsNbrbNsqXD3Ew= +github.com/minio/madmin-go v1.2.7/go.mod h1:/rOfQv4ohkXJ+7EaSnhg9IJEX7cobX08zkSLfh8G3Ks= github.com/minio/mc v0.0.0-20211207230606-23a05f5a17f2 h1:xocb1RGyrDJ8PxkNn0NSbaBlfdU6J/Ag9QK62pb7nR8= github.com/minio/mc v0.0.0-20211207230606-23a05f5a17f2/go.mod h1:siI9jWTzj1KsNXgz6NOL/S7OTaAUM0OMi+zEkF08gnA= github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=