// Copyright (c) 2015-2022 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package cmd import ( "bytes" "context" "encoding/base64" "encoding/binary" "encoding/json" "encoding/xml" "errors" "fmt" "net/url" "reflect" "runtime" "sort" "strings" "sync" "time" "github.com/minio/madmin-go/v2" minioClient "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/replication" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/auth" sreplication "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/logger" bktpolicy "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" ) const ( srStatePrefix = minioConfigPrefix + "/site-replication" srStateFile = "state.json" ) const ( srStateFormatVersion1 = 1 ) var ( errSRCannotJoin = SRError{ Cause: errors.New("this site is already configured for site-replication"), Code: ErrSiteReplicationInvalidRequest, } errSRDuplicateSites = SRError{ Cause: errors.New("duplicate sites provided for site-replication"), Code: ErrSiteReplicationInvalidRequest, } errSRSelfNotFound = SRError{ Cause: errors.New("none of the given sites correspond to the current one"), Code: ErrSiteReplicationInvalidRequest, } errSRPeerNotFound = SRError{ Cause: errors.New("peer not found"), Code: ErrSiteReplicationInvalidRequest, } errSRRequestorNotFound = SRError{ Cause: errors.New("requesting site not found in site replication config"), Code: ErrSiteReplicationInvalidRequest, } errSRNotEnabled = SRError{ Cause: errors.New("site replication is not enabled"), Code: ErrSiteReplicationInvalidRequest, } errSRResyncStarted = SRError{ Cause: errors.New("site replication resync is already in progress"), Code: ErrSiteReplicationInvalidRequest, } errSRResyncCanceled = SRError{ Cause: errors.New("site replication resync is already canceled"), Code: ErrSiteReplicationInvalidRequest, } errSRNoResync = SRError{ Cause: errors.New("no resync in progress"), Code: ErrSiteReplicationInvalidRequest, } errSRResyncToSelf = SRError{ Cause: errors.New("invalid peer specified - cannot resync to self"), Code: ErrSiteReplicationInvalidRequest, } ) func errSRInvalidRequest(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationInvalidRequest, } } func errSRPeerResp(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationPeerResp, } } func errSRBackendIssue(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationBackendIssue, } } func errSRServiceAccount(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationServiceAccountError, } } func errSRBucketConfigError(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationBucketConfigError, } } func errSRBucketMetaError(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationBucketMetaError, } } func errSRIAMError(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationIAMError, } } func errSRConfigMissingError(err error) SRError { return SRError{ Cause: err, Code: ErrSiteReplicationConfigMissing, } } var errSRObjectLayerNotReady = SRError{ Cause: fmt.Errorf("object layer not ready"), Code: ErrServerNotInitialized, } func getSRStateFilePath() string { return srStatePrefix + SlashSeparator + srStateFile } // SRError - wrapped error for site replication. type SRError struct { Cause error Code APIErrorCode } func (c SRError) Error() string { if c.Cause != nil { return c.Cause.Error() } return "" } func (c SRError) Unwrap() error { return c.Cause } func wrapSRErr(err error) SRError { return SRError{Cause: err, Code: ErrInternalError} } // SiteReplicationSys - manages cluster-level replication. type SiteReplicationSys struct { sync.RWMutex enabled bool // In-memory and persisted multi-site replication state. state srState iamMetaCache srIAMCache } type srState srStateV1 // srStateV1 represents version 1 of the site replication state persistence // format. type srStateV1 struct { Name string `json:"name"` // Peers maps peers by their deploymentID Peers map[string]madmin.PeerInfo `json:"peers"` ServiceAccountAccessKey string `json:"serviceAccountAccessKey"` } // srStateData represents the format of the current `srStateFile`. type srStateData struct { Version int `json:"version"` SRState srStateV1 `json:"srState"` } // Init - initialize the site replication manager. func (c *SiteReplicationSys) Init(ctx context.Context, objAPI ObjectLayer) error { go c.startHealRoutine(ctx, objAPI) err := c.loadFromDisk(ctx, objAPI) if err == errConfigNotFound { return nil } c.RLock() defer c.RUnlock() if c.enabled { logger.Info("Cluster replication initialized") } return err } func (c *SiteReplicationSys) loadFromDisk(ctx context.Context, objAPI ObjectLayer) error { buf, err := readConfig(ctx, objAPI, getSRStateFilePath()) if err != nil { if errors.Is(err, errConfigNotFound) { c.Lock() defer c.Unlock() c.state = srState{} c.enabled = false } return err } // attempt to read just the version key in the state file to ensure we // are reading a compatible version. var ver struct { Version int `json:"version"` } err = json.Unmarshal(buf, &ver) if err != nil { return err } if ver.Version != srStateFormatVersion1 { return fmt.Errorf("Unexpected ClusterRepl state version: %d", ver.Version) } var sdata srStateData err = json.Unmarshal(buf, &sdata) if err != nil { return err } c.Lock() defer c.Unlock() c.state = srState(sdata.SRState) c.enabled = len(c.state.Peers) != 0 return nil } func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) error { sdata := srStateData{ Version: srStateFormatVersion1, SRState: srStateV1(state), } buf, err := json.Marshal(sdata) if err != nil { return err } objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } if err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf); err != nil { return err } for _, err := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) { logger.LogIf(ctx, err) } c.Lock() defer c.Unlock() c.state = state c.enabled = len(c.state.Peers) != 0 return nil } func (c *SiteReplicationSys) removeFromDisk(ctx context.Context) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } if err := deleteConfig(ctx, objAPI, getSRStateFilePath()); err != nil { return err } for _, err := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) { logger.LogIf(ctx, err) } c.Lock() defer c.Unlock() c.state = srState{} c.enabled = false return nil } const ( // Access key of service account used for perform cluster-replication // operations. siteReplicatorSvcAcc = "site-replicator-0" ) // PeerSiteInfo is a wrapper struct around madmin.PeerSite with extra info on site status type PeerSiteInfo struct { madmin.PeerSite self bool DeploymentID string Replicated bool // true if already participating in site replication Empty bool // true if cluster has no buckets } // getSiteStatuses gathers more info on the sites being added func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites ...madmin.PeerSite) (psi []PeerSiteInfo, err error) { psi = make([]PeerSiteInfo, 0, len(sites)) for _, v := range sites { admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) if err != nil { return psi, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) } info, err := admClient.ServerInfo(ctx) if err != nil { return psi, errSRPeerResp(fmt.Errorf("unable to fetch server info for %s: %w", v.Name, err)) } s3Client, err := getS3Client(v) if err != nil { return psi, errSRPeerResp(fmt.Errorf("unable to create s3 client for %s: %w", v.Name, err)) } buckets, err := s3Client.ListBuckets(ctx) if err != nil { return psi, errSRPeerResp(fmt.Errorf("unable to list buckets for %s: %v", v.Name, err)) } psi = append(psi, PeerSiteInfo{ PeerSite: v, DeploymentID: info.DeploymentID, Empty: len(buckets) == 0, self: info.DeploymentID == globalDeploymentID, }) } return } // AddPeerClusters - add cluster sites for replication configuration. func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, error) { sites, serr := c.getSiteStatuses(ctx, psites...) if serr != nil { return madmin.ReplicateAddStatus{}, serr } var ( currSites madmin.SiteReplicationInfo currDeploymentIDsSet = set.NewStringSet() err error ) currSites, err = c.GetClusterInfo(ctx) if err != nil { return madmin.ReplicateAddStatus{}, errSRBackendIssue(err) } for _, v := range currSites.Sites { currDeploymentIDsSet.Add(v.DeploymentID) } deploymentIDsSet := set.NewStringSet() localHasBuckets := false nonLocalPeerWithBuckets := "" selfIdx := -1 for i, v := range sites { // deploymentIDs must be unique if deploymentIDsSet.Contains(v.DeploymentID) { return madmin.ReplicateAddStatus{}, errSRDuplicateSites } deploymentIDsSet.Add(v.DeploymentID) if v.self { selfIdx = i localHasBuckets = !v.Empty continue } if !v.Empty && !currDeploymentIDsSet.Contains(v.DeploymentID) { nonLocalPeerWithBuckets = v.Name } } if selfIdx == -1 { return madmin.ReplicateAddStatus{}, errSRBackendIssue(fmt.Errorf("global deployment ID %s mismatch, expected one of %s", globalDeploymentID, deploymentIDsSet)) } if !currDeploymentIDsSet.IsEmpty() { // If current cluster is already SR enabled and no new site being added ,fail. if currDeploymentIDsSet.Equals(deploymentIDsSet) { return madmin.ReplicateAddStatus{}, errSRCannotJoin } if len(currDeploymentIDsSet.Intersection(deploymentIDsSet)) != len(currDeploymentIDsSet) { diffSlc := getMissingSiteNames(currDeploymentIDsSet, deploymentIDsSet, currSites.Sites) return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("all existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " "))) } } // validate that all clusters are using the same IDP settings. pass, err := c.validateIDPSettings(ctx, sites) if err != nil { return madmin.ReplicateAddStatus{}, err } if !pass { return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("all cluster sites must have the same IAM/IDP settings")) } // For this `add` API, either all clusters must be empty or the local // cluster must be the only one having some buckets. if localHasBuckets && nonLocalPeerWithBuckets != "" { return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("only one cluster may have data when configuring site replication")) } if !localHasBuckets && nonLocalPeerWithBuckets != "" { return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets)) } // FIXME: Ideally, we also need to check if there are any global IAM // policies and any (LDAP user created) service accounts on the other // peer clusters, and if so, reject the cluster replicate add request. // This is not yet implemented. // VALIDATIONS COMPLETE. // Create a common service account for all clusters, with root // permissions. // Create a local service account. // Generate a secret key for the service account if not created already. var secretKey string var svcCred auth.Credentials sa, _, err := globalIAMSys.getServiceAccount(ctx, siteReplicatorSvcAcc) switch { case err == errNoSuchServiceAccount: _, secretKey, err = auth.GenerateCredentials() if err != nil { return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err)) } svcCred, _, err = globalIAMSys.NewServiceAccount(ctx, sites[selfIdx].AccessKey, nil, newServiceAccountOpts{ accessKey: siteReplicatorSvcAcc, secretKey: secretKey, allowSiteReplicatorAccount: true, }) if err != nil { return madmin.ReplicateAddStatus{}, errSRServiceAccount(fmt.Errorf("unable to create local service account: %w", err)) } case err == nil: svcCred = sa.Credentials secretKey = svcCred.SecretKey default: return madmin.ReplicateAddStatus{}, errSRBackendIssue(err) } joinReq := madmin.SRPeerJoinReq{ SvcAcctAccessKey: svcCred.AccessKey, SvcAcctSecretKey: secretKey, Peers: make(map[string]madmin.PeerInfo), } for _, v := range sites { joinReq.Peers[v.DeploymentID] = madmin.PeerInfo{ Endpoint: v.Endpoint, Name: v.Name, DeploymentID: v.DeploymentID, } } addedCount := 0 var ( peerAddErr error admClient *madmin.AdminClient ) for _, v := range sites { if v.self { continue } switch { case currDeploymentIDsSet.Contains(v.DeploymentID): admClient, err = c.getAdminClient(ctx, v.DeploymentID) default: admClient, err = getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) } if err != nil { peerAddErr = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) break } joinReq.SvcAcctParent = v.AccessKey err = admClient.SRPeerJoin(ctx, joinReq) if err != nil { peerAddErr = errSRPeerResp(fmt.Errorf("unable to link with peer %s: %w", v.Name, err)) break } addedCount++ } if peerAddErr != nil { if addedCount == 0 { return madmin.ReplicateAddStatus{}, peerAddErr } // In this case, it means at least one cluster was added // successfully, we need to send a response to the client with // some details - FIXME: the disks on this cluster would need to // be cleaned to recover. partial := madmin.ReplicateAddStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: peerAddErr.Error(), } return partial, nil } // Other than handling existing buckets, we can now save the cluster // replication configuration state. state := srState{ Name: sites[selfIdx].Name, Peers: joinReq.Peers, ServiceAccountAccessKey: svcCred.AccessKey, } if err = c.saveToDisk(ctx, state); err != nil { return madmin.ReplicateAddStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), }, nil } result := madmin.ReplicateAddStatus{ Success: true, Status: madmin.ReplicateAddStatusSuccess, } if err := c.syncToAllPeers(ctx); err != nil { result.InitialSyncErrorMessage = err.Error() } return result, nil } // PeerJoinReq - internal API handler to respond to a peer cluster's request to join. func (c *SiteReplicationSys) PeerJoinReq(ctx context.Context, arg madmin.SRPeerJoinReq) error { var ourName string for d, p := range arg.Peers { if d == globalDeploymentID { ourName = p.Name break } } if ourName == "" { return errSRSelfNotFound } _, _, err := globalIAMSys.GetServiceAccount(ctx, arg.SvcAcctAccessKey) if err == errNoSuchServiceAccount { _, _, err = globalIAMSys.NewServiceAccount(ctx, arg.SvcAcctParent, nil, newServiceAccountOpts{ accessKey: arg.SvcAcctAccessKey, secretKey: arg.SvcAcctSecretKey, allowSiteReplicatorAccount: arg.SvcAcctAccessKey == siteReplicatorSvcAcc, }) } if err != nil { return errSRServiceAccount(fmt.Errorf("unable to create service account on %s: %v", ourName, err)) } state := srState{ Name: ourName, Peers: arg.Peers, ServiceAccountAccessKey: arg.SvcAcctAccessKey, } if err = c.saveToDisk(ctx, state); err != nil { return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to drive on %s: %v", ourName, err)) } return nil } // GetIDPSettings returns info about the configured identity provider. It is // used to validate that all peers have the same IDP. func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSettings { s := madmin.IDPSettings{} s.LDAP = madmin.LDAPSettings{ IsLDAPEnabled: globalIAMSys.LDAPConfig.Enabled(), LDAPUserDNSearchBase: globalIAMSys.LDAPConfig.LDAP.UserDNSearchBaseDistName, LDAPUserDNSearchFilter: globalIAMSys.LDAPConfig.LDAP.UserDNSearchFilter, LDAPGroupSearchBase: globalIAMSys.LDAPConfig.LDAP.GroupSearchBaseDistName, LDAPGroupSearchFilter: globalIAMSys.LDAPConfig.LDAP.GroupSearchFilter, } s.OpenID = globalIAMSys.OpenIDConfig.GetSettings() if s.OpenID.Enabled { s.OpenID.Region = globalSite.Region } return s } func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, error) { s := make([]madmin.IDPSettings, 0, len(peers)) for _, v := range peers { if v.self { s = append(s, c.GetIDPSettings(ctx)) continue } admClient, err := getAdminClient(v.Endpoint, v.AccessKey, v.SecretKey) if err != nil { return false, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) } is, err := admClient.SRPeerGetIDPSettings(ctx) if err != nil { return false, errSRPeerResp(fmt.Errorf("unable to fetch IDP settings from %s: %v", v.Name, err)) } s = append(s, is) } for i := 1; i < len(s); i++ { if !reflect.DeepEqual(s[i], s[0]) { return false, nil } } return true, nil } // GetClusterInfo - returns site replication information. func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.SiteReplicationInfo, err error) { c.RLock() defer c.RUnlock() if !c.enabled { return info, nil } info.Enabled = true info.Name = c.state.Name info.Sites = make([]madmin.PeerInfo, 0, len(c.state.Peers)) for _, peer := range c.state.Peers { info.Sites = append(info.Sites, peer) } sort.Slice(info.Sites, func(i, j int) bool { return info.Sites[i].Name < info.Sites[j].Name }) info.ServiceAccountAccessKey = c.state.ServiceAccountAccessKey return info, nil } const ( makeBucketWithVersion = "MakeBucketWithVersioning" configureReplication = "ConfigureReplication" deleteBucket = "DeleteBucket" replicateIAMItem = "SRPeerReplicateIAMItem" replicateBucketMetadata = "SRPeerReplicateBucketMeta" ) // MakeBucketHook - called during a regular make bucket call when cluster // replication is enabled. It is responsible for the creation of the same bucket // on remote clusters, and creating replication rules on local and peer // clusters. func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, opts MakeBucketOptions) error { // At this point, the local bucket is created. c.RLock() defer c.RUnlock() if !c.enabled { return nil } optsMap := make(map[string]string) if opts.LockEnabled { optsMap["lockEnabled"] = "true" optsMap["versioningEnabled"] = "true" } if opts.VersioningEnabled { optsMap["versioningEnabled"] = "true" } if opts.ForceCreate { optsMap["forceCreate"] = "true" } createdAt, _ := globalBucketMetadataSys.CreatedAt(bucket) optsMap["createdAt"] = createdAt.UTC().Format(time.RFC3339Nano) opts.CreatedAt = createdAt // Create bucket and enable versioning on all peers. makeBucketConcErr := c.concDo( func() error { return c.annotateErr(makeBucketWithVersion, c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)) }, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) if err != nil { return err } return c.annotatePeerErr(p.Name, makeBucketWithVersion, admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)) }, makeBucketWithVersion, ) // Create bucket remotes and add replication rules for the bucket on self and peers. makeRemotesConcErr := c.concDo( func() error { return c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket)) }, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) if err != nil { return err } return c.annotatePeerErr(p.Name, configureReplication, admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)) }, configureReplication, ) if err := errors.Unwrap(makeBucketConcErr); err != nil { return err } if err := errors.Unwrap(makeRemotesConcErr); err != nil { return err } return nil } // DeleteBucketHook - called during a regular delete bucket call when cluster // replication is enabled. It is responsible for the deletion of the same bucket // on remote clusters. func (c *SiteReplicationSys) DeleteBucketHook(ctx context.Context, bucket string, forceDelete bool) error { // At this point, the local bucket is deleted. c.RLock() defer c.RUnlock() if !c.enabled { return nil } op := madmin.DeleteBucketBktOp if forceDelete { op = madmin.ForceDeleteBucketBktOp } // Send bucket delete to other clusters. cerr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) if err != nil { return wrapSRErr(err) } return c.annotatePeerErr(p.Name, deleteBucket, admClient.SRPeerBucketOps(ctx, bucket, op, nil)) }, deleteBucket, ) return errors.Unwrap(cerr) } // PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning. func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Context, bucket string, opts MakeBucketOptions) error { objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } err := objAPI.MakeBucket(ctx, bucket, opts) if err != nil { // Check if this is a bucket exists error. _, ok1 := err.(BucketExists) _, ok2 := err.(BucketAlreadyExists) if !ok1 && !ok2 { return wrapSRErr(c.annotateErr(makeBucketWithVersion, err)) } } else { // Load updated bucket metadata into memory as new // bucket was created. globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) } meta, err := globalBucketMetadataSys.Get(bucket) if err != nil { return wrapSRErr(c.annotateErr(makeBucketWithVersion, err)) } meta.SetCreatedAt(opts.CreatedAt) meta.VersioningConfigXML = enabledBucketVersioningConfig if opts.LockEnabled { meta.ObjectLockConfigXML = enabledBucketObjectLockConfig } if err := meta.Save(context.Background(), objAPI); err != nil { return wrapSRErr(err) } globalBucketMetadataSys.Set(bucket, meta) // Load updated bucket metadata into memory as new metadata updated. globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) return nil } // PeerBucketConfigureReplHandler - configures replication remote and // replication rules to all other peers for the local bucket. func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, bucket string) error { creds, err := c.getPeerCreds() if err != nil { return wrapSRErr(err) } // The following function, creates a bucket remote and sets up a bucket // replication rule for the given peer. configurePeerFn := func(d string, peer madmin.PeerInfo) error { // Create bucket replication rule to this peer. // To add the bucket replication rule, we fetch the current // server configuration, and convert it to minio-go's // replication configuration type (by converting to xml and // parsing it back), use minio-go's add rule function, and // finally convert it back to the server type (again via xml). // This is needed as there is no add-rule function in the server // yet. // Though we do not check if the rule already exists, this is // not a problem as we are always using the same replication // rule ID - if the rule already exists, it is just replaced. replicationConfigS, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) if err != nil { _, ok := err.(BucketReplicationConfigNotFound) if !ok { return err } } var replicationConfig replication.Config if replicationConfigS != nil { replCfgSBytes, err := xml.Marshal(replicationConfigS) if err != nil { return err } err = xml.Unmarshal(replCfgSBytes, &replicationConfig) if err != nil { return err } } var ( ruleID = fmt.Sprintf("site-repl-%s", d) hasRule bool ) var ruleARN string for _, r := range replicationConfig.Rules { if r.ID == ruleID { hasRule = true ruleARN = r.Destination.Bucket } } ep, _ := url.Parse(peer.Endpoint) var targets []madmin.BucketTarget if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil { targets = targetsPtr.Targets } targetARN := "" var updateTgt bool var targetToUpdate madmin.BucketTarget for _, target := range targets { if target.Arn == ruleARN { targetARN = ruleARN if target.URL().String() != peer.Endpoint { updateTgt = true targetToUpdate = target } break } } // replication config had a stale target ARN - update the endpoint if updateTgt { targetToUpdate.Endpoint = ep.Host targetToUpdate.Secure = ep.Scheme == "https" targetToUpdate.Credentials = &madmin.Credentials{ AccessKey: creds.AccessKey, SecretKey: creds.SecretKey, } if !peer.SyncState.Empty() { targetToUpdate.ReplicationSync = (peer.SyncState == madmin.SyncEnabled) } err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true) if err != nil { return c.annotatePeerErr(peer.Name, "Bucket target update error", err) } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { return wrapSRErr(err) } tgtBytes, err := json.Marshal(&targets) if err != nil { return wrapSRErr(err) } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { return wrapSRErr(err) } } // no replication rule for this peer or target ARN missing in bucket targets if targetARN == "" { bucketTarget := madmin.BucketTarget{ SourceBucket: bucket, Endpoint: ep.Host, Credentials: &madmin.Credentials{ AccessKey: creds.AccessKey, SecretKey: creds.SecretKey, }, TargetBucket: bucket, Secure: ep.Scheme == "https", API: "s3v4", Type: madmin.ReplicationService, Region: "", ReplicationSync: peer.SyncState == madmin.SyncEnabled, } var exists bool // true if ARN already exists bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID) if !exists { // persist newly generated ARN to targets and metadata on disk err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) if err != nil { return c.annotatePeerErr(peer.Name, "Bucket target creation error", err) } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { return err } tgtBytes, err := json.Marshal(&targets) if err != nil { return err } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { return err } } targetARN = bucketTarget.Arn } opts := replication.Options{ // Set the ID so we can identify the rule as being // created for site-replication and include the // destination cluster's deployment ID. ID: ruleID, // Use a helper to generate unique priority numbers. Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)), Op: replication.AddOption, RuleStatus: "enable", DestBucket: targetARN, // Replicate everything! ReplicateDeletes: "enable", ReplicateDeleteMarkers: "enable", ReplicaSync: "enable", ExistingObjectReplicate: "enable", } switch { case hasRule: if ruleARN != opts.DestBucket { // remove stale replication rule and replace rule with correct target ARN if len(replicationConfig.Rules) > 1 { err = replicationConfig.RemoveRule(opts) } else { replicationConfig = replication.Config{} } if err == nil { err = replicationConfig.AddRule(opts) } } else { err = replicationConfig.EditRule(opts) } default: err = replicationConfig.AddRule(opts) } if err != nil { return c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err) } // Now convert the configuration back to server's type so we can // do some validation. newReplCfgBytes, err := xml.Marshal(replicationConfig) if err != nil { return err } newReplicationConfig, err := sreplication.ParseConfig(bytes.NewReader(newReplCfgBytes)) if err != nil { return err } sameTarget, apiErr := validateReplicationDestination(ctx, bucket, newReplicationConfig, true) if apiErr != noError { return fmt.Errorf("bucket replication config validation error: %#v", apiErr) } err = newReplicationConfig.Validate(bucket, sameTarget) if err != nil { return err } // Config looks good, so we save it. replCfgData, err := xml.Marshal(newReplicationConfig) if err != nil { return err } _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData) return c.annotatePeerErr(peer.Name, "Error updating replication configuration", err) } c.RLock() defer c.RUnlock() errMap := make(map[string]error, len(c.state.Peers)) for d, peer := range c.state.Peers { if d == globalDeploymentID { continue } errMap[d] = configurePeerFn(d, peer) } return c.toErrorFromErrMap(errMap, configureReplication) } // PeerBucketDeleteHandler - deletes bucket on local in response to a delete // bucket request from a peer. func (c *SiteReplicationSys) PeerBucketDeleteHandler(ctx context.Context, bucket string, opts DeleteBucketOptions) error { c.RLock() defer c.RUnlock() if !c.enabled { return errSRNotEnabled } objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } if globalDNSConfig != nil { if err := globalDNSConfig.Delete(bucket); err != nil { return err } } err := objAPI.DeleteBucket(ctx, bucket, opts) if err != nil { if globalDNSConfig != nil { if err2 := globalDNSConfig.Put(bucket); err2 != nil { logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2)) } } return err } globalNotificationSys.DeleteBucketMetadata(ctx, bucket) return nil } // IAMChangeHook - called when IAM items need to be replicated to peer clusters. // This includes named policy creation, policy mapping changes and service // account changes. // // All policies are replicated. // // Policy mappings are only replicated when they are for LDAP users or groups // (as an external IDP is always assumed when SR is used). In the case of // OpenID, such mappings are provided from the IDP directly and so are not // applicable here. // // Service accounts are replicated as long as they are not meant for the root // user. // // STS accounts are replicated, but only if the session token is verifiable // using the local cluster's root credential. func (c *SiteReplicationSys) IAMChangeHook(ctx context.Context, item madmin.SRIAMItem) error { // The IAM item has already been applied to the local cluster at this // point, and only needs to be updated on all remote peer clusters. c.RLock() defer c.RUnlock() if !c.enabled { return nil } cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, d) if err != nil { return wrapSRErr(err) } return c.annotatePeerErr(p.Name, replicateIAMItem, admClient.SRPeerReplicateIAMItem(ctx, item)) }, replicateIAMItem, ) return errors.Unwrap(cerr) } // PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument, // causes the named policy to be deleted. func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyName string, p *iampolicy.Policy, updatedAt time.Time) error { var err error // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if p, err := globalIAMSys.store.GetPolicyDoc(policyName); err == nil && p.UpdateDate.After(updatedAt) { return nil } } if p == nil { err = globalIAMSys.DeletePolicy(ctx, policyName, true) } else { _, err = globalIAMSys.SetPolicy(ctx, policyName, *p) } if err != nil { return wrapSRErr(err) } return nil } // PeerIAMUserChangeHandler - copies IAM user to local. func (c *SiteReplicationSys) PeerIAMUserChangeHandler(ctx context.Context, change *madmin.SRIAMUser, updatedAt time.Time) error { if change == nil { return errSRInvalidRequest(errInvalidArgument) } // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if ui, err := globalIAMSys.GetUserInfo(ctx, change.AccessKey); err == nil && ui.UpdatedAt.After(updatedAt) { return nil } } var err error if change.IsDeleteReq { err = globalIAMSys.DeleteUser(ctx, change.AccessKey, true) } else { if change.UserReq == nil { return errSRInvalidRequest(errInvalidArgument) } userReq := *change.UserReq if userReq.Status != "" && userReq.SecretKey == "" { // Status is set without secretKey updates means we are // only changing the account status. _, err = globalIAMSys.SetUserStatus(ctx, change.AccessKey, userReq.Status) } else { _, err = globalIAMSys.CreateUser(ctx, change.AccessKey, userReq) } } if err != nil { return wrapSRErr(err) } return nil } // PeerGroupInfoChangeHandler - copies group changes to local. func (c *SiteReplicationSys) PeerGroupInfoChangeHandler(ctx context.Context, change *madmin.SRGroupInfo, updatedAt time.Time) error { if change == nil { return errSRInvalidRequest(errInvalidArgument) } updReq := change.UpdateReq var err error // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if gd, err := globalIAMSys.GetGroupDescription(updReq.Group); err == nil && gd.UpdatedAt.After(updatedAt) { return nil } } if updReq.IsRemove { _, err = globalIAMSys.RemoveUsersFromGroup(ctx, updReq.Group, updReq.Members) } else { if updReq.Status != "" && len(updReq.Members) == 0 { _, err = globalIAMSys.SetGroupStatus(ctx, updReq.Group, updReq.Status == madmin.GroupEnabled) } else { _, err = globalIAMSys.AddUsersToGroup(ctx, updReq.Group, updReq.Members) if err == nil && updReq.Status != madmin.GroupEnabled { _, err = globalIAMSys.SetGroupStatus(ctx, updReq.Group, updReq.Status == madmin.GroupEnabled) } } } if err != nil { return wrapSRErr(err) } return nil } // PeerSvcAccChangeHandler - copies service-account change to local. func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change *madmin.SRSvcAccChange, updatedAt time.Time) error { if change == nil { return errSRInvalidRequest(errInvalidArgument) } switch { case change.Create != nil: var sp *iampolicy.Policy var err error if len(change.Create.SessionPolicy) > 0 { sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Create.SessionPolicy)) if err != nil { return wrapSRErr(err) } } // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() && change.Create.AccessKey != "" { if sa, _, err := globalIAMSys.getServiceAccount(ctx, change.Create.AccessKey); err == nil && sa.UpdatedAt.After(updatedAt) { return nil } } opts := newServiceAccountOpts{ accessKey: change.Create.AccessKey, secretKey: change.Create.SecretKey, sessionPolicy: sp, claims: change.Create.Claims, name: change.Create.Name, description: change.Create.Description, expiration: change.Create.Expiration, } _, _, err = globalIAMSys.NewServiceAccount(ctx, change.Create.Parent, change.Create.Groups, opts) if err != nil { return wrapSRErr(err) } case change.Update != nil: var sp *iampolicy.Policy var err error if len(change.Update.SessionPolicy) > 0 { sp, err = iampolicy.ParseConfig(bytes.NewReader(change.Update.SessionPolicy)) if err != nil { return wrapSRErr(err) } } // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if sa, _, err := globalIAMSys.getServiceAccount(ctx, change.Update.AccessKey); err == nil && sa.UpdatedAt.After(updatedAt) { return nil } } opts := updateServiceAccountOpts{ secretKey: change.Update.SecretKey, status: change.Update.Status, name: change.Update.Name, description: change.Update.Description, sessionPolicy: sp, expiration: change.Update.Expiration, } _, err = globalIAMSys.UpdateServiceAccount(ctx, change.Update.AccessKey, opts) if err != nil { return wrapSRErr(err) } case change.Delete != nil: // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if sa, _, err := globalIAMSys.getServiceAccount(ctx, change.Delete.AccessKey); err == nil && sa.UpdatedAt.After(updatedAt) { return nil } } if err := globalIAMSys.DeleteServiceAccount(ctx, change.Delete.AccessKey, true); err != nil { return wrapSRErr(err) } } return nil } // PeerPolicyMappingHandler - copies policy mapping to local. func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mapping *madmin.SRPolicyMapping, updatedAt time.Time) error { if mapping == nil { return errSRInvalidRequest(errInvalidArgument) } // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { mp, ok := globalIAMSys.store.GetMappedPolicy(mapping.Policy, mapping.IsGroup) if ok && mp.UpdatedAt.After(updatedAt) { return nil } } _, err := globalIAMSys.PolicyDBSet(ctx, mapping.UserOrGroup, mapping.Policy, IAMUserType(mapping.UserType), mapping.IsGroup) if err != nil { return wrapSRErr(err) } return nil } // PeerSTSAccHandler - replicates STS credential locally. func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *madmin.SRSTSCredential, updatedAt time.Time) error { if stsCred == nil { return errSRInvalidRequest(errInvalidArgument) } // skip overwrite of local update if peer sent stale info if !updatedAt.IsZero() { if u, _, err := globalIAMSys.getTempAccount(ctx, stsCred.AccessKey); err == nil { if u.UpdatedAt.After(updatedAt) { return nil } } } // Verify the session token of the stsCred claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey) if err != nil { return fmt.Errorf("STS credential could not be verified: %w", err) } mapClaims := claims.Map() expiry, err := auth.ExpToInt64(mapClaims["exp"]) if err != nil { return fmt.Errorf("Expiry claim was not found: %v: %w", mapClaims, err) } cred := auth.Credentials{ AccessKey: stsCred.AccessKey, SecretKey: stsCred.SecretKey, Expiration: time.Unix(expiry, 0).UTC(), SessionToken: stsCred.SessionToken, ParentUser: stsCred.ParentUser, Status: auth.AccountOn, } // Extract the username and lookup DN and groups in LDAP. ldapUser, isLDAPSTS := claims.Lookup(ldapUserN) if isLDAPSTS { // Need to lookup the groups from LDAP. _, ldapGroups, err := globalIAMSys.LDAPConfig.LookupUserDN(ldapUser) if err != nil { return fmt.Errorf("unable to query LDAP server for %s: %w", ldapUser, err) } cred.Groups = ldapGroups } // Set these credentials to IAM. if _, err := globalIAMSys.SetTempUser(ctx, cred.AccessKey, cred, stsCred.ParentPolicyMapping); err != nil { return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %w", err) } return nil } // BucketMetaHook - called when bucket meta changes happen and need to be // replicated to peer clusters. func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRBucketMeta) error { // The change has already been applied to the local cluster at this // point, and only needs to be updated on all remote peer clusters. c.RLock() defer c.RUnlock() if !c.enabled { return nil } cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, d) if err != nil { return wrapSRErr(err) } return c.annotatePeerErr(p.Name, replicateBucketMetadata, admClient.SRPeerReplicateBucketMeta(ctx, item)) }, replicateBucketMetadata, ) return errors.Unwrap(cerr) } // PeerBucketVersioningHandler - updates versioning config to local cluster. func (c *SiteReplicationSys) PeerBucketVersioningHandler(ctx context.Context, bucket string, versioning *string, updatedAt time.Time) error { if versioning != nil { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetVersioningConfig(bucket); err == nil && updateTm.After(updatedAt) { return nil } } configData, err := base64.StdEncoding.DecodeString(*versioning) if err != nil { return wrapSRErr(err) } _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, configData) if err != nil { return wrapSRErr(err) } return nil } return nil } // PeerBucketPolicyHandler - copies/deletes policy to local cluster. func (c *SiteReplicationSys) PeerBucketPolicyHandler(ctx context.Context, bucket string, policy *bktpolicy.Policy, updatedAt time.Time) error { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetPolicyConfig(bucket); err == nil && updateTm.After(updatedAt) { return nil } } if policy != nil { configData, err := json.Marshal(policy) if err != nil { return wrapSRErr(err) } _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, configData) if err != nil { return wrapSRErr(err) } return nil } // Delete the bucket policy _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketPolicyConfig) if err != nil { return wrapSRErr(err) } return nil } // PeerBucketTaggingHandler - copies/deletes tags to local cluster. func (c *SiteReplicationSys) PeerBucketTaggingHandler(ctx context.Context, bucket string, tags *string, updatedAt time.Time) error { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetTaggingConfig(bucket); err == nil && updateTm.After(updatedAt) { return nil } } if tags != nil { configData, err := base64.StdEncoding.DecodeString(*tags) if err != nil { return wrapSRErr(err) } _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, configData) if err != nil { return wrapSRErr(err) } return nil } // Delete the tags _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketTaggingConfig) if err != nil { return wrapSRErr(err) } return nil } // PeerBucketObjectLockConfigHandler - sets object lock on local bucket. func (c *SiteReplicationSys) PeerBucketObjectLockConfigHandler(ctx context.Context, bucket string, objectLockData *string, updatedAt time.Time) error { if objectLockData != nil { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetObjectLockConfig(bucket); err == nil && updateTm.After(updatedAt) { return nil } } configData, err := base64.StdEncoding.DecodeString(*objectLockData) if err != nil { return wrapSRErr(err) } _, err = globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, configData) if err != nil { return wrapSRErr(err) } return nil } return nil } // PeerBucketSSEConfigHandler - copies/deletes SSE config to local cluster. func (c *SiteReplicationSys) PeerBucketSSEConfigHandler(ctx context.Context, bucket string, sseConfig *string, updatedAt time.Time) error { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetSSEConfig(bucket); err == nil && updateTm.After(updatedAt) { return nil } } if sseConfig != nil { configData, err := base64.StdEncoding.DecodeString(*sseConfig) if err != nil { return wrapSRErr(err) } _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, configData) if err != nil { return wrapSRErr(err) } return nil } // Delete sse config _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketSSEConfig) if err != nil { return wrapSRErr(err) } return nil } // PeerBucketQuotaConfigHandler - copies/deletes policy to local cluster. func (c *SiteReplicationSys) PeerBucketQuotaConfigHandler(ctx context.Context, bucket string, quota *madmin.BucketQuota, updatedAt time.Time) error { // skip overwrite if local update is newer than peer update. if !updatedAt.IsZero() { if _, updateTm, err := globalBucketMetadataSys.GetQuotaConfig(ctx, bucket); err == nil && updateTm.After(updatedAt) { return nil } } if quota != nil { quotaData, err := json.Marshal(quota) if err != nil { return wrapSRErr(err) } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, quotaData); err != nil { return wrapSRErr(err) } return nil } // Delete the bucket policy _, err := globalBucketMetadataSys.Delete(ctx, bucket, bucketQuotaConfigFile) if err != nil { return wrapSRErr(err) } return nil } // getAdminClient - NOTE: ensure to take at least a read lock on SiteReplicationSys // before calling this. func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID string) (*madmin.AdminClient, error) { creds, err := c.getPeerCreds() if err != nil { return nil, err } peer, ok := c.state.Peers[deploymentID] if !ok { return nil, errSRPeerNotFound } return getAdminClient(peer.Endpoint, creds.AccessKey, creds.SecretKey) } // getAdminClientWithEndpoint - NOTE: ensure to take at least a read lock on SiteReplicationSys // before calling this. func (c *SiteReplicationSys) getAdminClientWithEndpoint(ctx context.Context, deploymentID, endpoint string) (*madmin.AdminClient, error) { creds, err := c.getPeerCreds() if err != nil { return nil, err } if _, ok := c.state.Peers[deploymentID]; !ok { return nil, errSRPeerNotFound } return getAdminClient(endpoint, creds.AccessKey, creds.SecretKey) } func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) { u, ok := globalIAMSys.store.GetUser(c.state.ServiceAccountAccessKey) if !ok { return nil, errors.New("site replication service account not found") } return &u.Credentials, nil } // listBuckets returns a consistent common view of latest unique buckets across // sites, this is used for replication. func (c *SiteReplicationSys) listBuckets(ctx context.Context) ([]BucketInfo, error) { // If local has buckets, enable versioning on them, create them on peers // and setup replication rules. objAPI := newObjectLayerFn() if objAPI == nil { return nil, errSRObjectLayerNotReady } return objAPI.ListBuckets(ctx, BucketOptions{Deleted: true}) } // syncToAllPeers is used for syncing local data to all remote peers, it is // called once during initial "AddPeerClusters" request. func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error { objAPI := newObjectLayerFn() if objAPI == nil { return errSRObjectLayerNotReady } buckets, err := objAPI.ListBuckets(ctx, BucketOptions{}) if err != nil { return err } for _, bucketInfo := range buckets { bucket := bucketInfo.Name meta, err := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket) if err != nil && !errors.Is(err, errConfigNotFound) { return errSRBackendIssue(err) } opts := MakeBucketOptions{ LockEnabled: meta.ObjectLocking(), CreatedAt: bucketInfo.Created.UTC(), } // Now call the MakeBucketHook on existing bucket - this will // create buckets and replication rules on peer clusters. if err = c.MakeBucketHook(ctx, bucket, opts); err != nil { return errSRBucketConfigError(err) } // Replicate bucket policy if present. policyJSON, tm := meta.PolicyConfigJSON, meta.PolicyConfigUpdatedAt if len(policyJSON) > 0 { err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypePolicy, Bucket: bucket, Policy: policyJSON, UpdatedAt: tm, }) if err != nil { return errSRBucketMetaError(err) } } // Replicate bucket tags if present. tagCfg, tm := meta.TaggingConfigXML, meta.TaggingConfigUpdatedAt if len(tagCfg) > 0 { tagCfgStr := base64.StdEncoding.EncodeToString(tagCfg) err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeTags, Bucket: bucket, Tags: &tagCfgStr, UpdatedAt: tm, }) if err != nil { return errSRBucketMetaError(err) } } // Replicate object-lock config if present. objLockCfgData, tm := meta.ObjectLockConfigXML, meta.ObjectLockConfigUpdatedAt if len(objLockCfgData) > 0 { objLockStr := base64.StdEncoding.EncodeToString(objLockCfgData) err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeObjectLockConfig, Bucket: bucket, Tags: &objLockStr, UpdatedAt: tm, }) if err != nil { return errSRBucketMetaError(err) } } // Replicate existing bucket bucket encryption settings sseConfigData, tm := meta.EncryptionConfigXML, meta.EncryptionConfigUpdatedAt if len(sseConfigData) > 0 { sseConfigStr := base64.StdEncoding.EncodeToString(sseConfigData) err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeSSEConfig, Bucket: bucket, SSEConfig: &sseConfigStr, UpdatedAt: tm, }) if err != nil { return errSRBucketMetaError(err) } } quotaConfigJSON, tm := meta.QuotaConfigJSON, meta.QuotaConfigUpdatedAt if len(quotaConfigJSON) > 0 { err = c.BucketMetaHook(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeQuotaConfig, Bucket: bucket, Quota: quotaConfigJSON, UpdatedAt: tm, }) if err != nil { return errSRBucketMetaError(err) } } } // Order matters from now on how the information is // synced to remote sites. // Policies should be synced first. { // Replicate IAM policies on local to all peers. allPolicyDocs, err := globalIAMSys.ListPolicyDocs(ctx, "") if err != nil { return errSRBackendIssue(err) } for pname, pdoc := range allPolicyDocs { policyJSON, err := json.Marshal(pdoc.Policy) if err != nil { return wrapSRErr(err) } err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicy, Name: pname, Policy: policyJSON, UpdatedAt: pdoc.UpdateDate, }) if err != nil { return errSRIAMError(err) } } } // Next should be userAccounts those are local users, OIDC and LDAP will not // may not have any local users. { userAccounts := make(map[string]UserIdentity) err := globalIAMSys.store.loadUsers(ctx, regUser, userAccounts) if err != nil { return errSRBackendIssue(err) } for _, acc := range userAccounts { if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemIAMUser, IAMUser: &madmin.SRIAMUser{ AccessKey: acc.Credentials.AccessKey, IsDeleteReq: false, UserReq: &madmin.AddOrUpdateUserReq{ SecretKey: acc.Credentials.SecretKey, Status: madmin.AccountStatus(acc.Credentials.Status), }, }, UpdatedAt: acc.UpdatedAt, }); err != nil { return errSRIAMError(err) } } } // Next should be Groups for some of these users, LDAP might have some Group // DNs here { groups := make(map[string]GroupInfo) err := globalIAMSys.store.loadGroups(ctx, groups) if err != nil { return errSRBackendIssue(err) } for gname, group := range groups { if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemGroupInfo, GroupInfo: &madmin.SRGroupInfo{ UpdateReq: madmin.GroupAddRemove{ Group: gname, Members: group.Members, Status: madmin.GroupStatus(group.Status), IsRemove: false, }, }, UpdatedAt: group.UpdatedAt, }); err != nil { return errSRIAMError(err) } } } // Followed by group policy mapping { // Replicate policy mappings on local to all peers. groupPolicyMap := make(map[string]MappedPolicy) errG := globalIAMSys.store.loadMappedPolicies(ctx, unknownIAMUserType, true, groupPolicyMap) if errG != nil { return errSRBackendIssue(errG) } for group, mp := range groupPolicyMap { err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: group, UserType: -1, IsGroup: true, Policy: mp.Policies, }, UpdatedAt: mp.UpdatedAt, }) if err != nil { return errSRIAMError(err) } } } // Service accounts are the static accounts that should be synced with // valid claims. { serviceAccounts := make(map[string]UserIdentity) err := globalIAMSys.store.loadUsers(ctx, svcUser, serviceAccounts) if err != nil { return errSRBackendIssue(err) } for user, acc := range serviceAccounts { if user == siteReplicatorSvcAcc { // skip the site replicate svc account as it is // already replicated. continue } claims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, acc.Credentials.AccessKey) if err != nil { return errSRBackendIssue(err) } _, policy, err := globalIAMSys.GetServiceAccount(ctx, acc.Credentials.AccessKey) if err != nil { return errSRBackendIssue(err) } var policyJSON []byte if policy != nil { policyJSON, err = json.Marshal(policy) if err != nil { return wrapSRErr(err) } } err = c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemSvcAcc, SvcAccChange: &madmin.SRSvcAccChange{ Create: &madmin.SRSvcAccCreate{ Parent: acc.Credentials.ParentUser, AccessKey: user, SecretKey: acc.Credentials.SecretKey, Groups: acc.Credentials.Groups, Claims: claims, SessionPolicy: json.RawMessage(policyJSON), Status: acc.Credentials.Status, Name: acc.Credentials.Name, Description: acc.Credentials.Description, Expiration: &acc.Credentials.Expiration, }, }, UpdatedAt: acc.UpdatedAt, }) if err != nil { return errSRIAMError(err) } } } // Followed by policy mapping for the userAccounts we previously synced. { // Replicate policy mappings on local to all peers. userPolicyMap := make(map[string]MappedPolicy) errU := globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap) if errU != nil { return errSRBackendIssue(errU) } for user, mp := range userPolicyMap { err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: user, UserType: int(regUser), IsGroup: false, Policy: mp.Policies, }, UpdatedAt: mp.UpdatedAt, }) if err != nil { return errSRIAMError(err) } } } // and finally followed by policy mappings for for STS users. { // Replicate policy mappings on local to all peers. stsPolicyMap := make(map[string]MappedPolicy) errU := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, stsPolicyMap) if errU != nil { return errSRBackendIssue(errU) } for user, mp := range stsPolicyMap { err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: user, UserType: int(stsUser), IsGroup: false, Policy: mp.Policies, }, UpdatedAt: mp.UpdatedAt, }) if err != nil { return errSRIAMError(err) } } } return nil } // Concurrency helpers type concErr struct { errMap map[string]error summaryErr error } func (c concErr) Error() string { if c.summaryErr != nil { return c.summaryErr.Error() } return "" } func (c concErr) Unwrap() error { return c.summaryErr } func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error, actionName string) error { if len(errMap) == 0 { return nil } var success int msgs := []string{} for d, err := range errMap { name := c.state.Peers[d].Name if err == nil { msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): succeeded", actionName, name, d)) success++ } else { msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): failed(%v)", actionName, name, d, err)) } } if success == len(errMap) { return nil } return fmt.Errorf("Site replication error(s): \n%s", strings.Join(msgs, "\n")) } func (c *SiteReplicationSys) newConcErr(errMap map[string]error, actionName string) error { return concErr{ errMap: errMap, summaryErr: c.toErrorFromErrMap(errMap, actionName), } } // concDo calls actions concurrently. selfActionFn is run for the current // cluster and peerActionFn is run for each peer replication cluster. func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error, actionName string) error { depIDs := make([]string, 0, len(c.state.Peers)) for d := range c.state.Peers { depIDs = append(depIDs, d) } errs := make([]error, len(c.state.Peers)) var wg sync.WaitGroup wg.Add(len(depIDs)) for i := range depIDs { go func(i int) { defer wg.Done() if depIDs[i] == globalDeploymentID { if selfActionFn != nil { errs[i] = selfActionFn() } } else { errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]]) } }(i) } wg.Wait() errMap := make(map[string]error, len(c.state.Peers)) for i, depID := range depIDs { errMap[depID] = errs[i] } return c.newConcErr(errMap, actionName) } func (c *SiteReplicationSys) annotateErr(annotation string, err error) error { if err == nil { return nil } return fmt.Errorf("%s: %s: %w", c.state.Name, annotation, err) } func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error { if err == nil { return nil } return fmt.Errorf("%s->%s: %s: %w", c.state.Name, dstPeer, annotation, err) } // isEnabled returns true if site replication is enabled func (c *SiteReplicationSys) isEnabled() bool { c.RLock() defer c.RUnlock() return c.enabled } var errMissingSRConfig = fmt.Errorf("unable to find site replication configuration") // RemovePeerCluster - removes one or more clusters from site replication configuration. func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) (st madmin.ReplicateRemoveStatus, err error) { if !c.isEnabled() { return st, errSRNotEnabled } info, err := c.GetClusterInfo(ctx) if err != nil { return st, errSRBackendIssue(err) } peerMap := make(map[string]madmin.PeerInfo) var rmvEndpoints []string siteNames := rreq.SiteNames updatedPeers := make(map[string]madmin.PeerInfo) for _, pi := range info.Sites { updatedPeers[pi.DeploymentID] = pi peerMap[pi.Name] = pi if rreq.RemoveAll { siteNames = append(siteNames, pi.Name) } } for _, s := range siteNames { pinfo, ok := peerMap[s] if !ok { return st, errSRConfigMissingError(errMissingSRConfig) } rmvEndpoints = append(rmvEndpoints, pinfo.Endpoint) delete(updatedPeers, pinfo.DeploymentID) } var wg sync.WaitGroup errs := make(map[string]error, len(c.state.Peers)) for _, v := range info.Sites { wg.Add(1) if v.DeploymentID == globalDeploymentID { go func() { defer wg.Done() err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, false) errs[globalDeploymentID] = err }() continue } go func(pi madmin.PeerInfo) { defer wg.Done() admClient, err := c.getAdminClient(ctx, pi.DeploymentID) if err != nil { errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", pi.Name, err)) return } // set the requesting site's deploymentID for verification of peer request rreq.RequestingDepID = globalDeploymentID if _, err = admClient.SRPeerRemove(ctx, rreq); err != nil { if errors.Is(err, errMissingSRConfig) { // ignore if peer is already removed. return } errs[pi.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", pi.Name, err)) return } }(v) } wg.Wait() errdID := "" selfTgtsDeleted := errs[globalDeploymentID] == nil // true if all remote targets and replication config cleared successfully on local cluster for dID, err := range errs { if err != nil { if !rreq.RemoveAll && !selfTgtsDeleted { return madmin.ReplicateRemoveStatus{ ErrDetail: err.Error(), Status: madmin.ReplicateRemoveStatusPartial, }, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err)) } errdID = dID } } // force local config to be cleared even if peers failed since the remote targets are deleted // by now from the replication config and user intended to forcibly clear all site replication if rreq.RemoveAll { if err = c.removeFromDisk(ctx); err != nil { return madmin.ReplicateRemoveStatus{ Status: madmin.ReplicateRemoveStatusPartial, ErrDetail: fmt.Sprintf("unable to remove cluster-replication state on local: %v", err), }, nil } if errdID != "" { err := errs[errdID] return madmin.ReplicateRemoveStatus{ Status: madmin.ReplicateRemoveStatusPartial, ErrDetail: err.Error(), }, nil } return madmin.ReplicateRemoveStatus{ Status: madmin.ReplicateRemoveStatusSuccess, }, nil } // Update cluster state var state srState if len(updatedPeers) > 1 { state = srState{ Name: info.Name, Peers: updatedPeers, ServiceAccountAccessKey: info.ServiceAccountAccessKey, } } if err = c.saveToDisk(ctx, state); err != nil { return madmin.ReplicateRemoveStatus{ Status: madmin.ReplicateRemoveStatusPartial, ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), }, err } st = madmin.ReplicateRemoveStatus{ Status: madmin.ReplicateRemoveStatusSuccess, } if errs[errdID] != nil { st.Status = madmin.ReplicateRemoveStatusPartial st.ErrDetail = errs[errdID].Error() } return st, nil } // InternalRemoveReq - sends an unlink request to peer cluster to remove one or more sites // from the site replication configuration. func (c *SiteReplicationSys) InternalRemoveReq(ctx context.Context, objectAPI ObjectLayer, rreq madmin.SRRemoveReq) error { if !c.isEnabled() { return errSRNotEnabled } if rreq.RequestingDepID != "" { // validate if requesting site is still part of site replication var foundRequestor bool for _, p := range c.state.Peers { if p.DeploymentID == rreq.RequestingDepID { foundRequestor = true break } } if !foundRequestor { return errSRRequestorNotFound } } ourName := "" peerMap := make(map[string]madmin.PeerInfo) updatedPeers := make(map[string]madmin.PeerInfo) siteNames := rreq.SiteNames for _, p := range c.state.Peers { peerMap[p.Name] = p if p.DeploymentID == globalDeploymentID { ourName = p.Name } updatedPeers[p.DeploymentID] = p if rreq.RemoveAll { siteNames = append(siteNames, p.Name) } } var rmvEndpoints []string var unlinkSelf bool for _, s := range siteNames { info, ok := peerMap[s] if !ok { return errMissingSRConfig } if info.DeploymentID == globalDeploymentID { unlinkSelf = true continue } delete(updatedPeers, info.DeploymentID) rmvEndpoints = append(rmvEndpoints, info.Endpoint) } if err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, unlinkSelf); err != nil { return err } var state srState if !unlinkSelf { state = srState{ Name: c.state.Name, Peers: updatedPeers, ServiceAccountAccessKey: c.state.ServiceAccountAccessKey, } } if err := c.saveToDisk(ctx, state); err != nil { return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to drive on %s: %v", ourName, err)) } return nil } // RemoveRemoteTargetsForEndpoint removes replication targets corresponding to endpoint func (c *SiteReplicationSys) RemoveRemoteTargetsForEndpoint(ctx context.Context, objectAPI ObjectLayer, endpoints []string, unlinkSelf bool) (err error) { targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService)) m := make(map[string]madmin.BucketTarget) for _, t := range targets { for _, endpoint := range endpoints { ep, _ := url.Parse(endpoint) if t.Endpoint == ep.Host && t.Secure == (ep.Scheme == "https") && t.Type == madmin.ReplicationService { m[t.Arn] = t } } // all remote targets from self are to be delinked if unlinkSelf { m[t.Arn] = t } } buckets, err := objectAPI.ListBuckets(ctx, BucketOptions{}) if err != nil { return errSRBackendIssue(err) } for _, b := range buckets { config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, b.Name) if err != nil { if errors.Is(err, BucketReplicationConfigNotFound{Bucket: b.Name}) { continue } return err } var nRules []sreplication.Rule for _, r := range config.Rules { if _, ok := m[r.Destination.Bucket]; !ok { nRules = append(nRules, r) } } if len(nRules) > 0 { config.Rules = nRules configData, err := xml.Marshal(config) if err != nil { return err } if _, err = globalBucketMetadataSys.Update(ctx, b.Name, bucketReplicationConfig, configData); err != nil { return err } } else { if _, err := globalBucketMetadataSys.Delete(ctx, b.Name, bucketReplicationConfig); err != nil { return err } } } for arn, t := range m { if err := globalBucketTargetSys.RemoveTarget(ctx, t.SourceBucket, arn); err != nil { if errors.Is(err, BucketRemoteTargetNotFound{Bucket: t.SourceBucket}) { continue } return err } targets, terr := globalBucketTargetSys.ListBucketTargets(ctx, t.SourceBucket) if terr != nil { return err } tgtBytes, terr := json.Marshal(&targets) if terr != nil { return err } if _, err = globalBucketMetadataSys.Update(ctx, t.SourceBucket, bucketTargetsFile, tgtBytes); err != nil { return err } } return } // Other helpers func getAdminClient(endpoint, accessKey, secretKey string) (*madmin.AdminClient, error) { epURL, err := url.Parse(endpoint) if err != nil { return nil, err } if globalBucketTargetSys.isOffline(epURL) { return nil, RemoteTargetConnectionErr{Endpoint: epURL.String(), Err: fmt.Errorf("remote target is offline for endpoint %s", epURL.String())} } client, err := madmin.New(epURL.Host, accessKey, secretKey, epURL.Scheme == "https") if err != nil { return nil, err } client.SetCustomTransport(globalRemoteTargetTransport) return client, nil } func getS3Client(pc madmin.PeerSite) (*minioClient.Client, error) { ep, err := url.Parse(pc.Endpoint) if err != nil { return nil, err } if globalBucketTargetSys.isOffline(ep) { return nil, RemoteTargetConnectionErr{Endpoint: ep.String(), Err: fmt.Errorf("remote target is offline for endpoint %s", ep.String())} } return minioClient.New(ep.Host, &minioClient.Options{ Creds: credentials.NewStaticV4(pc.AccessKey, pc.SecretKey, ""), Secure: ep.Scheme == "https", Transport: globalRemoteTargetTransport, }) } func getPriorityHelper(replicationConfig replication.Config) int { maxPrio := 0 for _, rule := range replicationConfig.Rules { if rule.Priority > maxPrio { maxPrio = rule.Priority } } // leave some gaps in priority numbers for flexibility return maxPrio + 10 } // returns a slice with site names participating in site replciation but unspecified while adding // a new site. func getMissingSiteNames(oldDeps, newDeps set.StringSet, currSites []madmin.PeerInfo) []string { diff := oldDeps.Difference(newDeps) var diffSlc []string for _, v := range currSites { if diff.Contains(v.DeploymentID) { diffSlc = append(diffSlc, v.Name) } } return diffSlc } type srBucketMetaInfo struct { madmin.SRBucketInfo DeploymentID string } type srPolicy struct { madmin.SRIAMPolicy DeploymentID string } type srPolicyMapping struct { madmin.SRPolicyMapping DeploymentID string } type srUserInfo struct { madmin.UserInfo DeploymentID string } type srGroupDesc struct { madmin.GroupDesc DeploymentID string } // SiteReplicationStatus returns the site replication status across clusters participating in site replication. func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info madmin.SRStatusInfo, err error) { sinfo, err := c.siteReplicationStatus(ctx, objAPI, opts) if err != nil { return info, err } info = madmin.SRStatusInfo{ Enabled: sinfo.Enabled, MaxBuckets: sinfo.MaxBuckets, MaxUsers: sinfo.MaxUsers, MaxGroups: sinfo.MaxGroups, MaxPolicies: sinfo.MaxPolicies, Sites: sinfo.Sites, StatsSummary: sinfo.StatsSummary, } info.BucketStats = make(map[string]map[string]madmin.SRBucketStatsSummary, len(sinfo.Sites)) info.PolicyStats = make(map[string]map[string]madmin.SRPolicyStatsSummary) info.UserStats = make(map[string]map[string]madmin.SRUserStatsSummary) info.GroupStats = make(map[string]map[string]madmin.SRGroupStatsSummary) numSites := len(info.Sites) for b, stat := range sinfo.BucketStats { for dID, st := range stat { if st.TagMismatch || st.VersioningConfigMismatch || st.OLockConfigMismatch || st.SSEConfigMismatch || st.PolicyMismatch || st.ReplicationCfgMismatch || st.QuotaCfgMismatch || opts.Entity == madmin.SRBucketEntity { if _, ok := info.BucketStats[b]; !ok { info.BucketStats[b] = make(map[string]madmin.SRBucketStatsSummary, numSites) } info.BucketStats[b][dID] = st.SRBucketStatsSummary } } } for u, stat := range sinfo.UserStats { for dID, st := range stat { if st.PolicyMismatch || st.UserInfoMismatch || opts.Entity == madmin.SRUserEntity { if _, ok := info.UserStats[u]; !ok { info.UserStats[u] = make(map[string]madmin.SRUserStatsSummary, numSites) } info.UserStats[u][dID] = st.SRUserStatsSummary } } } for g, stat := range sinfo.GroupStats { for dID, st := range stat { if st.PolicyMismatch || st.GroupDescMismatch || opts.Entity == madmin.SRGroupEntity { if _, ok := info.GroupStats[g]; !ok { info.GroupStats[g] = make(map[string]madmin.SRGroupStatsSummary, numSites) } info.GroupStats[g][dID] = st.SRGroupStatsSummary } } } for p, stat := range sinfo.PolicyStats { for dID, st := range stat { if st.PolicyMismatch || opts.Entity == madmin.SRPolicyEntity { if _, ok := info.PolicyStats[p]; !ok { info.PolicyStats[p] = make(map[string]madmin.SRPolicyStatsSummary, numSites) } info.PolicyStats[p][dID] = st.SRPolicyStatsSummary } } } return } const ( replicationStatus = "ReplicationStatus" ) // siteReplicationStatus returns the site replication status across clusters participating in site replication. func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info srStatusInfo, err error) { c.RLock() defer c.RUnlock() if !c.enabled { return info, err } sris := make([]madmin.SRInfo, len(c.state.Peers)) depIdx := make(map[string]int, len(c.state.Peers)) i := 0 for d := range c.state.Peers { depIdx[d] = i i++ } metaInfoConcErr := c.concDo( func() error { srInfo, err := c.SiteReplicationMetaInfo(ctx, objAPI, opts) if err != nil { return err } sris[depIdx[globalDeploymentID]] = srInfo return nil }, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) if err != nil { return err } srInfo, err := admClient.SRMetaInfo(ctx, opts) if err != nil { return err } sris[depIdx[deploymentID]] = srInfo return nil }, replicationStatus, ) if err := errors.Unwrap(metaInfoConcErr); err != nil { return info, errSRBackendIssue(err) } info.Enabled = true info.Sites = make(map[string]madmin.PeerInfo, len(c.state.Peers)) for d, peer := range c.state.Peers { info.Sites[d] = peer } var maxBuckets int for _, sri := range sris { if len(sri.Buckets) > maxBuckets { maxBuckets = len(sri.Buckets) } } // mapping b/w entity and entity config across sites bucketStats := make(map[string][]srBucketMetaInfo) policyStats := make(map[string][]srPolicy) userPolicyStats := make(map[string][]srPolicyMapping) groupPolicyStats := make(map[string][]srPolicyMapping) userInfoStats := make(map[string][]srUserInfo) groupDescStats := make(map[string][]srGroupDesc) numSites := len(sris) allBuckets := set.NewStringSet() // across sites allUsers := set.NewStringSet() allUserWPolicies := set.NewStringSet() allGroups := set.NewStringSet() allGroupWPolicies := set.NewStringSet() allPolicies := set.NewStringSet() for _, sri := range sris { for b := range sri.Buckets { allBuckets.Add(b) } for u := range sri.UserInfoMap { allUsers.Add(u) } for g := range sri.GroupDescMap { allGroups.Add(g) } for p := range sri.Policies { allPolicies.Add(p) } for u := range sri.UserPolicies { allUserWPolicies.Add(u) } for g := range sri.GroupPolicies { allGroupWPolicies.Add(g) } } for i, sri := range sris { for b := range allBuckets { if _, ok := bucketStats[b]; !ok { bucketStats[b] = make([]srBucketMetaInfo, numSites) } si, ok := sri.Buckets[b] if !ok { si = madmin.SRBucketInfo{Bucket: b} } bucketStats[b][i] = srBucketMetaInfo{SRBucketInfo: si, DeploymentID: sri.DeploymentID} } for pname := range allPolicies { if _, ok := policyStats[pname]; !ok { policyStats[pname] = make([]srPolicy, numSites) } // if pname is not present in the map, the zero value // will be returned. pi := sri.Policies[pname] policyStats[pname][i] = srPolicy{SRIAMPolicy: pi, DeploymentID: sri.DeploymentID} } for user := range allUserWPolicies { if _, ok := userPolicyStats[user]; !ok { userPolicyStats[user] = make([]srPolicyMapping, numSites) } up := sri.UserPolicies[user] userPolicyStats[user][i] = srPolicyMapping{SRPolicyMapping: up, DeploymentID: sri.DeploymentID} } for group := range allGroupWPolicies { if _, ok := groupPolicyStats[group]; !ok { groupPolicyStats[group] = make([]srPolicyMapping, numSites) } up := sri.GroupPolicies[group] groupPolicyStats[group][i] = srPolicyMapping{SRPolicyMapping: up, DeploymentID: sri.DeploymentID} } for u := range allUsers { if _, ok := userInfoStats[u]; !ok { userInfoStats[u] = make([]srUserInfo, numSites) } ui := sri.UserInfoMap[u] userInfoStats[u][i] = srUserInfo{UserInfo: ui, DeploymentID: sri.DeploymentID} } for g := range allGroups { if _, ok := groupDescStats[g]; !ok { groupDescStats[g] = make([]srGroupDesc, numSites) } gd := sri.GroupDescMap[g] groupDescStats[g][i] = srGroupDesc{GroupDesc: gd, DeploymentID: sri.DeploymentID} } } info.StatsSummary = make(map[string]madmin.SRSiteSummary, len(c.state.Peers)) info.BucketStats = make(map[string]map[string]srBucketStatsSummary) info.PolicyStats = make(map[string]map[string]srPolicyStatsSummary) info.UserStats = make(map[string]map[string]srUserStatsSummary) info.GroupStats = make(map[string]map[string]srGroupStatsSummary) // collect user policy mapping replication status across sites if opts.Users || opts.Entity == madmin.SRUserEntity { for u, pslc := range userPolicyStats { if len(info.UserStats[u]) == 0 { info.UserStats[u] = make(map[string]srUserStatsSummary) } var policyMappings []madmin.SRPolicyMapping uPolicyCount := 0 for _, ps := range pslc { policyMappings = append(policyMappings, ps.SRPolicyMapping) uPolicyCount++ sum := info.StatsSummary[ps.DeploymentID] sum.TotalUserPolicyMappingCount++ info.StatsSummary[ps.DeploymentID] = sum } userPolicyMismatch := !isPolicyMappingReplicated(uPolicyCount, numSites, policyMappings) for _, ps := range pslc { dID := depIdx[ps.DeploymentID] _, hasUser := sris[dID].UserPolicies[u] info.UserStats[u][ps.DeploymentID] = srUserStatsSummary{ SRUserStatsSummary: madmin.SRUserStatsSummary{ PolicyMismatch: userPolicyMismatch, HasUser: hasUser, HasPolicyMapping: ps.Policy != "", }, userPolicy: ps, } if !userPolicyMismatch || opts.Entity != madmin.SRUserEntity { sum := info.StatsSummary[ps.DeploymentID] if !ps.IsGroup { sum.ReplicatedUserPolicyMappings++ } info.StatsSummary[ps.DeploymentID] = sum } } } // collect user info replication status across sites for u, pslc := range userInfoStats { var uiSlc []madmin.UserInfo userCount := 0 for _, ps := range pslc { uiSlc = append(uiSlc, ps.UserInfo) userCount++ sum := info.StatsSummary[ps.DeploymentID] sum.TotalUsersCount++ info.StatsSummary[ps.DeploymentID] = sum } userInfoMismatch := !isUserInfoReplicated(userCount, numSites, uiSlc) for _, ps := range pslc { dID := depIdx[ps.DeploymentID] _, hasUser := sris[dID].UserInfoMap[u] if len(info.UserStats[u]) == 0 { info.UserStats[u] = make(map[string]srUserStatsSummary) } umis, ok := info.UserStats[u][ps.DeploymentID] if !ok { umis = srUserStatsSummary{ SRUserStatsSummary: madmin.SRUserStatsSummary{ HasUser: hasUser, }, } } umis.UserInfoMismatch = userInfoMismatch umis.userInfo = ps info.UserStats[u][ps.DeploymentID] = umis if !userInfoMismatch || opts.Entity != madmin.SRUserEntity { sum := info.StatsSummary[ps.DeploymentID] sum.ReplicatedUsers++ info.StatsSummary[ps.DeploymentID] = sum } } } } if opts.Groups || opts.Entity == madmin.SRGroupEntity { // collect group policy mapping replication status across sites for g, pslc := range groupPolicyStats { var policyMappings []madmin.SRPolicyMapping gPolicyCount := 0 for _, ps := range pslc { policyMappings = append(policyMappings, ps.SRPolicyMapping) gPolicyCount++ sum := info.StatsSummary[ps.DeploymentID] sum.TotalGroupPolicyMappingCount++ info.StatsSummary[ps.DeploymentID] = sum } groupPolicyMismatch := !isPolicyMappingReplicated(gPolicyCount, numSites, policyMappings) if len(info.GroupStats[g]) == 0 { info.GroupStats[g] = make(map[string]srGroupStatsSummary) } for _, ps := range pslc { dID := depIdx[ps.DeploymentID] _, hasGroup := sris[dID].GroupPolicies[g] info.GroupStats[g][ps.DeploymentID] = srGroupStatsSummary{ SRGroupStatsSummary: madmin.SRGroupStatsSummary{ PolicyMismatch: groupPolicyMismatch, HasGroup: hasGroup, HasPolicyMapping: ps.Policy != "", DeploymentID: ps.DeploymentID, }, groupPolicy: ps, } if !groupPolicyMismatch && opts.Entity != madmin.SRGroupEntity { sum := info.StatsSummary[ps.DeploymentID] sum.ReplicatedGroupPolicyMappings++ info.StatsSummary[ps.DeploymentID] = sum } } } // collect group desc replication status across sites for g, pslc := range groupDescStats { var gds []madmin.GroupDesc groupCount := 0 for _, ps := range pslc { groupCount++ sum := info.StatsSummary[ps.DeploymentID] sum.TotalGroupsCount++ info.StatsSummary[ps.DeploymentID] = sum gds = append(gds, ps.GroupDesc) } gdMismatch := !isGroupDescReplicated(groupCount, numSites, gds) for _, ps := range pslc { dID := depIdx[ps.DeploymentID] _, hasGroup := sris[dID].GroupDescMap[g] if len(info.GroupStats[g]) == 0 { info.GroupStats[g] = make(map[string]srGroupStatsSummary) } gmis, ok := info.GroupStats[g][ps.DeploymentID] if !ok { gmis = srGroupStatsSummary{ SRGroupStatsSummary: madmin.SRGroupStatsSummary{ HasGroup: hasGroup, }, } } gmis.GroupDescMismatch = gdMismatch gmis.groupDesc = ps info.GroupStats[g][ps.DeploymentID] = gmis if !gdMismatch && opts.Entity != madmin.SRGroupEntity { sum := info.StatsSummary[ps.DeploymentID] sum.ReplicatedGroups++ info.StatsSummary[ps.DeploymentID] = sum } } } } if opts.Policies || opts.Entity == madmin.SRPolicyEntity { // collect IAM policy replication status across sites for p, pslc := range policyStats { var policies []*iampolicy.Policy uPolicyCount := 0 for _, ps := range pslc { plcy, err := iampolicy.ParseConfig(bytes.NewReader([]byte(ps.SRIAMPolicy.Policy))) if err != nil { continue } policies = append(policies, plcy) uPolicyCount++ sum := info.StatsSummary[ps.DeploymentID] sum.TotalIAMPoliciesCount++ info.StatsSummary[ps.DeploymentID] = sum } if len(info.PolicyStats[p]) == 0 { info.PolicyStats[p] = make(map[string]srPolicyStatsSummary) } policyMismatch := !isIAMPolicyReplicated(uPolicyCount, numSites, policies) for _, ps := range pslc { dID := depIdx[ps.DeploymentID] _, hasPolicy := sris[dID].Policies[p] info.PolicyStats[p][ps.DeploymentID] = srPolicyStatsSummary{ SRPolicyStatsSummary: madmin.SRPolicyStatsSummary{ PolicyMismatch: policyMismatch, HasPolicy: hasPolicy, }, policy: ps, } switch { case policyMismatch, opts.Entity == madmin.SRPolicyEntity: default: sum := info.StatsSummary[ps.DeploymentID] if !policyMismatch { sum.ReplicatedIAMPolicies++ } info.StatsSummary[ps.DeploymentID] = sum } } } } if opts.Buckets || opts.Entity == madmin.SRBucketEntity { // collect bucket metadata replication stats across sites for b, slc := range bucketStats { tagSet := set.NewStringSet() olockConfigSet := set.NewStringSet() policies := make([]*bktpolicy.Policy, numSites) replCfgs := make([]*sreplication.Config, numSites) quotaCfgs := make([]*madmin.BucketQuota, numSites) sseCfgSet := set.NewStringSet() versionCfgSet := set.NewStringSet() var tagCount, olockCfgCount, sseCfgCount, versionCfgCount int for i, s := range slc { if s.ReplicationConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.ReplicationConfig) if err != nil { continue } cfg, err := sreplication.ParseConfig(bytes.NewReader(cfgBytes)) if err != nil { continue } replCfgs[i] = cfg } if s.Versioning != nil { configData, err := base64.StdEncoding.DecodeString(*s.Versioning) if err != nil { continue } versionCfgCount++ if !versionCfgSet.Contains(string(configData)) { versionCfgSet.Add(string(configData)) } } if s.QuotaConfig != nil { cfgBytes, err := base64.StdEncoding.DecodeString(*s.QuotaConfig) if err != nil { continue } cfg, err := parseBucketQuota(b, cfgBytes) if err != nil { continue } quotaCfgs[i] = cfg } if s.Tags != nil { tagBytes, err := base64.StdEncoding.DecodeString(*s.Tags) if err != nil { continue } tagCount++ if !tagSet.Contains(string(tagBytes)) { tagSet.Add(string(tagBytes)) } } if len(s.Policy) > 0 { plcy, err := bktpolicy.ParseConfig(bytes.NewReader(s.Policy), b) if err != nil { continue } policies[i] = plcy } if s.ObjectLockConfig != nil { configData, err := base64.StdEncoding.DecodeString(*s.ObjectLockConfig) if err != nil { continue } olockCfgCount++ if !olockConfigSet.Contains(string(configData)) { olockConfigSet.Add(string(configData)) } } if s.SSEConfig != nil { configData, err := base64.StdEncoding.DecodeString(*s.SSEConfig) if err != nil { continue } sseCfgCount++ if !sseCfgSet.Contains(string(configData)) { sseCfgSet.Add(string(configData)) } } ss, ok := info.StatsSummary[s.DeploymentID] if !ok { ss = madmin.SRSiteSummary{} } // increment total number of replicated buckets if len(slc) == numSites { ss.ReplicatedBuckets++ } ss.TotalBucketsCount++ if tagCount > 0 { ss.TotalTagsCount++ } if olockCfgCount > 0 { ss.TotalLockConfigCount++ } if sseCfgCount > 0 { ss.TotalSSEConfigCount++ } if versionCfgCount > 0 { ss.TotalVersioningConfigCount++ } if len(policies) > 0 { ss.TotalBucketPoliciesCount++ } info.StatsSummary[s.DeploymentID] = ss } tagMismatch := !isReplicated(tagCount, numSites, tagSet) olockCfgMismatch := !isReplicated(olockCfgCount, numSites, olockConfigSet) sseCfgMismatch := !isReplicated(sseCfgCount, numSites, sseCfgSet) versionCfgMismatch := !isReplicated(versionCfgCount, numSites, versionCfgSet) policyMismatch := !isBktPolicyReplicated(numSites, policies) replCfgMismatch := !isBktReplCfgReplicated(numSites, replCfgs) quotaCfgMismatch := !isBktQuotaCfgReplicated(numSites, quotaCfgs) info.BucketStats[b] = make(map[string]srBucketStatsSummary, numSites) for i, s := range slc { dIdx := depIdx[s.DeploymentID] var hasBucket, isBucketMarkedDeleted bool bi, ok := sris[dIdx].Buckets[s.Bucket] if ok { isBucketMarkedDeleted = !bi.DeletedAt.IsZero() && (bi.CreatedAt.IsZero() || bi.DeletedAt.After(bi.CreatedAt)) hasBucket = !bi.CreatedAt.IsZero() } quotaCfgSet := hasBucket && quotaCfgs[i] != nil && *quotaCfgs[i] != madmin.BucketQuota{} ss := madmin.SRBucketStatsSummary{ DeploymentID: s.DeploymentID, HasBucket: hasBucket, BucketMarkedDeleted: isBucketMarkedDeleted, TagMismatch: tagMismatch, OLockConfigMismatch: olockCfgMismatch, SSEConfigMismatch: sseCfgMismatch, VersioningConfigMismatch: versionCfgMismatch, PolicyMismatch: policyMismatch, ReplicationCfgMismatch: replCfgMismatch, QuotaCfgMismatch: quotaCfgMismatch, HasReplicationCfg: s.ReplicationConfig != nil, HasTagsSet: s.Tags != nil, HasOLockConfigSet: s.ObjectLockConfig != nil, HasPolicySet: s.Policy != nil, HasQuotaCfgSet: quotaCfgSet, HasSSECfgSet: s.SSEConfig != nil, } var m srBucketMetaInfo if len(bucketStats[s.Bucket]) > dIdx { m = bucketStats[s.Bucket][dIdx] } info.BucketStats[b][s.DeploymentID] = srBucketStatsSummary{ SRBucketStatsSummary: ss, meta: m, } } // no mismatch for _, s := range slc { sum := info.StatsSummary[s.DeploymentID] if !olockCfgMismatch && olockCfgCount == numSites { sum.ReplicatedLockConfig++ } if !versionCfgMismatch && versionCfgCount == numSites { sum.ReplicatedVersioningConfig++ } if !sseCfgMismatch && sseCfgCount == numSites { sum.ReplicatedSSEConfig++ } if !policyMismatch && len(policies) == numSites { sum.ReplicatedBucketPolicies++ } if !tagMismatch && tagCount == numSites { sum.ReplicatedTags++ } info.StatsSummary[s.DeploymentID] = sum } } } // maximum buckets users etc seen across sites info.MaxBuckets = len(bucketStats) info.MaxUsers = len(userInfoStats) info.MaxGroups = len(groupDescStats) info.MaxPolicies = len(policyStats) return } // isReplicated returns true if count of replicated matches the number of // sites and there is atmost one unique entry in the set. func isReplicated(cntReplicated, total int, valSet set.StringSet) bool { if cntReplicated > 0 && cntReplicated < total { return false } if len(valSet) > 1 { // mismatch - one or more sites has differing tags/policy return false } return true } // isIAMPolicyReplicated returns true if count of replicated IAM policies matches total // number of sites and IAM policies are identical. func isIAMPolicyReplicated(cntReplicated, total int, policies []*iampolicy.Policy) bool { if cntReplicated > 0 && cntReplicated != total { return false } // check if policies match between sites var prev *iampolicy.Policy for i, p := range policies { if i == 0 { prev = p continue } if !prev.Equals(*p) { return false } } return true } // isPolicyMappingReplicated returns true if count of replicated IAM policy mappings matches total // number of sites and IAM policy mappings are identical. func isPolicyMappingReplicated(cntReplicated, total int, policies []madmin.SRPolicyMapping) bool { if cntReplicated > 0 && cntReplicated != total { return false } // check if policies match between sites var prev madmin.SRPolicyMapping for i, p := range policies { if i == 0 { prev = p continue } if prev.IsGroup != p.IsGroup || prev.Policy != p.Policy || prev.UserOrGroup != p.UserOrGroup { return false } } return true } func isUserInfoReplicated(cntReplicated, total int, uis []madmin.UserInfo) bool { if cntReplicated > 0 && cntReplicated != total { return false } // check if policies match between sites var prev madmin.UserInfo for i, ui := range uis { if i == 0 { prev = ui continue } if !isUserInfoEqual(prev, ui) { return false } } return true } func isGroupDescReplicated(cntReplicated, total int, gds []madmin.GroupDesc) bool { if cntReplicated > 0 && cntReplicated != total { return false } // check if policies match between sites var prev madmin.GroupDesc for i, gd := range gds { if i == 0 { prev = gd continue } if !isGroupDescEqual(prev, gd) { return false } } return true } func isBktQuotaCfgReplicated(total int, quotaCfgs []*madmin.BucketQuota) bool { numquotaCfgs := 0 for _, q := range quotaCfgs { if q == nil { continue } numquotaCfgs++ } if numquotaCfgs == 0 { return true } if numquotaCfgs > 0 && numquotaCfgs != total { return false } var prev *madmin.BucketQuota for i, q := range quotaCfgs { if q == nil { return false } if i == 0 { prev = q continue } if prev.Quota != q.Quota || prev.Type != q.Type { return false } } return true } // isBktPolicyReplicated returns true if count of replicated bucket policies matches total // number of sites and bucket policies are identical. func isBktPolicyReplicated(total int, policies []*bktpolicy.Policy) bool { numPolicies := 0 for _, p := range policies { if p == nil { continue } numPolicies++ } if numPolicies > 0 && numPolicies != total { return false } // check if policies match between sites var prev *bktpolicy.Policy for i, p := range policies { if p == nil { continue } if i == 0 { prev = p continue } if !prev.Equals(*p) { return false } } return true } // isBktReplCfgReplicated returns true if all the sites have same number // of replication rules with all replication features enabled. func isBktReplCfgReplicated(total int, cfgs []*sreplication.Config) bool { cntReplicated := 0 for _, c := range cfgs { if c == nil { continue } cntReplicated++ } if cntReplicated > 0 && cntReplicated != total { return false } // check if policies match between sites var prev *sreplication.Config for i, c := range cfgs { if c == nil { continue } if i == 0 { prev = c continue } if len(prev.Rules) != len(c.Rules) { return false } if len(c.Rules) != total-1 { return false } for _, r := range c.Rules { if !strings.HasPrefix(r.ID, "site-repl-") { return false } if r.DeleteMarkerReplication.Status == sreplication.Disabled || r.DeleteReplication.Status == sreplication.Disabled || r.ExistingObjectReplication.Status == sreplication.Disabled || r.SourceSelectionCriteria.ReplicaModifications.Status == sreplication.Disabled { return false } } } return true } // cache of IAM info fetched in last SiteReplicationMetaInfo call type srIAMCache struct { sync.RWMutex lastUpdate time.Time srIAMInfo madmin.SRInfo // caches IAM info } func (c *SiteReplicationSys) getSRCachedIAMInfo() (info madmin.SRInfo, ok bool) { c.iamMetaCache.RLock() defer c.iamMetaCache.RUnlock() if c.iamMetaCache.lastUpdate.IsZero() { return info, false } if time.Since(c.iamMetaCache.lastUpdate) < siteHealTimeInterval { return c.iamMetaCache.srIAMInfo, true } return info, false } func (c *SiteReplicationSys) srCacheIAMInfo(info madmin.SRInfo) { c.iamMetaCache.Lock() defer c.iamMetaCache.Unlock() c.iamMetaCache.srIAMInfo = info c.iamMetaCache.lastUpdate = time.Now() } // SiteReplicationMetaInfo returns the metadata info on buckets, policies etc for the replicated site func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info madmin.SRInfo, err error) { if objAPI == nil { return info, errSRObjectLayerNotReady } c.RLock() defer c.RUnlock() if !c.enabled { return info, nil } info.DeploymentID = globalDeploymentID if opts.Buckets || opts.Entity == madmin.SRBucketEntity { var ( buckets []BucketInfo err error ) if opts.Entity == madmin.SRBucketEntity { bi, err := objAPI.GetBucketInfo(ctx, opts.EntityValue, BucketOptions{Deleted: opts.ShowDeleted}) if err != nil { if isErrBucketNotFound(err) { return info, nil } return info, errSRBackendIssue(err) } buckets = append(buckets, bi) } else { buckets, err = objAPI.ListBuckets(ctx, BucketOptions{Deleted: opts.ShowDeleted}) if err != nil { return info, errSRBackendIssue(err) } } info.Buckets = make(map[string]madmin.SRBucketInfo, len(buckets)) for _, bucketInfo := range buckets { bucket := bucketInfo.Name bucketExists := bucketInfo.Deleted.IsZero() || (!bucketInfo.Created.IsZero() && bucketInfo.Created.After(bucketInfo.Deleted)) bms := madmin.SRBucketInfo{ Bucket: bucket, CreatedAt: bucketInfo.Created.UTC(), DeletedAt: bucketInfo.Deleted.UTC(), } if !bucketExists { info.Buckets[bucket] = bms continue } meta, err := globalBucketMetadataSys.GetConfigFromDisk(ctx, bucket) if err != nil && !errors.Is(err, errConfigNotFound) { return info, errSRBackendIssue(err) } bms.Policy = meta.PolicyConfigJSON bms.PolicyUpdatedAt = meta.PolicyConfigUpdatedAt if len(meta.TaggingConfigXML) > 0 { tagCfgStr := base64.StdEncoding.EncodeToString(meta.TaggingConfigXML) bms.Tags = &tagCfgStr bms.TagConfigUpdatedAt = meta.TaggingConfigUpdatedAt } if len(meta.VersioningConfigXML) > 0 { versioningCfgStr := base64.StdEncoding.EncodeToString(meta.VersioningConfigXML) bms.Versioning = &versioningCfgStr bms.VersioningConfigUpdatedAt = meta.VersioningConfigUpdatedAt } if len(meta.ObjectLockConfigXML) > 0 { objLockStr := base64.StdEncoding.EncodeToString(meta.ObjectLockConfigXML) bms.ObjectLockConfig = &objLockStr bms.ObjectLockConfigUpdatedAt = meta.ObjectLockConfigUpdatedAt } if len(meta.QuotaConfigJSON) > 0 { quotaConfigStr := base64.StdEncoding.EncodeToString(meta.QuotaConfigJSON) bms.QuotaConfig = "aConfigStr bms.QuotaConfigUpdatedAt = meta.QuotaConfigUpdatedAt } if len(meta.EncryptionConfigXML) > 0 { sseConfigStr := base64.StdEncoding.EncodeToString(meta.EncryptionConfigXML) bms.SSEConfig = &sseConfigStr bms.SSEConfigUpdatedAt = meta.EncryptionConfigUpdatedAt } if len(meta.ReplicationConfigXML) > 0 { rcfgXMLStr := base64.StdEncoding.EncodeToString(meta.ReplicationConfigXML) bms.ReplicationConfig = &rcfgXMLStr bms.ReplicationConfigUpdatedAt = meta.ReplicationConfigUpdatedAt } info.Buckets[bucket] = bms } } if opts.Users && opts.Groups && opts.Policies && !opts.Buckets { // serialize SiteReplicationMetaInfo calls - if data in cache is within // healing interval, avoid fetching IAM data again from disk. if metaInfo, ok := c.getSRCachedIAMInfo(); ok { return metaInfo, nil } } if opts.Policies || opts.Entity == madmin.SRPolicyEntity { var allPolicies map[string]PolicyDoc if opts.Entity == madmin.SRPolicyEntity { if p, err := globalIAMSys.store.GetPolicyDoc(opts.EntityValue); err == nil { allPolicies = map[string]PolicyDoc{opts.EntityValue: p} } } else { // Replicate IAM policies on local to all peers. allPolicies, err = globalIAMSys.store.listPolicyDocs(ctx, "") if err != nil { return info, errSRBackendIssue(err) } } info.Policies = make(map[string]madmin.SRIAMPolicy, len(allPolicies)) for pname, policyDoc := range allPolicies { policyJSON, err := json.Marshal(policyDoc.Policy) if err != nil { return info, wrapSRErr(err) } info.Policies[pname] = madmin.SRIAMPolicy{Policy: json.RawMessage(policyJSON), UpdatedAt: policyDoc.UpdateDate} } } if opts.Users || opts.Entity == madmin.SRUserEntity { // Replicate policy mappings on local to all peers. userPolicyMap := make(map[string]MappedPolicy) if opts.Entity == madmin.SRUserEntity { if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, false); ok { userPolicyMap[opts.EntityValue] = mp } } else { stsErr := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, false, userPolicyMap) if stsErr != nil { return info, errSRBackendIssue(stsErr) } usrErr := globalIAMSys.store.loadMappedPolicies(ctx, regUser, false, userPolicyMap) if usrErr != nil { return info, errSRBackendIssue(usrErr) } svcErr := globalIAMSys.store.loadMappedPolicies(ctx, svcUser, false, userPolicyMap) if svcErr != nil { return info, errSRBackendIssue(svcErr) } } info.UserPolicies = make(map[string]madmin.SRPolicyMapping, len(userPolicyMap)) for user, mp := range userPolicyMap { info.UserPolicies[user] = madmin.SRPolicyMapping{ IsGroup: false, UserOrGroup: user, Policy: mp.Policies, UpdatedAt: mp.UpdatedAt, } } info.UserInfoMap = make(map[string]madmin.UserInfo) if opts.Entity == madmin.SRUserEntity { if ui, err := globalIAMSys.GetUserInfo(ctx, opts.EntityValue); err == nil { info.UserInfoMap[opts.EntityValue] = ui } } else { userAccounts := make(map[string]UserIdentity) uerr := globalIAMSys.store.loadUsers(ctx, regUser, userAccounts) if uerr != nil { return info, errSRBackendIssue(uerr) } serr := globalIAMSys.store.loadUsers(ctx, svcUser, userAccounts) if serr != nil { return info, errSRBackendIssue(serr) } terr := globalIAMSys.store.loadUsers(ctx, stsUser, userAccounts) if terr != nil { return info, errSRBackendIssue(terr) } for k, v := range userAccounts { if k == siteReplicatorSvcAcc { // skip the site replicate svc account as it is // already replicated. continue } if v.Credentials.ParentUser != "" && v.Credentials.ParentUser == globalActiveCred.AccessKey { // skip all root user service accounts. continue } info.UserInfoMap[k] = madmin.UserInfo{ Status: madmin.AccountStatus(v.Credentials.Status), } } } } if opts.Groups || opts.Entity == madmin.SRGroupEntity { // Replicate policy mappings on local to all peers. groupPolicyMap := make(map[string]MappedPolicy) if opts.Entity == madmin.SRGroupEntity { if mp, ok := globalIAMSys.store.GetMappedPolicy(opts.EntityValue, true); ok { groupPolicyMap[opts.EntityValue] = mp } } else { stsErr := globalIAMSys.store.loadMappedPolicies(ctx, stsUser, true, groupPolicyMap) if stsErr != nil { return info, errSRBackendIssue(stsErr) } userErr := globalIAMSys.store.loadMappedPolicies(ctx, regUser, true, groupPolicyMap) if userErr != nil { return info, errSRBackendIssue(userErr) } } info.GroupPolicies = make(map[string]madmin.SRPolicyMapping, len(c.state.Peers)) for group, mp := range groupPolicyMap { info.GroupPolicies[group] = madmin.SRPolicyMapping{ IsGroup: true, UserOrGroup: group, Policy: mp.Policies, UpdatedAt: mp.UpdatedAt, } } info.GroupDescMap = make(map[string]madmin.GroupDesc) if opts.Entity == madmin.SRGroupEntity { if gd, err := globalIAMSys.GetGroupDescription(opts.EntityValue); err == nil { info.GroupDescMap[opts.EntityValue] = gd } } else { // get users/group info on local. groups, errG := globalIAMSys.store.listGroups(ctx) if errG != nil { return info, errSRBackendIssue(errG) } groupDescMap := make(map[string]madmin.GroupDesc, len(groups)) for _, g := range groups { groupDescMap[g], errG = globalIAMSys.GetGroupDescription(g) if errG != nil { return info, errSRBackendIssue(errG) } } for group, d := range groupDescMap { info.GroupDescMap[group] = d } } } // cache SR metadata info for IAM if opts.Users && opts.Groups && opts.Policies && !opts.Buckets { c.srCacheIAMInfo(info) } return info, nil } // EditPeerCluster - edits replication configuration and updates peer endpoint. func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.PeerInfo) (madmin.ReplicateEditStatus, error) { sites, err := c.GetClusterInfo(ctx) if err != nil { return madmin.ReplicateEditStatus{}, errSRBackendIssue(err) } if !sites.Enabled { return madmin.ReplicateEditStatus{}, errSRNotEnabled } var ( found bool admClient *madmin.AdminClient ) if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("A peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID)) } for _, v := range sites.Sites { if peer.DeploymentID == v.DeploymentID { found = true if !peer.SyncState.Empty() && peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state is being updated break } if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID)) } admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) if err != nil { return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) } // check if endpoint is reachable info, err := admClient.ServerInfo(ctx) if err != nil { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s not reachable: %w", peer.Endpoint, err)) } if info.DeploymentID != v.DeploymentID { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s does not belong to deployment expected: %s (found %s) ", v.Endpoint, v.DeploymentID, info.DeploymentID)) } } } if !found { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID)) } pi := c.state.Peers[peer.DeploymentID] prevPeerInfo := pi if !peer.SyncState.Empty() { // update replication to peer to be sync/async pi.SyncState = peer.SyncState c.state.Peers[peer.DeploymentID] = pi } if peer.Endpoint != "" { // `admin replicate update` requested an endpoint change pi.Endpoint = peer.Endpoint } if admClient != nil { errs := make(map[string]error, len(c.state.Peers)) var wg sync.WaitGroup for i, v := range sites.Sites { if v.DeploymentID == globalDeploymentID { c.state.Peers[peer.DeploymentID] = pi continue } wg.Add(1) go func(pi madmin.PeerInfo, i int) { defer wg.Done() v := sites.Sites[i] admClient, err := c.getAdminClient(ctx, v.DeploymentID) if v.DeploymentID == peer.DeploymentID { admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) } if err != nil { errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) return } if err = admClient.SRPeerEdit(ctx, pi); err != nil { errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err)) return } }(pi, i) } wg.Wait() for dID, err := range errs { if err != nil { return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err)) } } } // we can now save the cluster replication configuration state. if err = c.saveToDisk(ctx, c.state); err != nil { return madmin.ReplicateEditStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), }, nil } if err = c.updateTargetEndpoints(ctx, prevPeerInfo, pi); err != nil { return madmin.ReplicateEditStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: fmt.Sprintf("unable to update peer targets on local: %v", err), }, nil } result := madmin.ReplicateEditStatus{ Success: true, Status: fmt.Sprintf("Cluster replication configuration updated with endpoint %s for peer %s successfully", peer.Endpoint, peer.Name), } return result, nil } func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo, peer madmin.PeerInfo) error { objAPI := newObjectLayerFn() if objAPI == nil { return errSRObjectLayerNotReady } buckets, err := objAPI.ListBuckets(ctx, BucketOptions{}) if err != nil { return err } for _, bucketInfo := range buckets { bucket := bucketInfo.Name ep, _ := url.Parse(peer.Endpoint) prevEp, _ := url.Parse(prevInfo.Endpoint) targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { continue // site healing will take care of configuring new targets } for _, target := range targets.Targets { if target.SourceBucket == bucket && target.TargetBucket == bucket && target.Endpoint == prevEp.Host && target.Secure == (prevEp.Scheme == "https") && target.Type == madmin.ReplicationService { bucketTarget := target bucketTarget.Secure = ep.Scheme == "https" bucketTarget.Endpoint = ep.Host if !peer.SyncState.Empty() { bucketTarget.ReplicationSync = (peer.SyncState == madmin.SyncEnabled) } err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, true) if err != nil { logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err)) continue } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { logger.LogIf(ctx, err) continue } tgtBytes, err := json.Marshal(&targets) if err != nil { logger.LogIf(ctx, err) continue } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { logger.LogIf(ctx, err) continue } } } } return nil } // PeerEditReq - internal API handler to respond to a peer cluster's request // to edit endpoint. func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error { ourName := "" for i := range c.state.Peers { p := c.state.Peers[i] if p.DeploymentID == arg.DeploymentID { p.Endpoint = arg.Endpoint c.state.Peers[arg.DeploymentID] = p } if p.DeploymentID == globalDeploymentID { ourName = p.Name } } if err := c.saveToDisk(ctx, c.state); err != nil { return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to drive on %s: %v", ourName, err)) } return nil } const siteHealTimeInterval = 30 * time.Second func (c *SiteReplicationSys) startHealRoutine(ctx context.Context, objAPI ObjectLayer) { ctx, cancel := globalLeaderLock.GetLock(ctx) defer cancel() healTimer := time.NewTimer(siteHealTimeInterval) defer healTimer.Stop() var maxRefreshDurationSecondsForLog float64 = 10 // 10 seconds.. for { select { case <-healTimer.C: c.RLock() enabled := c.enabled c.RUnlock() if enabled { refreshStart := time.Now() c.healIAMSystem(ctx, objAPI) // heal IAM system first c.healBuckets(ctx, objAPI) // heal buckets subsequently took := time.Since(refreshStart).Seconds() if took > maxRefreshDurationSecondsForLog { // Log if we took a lot of time. logger.Info("Site replication healing refresh took %.2fs", took) } // wait for 200 millisecond, if we are experience lot of I/O waitForLowIO(runtime.GOMAXPROCS(0), 200*time.Millisecond, currentHTTPIO) } healTimer.Reset(siteHealTimeInterval) case <-ctx.Done(): return } } } type srBucketStatsSummary struct { madmin.SRBucketStatsSummary meta srBucketMetaInfo } type srPolicyStatsSummary struct { madmin.SRPolicyStatsSummary policy srPolicy } type srUserStatsSummary struct { madmin.SRUserStatsSummary userInfo srUserInfo userPolicy srPolicyMapping } type srGroupStatsSummary struct { madmin.SRGroupStatsSummary groupDesc srGroupDesc groupPolicy srPolicyMapping } type srStatusInfo struct { // SRStatusInfo returns detailed status on site replication status Enabled bool MaxBuckets int // maximum buckets seen across sites MaxUsers int // maximum users seen across sites MaxGroups int // maximum groups seen across sites MaxPolicies int // maximum policies across sites Sites map[string]madmin.PeerInfo // deployment->sitename StatsSummary map[string]madmin.SRSiteSummary // map of deployment id -> site stat // BucketStats map of bucket to slice of deployment IDs with stats. This is populated only if there are // mismatches or if a specific bucket's stats are requested BucketStats map[string]map[string]srBucketStatsSummary // PolicyStats map of policy to slice of deployment IDs with stats. This is populated only if there are // mismatches or if a specific bucket's stats are requested PolicyStats map[string]map[string]srPolicyStatsSummary // UserStats map of user to slice of deployment IDs with stats. This is populated only if there are // mismatches or if a specific bucket's stats are requested UserStats map[string]map[string]srUserStatsSummary // GroupStats map of group to slice of deployment IDs with stats. This is populated only if there are // mismatches or if a specific bucket's stats are requested GroupStats map[string]map[string]srGroupStatsSummary } // SRBucketDeleteOp - type of delete op type SRBucketDeleteOp string const ( // MarkDelete creates .minio.sys/buckets/.deleted/ vol entry to hold onto deleted bucket's state // until peers are synced in site replication setup. MarkDelete SRBucketDeleteOp = "MarkDelete" // Purge deletes the .minio.sys/buckets/.deleted/ vol entry Purge SRBucketDeleteOp = "Purge" // NoOp no action needed NoOp SRBucketDeleteOp = "NoOp" ) // Empty returns true if this Op is not set func (s SRBucketDeleteOp) Empty() bool { return string(s) == "" || string(s) == string(NoOp) } func getSRBucketDeleteOp(isSiteReplicated bool) SRBucketDeleteOp { if !isSiteReplicated { return NoOp } return MarkDelete } func (c *SiteReplicationSys) healBuckets(ctx context.Context, objAPI ObjectLayer) error { buckets, err := c.listBuckets(ctx) if err != nil { return err } for _, bi := range buckets { bucket := bi.Name info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{ Entity: madmin.SRBucketEntity, EntityValue: bucket, ShowDeleted: true, }) if err != nil { logger.LogIf(ctx, err) continue } c.healBucket(ctx, objAPI, bucket, info) if bi.Deleted.IsZero() || (!bi.Created.IsZero() && bi.Deleted.Before(bi.Created)) { c.healVersioningMetadata(ctx, objAPI, bucket, info) c.healOLockConfigMetadata(ctx, objAPI, bucket, info) c.healSSEMetadata(ctx, objAPI, bucket, info) c.healBucketReplicationConfig(ctx, objAPI, bucket, info) c.healBucketPolicies(ctx, objAPI, bucket, info) c.healTagMetadata(ctx, objAPI, bucket, info) c.healBucketQuotaConfig(ctx, objAPI, bucket, info) } // Notification and ILM are site specific settings. } return nil } func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestTaggingConfig *string ) for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.TagConfigUpdatedAt latestID = dID latestTaggingConfig = ss.meta.Tags } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.TagConfigUpdatedAt) { continue } if ss.meta.TagConfigUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.TagConfigUpdatedAt latestID = dID latestTaggingConfig = ss.meta.Tags } } latestPeerName = info.Sites[latestID].Name var latestTaggingConfigBytes []byte var err error if latestTaggingConfig != nil { latestTaggingConfigBytes, err = base64.StdEncoding.DecodeString(*latestTaggingConfig) if err != nil { return err } } for dID, bStatus := range bs { if !bStatus.TagMismatch { continue } if isBucketMetadataEqual(latestTaggingConfig, bStatus.meta.Tags) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, latestTaggingConfigBytes); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal tagging metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeTags, Bucket: bucket, Tags: latestTaggingConfig, }) if err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestIAMPolicy json.RawMessage ) for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.PolicyUpdatedAt latestID = dID latestIAMPolicy = ss.meta.Policy } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.PolicyUpdatedAt) { continue } if ss.meta.PolicyUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.PolicyUpdatedAt latestID = dID latestIAMPolicy = ss.meta.Policy } } latestPeerName = info.Sites[latestID].Name for dID, bStatus := range bs { if !bStatus.PolicyMismatch { continue } if strings.EqualFold(string(latestIAMPolicy), string(bStatus.meta.Policy)) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, latestIAMPolicy); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal bucket policy metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypePolicy, Bucket: bucket, Policy: latestIAMPolicy, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal bucket policy metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestQuotaConfig *string latestQuotaConfigBytes []byte ) for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.QuotaConfigUpdatedAt latestID = dID latestQuotaConfig = ss.meta.QuotaConfig } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.QuotaConfigUpdatedAt) { continue } if ss.meta.QuotaConfigUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.QuotaConfigUpdatedAt latestID = dID latestQuotaConfig = ss.meta.QuotaConfig } } var err error if latestQuotaConfig != nil { latestQuotaConfigBytes, err = base64.StdEncoding.DecodeString(*latestQuotaConfig) if err != nil { return err } } latestPeerName = info.Sites[latestID].Name for dID, bStatus := range bs { if !bStatus.QuotaCfgMismatch { continue } if isBucketMetadataEqual(latestQuotaConfig, bStatus.meta.QuotaConfig) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal quota metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name if err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeQuotaConfig, Bucket: bucket, Quota: latestQuotaConfigBytes, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal quota config metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestVersioningConfig *string ) bs := info.BucketStats[bucket] for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.VersioningConfigUpdatedAt latestID = dID latestVersioningConfig = ss.meta.Versioning } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.VersioningConfigUpdatedAt) { continue } if ss.meta.VersioningConfigUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.VersioningConfigUpdatedAt latestID = dID latestVersioningConfig = ss.meta.Versioning } } latestPeerName = info.Sites[latestID].Name var latestVersioningConfigBytes []byte var err error if latestVersioningConfig != nil { latestVersioningConfigBytes, err = base64.StdEncoding.DecodeString(*latestVersioningConfig) if err != nil { return err } } for dID, bStatus := range bs { if !bStatus.VersioningConfigMismatch { continue } if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, latestVersioningConfigBytes); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal versioning metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeVersionConfig, Bucket: bucket, Versioning: latestVersioningConfig, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal versioning config metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestSSEConfig *string ) bs := info.BucketStats[bucket] for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.SSEConfigUpdatedAt latestID = dID latestSSEConfig = ss.meta.SSEConfig } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.SSEConfigUpdatedAt) { continue } if ss.meta.SSEConfigUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.SSEConfigUpdatedAt latestID = dID latestSSEConfig = ss.meta.SSEConfig } } latestPeerName = info.Sites[latestID].Name var latestSSEConfigBytes []byte var err error if latestSSEConfig != nil { latestSSEConfigBytes, err = base64.StdEncoding.DecodeString(*latestSSEConfig) if err != nil { return err } } for dID, bStatus := range bs { if !bStatus.SSEConfigMismatch { continue } if isBucketMetadataEqual(latestSSEConfig, bStatus.meta.SSEConfig) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestSSEConfigBytes); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal sse metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeSSEConfig, Bucket: bucket, SSEConfig: latestSSEConfig, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal SSE config metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestObjLockConfig *string ) for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = ss.meta.ObjectLockConfigUpdatedAt latestID = dID latestObjLockConfig = ss.meta.ObjectLockConfig } // avoid considering just created buckets as latest. Perhaps this site // just joined cluster replication and yet to be sync'd if ss.meta.CreatedAt.Equal(ss.meta.ObjectLockConfigUpdatedAt) { continue } if ss.meta.ObjectLockConfig != nil && ss.meta.ObjectLockConfigUpdatedAt.After(lastUpdate) { lastUpdate = ss.meta.ObjectLockConfigUpdatedAt latestID = dID latestObjLockConfig = ss.meta.ObjectLockConfig } } latestPeerName = info.Sites[latestID].Name var latestObjLockConfigBytes []byte var err error if latestObjLockConfig != nil { latestObjLockConfigBytes, err = base64.StdEncoding.DecodeString(*latestObjLockConfig) if err != nil { return err } } for dID, bStatus := range bs { if !bStatus.OLockConfigMismatch { continue } if isBucketMetadataEqual(latestObjLockConfig, bStatus.meta.ObjectLockConfig) { continue } if dID == globalDeploymentID { if _, err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, latestObjLockConfigBytes); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal objectlock config metadata from peer site %s : %w", latestPeerName, err)) } continue } admClient, err := c.getAdminClient(ctx, dID) if err != nil { return wrapSRErr(err) } peerName := info.Sites[dID].Name err = admClient.SRPeerReplicateBucketMeta(ctx, madmin.SRBucketMeta{ Type: madmin.SRBucketMetaTypeObjectLockConfig, Bucket: bucket, Tags: latestObjLockConfig, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, fmt.Errorf("Unable to heal object lock config metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil } func (c *SiteReplicationSys) purgeDeletedBucket(ctx context.Context, objAPI ObjectLayer, bucket string) { z, ok := objAPI.(*erasureServerPools) if !ok { return } z.s3Peer.DeleteBucket(context.Background(), pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket), DeleteBucketOptions{}) } // healBucket creates/deletes the bucket according to latest state across clusters participating in site replication. func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } numSites := len(c.state.Peers) mostRecent := func(d1, d2 time.Time) time.Time { if d1.IsZero() { return d2 } if d2.IsZero() { return d1 } if d1.After(d2) { return d1 } return d2 } var ( latestID string lastUpdate time.Time withB []string missingB []string deletedCnt int ) for dID, ss := range bs { if lastUpdate.IsZero() { lastUpdate = mostRecent(ss.meta.CreatedAt, ss.meta.DeletedAt) latestID = dID } recentUpdt := mostRecent(ss.meta.CreatedAt, ss.meta.DeletedAt) if recentUpdt.After(lastUpdate) { lastUpdate = recentUpdt latestID = dID } if ss.BucketMarkedDeleted { deletedCnt++ } if ss.HasBucket { withB = append(withB, dID) } else { missingB = append(missingB, dID) } } latestPeerName := info.Sites[latestID].Name bStatus := info.BucketStats[bucket][latestID].meta isMakeBucket := len(missingB) > 0 deleteOp := NoOp if latestID != globalDeploymentID { return nil } if lastUpdate.Equal(bStatus.DeletedAt) { isMakeBucket = false switch { case len(withB) == numSites && deletedCnt == numSites: deleteOp = NoOp case len(withB) == 0 && len(missingB) == numSites: deleteOp = Purge default: deleteOp = MarkDelete } } if isMakeBucket { var opts MakeBucketOptions optsMap := make(map[string]string) optsMap["versioningEnabled"] = "true" opts.VersioningEnabled = true opts.CreatedAt = bStatus.CreatedAt optsMap["createdAt"] = bStatus.CreatedAt.UTC().Format(time.RFC3339Nano) if bStatus.ObjectLockConfig != nil { config, err := base64.StdEncoding.DecodeString(*bStatus.ObjectLockConfig) if err != nil { return err } if bytes.Equal([]byte(string(config)), enabledBucketObjectLockConfig) { optsMap["lockEnabled"] = "true" opts.LockEnabled = true } } for _, dID := range missingB { peerName := info.Sites[dID].Name if dID == globalDeploymentID { err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts) if err != nil { return c.annotateErr(makeBucketWithVersion, fmt.Errorf("error healing bucket for site replication %w from %s -> %s", err, latestPeerName, peerName)) } } else { admClient, err := c.getAdminClient(ctx, dID) if err != nil { return c.annotateErr(configureReplication, fmt.Errorf("unable to use admin client for %s: %w", dID, err)) } if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap); err != nil { return c.annotatePeerErr(peerName, makeBucketWithVersion, err) } if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil); err != nil { return c.annotatePeerErr(peerName, configureReplication, err) } } } if len(missingB) > 0 { // configure replication from current cluster to other clusters err := c.PeerBucketConfigureReplHandler(ctx, bucket) if err != nil { return c.annotateErr(configureReplication, err) } } return nil } // all buckets are marked deleted across sites at this point. It should be safe to purge the .minio.sys/buckets/.deleted/ entry // from disk if deleteOp == Purge { for _, dID := range missingB { peerName := info.Sites[dID].Name if dID == globalDeploymentID { c.purgeDeletedBucket(ctx, objAPI, bucket) } else { admClient, err := c.getAdminClient(ctx, dID) if err != nil { return c.annotateErr(configureReplication, fmt.Errorf("unable to use admin client for %s: %w", dID, err)) } if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.PurgeDeletedBucketOp, nil); err != nil { return c.annotatePeerErr(peerName, deleteBucket, err) } } } } // Mark buckets deleted on remaining peers if deleteOp == MarkDelete { for _, dID := range withB { peerName := info.Sites[dID].Name if dID == globalDeploymentID { err := c.PeerBucketDeleteHandler(ctx, bucket, DeleteBucketOptions{ Force: true, }) if err != nil { return c.annotateErr(deleteBucket, fmt.Errorf("error healing bucket for site replication %w from %s -> %s", err, latestPeerName, peerName)) } } else { admClient, err := c.getAdminClient(ctx, dID) if err != nil { return c.annotateErr(configureReplication, fmt.Errorf("unable to use admin client for %s: %w", dID, err)) } if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ForceDeleteBucketBktOp, nil); err != nil { return c.annotatePeerErr(peerName, deleteBucket, err) } } } } return nil } func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, objAPI ObjectLayer, bucket string, info srStatusInfo) error { bs := info.BucketStats[bucket] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var replMismatch bool for _, ss := range bs { if ss.ReplicationCfgMismatch { replMismatch = true break } } rcfg, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) if err != nil { _, ok := err.(BucketReplicationConfigNotFound) if !ok { return err } replMismatch = true } var ( epDeplIDMap = make(map[string]string) arnTgtMap = make(map[string]madmin.BucketTarget) ) if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil { for _, t := range targetsPtr.Targets { arnTgtMap[t.Arn] = t } } for _, p := range c.state.Peers { epDeplIDMap[p.Endpoint] = p.DeploymentID } // fix stale ARN's in replication config and endpoint mismatch between site config and // targets associated to this config. if rcfg != nil { for _, rule := range rcfg.Rules { if rule.Status != sreplication.Status(replication.Disabled) { tgt, isValidARN := arnTgtMap[rule.Destination.ARN] // detect stale ARN in replication config _, epFound := epDeplIDMap[tgt.URL().String()] // detect end point change at site level if !isValidARN || !epFound { replMismatch = true break } } } } if rcfg != nil && !replMismatch { // validate remote targets on current cluster for this bucket _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false) if apiErr != noError { replMismatch = true } } if replMismatch { logger.LogIf(ctx, c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket))) } return nil } func isBucketMetadataEqual(one, two *string) bool { switch { case one == nil && two == nil: return true case one == nil || two == nil: return false default: return strings.EqualFold(*one, *two) } } func (c *SiteReplicationSys) healIAMSystem(ctx context.Context, objAPI ObjectLayer) error { info, err := c.siteReplicationStatus(ctx, objAPI, madmin.SRStatusOptions{ Users: true, Policies: true, Groups: true, }) if err != nil { return err } for policy := range info.PolicyStats { c.healPolicies(ctx, objAPI, policy, info) } for user := range info.UserStats { c.healUsers(ctx, objAPI, user, info) } for group := range info.GroupStats { c.healGroups(ctx, objAPI, group, info) } for user := range info.UserStats { c.healUserPolicies(ctx, objAPI, user, info) } for group := range info.GroupStats { c.healGroupPolicies(ctx, objAPI, group, info) } return nil } // heal iam policies present on this site to peers, provided current cluster has the most recent update. func (c *SiteReplicationSys) healPolicies(ctx context.Context, objAPI ObjectLayer, policy string, info srStatusInfo) error { // create IAM policy on peer cluster if missing ps := info.PolicyStats[policy] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestPolicyStat srPolicyStatsSummary ) for dID, ss := range ps { if lastUpdate.IsZero() { lastUpdate = ss.policy.UpdatedAt latestID = dID latestPolicyStat = ss } if !ss.policy.UpdatedAt.IsZero() && ss.policy.UpdatedAt.After(lastUpdate) { lastUpdate = ss.policy.UpdatedAt latestID = dID latestPolicyStat = ss } } if latestID != globalDeploymentID { // heal only from the site with latest info. return nil } latestPeerName = info.Sites[latestID].Name // heal policy of peers if peer does not have it. for dID, pStatus := range ps { if dID == globalDeploymentID { continue } if !pStatus.PolicyMismatch && pStatus.HasPolicy { continue } peerName := info.Sites[dID].Name err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicy, Name: policy, Policy: latestPolicyStat.policy.Policy, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal IAM policy %s from peer site %s -> site %s : %w", policy, latestPeerName, peerName, err)) } } return nil } // heal user policy mappings present on this site to peers, provided current cluster has the most recent update. func (c *SiteReplicationSys) healUserPolicies(ctx context.Context, objAPI ObjectLayer, user string, info srStatusInfo) error { // create user policy mapping on peer cluster if missing us := info.UserStats[user] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestUserStat srUserStatsSummary ) for dID, ss := range us { if lastUpdate.IsZero() { lastUpdate = ss.userPolicy.UpdatedAt latestID = dID latestUserStat = ss } if !ss.userPolicy.UpdatedAt.IsZero() && ss.userPolicy.UpdatedAt.After(lastUpdate) { lastUpdate = ss.userPolicy.UpdatedAt latestID = dID latestUserStat = ss } } if latestID != globalDeploymentID { // heal only from the site with latest info. return nil } latestPeerName = info.Sites[latestID].Name // heal policy of peers if peer does not have it. for dID, pStatus := range us { if dID == globalDeploymentID { continue } if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping { continue } if isPolicyMappingEqual(pStatus.userPolicy, latestUserStat.userPolicy) { continue } peerName := info.Sites[dID].Name err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: user, IsGroup: false, Policy: latestUserStat.userPolicy.Policy, }, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal IAM user policy mapping for %s from peer site %s -> site %s : %w", user, latestPeerName, peerName, err)) } } return nil } // heal group policy mappings present on this site to peers, provided current cluster has the most recent update. func (c *SiteReplicationSys) healGroupPolicies(ctx context.Context, objAPI ObjectLayer, group string, info srStatusInfo) error { // create group policy mapping on peer cluster if missing gs := info.GroupStats[group] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestGroupStat srGroupStatsSummary ) for dID, ss := range gs { if lastUpdate.IsZero() { lastUpdate = ss.groupPolicy.UpdatedAt latestID = dID latestGroupStat = ss } if !ss.groupPolicy.UpdatedAt.IsZero() && ss.groupPolicy.UpdatedAt.After(lastUpdate) { lastUpdate = ss.groupPolicy.UpdatedAt latestID = dID latestGroupStat = ss } } if latestID != globalDeploymentID { // heal only from the site with latest info. return nil } latestPeerName = info.Sites[latestID].Name // heal policy of peers if peer does not have it. for dID, pStatus := range gs { if dID == globalDeploymentID { continue } if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping { continue } if isPolicyMappingEqual(pStatus.groupPolicy, latestGroupStat.groupPolicy) { continue } peerName := info.Sites[dID].Name err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemPolicyMapping, PolicyMapping: &madmin.SRPolicyMapping{ UserOrGroup: group, IsGroup: true, Policy: latestGroupStat.groupPolicy.Policy, }, UpdatedAt: lastUpdate, }) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal IAM group policy mapping for %s from peer site %s -> site %s : %w", group, latestPeerName, peerName, err)) } } return nil } // heal all users and their service accounts that are present on this site, // provided current cluster has the most recent update. func (c *SiteReplicationSys) healUsers(ctx context.Context, objAPI ObjectLayer, user string, info srStatusInfo) error { // create user if missing; fix user policy mapping if missing us := info.UserStats[user] c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestUserStat srUserStatsSummary ) for dID, ss := range us { if lastUpdate.IsZero() { lastUpdate = ss.userInfo.UserInfo.UpdatedAt latestID = dID latestUserStat = ss } if !ss.userInfo.UserInfo.UpdatedAt.IsZero() && ss.userInfo.UserInfo.UpdatedAt.After(lastUpdate) { lastUpdate = ss.userInfo.UserInfo.UpdatedAt latestID = dID latestUserStat = ss } } if latestID != globalDeploymentID { // heal only from the site with latest info. return nil } latestPeerName = info.Sites[latestID].Name for dID, uStatus := range us { if dID == globalDeploymentID { continue } if !uStatus.UserInfoMismatch { continue } if isUserInfoEqual(latestUserStat.userInfo.UserInfo, uStatus.userInfo.UserInfo) { continue } peerName := info.Sites[dID].Name u, ok := globalIAMSys.GetUser(ctx, user) if !ok { continue } creds := u.Credentials if creds.IsServiceAccount() { claims, err := globalIAMSys.GetClaimsForSvcAcc(ctx, creds.AccessKey) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) continue } _, policy, err := globalIAMSys.GetServiceAccount(ctx, creds.AccessKey) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) continue } var policyJSON []byte if policy != nil { policyJSON, err = json.Marshal(policy) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) continue } } if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemSvcAcc, SvcAccChange: &madmin.SRSvcAccChange{ Create: &madmin.SRSvcAccCreate{ Parent: creds.ParentUser, AccessKey: creds.AccessKey, SecretKey: creds.SecretKey, Groups: creds.Groups, Claims: claims, SessionPolicy: json.RawMessage(policyJSON), Status: creds.Status, Name: creds.Name, Description: creds.Description, Expiration: &creds.Expiration, }, }, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal service account %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) } continue } if creds.IsTemp() && !creds.IsExpired() { var parentPolicy string u, err := globalIAMSys.GetUserInfo(ctx, creds.ParentUser) if err != nil { // Parent may be "virtual" (for ldap, oidc, client tls auth, // custom auth plugin), so in such cases we apply no parent // policy. The session token will contain info about policy to // be applied. if !errors.Is(err, errNoSuchUser) { logger.LogIf(ctx, fmt.Errorf("Unable to heal temporary credentials %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) continue } } else { parentPolicy = u.PolicyName } // Call hook for site replication. if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemSTSAcc, STSCredential: &madmin.SRSTSCredential{ AccessKey: creds.AccessKey, SecretKey: creds.SecretKey, SessionToken: creds.SessionToken, ParentUser: creds.ParentUser, ParentPolicyMapping: parentPolicy, }, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal temporary credentials %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) } continue } if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemIAMUser, IAMUser: &madmin.SRIAMUser{ AccessKey: user, IsDeleteReq: false, UserReq: &madmin.AddOrUpdateUserReq{ SecretKey: creds.SecretKey, Status: latestUserStat.userInfo.Status, }, }, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal user %s from peer site %s -> %s : %w", user, latestPeerName, peerName, err)) } } return nil } func (c *SiteReplicationSys) healGroups(ctx context.Context, objAPI ObjectLayer, group string, info srStatusInfo) error { c.RLock() defer c.RUnlock() if !c.enabled { return nil } var ( latestID, latestPeerName string lastUpdate time.Time latestGroupStat srGroupStatsSummary ) // create group if missing; fix group policy mapping if missing gs, ok := info.GroupStats[group] if !ok { return nil } for dID, ss := range gs { if lastUpdate.IsZero() { lastUpdate = ss.groupDesc.UpdatedAt latestID = dID latestGroupStat = ss } if !ss.groupDesc.UpdatedAt.IsZero() && ss.groupDesc.UpdatedAt.After(lastUpdate) { lastUpdate = ss.groupDesc.UpdatedAt latestID = dID latestGroupStat = ss } } if latestID != globalDeploymentID { // heal only from the site with latest info. return nil } latestPeerName = info.Sites[latestID].Name for dID, gStatus := range gs { if dID == globalDeploymentID { continue } if !gStatus.GroupDescMismatch { continue } if isGroupDescEqual(latestGroupStat.groupDesc.GroupDesc, gStatus.groupDesc.GroupDesc) { continue } peerName := info.Sites[dID].Name if err := c.IAMChangeHook(ctx, madmin.SRIAMItem{ Type: madmin.SRIAMItemGroupInfo, GroupInfo: &madmin.SRGroupInfo{ UpdateReq: madmin.GroupAddRemove{ Group: group, Status: madmin.GroupStatus(latestGroupStat.groupDesc.Status), Members: latestGroupStat.groupDesc.Members, IsRemove: false, }, }, UpdatedAt: lastUpdate, }); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to heal group %s from peer site %s -> site %s : %w", group, latestPeerName, peerName, err)) } } return nil } func isGroupDescEqual(g1, g2 madmin.GroupDesc) bool { if g1.Name != g2.Name || g1.Status != g2.Status || g1.Policy != g2.Policy { return false } if len(g1.Members) != len(g2.Members) { return false } for _, v1 := range g1.Members { var found bool for _, v2 := range g2.Members { if v1 == v2 { found = true break } } if !found { return false } } return true } func isUserInfoEqual(u1, u2 madmin.UserInfo) bool { if u1.PolicyName != u2.PolicyName || u1.Status != u2.Status || u1.SecretKey != u2.SecretKey { return false } for len(u1.MemberOf) != len(u2.MemberOf) { return false } for _, v1 := range u1.MemberOf { var found bool for _, v2 := range u2.MemberOf { if v1 == v2 { found = true break } } if !found { return false } } return true } func isPolicyMappingEqual(p1, p2 srPolicyMapping) bool { return p1.Policy == p2.Policy && p1.IsGroup == p2.IsGroup && p1.UserOrGroup == p2.UserOrGroup } type srPeerInfo struct { madmin.PeerInfo EndpointURL *url.URL } // getPeerForUpload returns the site replication peer handling this upload. Defaults to local cluster otherwise func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, local bool) { ci, _ := c.GetClusterInfo(GlobalContext) if !ci.Enabled { return pi, true } for _, site := range ci.Sites { if deplID == site.DeploymentID { ep, _ := url.Parse(site.Endpoint) pi = srPeerInfo{ PeerInfo: site, EndpointURL: ep, } return pi, site.DeploymentID == globalDeploymentID } } return pi, true } // startResync initiates resync of data to peerSite specified. The overall site resync status // is maintained in .minio.sys/buckets/site-replication/resync/, while collecting // individual bucket resync status in .minio.sys/buckets//replication/resync.bin func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) { if !c.isEnabled() { return res, errSRNotEnabled } if objAPI == nil { return res, errSRObjectLayerNotReady } if peer.DeploymentID == globalDeploymentID { return res, errSRResyncToSelf } if _, ok := c.state.Peers[peer.DeploymentID]; !ok { return res, errSRPeerNotFound } rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID) if err != nil { return res, err } if rs.Status == ResyncStarted { return res, errSRResyncStarted } var buckets []BucketInfo buckets, err = objAPI.ListBuckets(ctx, BucketOptions{}) if err != nil { return res, err } rs = newSiteResyncStatus(peer.DeploymentID, buckets) defer func() { if err != nil { rs.Status = ResyncFailed saveSiteResyncMetadata(ctx, rs, objAPI) globalSiteResyncMetrics.updateState(rs) } }() globalSiteResyncMetrics.updateState(rs) if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil { return res, err } for _, bi := range buckets { bucket := bi.Name if _, err := getReplicationConfig(ctx, bucket); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, Status: ResyncFailed.String(), }) continue } // mark remote target for this deployment with the new reset id tgtArn := globalBucketTargetSys.getRemoteARNForPeer(bucket, peer) if tgtArn == "" { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: fmt.Sprintf("no valid remote target found for this peer %s (%s)", peer.Name, peer.DeploymentID), Bucket: bucket, }) continue } target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn) target.ResetBeforeDate = UTCNow() target.ResetID = rs.ResyncID if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } tgtBytes, err := json.Marshal(&targets) if err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{ bucket: bucket, arn: tgtArn, resyncID: rs.ResyncID, }); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } } res = madmin.SRResyncOpStatus{ Status: ResyncStarted.String(), OpType: "start", ResyncID: rs.ResyncID, } if len(res.Buckets) > 0 { res.ErrDetail = "partial failure in starting site resync" } return res, nil } // cancelResync stops an ongoing site level resync for the peer specified. func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) { if !c.isEnabled() { return res, errSRNotEnabled } if objAPI == nil { return res, errSRObjectLayerNotReady } if peer.DeploymentID == globalDeploymentID { return res, errSRResyncToSelf } if _, ok := c.state.Peers[peer.DeploymentID]; !ok { return res, errSRPeerNotFound } rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID) if err != nil { return res, err } switch rs.Status { case ResyncCanceled: return res, errSRResyncCanceled case ResyncCompleted, NoResync: return res, errSRNoResync } res = madmin.SRResyncOpStatus{ Status: rs.Status.String(), OpType: "cancel", ResyncID: rs.ResyncID, } switch rs.Status { case ResyncCanceled: return res, errSRResyncCanceled case ResyncCompleted, NoResync: return res, errSRNoResync } targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService)) // clear the remote target resetID set while initiating resync to stop replication for _, t := range targets { if t.ResetID == rs.ResyncID { // get tgt with credentials tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, t.SourceBucket, t.Arn) tgt.ResetID = "" bucket := t.SourceBucket if err = globalBucketTargetSys.SetTarget(ctx, bucket, &tgt, true); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } tgtBytes, err := json.Marshal(&targets) if err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ ErrDetail: err.Error(), Bucket: bucket, }) continue } // update resync state for the bucket globalReplicationPool.resyncer.Lock() m, ok := globalReplicationPool.resyncer.statusMap[bucket] if !ok { m = newBucketResyncStatus(bucket) } if st, ok := m.TargetsMap[t.Arn]; ok { st.LastUpdate = UTCNow() st.ResyncStatus = ResyncCanceled m.TargetsMap[t.Arn] = st m.LastUpdate = UTCNow() } globalReplicationPool.resyncer.statusMap[bucket] = m globalReplicationPool.resyncer.Unlock() } } rs.Status = ResyncCanceled rs.LastUpdate = UTCNow() if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil { return res, err } select { case globalReplicationPool.resyncer.resyncCancelCh <- struct{}{}: case <-ctx.Done(): } globalSiteResyncMetrics.updateState(rs) res.Status = rs.Status.String() return res, nil } const ( siteResyncMetaFormat = 1 siteResyncMetaVersionV1 = 1 siteResyncMetaVersion = siteResyncMetaVersionV1 siteResyncSaveInterval = 10 * time.Second ) func newSiteResyncStatus(dID string, buckets []BucketInfo) SiteResyncStatus { now := UTCNow() s := SiteResyncStatus{ Version: siteResyncMetaVersion, Status: ResyncStarted, DeplID: dID, TotBuckets: len(buckets), BucketStatuses: make(map[string]ResyncStatusType), } for _, bi := range buckets { s.BucketStatuses[bi.Name] = ResyncPending } s.ResyncID = mustGetUUID() s.StartTime = now s.LastUpdate = now return s } // load site resync metadata from disk func loadSiteResyncMetadata(ctx context.Context, objAPI ObjectLayer, dID string) (rs SiteResyncStatus, e error) { data, err := readConfig(GlobalContext, objAPI, getSRResyncFilePath(dID)) if err != nil { return rs, err } if len(data) == 0 { // Seems to be empty. return rs, nil } if len(data) <= 4 { return rs, fmt.Errorf("site resync: no data") } // Read resync meta header switch binary.LittleEndian.Uint16(data[0:2]) { case siteResyncMetaFormat: default: return rs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) } switch binary.LittleEndian.Uint16(data[2:4]) { case siteResyncMetaVersion: default: return rs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) } // OK, parse data. if _, err = rs.UnmarshalMsg(data[4:]); err != nil { return rs, err } switch rs.Version { case siteResyncMetaVersionV1: default: return rs, fmt.Errorf("unexpected resync meta version: %d", rs.Version) } return rs, nil } // save resync status of peer to resync/depl-id.meta func saveSiteResyncMetadata(ctx context.Context, ss SiteResyncStatus, objectAPI ObjectLayer) error { data := make([]byte, 4, ss.Msgsize()+4) // Initialize the resync meta header. binary.LittleEndian.PutUint16(data[0:2], siteResyncMetaFormat) binary.LittleEndian.PutUint16(data[2:4], siteResyncMetaVersion) buf, err := ss.MarshalMsg(data) if err != nil { return err } return saveConfig(ctx, objectAPI, getSRResyncFilePath(ss.DeplID), buf) } func getSRResyncFilePath(dID string) string { return pathJoin(siteResyncPrefix, dID+".meta") }