diff --git a/cmd/site-replication.go b/cmd/site-replication.go index ef7040c6e..9d7cc7436 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -934,12 +934,13 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, targets = targetsPtr.Targets } targetARN := "" - var updateTgt bool + var updateTgt, updateBW bool var targetToUpdate madmin.BucketTarget for _, target := range targets { if target.Arn == ruleARN { targetARN = ruleARN - if target.URL().String() != peer.Endpoint { + updateBW = peer.DefaultBandwidth.Limit != 0 && target.BandwidthLimit == 0 + if (target.URL().String() != peer.Endpoint) || updateBW { updateTgt = true targetToUpdate = target } @@ -957,6 +958,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, if !peer.SyncState.Empty() { targetToUpdate.ReplicationSync = (peer.SyncState == madmin.SyncEnabled) } + if updateBW { + targetToUpdate.BandwidthLimit = int64(peer.DefaultBandwidth.Limit) + } err := globalBucketTargetSys.SetTarget(ctx, bucket, &targetToUpdate, true) if err != nil { return c.annotatePeerErr(peer.Name, "Bucket target update error", err) @@ -990,6 +994,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, Region: "", ReplicationSync: peer.SyncState == madmin.SyncEnabled, DeploymentID: d, + BandwidthLimit: int64(peer.DefaultBandwidth.Limit), } var exists bool // true if ARN already exists bucketTarget.Arn, exists = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget, peer.DeploymentID) @@ -3606,17 +3611,17 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe admClient *madmin.AdminClient ) - if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() { - return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("A peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID)) + if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet { + return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("a peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID)) } for _, v := range sites.Sites { if peer.DeploymentID == v.DeploymentID { found = true - if !peer.SyncState.Empty() && peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state is being updated + if (!peer.SyncState.Empty() || peer.DefaultBandwidth.IsSet) && peer.Endpoint == "" { // peer.Endpoint may be "" if only sync state/bandwidth is being updated break } - if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() { + if peer.Endpoint == v.Endpoint && peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("Endpoint %s entered for deployment id %s already configured in site replication", v.Endpoint, v.DeploymentID)) } admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) @@ -3637,59 +3642,65 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe if !found { return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("%s not found in existing replicated sites", peer.DeploymentID)) } + var state srState + c.RLock() pi := c.state.Peers[peer.DeploymentID] + state = c.state + c.RUnlock() prevPeerInfo := pi if !peer.SyncState.Empty() { // update replication to peer to be sync/async pi.SyncState = peer.SyncState - c.state.Peers[peer.DeploymentID] = pi } if peer.Endpoint != "" { // `admin replicate update` requested an endpoint change pi.Endpoint = peer.Endpoint } - if admClient != nil { - errs := make(map[string]error, len(c.state.Peers)) - var wg sync.WaitGroup + if peer.DefaultBandwidth.IsSet { + pi.DefaultBandwidth = peer.DefaultBandwidth + pi.DefaultBandwidth.UpdatedAt = UTCNow() + } + state.Peers[peer.DeploymentID] = pi - for i, v := range sites.Sites { - if v.DeploymentID == globalDeploymentID { - c.state.Peers[peer.DeploymentID] = pi - continue - } - wg.Add(1) - go func(pi madmin.PeerInfo, i int) { - defer wg.Done() - v := sites.Sites[i] - admClient, err := c.getAdminClient(ctx, v.DeploymentID) - if v.DeploymentID == peer.DeploymentID { - admClient, err = c.getAdminClientWithEndpoint(ctx, v.DeploymentID, peer.Endpoint) - } - if err != nil { - errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", v.Name, err)) - return - } - if err = admClient.SRPeerEdit(ctx, pi); err != nil { - errs[v.DeploymentID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", v.Name, err)) - return - } - }(pi, i) + errs := make(map[string]error, len(state.Peers)) + var wg sync.WaitGroup + + for dID, v := range state.Peers { + if v.DeploymentID == globalDeploymentID { + continue } - - wg.Wait() - for dID, err := range errs { - if err != nil { - return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", c.state.Peers[dID].Name, err)) + wg.Add(1) + go func(pi madmin.PeerInfo, dID string) { + defer wg.Done() + admClient, err := c.getAdminClient(ctx, dID) + if dID == peer.DeploymentID { + admClient, err = c.getAdminClientWithEndpoint(ctx, dID, pi.Endpoint) } + if err != nil { + errs[dID] = errSRPeerResp(fmt.Errorf("unable to create admin client for %s: %w", pi.Name, err)) + return + } + if err = admClient.SRPeerEdit(ctx, pi); err != nil { + errs[dID] = errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", pi.Name, err)) + return + } + }(pi, dID) + } + + wg.Wait() + for dID, err := range errs { + if err != nil { + return madmin.ReplicateEditStatus{}, errSRPeerResp(fmt.Errorf("unable to update peer %s: %w", state.Peers[dID].Name, err)) } } // we can now save the cluster replication configuration state. - if err = c.saveToDisk(ctx, c.state); err != nil { + if err = c.saveToDisk(ctx, state); err != nil { return madmin.ReplicateEditStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), }, nil } + if err = c.updateTargetEndpoints(ctx, prevPeerInfo, pi); err != nil { return madmin.ReplicateEditStatus{ Status: madmin.ReplicateAddStatusPartial, @@ -3732,6 +3743,9 @@ func (c *SiteReplicationSys) updateTargetEndpoints(ctx context.Context, prevInfo bucketTarget := target bucketTarget.Secure = ep.Scheme == "https" bucketTarget.Endpoint = ep.Host + if peer.DefaultBandwidth.IsSet && target.BandwidthLimit == 0 { + bucketTarget.BandwidthLimit = int64(peer.DefaultBandwidth.Limit) + } if !peer.SyncState.Empty() { bucketTarget.ReplicationSync = (peer.SyncState == madmin.SyncEnabled) } @@ -3768,6 +3782,14 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf p := c.state.Peers[i] if p.DeploymentID == arg.DeploymentID { p.Endpoint = arg.Endpoint + if arg.DefaultBandwidth.IsSet { + if arg.DefaultBandwidth.UpdatedAt.After(p.DefaultBandwidth.UpdatedAt) { + p.DefaultBandwidth = arg.DefaultBandwidth + } + } + if !arg.SyncState.Empty() { + p.SyncState = arg.SyncState + } c.state.Peers[arg.DeploymentID] = p } if p.DeploymentID == globalDeploymentID { diff --git a/go.mod b/go.mod index 1f581ba38..186d0b9ec 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/minio/dperf v0.5.0 github.com/minio/highwayhash v1.0.2 github.com/minio/kes-go v0.2.0 - github.com/minio/madmin-go/v3 v3.0.19 + github.com/minio/madmin-go/v3 v3.0.20 github.com/minio/minio-go/v7 v7.0.63 github.com/minio/mux v1.9.0 github.com/minio/pkg/v2 v2.0.2 diff --git a/go.sum b/go.sum index 3016bcce7..65138c1ef 100644 --- a/go.sum +++ b/go.sum @@ -474,10 +474,10 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/kes-go v0.2.0 h1:HA33arq9s3MErbsj3PAXFVfFo4U4yw7lTKQ5kWFrpCA= github.com/minio/kes-go v0.2.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo= -github.com/minio/madmin-go/v3 v3.0.19 h1:iisPDalKxt0PWkIaKrcrTU5NhSrl5nxJEQzMXRFATls= -github.com/minio/madmin-go/v3 v3.0.19/go.mod h1:B2EgtEGrfWx+AkXv+OAcS6IHwoIJcd1p75QfDPSPd6Q= github.com/minio/mc v0.0.0-20230922050746-ae05d451739b h1:oZ5XAtyKMFbMbxqShGA51/YD/YaW+B2BiBQI7rN2uFc= github.com/minio/mc v0.0.0-20230922050746-ae05d451739b/go.mod h1:+4OzbgOsVsKvdmLLjN47DKKyHiLJuT05m5qF9fAQWe4= +github.com/minio/madmin-go/v3 v3.0.20 h1:Op+MzisvVUdTdWU2eH1PdaRMY8ia/hNmcWHHFQnTFoA= +github.com/minio/madmin-go/v3 v3.0.20/go.mod h1:B2EgtEGrfWx+AkXv+OAcS6IHwoIJcd1p75QfDPSPd6Q= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v6 v6.0.46/go.mod h1:qD0lajrGW49lKZLtXKtCB4X/qkMf0a5tBvN2PaZg7Gg=