mirror of
https://github.com/minio/minio.git
synced 2025-11-10 14:09:48 -05:00
Use rate.Limiter for bandwidth monitoring (#12506)
Bonus: fixes a hang when bandwidth caps are enabled for synchronous replication
This commit is contained in:
committed by
GitHub
parent
8d1bc65757
commit
d00783c923
@@ -205,6 +205,12 @@ func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.
|
||||
}
|
||||
target = tgt
|
||||
}
|
||||
|
||||
// enforce minimum bandwidth limit as 100MBps
|
||||
if target.BandwidthLimit < 100*1000*1000 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationBandwidthLimitError, err), r.URL)
|
||||
return
|
||||
}
|
||||
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, update); err != nil {
|
||||
switch err.(type) {
|
||||
case BucketRemoteConnectionErr:
|
||||
|
||||
@@ -115,6 +115,7 @@ const (
|
||||
ErrReplicationDestinationMissingLock
|
||||
ErrRemoteTargetNotFoundError
|
||||
ErrReplicationRemoteConnectionError
|
||||
ErrReplicationBandwidthLimitError
|
||||
ErrBucketRemoteIdenticalToSource
|
||||
ErrBucketRemoteAlreadyExists
|
||||
ErrBucketRemoteLabelInUse
|
||||
@@ -860,6 +861,11 @@ var errorCodes = errorCodeMap{
|
||||
Description: "Remote service connection error - please check remote service credentials and target bucket",
|
||||
HTTPStatusCode: http.StatusNotFound,
|
||||
},
|
||||
ErrReplicationBandwidthLimitError: {
|
||||
Code: "XMinioAdminReplicationBandwidthLimitError",
|
||||
Description: "Bandwidth limit for remote target must be atleast 100MBps",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrReplicationNoMatchingRuleError: {
|
||||
Code: "XMinioReplicationNoMatchingRule",
|
||||
Description: "No matching replication rule found for this object prefix",
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -733,18 +733,6 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate metadata for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
}
|
||||
} else {
|
||||
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
|
||||
sendEvent(eventArgs{
|
||||
EventName: event.ObjectReplicationNotTracked,
|
||||
BucketName: bucket,
|
||||
Object: objInfo,
|
||||
Host: "Internal: [Replication]",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
putOpts, err := putReplicationOpts(ctx, dest, objInfo)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%w", bucket, cfg.RoleArn, err))
|
||||
@@ -756,28 +744,18 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Setup bandwidth throttling
|
||||
peers, _ := globalEndpoints.peers()
|
||||
totalNodesCount := len(peers)
|
||||
if totalNodesCount == 0 {
|
||||
totalNodesCount = 1 // For standalone erasure coding
|
||||
}
|
||||
|
||||
var headerSize int
|
||||
for k, v := range putOpts.Header() {
|
||||
headerSize += len(k) + len(v)
|
||||
}
|
||||
|
||||
opts := &bandwidth.MonitorReaderOptions{
|
||||
Bucket: objInfo.Bucket,
|
||||
Object: objInfo.Name,
|
||||
HeaderSize: headerSize,
|
||||
BandwidthBytesPerSec: target.BandwidthLimit / int64(totalNodesCount),
|
||||
ClusterBandwidth: target.BandwidthLimit,
|
||||
Bucket: objInfo.Bucket,
|
||||
HeaderSize: headerSize,
|
||||
}
|
||||
|
||||
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
|
||||
newCtx, cancel := context.WithTimeout(ctx, globalOperationTimeout.Timeout())
|
||||
defer cancel()
|
||||
r := bandwidth.NewMonitoredReader(newCtx, globalBucketMonitor, gr, opts)
|
||||
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
|
||||
replicationStatus = replication.Failed
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %s", bucket, objInfo.Name, objInfo.VersionID, err))
|
||||
|
||||
@@ -131,9 +131,6 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
if vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
if tgt.ReplicationSync && tgt.BandwidthLimit > 0 {
|
||||
return NotImplemented{Message: "Synchronous replication does not support bandwidth limits"}
|
||||
}
|
||||
}
|
||||
sys.Lock()
|
||||
defer sys.Unlock()
|
||||
@@ -159,9 +156,23 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
|
||||
sys.targetsMap[bucket] = newtgts
|
||||
sys.arnRemotesMap[tgt.Arn] = clnt
|
||||
sys.updateBandwidthLimit(bucket, tgt.BandwidthLimit)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sys *BucketTargetSys) updateBandwidthLimit(bucket string, limit int64) {
|
||||
if globalIsGateway {
|
||||
return
|
||||
}
|
||||
if limit == 0 {
|
||||
globalBucketMonitor.DeleteBucket(bucket)
|
||||
return
|
||||
}
|
||||
// Setup bandwidth throttling
|
||||
|
||||
globalBucketMonitor.SetBandwidthLimit(bucket, limit)
|
||||
}
|
||||
|
||||
// RemoveTarget - removes a remote bucket target for this source bucket.
|
||||
func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr string) error {
|
||||
if globalIsGateway {
|
||||
@@ -214,6 +225,7 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
|
||||
}
|
||||
sys.targetsMap[bucket] = targets
|
||||
delete(sys.arnRemotesMap, arnStr)
|
||||
sys.updateBandwidthLimit(bucket, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -278,6 +290,7 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
|
||||
}
|
||||
}
|
||||
delete(sys.targetsMap, bucket)
|
||||
sys.updateBandwidthLimit(bucket, 0)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -290,6 +303,7 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
|
||||
continue
|
||||
}
|
||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
||||
sys.updateBandwidthLimit(bucket, tgt.BandwidthLimit)
|
||||
}
|
||||
sys.targetsMap[bucket] = tgts.Targets
|
||||
}
|
||||
@@ -315,6 +329,7 @@ func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objA
|
||||
continue
|
||||
}
|
||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
||||
sys.updateBandwidthLimit(bucket.Name, tgt.BandwidthLimit)
|
||||
}
|
||||
sys.targetsMap[bucket.Name] = cfg.Targets
|
||||
}
|
||||
|
||||
@@ -214,7 +214,7 @@ func newAllSubsystems() {
|
||||
}
|
||||
|
||||
// Create the bucket bandwidth monitor
|
||||
globalBucketMonitor = bandwidth.NewMonitor(GlobalServiceDoneCh)
|
||||
globalBucketMonitor = bandwidth.NewMonitor(GlobalContext, totalNodeCount())
|
||||
|
||||
// Create a new config system.
|
||||
globalConfigSys = NewConfigSys()
|
||||
|
||||
10
cmd/utils.go
10
cmd/utils.go
@@ -930,3 +930,13 @@ func loadAndResetRPCNetworkErrsCounter() uint64 {
|
||||
defer rest.ResetNetworkErrsCounter()
|
||||
return rest.GetNetworkErrsCounter()
|
||||
}
|
||||
|
||||
// Helper method to return total number of nodes in cluster
|
||||
func totalNodeCount() uint64 {
|
||||
peers, _ := globalEndpoints.peers()
|
||||
totalNodesCount := uint64(len(peers))
|
||||
if totalNodesCount == 0 {
|
||||
totalNodesCount = 1 // For standalone erasure coding
|
||||
}
|
||||
return totalNodesCount
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user