fix: healing of replication config for endpoint changes (#16648)

This commit is contained in:
Poorna 2023-02-20 02:36:13 -08:00 committed by GitHub
parent 82dcfd4e10
commit 8a08861dd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 197 additions and 75 deletions

View File

@ -199,7 +199,7 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
ops = madmin.GetTargetUpdateOps(r.Form) ops = madmin.GetTargetUpdateOps(r.Form)
} else { } else {
var exists bool // true if arn exists var exists bool // true if arn exists
target.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &target) target.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &target, "")
if exists && target.Arn != "" { // return pre-existing ARN if exists && target.Arn != "" { // return pre-existing ARN
data, err := json.Marshal(target.Arn) data, err := json.Marshal(target.Arn)
if err != nil { if err != nil {

View File

@ -475,7 +475,7 @@ func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*T
} }
// getRemoteARN gets existing ARN for an endpoint or generates a new one. // getRemoteARN gets existing ARN for an endpoint or generates a new one.
func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget) (arn string, exists bool) { func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTarget, deplID string) (arn string, exists bool) {
if target == nil { if target == nil {
return return
} }
@ -491,7 +491,7 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar
if !target.Type.IsValid() { if !target.Type.IsValid() {
return return
} }
return generateARN(target), false return generateARN(target, deplID), false
} }
// getRemoteARNForPeer returns the remote target for a peer site in site replication // getRemoteARNForPeer returns the remote target for a peer site in site replication
@ -511,10 +511,14 @@ func (sys *BucketTargetSys) getRemoteARNForPeer(bucket string, peer madmin.PeerI
} }
// generate ARN that is unique to this target type // generate ARN that is unique to this target type
func generateARN(t *madmin.BucketTarget) string { func generateARN(t *madmin.BucketTarget, deplID string) string {
uuid := deplID
if uuid == "" {
uuid = mustGetUUID()
}
arn := madmin.ARN{ arn := madmin.ARN{
Type: t.Type, Type: t.Type,
ID: mustGetUUID(), ID: uuid,
Region: t.Region, Region: t.Region,
Bucket: t.TargetBucket, Bucket: t.TargetBucket,
} }

View File

@ -827,56 +827,6 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
// The following function, creates a bucket remote and sets up a bucket // The following function, creates a bucket remote and sets up a bucket
// replication rule for the given peer. // replication rule for the given peer.
configurePeerFn := func(d string, peer madmin.PeerInfo) error { configurePeerFn := func(d string, peer madmin.PeerInfo) error {
ep, _ := url.Parse(peer.Endpoint)
targets := globalBucketTargetSys.ListTargets(ctx, bucket, string(madmin.ReplicationService))
targetARN := ""
for _, target := range targets {
if target.SourceBucket == bucket &&
target.TargetBucket == bucket &&
target.Endpoint == ep.Host &&
target.Secure == (ep.Scheme == "https") &&
target.Type == madmin.ReplicationService {
targetARN = target.Arn
break
}
}
if targetARN == "" {
bucketTarget := madmin.BucketTarget{
SourceBucket: bucket,
Endpoint: ep.Host,
Credentials: &madmin.Credentials{
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
},
TargetBucket: bucket,
Secure: ep.Scheme == "https",
API: "s3v4",
Type: madmin.ReplicationService,
Region: "",
ReplicationSync: false,
}
var exists bool // true if ARN already exists
bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget)
if !exists { // persist newly generated ARN to targets and metadata on disk
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
if err != nil {
return c.annotatePeerErr(peer.Name, "Bucket target creation error", err)
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
return err
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
return err
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return err
}
}
targetARN = bucketTarget.Arn
}
// Create bucket replication rule to this peer. // Create bucket replication rule to this peer.
// To add the bucket replication rule, we fetch the current // To add the bucket replication rule, we fetch the current
@ -911,33 +861,114 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
var ( var (
ruleID = fmt.Sprintf("site-repl-%s", d) ruleID = fmt.Sprintf("site-repl-%s", d)
hasRule bool hasRule bool
opts = replication.Options{
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID: ruleID,
// Use a helper to generate unique priority numbers.
Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)),
Op: replication.AddOption,
RuleStatus: "enable",
DestBucket: targetARN,
// Replicate everything!
ReplicateDeletes: "enable",
ReplicateDeleteMarkers: "enable",
ReplicaSync: "enable",
ExistingObjectReplicate: "enable",
}
) )
ruleARN := targetARN var ruleARN string
for _, r := range replicationConfig.Rules { for _, r := range replicationConfig.Rules {
if r.ID == ruleID { if r.ID == ruleID {
hasRule = true hasRule = true
ruleARN = r.Destination.Bucket ruleARN = r.Destination.Bucket
} }
} }
ep, _ := url.Parse(peer.Endpoint)
var targets []madmin.BucketTarget
if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil {
targets = targetsPtr.Targets
}
targetARN := ""
var updateTgt bool
var targetToUpdate madmin.BucketTarget
for _, target := range targets {
if target.Arn == ruleARN {
targetARN = ruleARN
if target.URL().String() != peer.Endpoint {
updateTgt = true
targetToUpdate = target
}
break
}
}
// replication config had a stale target ARN - update the endpoint
if updateTgt {
targetToUpdate.Endpoint = ep.Host
targetToUpdate.Secure = ep.Scheme == "https"
targetToUpdate.Credentials = &madmin.Credentials{
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
}
err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true)
if err != nil {
return c.annotatePeerErr(peer.Name, "Bucket target update error", err)
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
return wrapSRErr(err)
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
return wrapSRErr(err)
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return wrapSRErr(err)
}
}
// no replication rule for this peer or target ARN missing in bucket targets
if targetARN == "" {
bucketTarget := madmin.BucketTarget{
SourceBucket: bucket,
Endpoint: ep.Host,
Credentials: &madmin.Credentials{
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
},
TargetBucket: bucket,
Secure: ep.Scheme == "https",
API: "s3v4",
Type: madmin.ReplicationService,
Region: "",
ReplicationSync: false,
}
var exists bool // true if ARN already exists
bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID)
if !exists { // persist newly generated ARN to targets and metadata on disk
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
if err != nil {
return c.annotatePeerErr(peer.Name, "Bucket target creation error", err)
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
return err
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
return err
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
return err
}
}
targetARN = bucketTarget.Arn
}
opts := replication.Options{
// Set the ID so we can identify the rule as being
// created for site-replication and include the
// destination cluster's deployment ID.
ID: ruleID,
// Use a helper to generate unique priority numbers.
Priority: fmt.Sprintf("%d", getPriorityHelper(replicationConfig)),
Op: replication.AddOption,
RuleStatus: "enable",
DestBucket: targetARN,
// Replicate everything!
ReplicateDeletes: "enable",
ReplicateDeleteMarkers: "enable",
ReplicaSync: "enable",
ExistingObjectReplicate: "enable",
}
switch { switch {
case hasRule: case hasRule:
if ruleARN != opts.DestBucket { if ruleARN != opts.DestBucket {
@ -3441,6 +3472,7 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
var wg sync.WaitGroup var wg sync.WaitGroup
pi := c.state.Peers[peer.DeploymentID] pi := c.state.Peers[peer.DeploymentID]
prevPeerInfo := pi
pi.Endpoint = peer.Endpoint pi.Endpoint = peer.Endpoint
for i, v := range sites.Sites { for i, v := range sites.Sites {
@ -3480,6 +3512,12 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil }, nil
} }
if err = c.updateTargetEndpoints(ctx, prevPeerInfo, peer); err != nil {
return madmin.ReplicateEditStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to update peer targets on local: %v", err),
}, nil
}
result := madmin.ReplicateEditStatus{ result := madmin.ReplicateEditStatus{
Success: true, Success: true,
@ -3488,6 +3526,59 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
return result, nil return result, nil
} }
func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo, peer madmin.PeerInfo) error {
objAPI := newObjectLayerFn()
if objAPI == nil {
return errSRObjectLayerNotReady
}
buckets, err := objAPI.ListBuckets(ctx, BucketOptions{})
if err != nil {
return err
}
for _, bucketInfo := range buckets {
bucket := bucketInfo.Name
ep, _ := url.Parse(peer.Endpoint)
prevEp, _ := url.Parse(prevInfo.Endpoint)
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
continue // site healing will take care of configuring new targets
}
for _, target := range targets.Targets {
if target.SourceBucket == bucket &&
target.TargetBucket == bucket &&
target.Endpoint == prevEp.Host &&
target.Secure == (prevEp.Scheme == "https") &&
target.Type == madmin.ReplicationService {
bucketTarget := target
bucketTarget.Secure = ep.Scheme == "https"
bucketTarget.Endpoint = ep.Host
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, true)
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err))
continue
}
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
continue
}
tgtBytes, err := json.Marshal(&targets)
if err != nil {
logger.LogIf(ctx, err)
continue
}
if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil {
logger.LogIf(ctx, err)
continue
}
}
}
}
return nil
}
// PeerEditReq - internal API handler to respond to a peer cluster's request // PeerEditReq - internal API handler to respond to a peer cluster's request
// to edit endpoint. // to edit endpoint.
func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error { func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInfo) error {
@ -4282,7 +4373,34 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob
replMismatch = true replMismatch = true
} }
var (
epDeplIDMap = make(map[string]string)
arnTgtMap = make(map[string]madmin.BucketTarget)
)
if targetsPtr, _ := globalBucketTargetSys.ListBucketTargets(ctx, bucket); targetsPtr != nil {
for _, t := range targetsPtr.Targets {
arnTgtMap[t.Arn] = t
}
}
for _, p := range c.state.Peers {
epDeplIDMap[p.Endpoint] = p.DeploymentID
}
// fix stale ARN's in replication config and endpoint mismatch between site config and
// targets associated to this config.
if rcfg != nil { if rcfg != nil {
for _, rule := range rcfg.Rules {
if rule.Status != sreplication.Status(replication.Disabled) {
tgt, isValidARN := arnTgtMap[rule.Destination.ARN] // detect stale ARN in replication config
_, epFound := epDeplIDMap[tgt.URL().String()] // detect end point change at site level
if !isValidARN || !epFound {
replMismatch = true
break
}
}
}
}
if rcfg != nil && !replMismatch {
// validate remote targets on current cluster for this bucket // validate remote targets on current cluster for this bucket
_, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false) _, apiErr := validateReplicationDestination(ctx, bucket, rcfg, false)
if apiErr != noError { if apiErr != noError {