diff --git a/cmd/admin-bucket-handlers.go b/cmd/admin-bucket-handlers.go index a8cc177e5..51201b8c8 100644 --- a/cmd/admin-bucket-handlers.go +++ b/cmd/admin-bucket-handlers.go @@ -199,7 +199,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http. ops = madmin.GetTargetUpdateOps(r.Form) } else { var exists bool // true if arn exists - target.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &target) + target.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &target, "") if exists && target.Arn != "" { // return pre-existing ARN data, err := json.Marshal(target.Arn) if err != nil { diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 82d323b88..6e151a9e2 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -475,7 +475,7 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T } // getRemoteARN gets existing ARN for an endpoint or generates a new one. -func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) (arn string, exists bool) { +func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget, deplID string) (arn string, exists bool) { if target == nil { return } @@ -491,7 +491,7 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar if !target.Type.IsValid() { return } - return generateARN(target), false + return generateARN(target, deplID), false } // getRemoteARNForPeer returns the remote target for a peer site in site replication @@ -511,10 +511,14 @@ func (sys *BucketTargetSys) getRemoteARNForPeer(bucket string, peer madmin.PeerI } // generate ARN that is unique to this target type -func generateARN(t *madmin.BucketTarget) string { +func generateARN(t *madmin.BucketTarget, deplID string) string { + uuid := deplID + if uuid == "" { + uuid = mustGetUUID() + } arn := madmin.ARN{ Type: t.Type, - ID: mustGetUUID(), + ID: uuid, Region: t.Region, Bucket: t.TargetBucket, } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index fe28ffaea..444ca63b4 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -827,56 +827,6 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, // The following function, creates a bucket remote and sets up a bucket // replication rule for the given peer. configurePeerFn := func(d string, peer madmin.PeerInfo) error { - ep, _ := url.Parse(peer.Endpoint) - targets := globalBucketTargetSys.ListTargets(ctx, bucket, string(madmin.ReplicationService)) - targetARN := "" - for _, target := range targets { - if target.SourceBucket == bucket && - target.TargetBucket == bucket && - target.Endpoint == ep.Host && - target.Secure == (ep.Scheme == "https") && - target.Type == madmin.ReplicationService { - targetARN = target.Arn - break - } - } - if targetARN == "" { - bucketTarget := madmin.BucketTarget{ - SourceBucket: bucket, - Endpoint: ep.Host, - Credentials: &madmin.Credentials{ - AccessKey: creds.AccessKey, - SecretKey: creds.SecretKey, - }, - TargetBucket: bucket, - Secure: ep.Scheme == "https", - API: "s3v4", - Type: madmin.ReplicationService, - Region: "", - ReplicationSync: false, - } - var exists bool // true if ARN already exists - bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget) - if !exists { // persist newly generated ARN to targets and metadata on disk - err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) - if err != nil { - return c.annotatePeerErr(peer.Name, "Bucket target creation error", err) - } - targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) - if err != nil { - return err - } - tgtBytes, err := json.Marshal(&targets) - if err != nil { - return err - } - if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { - return err - } - } - targetARN = bucketTarget.Arn - } - // Create bucket replication rule to this peer. // To add the bucket replication rule, we fetch the current @@ -911,33 +861,114 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, var ( ruleID = fmt.Sprintf("site-repl-%s", d) hasRule bool - opts = replication.Options{ - // Set the ID so we can identify the rule as being - // created for site-replication and include the - // destination cluster's deployment ID. - ID: ruleID, - - // Use a helper to generate unique priority numbers. - Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)), - - Op: replication.AddOption, - RuleStatus: "enable", - DestBucket: targetARN, - - // Replicate everything! - ReplicateDeletes: "enable", - ReplicateDeleteMarkers: "enable", - ReplicaSync: "enable", - ExistingObjectReplicate: "enable", - } ) - ruleARN := targetARN + var ruleARN string for _, r := range replicationConfig.Rules { if r.ID == ruleID { hasRule = true ruleARN = r.Destination.Bucket } } + + ep, _ := url.Parse(peer.Endpoint) + var targets []madmin.BucketTarget + if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil { + targets = targetsPtr.Targets + } + targetARN := "" + var updateTgt bool + var targetToUpdate madmin.BucketTarget + for _, target := range targets { + if target.Arn == ruleARN { + targetARN = ruleARN + if target.URL().String() != peer.Endpoint { + updateTgt = true + targetToUpdate = target + } + break + } + } + // replication config had a stale target ARN - update the endpoint + if updateTgt { + targetToUpdate.Endpoint = ep.Host + targetToUpdate.Secure = ep.Scheme == "https" + targetToUpdate.Credentials = &madmin.Credentials{ + AccessKey: creds.AccessKey, + SecretKey: creds.SecretKey, + } + err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true) + if err != nil { + return c.annotatePeerErr(peer.Name, "Bucket target update error", err) + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + return wrapSRErr(err) + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + return wrapSRErr(err) + } + if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { + return wrapSRErr(err) + } + + } + // no replication rule for this peer or target ARN missing in bucket targets + if targetARN == "" { + bucketTarget := madmin.BucketTarget{ + SourceBucket: bucket, + Endpoint: ep.Host, + Credentials: &madmin.Credentials{ + AccessKey: creds.AccessKey, + SecretKey: creds.SecretKey, + }, + TargetBucket: bucket, + Secure: ep.Scheme == "https", + API: "s3v4", + Type: madmin.ReplicationService, + Region: "", + ReplicationSync: false, + } + var exists bool // true if ARN already exists + bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID) + if !exists { // persist newly generated ARN to targets and metadata on disk + err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) + if err != nil { + return c.annotatePeerErr(peer.Name, "Bucket target creation error", err) + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + return err + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + return err + } + if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { + return err + } + } + targetARN = bucketTarget.Arn + } + opts := replication.Options{ + // Set the ID so we can identify the rule as being + // created for site-replication and include the + // destination cluster's deployment ID. + ID: ruleID, + + // Use a helper to generate unique priority numbers. + Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)), + + Op: replication.AddOption, + RuleStatus: "enable", + DestBucket: targetARN, + // Replicate everything! + ReplicateDeletes: "enable", + ReplicateDeleteMarkers: "enable", + ReplicaSync: "enable", + ExistingObjectReplicate: "enable", + } + switch { case hasRule: if ruleARN != opts.DestBucket { @@ -3441,6 +3472,7 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe var wg sync.WaitGroup pi := c.state.Peers[peer.DeploymentID] + prevPeerInfo := pi pi.Endpoint = peer.Endpoint for i, v := range sites.Sites { @@ -3480,6 +3512,12 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), }, nil } + if err = c.updateTargetEndpoints(ctx, prevPeerInfo, peer); err != nil { + return madmin.ReplicateEditStatus{ + Status: madmin.ReplicateAddStatusPartial, + ErrDetail: fmt.Sprintf("unable to update peer targets on local: %v", err), + }, nil + } result := madmin.ReplicateEditStatus{ Success: true, @@ -3488,6 +3526,59 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe return result, nil } +func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo, peer madmin.PeerInfo) error { + objAPI := newObjectLayerFn() + if objAPI == nil { + return errSRObjectLayerNotReady + } + + buckets, err := objAPI.ListBuckets(ctx, BucketOptions{}) + if err != nil { + return err + } + + for _, bucketInfo := range buckets { + bucket := bucketInfo.Name + ep, _ := url.Parse(peer.Endpoint) + prevEp, _ := url.Parse(prevInfo.Endpoint) + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + continue // site healing will take care of configuring new targets + } + for _, target := range targets.Targets { + if target.SourceBucket == bucket && + target.TargetBucket == bucket && + target.Endpoint == prevEp.Host && + target.Secure == (prevEp.Scheme == "https") && + target.Type == madmin.ReplicationService { + bucketTarget := target + bucketTarget.Secure = ep.Scheme == "https" + bucketTarget.Endpoint = ep.Host + err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, true) + if err != nil { + logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err)) + continue + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + logger.LogIf(ctx, err) + continue + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + logger.LogIf(ctx, err) + continue + } + if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { + logger.LogIf(ctx, err) + continue + } + } + } + } + return nil +} + // PeerEditReq - internal API handler to respond to a peer cluster's request // to edit endpoint. func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error { @@ -4282,7 +4373,34 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob replMismatch = true } + var ( + epDeplIDMap = make(map[string]string) + arnTgtMap = make(map[string]madmin.BucketTarget) + ) + if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil { + for _, t := range targetsPtr.Targets { + arnTgtMap[t.Arn] = t + } + } + for _, p := range c.state.Peers { + epDeplIDMap[p.Endpoint] = p.DeploymentID + } + // fix stale ARN's in replication config and endpoint mismatch between site config and + // targets associated to this config. if rcfg != nil { + for _, rule := range rcfg.Rules { + if rule.Status != sreplication.Status(replication.Disabled) { + tgt, isValidARN := arnTgtMap[rule.Destination.ARN] // detect stale ARN in replication config + _, epFound := epDeplIDMap[tgt.URL().String()] // detect end point change at site level + if !isValidARN || !epFound { + replMismatch = true + break + } + } + } + } + + if rcfg != nil && !replMismatch { // validate remote targets on current cluster for this bucket _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false) if apiErr != noError {