refactor bandwidth throttling for replication target (#17980)

This refactor is to allow using the bandwidth throttling
for other purposes.
This commit is contained in:
Harshavardhana
2023-09-05 20:21:59 -07:00
committed by GitHub
parent 812f5a02d7
commit 5b114b43f7
8 changed files with 153 additions and 179 deletions

View File

@@ -107,8 +107,8 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string,
return err
}
if q != nil && q.Type == madmin.HardQuota && q.Quota > 0 {
if uint64(size) >= q.Quota { // check if file size already exceeds the quota
if q != nil && q.Type == madmin.HardQuota && q.Size > 0 {
if uint64(size) >= q.Size { // check if file size already exceeds the quota
return BucketQuotaExceeded{Bucket: bucket}
}
@@ -117,7 +117,7 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string,
return err
}
if bui.Size > 0 && ((bui.Size + uint64(size)) >= q.Quota) {
if bui.Size > 0 && ((bui.Size + uint64(size)) >= q.Size) {
return BucketQuotaExceeded{Bucket: bucket}
}
}

View File

@@ -232,10 +232,10 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW
enc := json.NewEncoder(w)
stats := globalReplicationStats.getLatestReplicationStats(bucket)
bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket)
bwMap := bwRpt.BucketStats[bucket]
bwMap := bwRpt.BucketStats
for arn, st := range stats.ReplicationStats.Stats {
if bwMap != nil {
if bw, ok := bwMap[arn]; ok {
for opts, bw := range bwMap {
if opts.ReplicationARN != "" && opts.ReplicationARN == arn {
st.BandWidthLimitInBytesPerSecond = bw.LimitInBytesPerSecond
st.CurrentBandwidthInBytesPerSecond = bw.CurrentBandwidthInBytesPerSecond
stats.ReplicationStats.Stats[arn] = st
@@ -288,10 +288,10 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsV2Handler(w http.Respons
enc := json.NewEncoder(w)
stats := globalReplicationStats.getLatestReplicationStats(bucket)
bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket)
bwMap := bwRpt.BucketStats[bucket]
bwMap := bwRpt.BucketStats
for arn, st := range stats.ReplicationStats.Stats {
if bwMap != nil {
if bw, ok := bwMap[arn]; ok {
for opts, bw := range bwMap {
if opts.ReplicationARN != "" && opts.ReplicationARN == arn {
st.BandWidthLimitInBytesPerSecond = bw.LimitInBytesPerSecond
st.CurrentBandwidthInBytesPerSecond = bw.CurrentBandwidthInBytesPerSecond
stats.ReplicationStats.Stats[arn] = st

View File

@@ -1217,8 +1217,10 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
}
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
TargetARN: tgt.ARN,
BucketOptions: bandwidth.BucketOptions{
Name: objInfo.Bucket,
ReplicationARN: tgt.ARN,
},
HeaderSize: headerSize,
}
newCtx := ctx
@@ -1456,8 +1458,10 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
}
opts := &bandwidth.MonitorReaderOptions{
Bucket: objInfo.Bucket,
TargetARN: tgt.ARN,
BucketOptions: bandwidth.BucketOptions{
Name: objInfo.Bucket,
ReplicationARN: tgt.ARN,
},
HeaderSize: headerSize,
}
newCtx := ctx

View File

@@ -489,31 +489,27 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
defer sys.Unlock()
// Remove existingtarget and arn association
if tgts, ok := sys.targetsMap[bucket]; ok {
for _, t := range tgts {
if stgts, ok := sys.targetsMap[bucket]; ok {
for _, t := range stgts {
delete(sys.arnRemotesMap, t.Arn)
}
delete(sys.targetsMap, bucket)
}
// No need for more if not adding anything
if tgts == nil || tgts.Empty() {
globalBucketMonitor.DeleteBucket(bucket)
return
}
if len(tgts.Targets) > 0 {
sys.targetsMap[bucket] = tgts.Targets
}
for _, tgt := range tgts.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
continue
if tgts != nil {
for _, tgt := range tgts.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {
continue
}
sys.arnRemotesMap[tgt.Arn] = tgtClient
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
}
if !tgts.Empty() {
sys.targetsMap[bucket] = tgts.Targets
}
sys.arnRemotesMap[tgt.Arn] = tgtClient
sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit)
}
sys.targetsMap[bucket] = tgts.Targets
}
// create minio-go clients for buckets having remote targets
@@ -524,9 +520,6 @@ func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) {
}
sys.Lock()
defer sys.Unlock()
if len(cfg.Targets) > 0 {
sys.targetsMap[bucket.Name] = cfg.Targets
}
for _, tgt := range cfg.Targets {
tgtClient, err := sys.getRemoteTargetClient(&tgt)
if err != nil {

View File

@@ -1099,35 +1099,24 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ...
}
reports = append(reports, globalBucketMonitor.GetReport(bandwidth.SelectBuckets(buckets...)))
consolidatedReport := bandwidth.BucketBandwidthReport{
BucketStats: make(map[string]map[string]bandwidth.Details),
BucketStats: make(map[bandwidth.BucketOptions]bandwidth.Details),
}
for _, report := range reports {
if report == nil || report.BucketStats == nil {
continue
}
for bucket := range report.BucketStats {
d, ok := consolidatedReport.BucketStats[bucket]
for opts := range report.BucketStats {
d, ok := consolidatedReport.BucketStats[opts]
if !ok {
consolidatedReport.BucketStats[bucket] = make(map[string]bandwidth.Details)
d = consolidatedReport.BucketStats[bucket]
for arn := range d {
d[arn] = bandwidth.Details{
LimitInBytesPerSecond: report.BucketStats[bucket][arn].LimitInBytesPerSecond,
}
d = bandwidth.Details{
LimitInBytesPerSecond: report.BucketStats[opts].LimitInBytesPerSecond,
}
}
for arn, st := range report.BucketStats[bucket] {
bwDet := bandwidth.Details{}
if bw, ok := d[arn]; ok {
bwDet = bw
}
if bwDet.LimitInBytesPerSecond < st.LimitInBytesPerSecond {
bwDet.LimitInBytesPerSecond = st.LimitInBytesPerSecond
}
bwDet.CurrentBandwidthInBytesPerSecond += st.CurrentBandwidthInBytesPerSecond
d[arn] = bwDet
consolidatedReport.BucketStats[bucket] = d
dt, ok := report.BucketStats[opts]
if ok {
d.CurrentBandwidthInBytesPerSecond += dt.CurrentBandwidthInBytesPerSecond
}
consolidatedReport.BucketStats[opts] = d
}
}
return consolidatedReport