Allow setting sync mode for site replication (#16876)

This commit is contained in:
Poorna
2023-03-24 14:41:23 -07:00
committed by GitHub
parent c259a8ea38
commit 74040b457b
3 changed files with 60 additions and 39 deletions

View File

@@ -896,6 +896,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
AccessKey: creds.AccessKey,
SecretKey: creds.SecretKey,
}
if !peer.SyncState.Empty() {
targetToUpdate.ReplicationSync = (peer.SyncState == madmin.SyncEnabled)
}
err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true)
if err != nil {
return c.annotatePeerErr(peer.Name, "Bucket target update error", err)
@@ -927,7 +930,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
API: "s3v4",
Type: madmin.ReplicationService,
Region: "",
ReplicationSync: false,
ReplicationSync: peer.SyncState == madmin.SyncEnabled,
}
var exists bool // true if ARN already exists
bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID)
@@ -3443,12 +3446,17 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
)
for _, v := range sites.Sites {
if peer.Endpoint == v.Endpoint {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID))
}
if peer.DeploymentID == v.DeploymentID {
found = true
if peer.Endpoint == v.Endpoint {
if !peer.SyncState.Empty() {
if globalDeploymentID == peer.DeploymentID {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("invalid sync setting for endpoint %s (deployment id %s)", v.Endpoint, v.DeploymentID))
}
if peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state is being updated
break
}
}
if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID))
}
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
@@ -3469,44 +3477,52 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
if !found {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID))
}
errs := make(map[string]error, len(c.state.Peers))
var wg sync.WaitGroup
pi := c.state.Peers[peer.DeploymentID]
prevPeerInfo := pi
pi.Endpoint = peer.Endpoint
if !peer.SyncState.Empty() { // update replication to peer to be sync/async
pi.SyncState = peer.SyncState
c.state.Peers[peer.DeploymentID] = pi
}
if peer.Endpoint != "" { // `admin replicate update` requested an endpoint change
pi.Endpoint = peer.Endpoint
}
for i, v := range sites.Sites {
if v.DeploymentID == globalDeploymentID {
c.state.Peers[peer.DeploymentID] = pi
continue
}
wg.Add(1)
go func(pi madmin.PeerInfo, i int) {
defer wg.Done()
v := sites.Sites[i]
admClient, err := c.getAdminClient(ctx, v.DeploymentID)
if v.DeploymentID == peer.DeploymentID {
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
if admClient != nil {
errs := make(map[string]error, len(c.state.Peers))
var wg sync.WaitGroup
for i, v := range sites.Sites {
if v.DeploymentID == globalDeploymentID {
c.state.Peers[peer.DeploymentID] = pi
continue
}
wg.Add(1)
go func(pi madmin.PeerInfo, i int) {
defer wg.Done()
v := sites.Sites[i]
admClient, err := c.getAdminClient(ctx, v.DeploymentID)
if v.DeploymentID == peer.DeploymentID {
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
}
if err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
return
}
if err = admClient.SRPeerEdit(ctx, pi); err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err))
return
}
}(pi, i)
}
wg.Wait()
for dID, err := range errs {
if err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
return
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err))
}
if err = admClient.SRPeerEdit(ctx, pi); err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err))
return
}
}(pi, i)
}
wg.Wait()
for dID, err := range errs {
if err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err))
}
}
// we can now save the cluster replication configuration state.
if err = c.saveToDisk(ctx, c.state); err != nil {
return madmin.ReplicateEditStatus{
@@ -3514,7 +3530,7 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil
}
if err = c.updateTargetEndpoints(ctx, prevPeerInfo, peer); err != nil {
if err = c.updateTargetEndpoints(ctx, prevPeerInfo, pi); err != nil {
return madmin.ReplicateEditStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to update peer targets on local: %v", err),
@@ -3556,6 +3572,9 @@ func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo
bucketTarget := target
bucketTarget.Secure = ep.Scheme == "https"
bucketTarget.Endpoint = ep.Host
if !peer.SyncState.Empty() {
bucketTarget.ReplicationSync = (peer.SyncState == madmin.SyncEnabled)
}
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, true)
if err != nil {
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err))