diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 95e3d6177..74d58cdbd 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -87,7 +87,7 @@ func (a adminAPIHandlers) SRPeerJoin(w http.ResponseWriter, r *http.Request) { return } - if err := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg); err != nil { + if err := globalSiteReplicationSys.PeerJoinReq(ctx, joinArg); err != nil { logger.LogIf(ctx, err) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return @@ -310,3 +310,49 @@ func parseJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptio return json.Unmarshal(data, v) } + +// SiteReplicationStatus - GET /minio/admin/v3/site-replication/status +func (a adminAPIHandlers) SiteReplicationStatus(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationStatus") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationInfoAction) + if objectAPI == nil { + return + } + + info, err := globalSiteReplicationSys.SiteReplicationStatus(ctx, objectAPI) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err = json.NewEncoder(w).Encode(info); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} + +// SiteReplicationMetaInfo - GET /minio/admin/v3/site-replication/metainfo +func (a adminAPIHandlers) SiteReplicationMetaInfo(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationMetaInfo") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationInfoAction) + if objectAPI == nil { + return + } + + info, err := globalSiteReplicationSys.SiteReplicationMetaInfo(ctx, objectAPI) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + if err = json.NewEncoder(w).Encode(info); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index d8b32b67a..44b99221f 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -193,6 +193,9 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/add").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationAdd))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/disable").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationDisable))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo))) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/metainfo").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationMetaInfo))) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationStatus))) + adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerJoin))) adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}") adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/iam-item").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerReplicateIAMItem))) diff --git a/cmd/site-replication.go b/cmd/site-replication.go index be5a37c3d..e9a89b704 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -41,7 +41,9 @@ import ( "github.com/minio/minio/internal/auth" sreplication "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/logger" + "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/bucket/policy" + bktpolicy "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" ) @@ -510,9 +512,9 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi return result, nil } -// InternalJoinReq - internal API handler to respond to a peer cluster's request +// PeerJoinReq - internal API handler to respond to a peer cluster's request // to join. -func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRPeerJoinReq) error { +func (c *SiteReplicationSys) PeerJoinReq(ctx context.Context, arg madmin.SRPeerJoinReq) error { var ourName string for d, p := range arg.Peers { if d == globalDeploymentID { @@ -1689,3 +1691,582 @@ func getMissingSiteNames(oldDeps, newDeps set.StringSet, currSites []madmin.Peer } return diffSlc } + +type srBucketMetaInfo struct { + madmin.SRBucketInfo + DeploymentID string +} + +type srPolicy struct { + policy json.RawMessage + DeploymentID string +} + +type srUserPolicyMapping struct { + madmin.SRPolicyMapping + DeploymentID string +} + +type srGroupPolicyMapping struct { + madmin.SRPolicyMapping + DeploymentID string +} + +// SiteReplicationStatus returns the site replication status across clusters participating in site replication. +func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI ObjectLayer) (info madmin.SRStatusInfo, err error) { + c.RLock() + defer c.RUnlock() + if !c.enabled { + return info, err + } + + sris := make([]madmin.SRInfo, len(c.state.Peers)) + sriErrs := make([]error, len(c.state.Peers)) + g := errgroup.WithNErrs(len(c.state.Peers)) + var depIDs []string + for d := range c.state.Peers { + depIDs = append(depIDs, d) + } + for index := range depIDs { + index := index + if depIDs[index] == globalDeploymentID { + g.Go(func() error { + sris[index], sriErrs[index] = c.SiteReplicationMetaInfo(ctx, objAPI) + return nil + }, index) + continue + } + g.Go(func() error { + admClient, err := c.getAdminClient(ctx, depIDs[index]) + if err != nil { + return err + } + sris[index], sriErrs[index] = admClient.SRMetaInfo(ctx) + return nil + }, index) + } + // Wait for the go routines. + g.Wait() + + for _, serr := range sriErrs { + if serr != nil { + return info, errSRBackendIssue(serr) + } + } + info.Enabled = true + info.Sites = make(map[string]madmin.PeerInfo, len(c.state.Peers)) + for d, peer := range c.state.Peers { + info.Sites[d] = peer + } + + var maxBuckets int + depIdxes := make(map[string]int) + for i, sri := range sris { + depIdxes[sri.DeploymentID] = i + if len(sri.Buckets) > maxBuckets { + maxBuckets = len(sri.Buckets) + } + } + // mapping b/w entity and entity config across sites + bucketStats := make(map[string][]srBucketMetaInfo) + policyStats := make(map[string][]srPolicy) + userPolicyStats := make(map[string][]srUserPolicyMapping) + groupPolicyStats := make(map[string][]srGroupPolicyMapping) + + numSites := len(sris) + for _, sri := range sris { + for b, si := range sri.Buckets { + if _, ok := bucketStats[si.Bucket]; !ok { + bucketStats[b] = make([]srBucketMetaInfo, 0, numSites) + } + bucketStats[b] = append(bucketStats[b], srBucketMetaInfo{SRBucketInfo: si, DeploymentID: sri.DeploymentID}) + } + for pname, policy := range sri.Policies { + if _, ok := policyStats[pname]; !ok { + policyStats[pname] = make([]srPolicy, 0, numSites) + } + policyStats[pname] = append(policyStats[pname], srPolicy{policy: policy, DeploymentID: sri.DeploymentID}) + } + for user, policy := range sri.UserPolicies { + if _, ok := userPolicyStats[user]; !ok { + userPolicyStats[user] = make([]srUserPolicyMapping, 0, numSites) + } + userPolicyStats[user] = append(userPolicyStats[user], srUserPolicyMapping{SRPolicyMapping: policy, DeploymentID: sri.DeploymentID}) + } + for group, policy := range sri.GroupPolicies { + if _, ok := userPolicyStats[group]; !ok { + groupPolicyStats[group] = make([]srGroupPolicyMapping, 0, numSites) + } + groupPolicyStats[group] = append(groupPolicyStats[group], srGroupPolicyMapping{SRPolicyMapping: policy, DeploymentID: sri.DeploymentID}) + } + } + info.StatsSummary = make(map[string]madmin.SRSiteSummary, len(c.state.Peers)) + info.BucketMismatches = make(map[string]map[string]madmin.SRBucketStatsSummary) + info.PolicyMismatches = make(map[string]map[string]madmin.SRPolicyStatsSummary) + info.UserMismatches = make(map[string]map[string]madmin.SRUserStatsSummary) + info.GroupMismatches = make(map[string]map[string]madmin.SRGroupStatsSummary) + // collect user policy mapping replication status across sites + for u, pslc := range userPolicyStats { + policySet := set.NewStringSet() + uPolicyCount := 0 + for _, ps := range pslc { + policyBytes, err := json.Marshal(ps) + if err != nil { + continue + } + uPolicyCount++ + if policyStr := string(policyBytes); !policySet.Contains(policyStr) { + policySet.Add(policyStr) + } + } + policyMismatch := !isReplicated(uPolicyCount, numSites, policySet) + if policyMismatch { + for _, ps := range pslc { + dID := depIdxes[ps.DeploymentID] + _, hasUser := sris[dID].UserPolicies[u] + + info.UserMismatches[u][ps.DeploymentID] = madmin.SRUserStatsSummary{ + PolicyMismatch: policyMismatch, + UserMissing: !hasUser, + } + } + } + } + + // collect user policy mapping replication status across sites + + for g, pslc := range groupPolicyStats { + policySet := set.NewStringSet() + gPolicyCount := 0 + for _, ps := range pslc { + policyBytes, err := json.Marshal(ps) + if err != nil { + continue + } + gPolicyCount++ + if policyStr := string(policyBytes); !policySet.Contains(policyStr) { + policySet.Add(policyStr) + } + } + policyMismatch := !isReplicated(gPolicyCount, numSites, policySet) + if policyMismatch { + for _, ps := range pslc { + dID := depIdxes[ps.DeploymentID] + _, hasGroup := sris[dID].GroupPolicies[g] + + info.GroupMismatches[g][ps.DeploymentID] = madmin.SRGroupStatsSummary{ + PolicyMismatch: policyMismatch, + GroupMissing: !hasGroup, + } + } + } + } + // collect IAM policy replication status across sites + + for p, pslc := range policyStats { + var policies []*iampolicy.Policy + uPolicyCount := 0 + for _, ps := range pslc { + plcy, err := iampolicy.ParseConfig(bytes.NewReader(ps.policy)) + if err != nil { + continue + } + policies = append(policies, plcy) + uPolicyCount++ + sum := info.StatsSummary[ps.DeploymentID] + sum.TotalIAMPoliciesCount++ + info.StatsSummary[ps.DeploymentID] = sum + } + policyMismatch := !isIAMPolicyReplicated(uPolicyCount, numSites, policies) + switch { + case policyMismatch: + for _, ps := range pslc { + dID := depIdxes[ps.DeploymentID] + _, hasPolicy := sris[dID].Policies[p] + if len(info.PolicyMismatches[p]) == 0 { + info.PolicyMismatches[p] = make(map[string]madmin.SRPolicyStatsSummary) + } + info.PolicyMismatches[p][ps.DeploymentID] = madmin.SRPolicyStatsSummary{ + PolicyMismatch: policyMismatch, + PolicyMissing: !hasPolicy, + } + } + default: + // no mismatch + for _, s := range pslc { + sum := info.StatsSummary[s.DeploymentID] + if !policyMismatch { + sum.ReplicatedIAMPolicies++ + } + info.StatsSummary[s.DeploymentID] = sum + } + + } + } + // collect bucket metadata replication stats across sites + for b, slc := range bucketStats { + tagSet := set.NewStringSet() + olockConfigSet := set.NewStringSet() + var policies []*bktpolicy.Policy + var replCfgs []*sreplication.Config + sseCfgSet := set.NewStringSet() + var tagCount, olockCfgCount, sseCfgCount int + for _, s := range slc { + if s.ReplicationConfig != nil { + cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig) + if err != nil { + continue + } + cfg, err := sreplication.ParseConfig(bytes.NewReader(cfgBytes)) + if err != nil { + continue + } + replCfgs = append(replCfgs, cfg) + } + if s.Tags != nil { + tagBytes, err := base64.StdEncoding.DecodeString(*s.Tags) + if err != nil { + continue + } + tagCount++ + if !tagSet.Contains(string(tagBytes)) { + tagSet.Add(string(tagBytes)) + } + } + if len(s.Policy) > 0 { + plcy, err := bktpolicy.ParseConfig(bytes.NewReader(s.Policy), b) + if err != nil { + continue + } + policies = append(policies, plcy) + } + if s.ObjectLockConfig != nil { + olockCfgCount++ + if !olockConfigSet.Contains(*s.ObjectLockConfig) { + olockConfigSet.Add(*s.ObjectLockConfig) + } + } + if s.SSEConfig != nil { + if !sseCfgSet.Contains(*s.SSEConfig) { + sseCfgSet.Add(*s.SSEConfig) + } + sseCfgCount++ + } + ss, ok := info.StatsSummary[s.DeploymentID] + if !ok { + ss = madmin.SRSiteSummary{} + } + // increment total number of replicated buckets + if len(slc) == numSites { + ss.ReplicatedBuckets++ + } + ss.TotalBucketsCount++ + if tagCount > 0 { + ss.TotalTagsCount++ + } + if olockCfgCount > 0 { + ss.TotalLockConfigCount++ + } + if sseCfgCount > 0 { + ss.TotalSSEConfigCount++ + } + if len(policies) > 0 { + ss.TotalBucketPoliciesCount++ + } + info.StatsSummary[s.DeploymentID] = ss + } + tagMismatch := !isReplicated(tagCount, numSites, tagSet) + olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet) + sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet) + policyMismatch := !isBktPolicyReplicated(numSites, policies) + replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs) + switch { + case tagMismatch, olockCfgMismatch, sseCfgMismatch, policyMismatch, replCfgMismatch: + info.BucketMismatches[b] = make(map[string]madmin.SRBucketStatsSummary, numSites) + for _, s := range slc { + dID := depIdxes[s.DeploymentID] + _, hasBucket := sris[dID].Buckets[s.Bucket] + info.BucketMismatches[b][s.DeploymentID] = madmin.SRBucketStatsSummary{ + DeploymentID: s.DeploymentID, + HasBucket: hasBucket, + TagMismatch: tagMismatch, + OLockConfigMismatch: olockCfgMismatch, + SSEConfigMismatch: sseCfgMismatch, + PolicyMismatch: policyMismatch, + ReplicationCfgMismatch: replCfgMismatch, + HasReplicationCfg: len(replCfgs) > 0, + } + } + fallthrough + default: + // no mismatch + for _, s := range slc { + sum := info.StatsSummary[s.DeploymentID] + if !olockCfgMismatch && olockCfgCount == numSites { + sum.ReplicatedLockConfig++ + } + if !sseCfgMismatch && sseCfgCount == numSites { + sum.ReplicatedSSEConfig++ + } + if !policyMismatch && len(policies) == numSites { + sum.ReplicatedBucketPolicies++ + } + if !tagMismatch && tagCount == numSites { + sum.ReplicatedTags++ + } + info.StatsSummary[s.DeploymentID] = sum + } + } + } + // maximum buckets users etc seen across sites + info.MaxBuckets = len(bucketStats) + info.MaxUsers = len(userPolicyStats) + info.MaxGroups = len(groupPolicyStats) + info.MaxPolicies = len(policyStats) + return +} + +// isReplicated returns true if count of replicated matches the number of +// sites and there is atmost one unique entry in the set. +func isReplicated(cntReplicated, total int, valSet set.StringSet) bool { + if cntReplicated > 0 && cntReplicated < total { + return false + } + if len(valSet) > 1 { + // mismatch - one or more sites has differing tags/policy + return false + } + return true +} + +// isIAMPolicyReplicated returns true if count of replicated IAM policies matches total +// number of sites and IAM policies are identical. +func isIAMPolicyReplicated(cntReplicated, total int, policies []*iampolicy.Policy) bool { + if cntReplicated > 0 && cntReplicated != total { + return false + } + // check if policies match between sites + var prev *iampolicy.Policy + for i, p := range policies { + if i == 0 { + prev = p + continue + } + if !prev.Equals(*p) { + return false + } + } + return true +} + +// isBktPolicyReplicated returns true if count of replicated bucket policies matches total +// number of sites and bucket policies are identical. +func isBktPolicyReplicated(total int, policies []*bktpolicy.Policy) bool { + if len(policies) > 0 && len(policies) != total { + return false + } + // check if policies match between sites + var prev *bktpolicy.Policy + for i, p := range policies { + if i == 0 { + prev = p + continue + } + if !prev.Equals(*p) { + return false + } + } + return true +} + +// isBktReplCfgReplicated returns true if all the sites have same number +// of replication rules with all replication features enabled. +func isBktReplCfgReplicated(total int, cfgs []*sreplication.Config) bool { + cntReplicated := len(cfgs) + if cntReplicated > 0 && cntReplicated != len(cfgs) { + return false + } + // check if policies match between sites + var prev *sreplication.Config + for i, c := range cfgs { + if i == 0 { + prev = c + continue + } + if len(prev.Rules) != len(c.Rules) { + return false + } + if len(c.Rules) != total-1 { + return false + } + for _, r := range c.Rules { + if !strings.HasPrefix(r.ID, "site-repl-") { + return false + } + if r.DeleteMarkerReplication.Status == sreplication.Disabled || + r.DeleteReplication.Status == sreplication.Disabled || + r.ExistingObjectReplication.Status == sreplication.Disabled || + r.SourceSelectionCriteria.ReplicaModifications.Status == sreplication.Disabled { + return false + } + } + } + return true +} + +// SiteReplicationMetaInfo returns the metadata info on buckets, policies etc for the replicated site +func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI ObjectLayer) (info madmin.SRInfo, err error) { + if objAPI == nil { + return info, errSRObjectLayerNotReady + } + + c.RLock() + defer c.RUnlock() + if !c.enabled { + return info, nil + } + buckets, err := objAPI.ListBuckets(ctx) + if err != nil { + return info, errSRBackendIssue(err) + } + info.DeploymentID = globalDeploymentID + + info.Buckets = make(map[string]madmin.SRBucketInfo, len(buckets)) + for _, bucketInfo := range buckets { + bucket := bucketInfo.Name + bms := madmin.SRBucketInfo{Bucket: bucket} + // Get bucket policy if present. + policy, err := globalPolicySys.Get(bucket) + found := true + if _, ok := err.(BucketPolicyNotFound); ok { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + policyJSON, err := json.Marshal(policy) + if err != nil { + return info, wrapSRErr(err) + } + bms.Policy = policyJSON + } + + // Get bucket tags if present. + tags, err := globalBucketMetadataSys.GetTaggingConfig(bucket) + found = true + if _, ok := err.(BucketTaggingNotFound); ok { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + tagBytes, err := xml.Marshal(tags) + if err != nil { + return info, wrapSRErr(err) + } + tagCfgStr := base64.StdEncoding.EncodeToString(tagBytes) + bms.Tags = &tagCfgStr + } + + // Get object-lock config if present. + objLockCfg, err := globalBucketMetadataSys.GetObjectLockConfig(bucket) + found = true + if _, ok := err.(BucketObjectLockConfigNotFound); ok { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + objLockCfgData, err := xml.Marshal(objLockCfg) + if err != nil { + return info, wrapSRErr(err) + } + objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData) + bms.ObjectLockConfig = &objLockStr + } + + // Get existing bucket bucket encryption settings + sseConfig, err := globalBucketMetadataSys.GetSSEConfig(bucket) + found = true + if _, ok := err.(BucketSSEConfigNotFound); ok { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + sseConfigData, err := xml.Marshal(sseConfig) + if err != nil { + return info, wrapSRErr(err) + } + sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData) + bms.SSEConfig = &sseConfigStr + } + // Get replication config if present + rcfg, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) + found = true + if _, ok := err.(BucketReplicationConfigNotFound); ok { + found = false + } else if err != nil { + return info, errSRBackendIssue(err) + } + if found { + rcfgXML, err := xml.Marshal(rcfg) + if err != nil { + return info, wrapSRErr(err) + } + rcfgXMLStr := base64.StdEncoding.EncodeToString(rcfgXML) + bms.ReplicationConfig = &rcfgXMLStr + } + info.Buckets[bucket] = bms + } + + { + // Replicate IAM policies on local to all peers. + allPolicies, err := globalIAMSys.ListPolicies(ctx, "") + if err != nil { + return info, errSRBackendIssue(err) + } + info.Policies = make(map[string]json.RawMessage, len(allPolicies)) + for pname, policy := range allPolicies { + policyJSON, err := json.Marshal(policy) + if err != nil { + return info, wrapSRErr(err) + } + info.Policies[pname] = json.RawMessage(policyJSON) + } + } + + { + // Replicate policy mappings on local to all peers. + userPolicyMap := make(map[string]MappedPolicy) + groupPolicyMap := make(map[string]MappedPolicy) + globalIAMSys.store.rlock() + errU := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap) + errG := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap) + globalIAMSys.store.runlock() + if errU != nil { + return info, errSRBackendIssue(errU) + } + if errG != nil { + return info, errSRBackendIssue(errG) + } + info.UserPolicies = make(map[string]madmin.SRPolicyMapping, len(userPolicyMap)) + info.GroupPolicies = make(map[string]madmin.SRPolicyMapping, len(c.state.Peers)) + for user, mp := range userPolicyMap { + info.UserPolicies[user] = madmin.SRPolicyMapping{ + IsGroup: false, + UserOrGroup: user, + Policy: mp.Policies, + } + } + + for group, mp := range groupPolicyMap { + info.UserPolicies[group] = madmin.SRPolicyMapping{ + IsGroup: true, + UserOrGroup: group, + Policy: mp.Policies, + } + } + } + return info, nil +}