From 5b114b43f7a657c6ab964cadd318505cba7f25c1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 5 Sep 2023 20:21:59 -0700 Subject: [PATCH] refactor bandwidth throttling for replication target (#17980) This refactor is to allow using the bandwidth throttling for other purposes. --- cmd/bucket-quota.go | 6 +- cmd/bucket-replication-handlers.go | 12 +- cmd/bucket-replication.go | 12 +- cmd/bucket-targets.go | 35 +++--- cmd/notification.go | 29 ++--- internal/bucket/bandwidth/monitor.go | 140 +++++++++------------- internal/bucket/bandwidth/monitor_test.go | 81 +++++++------ internal/bucket/bandwidth/reader.go | 17 ++- 8 files changed, 153 insertions(+), 179 deletions(-) diff --git a/cmd/bucket-quota.go b/cmd/bucket-quota.go index 05e1596a1..db2b968f2 100644 --- a/cmd/bucket-quota.go +++ b/cmd/bucket-quota.go @@ -107,8 +107,8 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string, return err } - if q != nil && q.Type == madmin.HardQuota && q.Quota > 0 { - if uint64(size) >= q.Quota { // check if file size already exceeds the quota + if q != nil && q.Type == madmin.HardQuota && q.Size > 0 { + if uint64(size) >= q.Size { // check if file size already exceeds the quota return BucketQuotaExceeded{Bucket: bucket} } @@ -117,7 +117,7 @@ func (sys *BucketQuotaSys) enforceQuotaHard(ctx context.Context, bucket string, return err } - if bui.Size > 0 && ((bui.Size + uint64(size)) >= q.Quota) { + if bui.Size > 0 && ((bui.Size + uint64(size)) >= q.Size) { return BucketQuotaExceeded{Bucket: bucket} } } diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index 84a91a270..4481dc6da 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -232,10 +232,10 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW enc := json.NewEncoder(w) stats := globalReplicationStats.getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) - bwMap := bwRpt.BucketStats[bucket] + bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { - if bwMap != nil { - if bw, ok := bwMap[arn]; ok { + for opts, bw := range bwMap { + if opts.ReplicationARN != "" && opts.ReplicationARN == arn { st.BandWidthLimitInBytesPerSecond = bw.LimitInBytesPerSecond st.CurrentBandwidthInBytesPerSecond = bw.CurrentBandwidthInBytesPerSecond stats.ReplicationStats.Stats[arn] = st @@ -288,10 +288,10 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsV2Handler(w http.Respons enc := json.NewEncoder(w) stats := globalReplicationStats.getLatestReplicationStats(bucket) bwRpt := globalNotificationSys.GetBandwidthReports(ctx, bucket) - bwMap := bwRpt.BucketStats[bucket] + bwMap := bwRpt.BucketStats for arn, st := range stats.ReplicationStats.Stats { - if bwMap != nil { - if bw, ok := bwMap[arn]; ok { + for opts, bw := range bwMap { + if opts.ReplicationARN != "" && opts.ReplicationARN == arn { st.BandWidthLimitInBytesPerSecond = bw.LimitInBytesPerSecond st.CurrentBandwidthInBytesPerSecond = bw.CurrentBandwidthInBytesPerSecond stats.ReplicationStats.Stats[arn] = st diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 62430e681..52b24ca11 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1217,8 +1217,10 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj } opts := &bandwidth.MonitorReaderOptions{ - Bucket: objInfo.Bucket, - TargetARN: tgt.ARN, + BucketOptions: bandwidth.BucketOptions{ + Name: objInfo.Bucket, + ReplicationARN: tgt.ARN, + }, HeaderSize: headerSize, } newCtx := ctx @@ -1456,8 +1458,10 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } opts := &bandwidth.MonitorReaderOptions{ - Bucket: objInfo.Bucket, - TargetARN: tgt.ARN, + BucketOptions: bandwidth.BucketOptions{ + Name: objInfo.Bucket, + ReplicationARN: tgt.ARN, + }, HeaderSize: headerSize, } newCtx := ctx diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index acf04f2cf..53aa5878d 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -489,31 +489,27 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT defer sys.Unlock() // Remove existingtarget and arn association - if tgts, ok := sys.targetsMap[bucket]; ok { - for _, t := range tgts { + if stgts, ok := sys.targetsMap[bucket]; ok { + for _, t := range stgts { delete(sys.arnRemotesMap, t.Arn) } delete(sys.targetsMap, bucket) } - // No need for more if not adding anything - if tgts == nil || tgts.Empty() { - globalBucketMonitor.DeleteBucket(bucket) - return - } - - if len(tgts.Targets) > 0 { - sys.targetsMap[bucket] = tgts.Targets - } - for _, tgt := range tgts.Targets { - tgtClient, err := sys.getRemoteTargetClient(&tgt) - if err != nil { - continue + if tgts != nil { + for _, tgt := range tgts.Targets { + tgtClient, err := sys.getRemoteTargetClient(&tgt) + if err != nil { + continue + } + sys.arnRemotesMap[tgt.Arn] = tgtClient + sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit) + } + + if !tgts.Empty() { + sys.targetsMap[bucket] = tgts.Targets } - sys.arnRemotesMap[tgt.Arn] = tgtClient - sys.updateBandwidthLimit(bucket, tgt.Arn, tgt.BandwidthLimit) } - sys.targetsMap[bucket] = tgts.Targets } // create minio-go clients for buckets having remote targets @@ -524,9 +520,6 @@ func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) { } sys.Lock() defer sys.Unlock() - if len(cfg.Targets) > 0 { - sys.targetsMap[bucket.Name] = cfg.Targets - } for _, tgt := range cfg.Targets { tgtClient, err := sys.getRemoteTargetClient(&tgt) if err != nil { diff --git a/cmd/notification.go b/cmd/notification.go index 00e5f52b5..94ca9409e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -1099,35 +1099,24 @@ func (sys *NotificationSys) GetBandwidthReports(ctx context.Context, buckets ... } reports = append(reports, globalBucketMonitor.GetReport(bandwidth.SelectBuckets(buckets...))) consolidatedReport := bandwidth.BucketBandwidthReport{ - BucketStats: make(map[string]map[string]bandwidth.Details), + BucketStats: make(map[bandwidth.BucketOptions]bandwidth.Details), } for _, report := range reports { if report == nil || report.BucketStats == nil { continue } - for bucket := range report.BucketStats { - d, ok := consolidatedReport.BucketStats[bucket] + for opts := range report.BucketStats { + d, ok := consolidatedReport.BucketStats[opts] if !ok { - consolidatedReport.BucketStats[bucket] = make(map[string]bandwidth.Details) - d = consolidatedReport.BucketStats[bucket] - for arn := range d { - d[arn] = bandwidth.Details{ - LimitInBytesPerSecond: report.BucketStats[bucket][arn].LimitInBytesPerSecond, - } + d = bandwidth.Details{ + LimitInBytesPerSecond: report.BucketStats[opts].LimitInBytesPerSecond, } } - for arn, st := range report.BucketStats[bucket] { - bwDet := bandwidth.Details{} - if bw, ok := d[arn]; ok { - bwDet = bw - } - if bwDet.LimitInBytesPerSecond < st.LimitInBytesPerSecond { - bwDet.LimitInBytesPerSecond = st.LimitInBytesPerSecond - } - bwDet.CurrentBandwidthInBytesPerSecond += st.CurrentBandwidthInBytesPerSecond - d[arn] = bwDet - consolidatedReport.BucketStats[bucket] = d + dt, ok := report.BucketStats[opts] + if ok { + d.CurrentBandwidthInBytesPerSecond += dt.CurrentBandwidthInBytesPerSecond } + consolidatedReport.BucketStats[opts] = d } } return consolidatedReport diff --git a/internal/bucket/bandwidth/monitor.go b/internal/bucket/bandwidth/monitor.go index abed6bcfa..34e885f24 100644 --- a/internal/bucket/bandwidth/monitor.go +++ b/internal/bucket/bandwidth/monitor.go @@ -25,27 +25,29 @@ import ( "golang.org/x/time/rate" ) -type throttle struct { +type bucketThrottle struct { *rate.Limiter NodeBandwidthPerSec int64 } // Monitor holds the state of the global bucket monitor type Monitor struct { - tlock sync.RWMutex // mutex for bucketThrottle - bucketThrottle map[string]map[string]*throttle - mlock sync.RWMutex // mutex for activeBuckets map - activeBuckets map[string]map[string]*bucketMeasurement // Buckets with objects in flight - bucketMovingAvgTicker *time.Ticker // Ticker for calculating moving averages - ctx context.Context // Context for generate + tlock sync.RWMutex // mutex for bucket throttling + mlock sync.RWMutex // mutex for bucket measurement + + bucketsThrottle map[BucketOptions]*bucketThrottle + bucketsMeasurement map[BucketOptions]*bucketMeasurement // Buckets with objects in flight + + bucketMovingAvgTicker *time.Ticker // Ticker for calculating moving averages + ctx context.Context // Context for generate NodeCount uint64 } // NewMonitor returns a monitor with defaults. func NewMonitor(ctx context.Context, numNodes uint64) *Monitor { m := &Monitor{ - activeBuckets: make(map[string]map[string]*bucketMeasurement), - bucketThrottle: make(map[string]map[string]*throttle), + bucketsMeasurement: make(map[BucketOptions]*bucketMeasurement), + bucketsThrottle: make(map[BucketOptions]*bucketThrottle), bucketMovingAvgTicker: time.NewTicker(2 * time.Second), ctx: ctx, NodeCount: numNodes, @@ -54,19 +56,16 @@ func NewMonitor(ctx context.Context, numNodes uint64) *Monitor { return m } -func (m *Monitor) updateMeasurement(bucket, arn string, bytes uint64) { +func (m *Monitor) updateMeasurement(opts BucketOptions, bytes uint64) { m.mlock.Lock() defer m.mlock.Unlock() - tm, ok := m.activeBuckets[bucket] + + tm, ok := m.bucketsMeasurement[opts] if !ok { - tm = make(map[string]*bucketMeasurement) + tm = &bucketMeasurement{} } - measurement, ok := tm[arn] - if !ok { - measurement = &bucketMeasurement{} - } - measurement.incrementBytes(bytes) - m.activeBuckets[bucket][arn] = measurement + tm.incrementBytes(bytes) + m.bucketsMeasurement[opts] = tm } // SelectionFunction for buckets @@ -80,8 +79,8 @@ func SelectBuckets(buckets ...string) SelectionFunction { } } return func(bucket string) bool { - for _, b := range buckets { - if b == "" || b == bucket { + for _, bkt := range buckets { + if bkt == bucket { return true } } @@ -97,7 +96,7 @@ type Details struct { // BucketBandwidthReport captures the details for all buckets. type BucketBandwidthReport struct { - BucketStats map[string]map[string]Details `json:"bucketStats,omitempty"` + BucketStats map[BucketOptions]Details `json:"bucketStats,omitempty"` } // GetReport gets the report for all bucket bandwidth details. @@ -109,24 +108,18 @@ func (m *Monitor) GetReport(selectBucket SelectionFunction) *BucketBandwidthRepo func (m *Monitor) getReport(selectBucket SelectionFunction) *BucketBandwidthReport { report := &BucketBandwidthReport{ - BucketStats: make(map[string]map[string]Details), + BucketStats: make(map[BucketOptions]Details), } - for bucket, bucketMeasurementMap := range m.activeBuckets { - if !selectBucket(bucket) { + for bucketOpts, bucketMeasurement := range m.bucketsMeasurement { + if !selectBucket(bucketOpts.Name) { continue } m.tlock.RLock() - report.BucketStats[bucket] = make(map[string]Details) - if tgtThrottle, ok := m.bucketThrottle[bucket]; ok { - for arn, throttle := range tgtThrottle { - var currBw float64 - if bucketMeasurement, ok := bucketMeasurementMap[arn]; ok { - currBw = bucketMeasurement.getExpMovingAvgBytesPerSecond() - } - report.BucketStats[bucket][arn] = Details{ - LimitInBytesPerSecond: throttle.NodeBandwidthPerSec * int64(m.NodeCount), - CurrentBandwidthInBytesPerSecond: currBw, - } + if tgtThrottle, ok := m.bucketsThrottle[bucketOpts]; ok { + currBw := bucketMeasurement.getExpMovingAvgBytesPerSecond() + report.BucketStats[bucketOpts] = Details{ + LimitInBytesPerSecond: tgtThrottle.NodeBandwidthPerSec * int64(m.NodeCount), + CurrentBandwidthInBytesPerSecond: currBw, } } m.tlock.RUnlock() @@ -149,92 +142,75 @@ func (m *Monitor) trackEWMA() { func (m *Monitor) updateMovingAvg() { m.mlock.Lock() defer m.mlock.Unlock() - for _, bucketMeasurement := range m.activeBuckets { - for _, measurement := range bucketMeasurement { - measurement.updateExponentialMovingAverage(time.Now()) - } + for _, bucketMeasurement := range m.bucketsMeasurement { + bucketMeasurement.updateExponentialMovingAverage(time.Now()) } } -func (m *Monitor) getBucketMeasurement(bucket, arn string, initTime time.Time) map[string]*bucketMeasurement { - bucketTracker, ok := m.activeBuckets[bucket] - if !ok { - bucketTracker = make(map[string]*bucketMeasurement) - bucketTracker[arn] = newBucketMeasurement(initTime) - m.activeBuckets[bucket] = bucketTracker - } - return bucketTracker -} - -// track returns the measurement object for bucket -func (m *Monitor) track(bucket, arn string) { +func (m *Monitor) init(opts BucketOptions) { m.mlock.Lock() defer m.mlock.Unlock() - m.getBucketMeasurement(bucket, arn, time.Now()) + + _, ok := m.bucketsMeasurement[opts] + if !ok { + m.bucketsMeasurement[opts] = newBucketMeasurement(time.Now()) + } } // DeleteBucket deletes monitoring the 'bucket' func (m *Monitor) DeleteBucket(bucket string) { m.tlock.Lock() - delete(m.bucketThrottle, bucket) + for opts := range m.bucketsThrottle { + if opts.Name == bucket { + delete(m.bucketsThrottle, opts) + } + } m.tlock.Unlock() + m.mlock.Lock() - delete(m.activeBuckets, bucket) + for opts := range m.bucketsMeasurement { + if opts.Name == bucket { + delete(m.bucketsMeasurement, opts) + } + } m.mlock.Unlock() } // DeleteBucketThrottle deletes monitoring for a bucket's target func (m *Monitor) DeleteBucketThrottle(bucket, arn string) { m.tlock.Lock() - if _, ok := m.bucketThrottle[bucket]; ok { - delete(m.bucketThrottle[bucket], arn) - } + delete(m.bucketsThrottle, BucketOptions{Name: bucket, ReplicationARN: arn}) m.tlock.Unlock() m.mlock.Lock() - if _, ok := m.activeBuckets[bucket]; ok { - delete(m.activeBuckets[bucket], arn) - } + delete(m.bucketsMeasurement, BucketOptions{Name: bucket, ReplicationARN: arn}) m.mlock.Unlock() } // throttle returns currently configured throttle for this bucket -func (m *Monitor) throttle(bucket, arn string) *throttle { +func (m *Monitor) throttle(opts BucketOptions) *bucketThrottle { m.tlock.RLock() defer m.tlock.RUnlock() - return m.bucketThrottle[bucket][arn] + return m.bucketsThrottle[opts] } // SetBandwidthLimit sets the bandwidth limit for a bucket func (m *Monitor) SetBandwidthLimit(bucket, arn string, limit int64) { m.tlock.Lock() defer m.tlock.Unlock() - bw := limit / int64(m.NodeCount) - tgtMap, ok := m.bucketThrottle[bucket] + limitBytes := limit / int64(m.NodeCount) + throttle, ok := m.bucketsThrottle[BucketOptions{Name: bucket, ReplicationARN: arn}] if !ok { - tgtMap = make(map[string]*throttle) - tgtMap[arn] = &throttle{ - NodeBandwidthPerSec: bw, - } + throttle = &bucketThrottle{} } - th, ok := tgtMap[arn] - if !ok { - th = &throttle{} - } - th.NodeBandwidthPerSec = bw - tgtMap[arn] = th - newlimit := rate.Every(time.Second / time.Duration(tgtMap[arn].NodeBandwidthPerSec)) - tgtMap[arn].Limiter = rate.NewLimiter(newlimit, int(tgtMap[arn].NodeBandwidthPerSec)) - m.bucketThrottle[bucket] = tgtMap + throttle.NodeBandwidthPerSec = limitBytes + throttle.Limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) + m.bucketsThrottle[BucketOptions{Name: bucket, ReplicationARN: arn}] = throttle } // IsThrottled returns true if a bucket has bandwidth throttling enabled. func (m *Monitor) IsThrottled(bucket, arn string) bool { m.tlock.RLock() defer m.tlock.RUnlock() - th, ok := m.bucketThrottle[bucket] - if !ok { - return ok - } - _, ok = th[arn] + _, ok := m.bucketsThrottle[BucketOptions{Name: bucket, ReplicationARN: arn}] return ok } diff --git a/internal/bucket/bandwidth/monitor_test.go b/internal/bucket/bandwidth/monitor_test.go index d25405acf..fbeac14e7 100644 --- a/internal/bucket/bandwidth/monitor_test.go +++ b/internal/bucket/bandwidth/monitor_test.go @@ -29,7 +29,7 @@ const ( func TestMonitor_GetReport(t *testing.T) { type fields struct { - activeBuckets map[string]map[string]*bucketMeasurement + activeBuckets map[BucketOptions]*bucketMeasurement endTime time.Time update2 uint64 endTime2 time.Time @@ -39,6 +39,28 @@ func TestMonitor_GetReport(t *testing.T) { m0.incrementBytes(0) m1MiBPS := newBucketMeasurement(start) m1MiBPS.incrementBytes(oneMiB) + + test1Want := make(map[BucketOptions]Details) + test1Want[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = Details{LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: 0} + test1Want2 := make(map[BucketOptions]Details) + test1Want2[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = Details{ + LimitInBytesPerSecond: 1024 * 1024, + CurrentBandwidthInBytesPerSecond: (1024 * 1024) / start.Add(2*time.Second).Sub(start.Add(1*time.Second)).Seconds(), + } + + test2Want := make(map[BucketOptions]Details) + test2Want[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = Details{LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: float64(oneMiB)} + test2Want2 := make(map[BucketOptions]Details) + test2Want2[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = Details{ + LimitInBytesPerSecond: 1024 * 1024, + CurrentBandwidthInBytesPerSecond: exponentialMovingAverage(betaBucket, float64(oneMiB), 2*float64(oneMiB)), + } + + test1ActiveBuckets := make(map[BucketOptions]*bucketMeasurement) + test1ActiveBuckets[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = m0 + test1ActiveBuckets2 := make(map[BucketOptions]*bucketMeasurement) + test1ActiveBuckets2[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = m1MiBPS + tests := []struct { name string fields fields @@ -48,46 +70,31 @@ func TestMonitor_GetReport(t *testing.T) { { name: "ZeroToOne", fields: fields{ - activeBuckets: map[string]map[string]*bucketMeasurement{ - "bucket": { - "arn": m0, - }, - }, - endTime: start.Add(1 * time.Second), - update2: oneMiB, - endTime2: start.Add(2 * time.Second), + activeBuckets: test1ActiveBuckets, + endTime: start.Add(1 * time.Second), + update2: oneMiB, + endTime2: start.Add(2 * time.Second), }, want: &BucketBandwidthReport{ - BucketStats: map[string]map[string]Details{ - "bucket": { - "arn": Details{LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: 0}, - }, - }, + BucketStats: test1Want, }, want2: &BucketBandwidthReport{ - BucketStats: map[string]map[string]Details{"bucket": {"arn": Details{LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: (1024 * 1024) / start.Add(2*time.Second).Sub(start.Add(1*time.Second)).Seconds()}}}, + BucketStats: test1Want2, }, }, { name: "OneToTwo", fields: fields{ - activeBuckets: map[string]map[string]*bucketMeasurement{ - "bucket": { - "arn": m1MiBPS, - }, - }, - endTime: start.Add(1 * time.Second), - update2: 2 * oneMiB, - endTime2: start.Add(2 * time.Second), + activeBuckets: test1ActiveBuckets2, + endTime: start.Add(1 * time.Second), + update2: 2 * oneMiB, + endTime2: start.Add(2 * time.Second), }, want: &BucketBandwidthReport{ - BucketStats: map[string]map[string]Details{"bucket": {"arn": Details{LimitInBytesPerSecond: 1024 * 1024, CurrentBandwidthInBytesPerSecond: float64(oneMiB)}}}, + BucketStats: test2Want, }, want2: &BucketBandwidthReport{ - BucketStats: map[string]map[string]Details{"bucket": {"arn": Details{ - LimitInBytesPerSecond: 1024 * 1024, - CurrentBandwidthInBytesPerSecond: exponentialMovingAverage(betaBucket, float64(oneMiB), 2*float64(oneMiB)), - }}}, + BucketStats: test2Want2, }, }, } @@ -95,23 +102,23 @@ func TestMonitor_GetReport(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - thr := throttle{ + thr := bucketThrottle{ NodeBandwidthPerSec: 1024 * 1024, } - th := make(map[string]map[string]*throttle) - th["bucket"] = map[string]*throttle{"arn": &thr} + th := make(map[BucketOptions]*bucketThrottle) + th[BucketOptions{Name: "bucket", ReplicationARN: "arn"}] = &thr m := &Monitor{ - activeBuckets: tt.fields.activeBuckets, - bucketThrottle: th, - NodeCount: 1, + bucketsMeasurement: tt.fields.activeBuckets, + bucketsThrottle: th, + NodeCount: 1, } - m.activeBuckets["bucket"]["arn"].updateExponentialMovingAverage(tt.fields.endTime) + m.bucketsMeasurement[BucketOptions{Name: "bucket", ReplicationARN: "arn"}].updateExponentialMovingAverage(tt.fields.endTime) got := m.GetReport(SelectBuckets()) if !reflect.DeepEqual(got, tt.want) { t.Errorf("GetReport() = %v, want %v", got, tt.want) } - m.activeBuckets["bucket"]["arn"].incrementBytes(tt.fields.update2) - m.activeBuckets["bucket"]["arn"].updateExponentialMovingAverage(tt.fields.endTime2) + m.bucketsMeasurement[BucketOptions{Name: "bucket", ReplicationARN: "arn"}].incrementBytes(tt.fields.update2) + m.bucketsMeasurement[BucketOptions{Name: "bucket", ReplicationARN: "arn"}].updateExponentialMovingAverage(tt.fields.endTime2) got = m.GetReport(SelectBuckets()) if !reflect.DeepEqual(got.BucketStats, tt.want2.BucketStats) { t.Errorf("GetReport() = %v, want %v", got.BucketStats, tt.want2.BucketStats) diff --git a/internal/bucket/bandwidth/reader.go b/internal/bucket/bandwidth/reader.go index 147eea277..3ec765321 100644 --- a/internal/bucket/bandwidth/reader.go +++ b/internal/bucket/bandwidth/reader.go @@ -26,17 +26,22 @@ import ( // MonitoredReader represents a throttled reader subject to bandwidth monitoring type MonitoredReader struct { r io.Reader - throttle *throttle + throttle *bucketThrottle ctx context.Context // request context lastErr error // last error reported, if this non-nil all reads will fail. m *Monitor opts *MonitorReaderOptions } +// BucketOptions represents the bucket and optionally its replication target pair. +type BucketOptions struct { + Name string + ReplicationARN string // This is optional, and not mandatory. +} + // MonitorReaderOptions provides configurable options for monitor reader implementation. type MonitorReaderOptions struct { - Bucket string - TargetARN string + BucketOptions HeaderSize int } @@ -80,7 +85,7 @@ func (r *MonitoredReader) Read(buf []byte) (n int, err error) { r.lastErr = err return } - r.m.updateMeasurement(r.opts.Bucket, r.opts.TargetARN, uint64(tokens)) + r.m.updateMeasurement(r.opts.BucketOptions, uint64(tokens)) return } @@ -89,11 +94,11 @@ func (r *MonitoredReader) Read(buf []byte) (n int, err error) { func NewMonitoredReader(ctx context.Context, m *Monitor, r io.Reader, opts *MonitorReaderOptions) *MonitoredReader { reader := MonitoredReader{ r: r, - throttle: m.throttle(opts.Bucket, opts.TargetARN), + throttle: m.throttle(opts.BucketOptions), m: m, opts: opts, ctx: ctx, } - reader.m.track(opts.Bucket, opts.TargetARN) + reader.m.init(opts.BucketOptions) return &reader }