From a4e1de93a730d09266dc667fca114c670bca4eda Mon Sep 17 00:00:00 2001 From: Poorna Date: Tue, 1 Feb 2022 17:26:09 -0800 Subject: [PATCH] Add API for removing site(s) from site replication (#14104) --- cmd/admin-handlers-site-replication.go | 70 +++++++-- cmd/admin-router.go | 3 +- cmd/site-replication.go | 192 ++++++++++++++++++++++++- 3 files changed, 249 insertions(+), 16 deletions(-) diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 5a3fbc5a6..d161703c5 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -253,18 +253,6 @@ func (a adminAPIHandlers) SRPeerReplicateBucketItem(w http.ResponseWriter, r *ht } } -// SiteReplicationDisable - PUT /minio/admin/v3/site-replication/disable -func (a adminAPIHandlers) SiteReplicationDisable(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "SiteReplicationDisable") - - defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) - - objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationDisableAction) - if objectAPI == nil { - return - } -} - // SiteReplicationInfo - GET /minio/admin/v3/site-replication/info func (a adminAPIHandlers) SiteReplicationInfo(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "SiteReplicationInfo") @@ -313,7 +301,6 @@ func parseJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptio Code: ErrSiteReplicationInvalidRequest, } } - if encryptionKey != "" { data, err = madmin.DecryptData(encryptionKey, bytes.NewReader(data)) if err != nil { @@ -324,7 +311,6 @@ func parseJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptio } } } - return json.Unmarshal(data, v) } @@ -448,3 +434,59 @@ func getSRStatusOptions(r *http.Request) (opts madmin.SRStatusOptions) { opts.EntityValue = q.Get("entityvalue") return } + +// SiteReplicationRemove - PUT /minio/admin/v3/site-replication/remove +func (a adminAPIHandlers) SiteReplicationRemove(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationRemove") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationRemoveAction) + if objectAPI == nil { + return + } + var rreq madmin.SRRemoveReq + err := parseJSONBody(ctx, r.Body, &rreq, "") + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + status, err := globalSiteReplicationSys.RemovePeerCluster(ctx, objectAPI, rreq) + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + body, err := json.Marshal(status) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + writeSuccessResponseJSON(w, body) +} + +// SRPeerRemove - PUT /minio/admin/v3/site-replication/peer/remove +// +// used internally to tell current cluster to update endpoint for peer +func (a adminAPIHandlers) SRPeerRemove(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SRPeerRemove") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationRemoveAction) + if objectAPI == nil { + return + } + + var req madmin.SRRemoveReq + if err := parseJSONBody(ctx, r.Body, &req, ""); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err := globalSiteReplicationSys.InternalRemoveReq(ctx, objectAPI, req); err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 503fc3013..67a442246 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -197,7 +197,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { // Cluster Replication APIs adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd))) - adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/disable").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationDisable))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationRemove))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/metainfo").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationMetaInfo))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationStatus))) @@ -209,6 +209,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/peer/idp-settings").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerGetIDPSettings))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerRemove))) } if globalIsDistErasure { diff --git a/cmd/site-replication.go b/cmd/site-replication.go index dcdcd2a18..d11fcb678 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -255,7 +255,7 @@ func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) erro c.Lock() defer c.Unlock() c.state = state - c.enabled = true + c.enabled = len(c.state.Peers) != 0 return nil } @@ -1848,6 +1848,196 @@ func (c *SiteReplicationSys) isEnabled() bool { return c.enabled } +// RemovePeerCluster - removes one or more clusters from site replication configuration. +func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) (st madmin.ReplicateRemoveStatus, err error) { + if !c.isEnabled() { + return st, errSRNotEnabled + } + info, err := c.GetClusterInfo(ctx) + if err != nil { + return st, errSRBackendIssue(err) + } + peerMap := make(map[string]madmin.PeerInfo) + var rmvEndpoints []string + siteNames := rreq.SiteNames + updatedPeers := make(map[string]madmin.PeerInfo) + + for _, pi := range info.Sites { + updatedPeers[pi.DeploymentID] = pi + peerMap[pi.Name] = pi + if rreq.RemoveAll { + siteNames = append(siteNames, pi.Name) + } + } + for _, s := range siteNames { + info, ok := peerMap[s] + if !ok { + return st, errSRInvalidRequest(fmt.Errorf("Site %s not found in site replication configuration", s)) + } + rmvEndpoints = append(rmvEndpoints, info.Endpoint) + delete(updatedPeers, info.DeploymentID) + } + var wg sync.WaitGroup + errs := make(map[string]error, len(c.state.Peers)) + + for _, v := range info.Sites { + wg.Add(1) + if v.DeploymentID == globalDeploymentID { + go func() { + defer wg.Done() + err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, false) + errs[globalDeploymentID] = err + }() + continue + } + go func(pi madmin.PeerInfo) { + defer wg.Done() + admClient, err := c.getAdminClient(ctx, pi.DeploymentID) + if err != nil { + errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", pi.Name, err)) + return + } + if _, err = admClient.SRPeerRemove(ctx, rreq); err != nil { + errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", pi.Name, err)) + return + } + }(v) + } + wg.Wait() + + for dID, err := range errs { + if err != nil { + return madmin.ReplicateRemoveStatus{ + ErrDetail: err.Error(), + Status: madmin.ReplicateRemoveStatusPartial, + }, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err)) + } + } + // Update cluster state + var state srState + if len(updatedPeers) > 1 { + state = srState{ + Name: info.Name, + Peers: updatedPeers, + ServiceAccountAccessKey: info.ServiceAccountAccessKey, + } + } + if err = c.saveToDisk(ctx, state); err != nil { + return madmin.ReplicateRemoveStatus{ + Status: madmin.ReplicateRemoveStatusPartial, + ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), + }, nil + } + + return madmin.ReplicateRemoveStatus{ + Status: madmin.ReplicateRemoveStatusSuccess, + }, nil +} + +// InternalRemoveReq - sends an unlink request to peer cluster to remove one or more sites +// from the site replication configuration. +func (c *SiteReplicationSys) InternalRemoveReq(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) error { + ourName := "" + peerMap := make(map[string]madmin.PeerInfo) + updatedPeers := make(map[string]madmin.PeerInfo) + siteNames := rreq.SiteNames + + for _, p := range c.state.Peers { + peerMap[p.Name] = p + if p.DeploymentID == globalDeploymentID { + ourName = p.Name + } + updatedPeers[p.DeploymentID] = p + if rreq.RemoveAll { + siteNames = append(siteNames, p.Name) + } + } + var rmvEndpoints []string + var unlinkSelf bool + + for _, s := range siteNames { + info, ok := peerMap[s] + if !ok { + return fmt.Errorf("Site %s not found in site replication configuration", s) + } + if info.DeploymentID == globalDeploymentID { + unlinkSelf = true + continue + } + delete(updatedPeers, info.DeploymentID) + rmvEndpoints = append(rmvEndpoints, info.Endpoint) + } + if err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, unlinkSelf); err != nil { + return err + } + var state srState + if !unlinkSelf { + state = srState{ + Name: c.state.Name, + Peers: updatedPeers, + ServiceAccountAccessKey: c.state.ServiceAccountAccessKey, + } + } + + if err := c.saveToDisk(ctx, state); err != nil { + return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err)) + } + return nil +} + +// RemoveRemoteTargetsForEndpoint removes replication targets corresponding to endpoint +func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context, objectAPI ObjectLayer, endpoints []string, unlinkSelf bool) (err error) { + targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService)) + m := make(map[string]madmin.BucketTarget) + for _, t := range targets { + for _, endpoint := range endpoints { + ep, _ := url.Parse(endpoint) + if t.Endpoint == ep.Host && + t.Secure == (ep.Scheme == "https") && + t.Type == madmin.ReplicationService { + m[t.Arn] = t + } + } + // all remote targets from self are to be delinked + if unlinkSelf { + m[t.Arn] = t + } + } + buckets, err := objectAPI.ListBuckets(ctx) + for _, b := range buckets { + config, err := globalBucketMetadataSys.GetReplicationConfig(ctx, b.Name) + if err != nil { + return err + } + var nRules []sreplication.Rule + for _, r := range config.Rules { + if _, ok := m[r.Destination.Bucket]; !ok { + nRules = append(nRules, r) + } + } + if len(nRules) > 0 { + config.Rules = nRules + configData, err := xml.Marshal(config) + if err != nil { + return err + } + if err = globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, configData); err != nil { + return err + } + } else { + if err := globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, nil); err != nil { + return err + } + } + } + for arn, t := range m { + if err := globalBucketTargetSys.RemoveTarget(ctx, t.SourceBucket, arn); err != nil { + return err + } + } + return +} + // Other helpers // newRemoteClusterHTTPTransport returns a new http configuration