site replication: allow setting bandwidth default for bucket (#18062)

This can still be overridden at the bucket level
This commit is contained in:
Poorna
2023-09-25 15:50:52 -07:00
committed by GitHub
parent 6dec60b6e6
commit 50a8f13e85
3 changed files with 63 additions and 41 deletions

View File

@@ -934,12 +934,13 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
targets = targetsPtr.Targets
}
targetARN := ""
var updateTgt bool
var updateTgt, updateBW bool
var targetToUpdate madmin.BucketTarget
for _, target := range targets {
if target.Arn == ruleARN {
targetARN = ruleARN
if target.URL().String() != peer.Endpoint {
updateBW = peer.DefaultBandwidth.Limit != 0 && target.BandwidthLimit == 0
if (target.URL().String() != peer.Endpoint) || updateBW {
updateTgt = true
targetToUpdate = target
}
@@ -957,6 +958,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
if !peer.SyncState.Empty() {
targetToUpdate.ReplicationSync = (peer.SyncState == madmin.SyncEnabled)
}
if updateBW {
targetToUpdate.BandwidthLimit = int64(peer.DefaultBandwidth.Limit)
}
err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true)
if err != nil {
return c.annotatePeerErr(peer.Name, "Bucket target update error", err)
@@ -990,6 +994,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
Region: "",
ReplicationSync: peer.SyncState == madmin.SyncEnabled,
DeploymentID: d,
BandwidthLimit: int64(peer.DefaultBandwidth.Limit),
}
var exists bool // true if ARN already exists
bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID)
@@ -3606,17 +3611,17 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
admClient *madmin.AdminClient
)
if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("A peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID))
if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("a peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID))
}
for _, v := range sites.Sites {
if peer.DeploymentID == v.DeploymentID {
found = true
if !peer.SyncState.Empty() && peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state is being updated
if (!peer.SyncState.Empty() || peer.DefaultBandwidth.IsSet) && peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state/bandwidth is being updated
break
}
if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() {
if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID))
}
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
@@ -3637,59 +3642,65 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
if !found {
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID))
}
var state srState
c.RLock()
pi := c.state.Peers[peer.DeploymentID]
state = c.state
c.RUnlock()
prevPeerInfo := pi
if !peer.SyncState.Empty() { // update replication to peer to be sync/async
pi.SyncState = peer.SyncState
c.state.Peers[peer.DeploymentID] = pi
}
if peer.Endpoint != "" { // `admin replicate update` requested an endpoint change
pi.Endpoint = peer.Endpoint
}
if admClient != nil {
errs := make(map[string]error, len(c.state.Peers))
var wg sync.WaitGroup
if peer.DefaultBandwidth.IsSet {
pi.DefaultBandwidth = peer.DefaultBandwidth
pi.DefaultBandwidth.UpdatedAt = UTCNow()
}
state.Peers[peer.DeploymentID] = pi
for i, v := range sites.Sites {
if v.DeploymentID == globalDeploymentID {
c.state.Peers[peer.DeploymentID] = pi
continue
}
wg.Add(1)
go func(pi madmin.PeerInfo, i int) {
defer wg.Done()
v := sites.Sites[i]
admClient, err := c.getAdminClient(ctx, v.DeploymentID)
if v.DeploymentID == peer.DeploymentID {
admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint)
}
if err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err))
return
}
if err = admClient.SRPeerEdit(ctx, pi); err != nil {
errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err))
return
}
}(pi, i)
errs := make(map[string]error, len(state.Peers))
var wg sync.WaitGroup
for dID, v := range state.Peers {
if v.DeploymentID == globalDeploymentID {
continue
}
wg.Wait()
for dID, err := range errs {
if err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err))
wg.Add(1)
go func(pi madmin.PeerInfo, dID string) {
defer wg.Done()
admClient, err := c.getAdminClient(ctx, dID)
if dID == peer.DeploymentID {
admClient, err = c.getAdminClientWithEndpoint(ctx, dID, pi.Endpoint)
}
if err != nil {
errs[dID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", pi.Name, err))
return
}
if err = admClient.SRPeerEdit(ctx, pi); err != nil {
errs[dID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", pi.Name, err))
return
}
}(pi, dID)
}
wg.Wait()
for dID, err := range errs {
if err != nil {
return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", state.Peers[dID].Name, err))
}
}
// we can now save the cluster replication configuration state.
if err = c.saveToDisk(ctx, c.state); err != nil {
if err = c.saveToDisk(ctx, state); err != nil {
return madmin.ReplicateEditStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, nil
}
if err = c.updateTargetEndpoints(ctx, prevPeerInfo, pi); err != nil {
return madmin.ReplicateEditStatus{
Status: madmin.ReplicateAddStatusPartial,
@@ -3732,6 +3743,9 @@ func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo
bucketTarget := target
bucketTarget.Secure = ep.Scheme == "https"
bucketTarget.Endpoint = ep.Host
if peer.DefaultBandwidth.IsSet && target.BandwidthLimit == 0 {
bucketTarget.BandwidthLimit = int64(peer.DefaultBandwidth.Limit)
}
if !peer.SyncState.Empty() {
bucketTarget.ReplicationSync = (peer.SyncState == madmin.SyncEnabled)
}
@@ -3768,6 +3782,14 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf
p := c.state.Peers[i]
if p.DeploymentID == arg.DeploymentID {
p.Endpoint = arg.Endpoint
if arg.DefaultBandwidth.IsSet {
if arg.DefaultBandwidth.UpdatedAt.After(p.DefaultBandwidth.UpdatedAt) {
p.DefaultBandwidth = arg.DefaultBandwidth
}
}
if !arg.SyncState.Empty() {
p.SyncState = arg.SyncState
}
c.state.Peers[arg.DeploymentID] = p
}
if p.DeploymentID == globalDeploymentID {