From 75ac4ea840279efb07cb3d6159d3275171eae7e6 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 15 Apr 2021 16:20:45 -0700 Subject: [PATCH] remove possible double locks in bandwidth monitor (#12067) additionally reject bandwidth limits with synchronous replication for now. --- cmd/api-errors.go | 18 ++++++++++++-- cmd/api-errors_test.go | 1 - cmd/bucket-targets.go | 7 ++++-- cmd/object-api-errors.go | 16 ++---------- pkg/bucket/bandwidth/monitor.go | 44 ++++++--------------------------- pkg/bucket/bandwidth/reader.go | 23 +++++++---------- 6 files changed, 39 insertions(+), 70 deletions(-) diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 4cc8466ab..17d7a88cc 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -1921,8 +1921,6 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) { apiErr = ErrSlowDown case InsufficientReadQuorum: apiErr = ErrSlowDown - case UnsupportedDelimiter: - apiErr = ErrNotImplemented case InvalidMarkerPrefixCombination: apiErr = ErrNotImplemented case InvalidUploadIDKeyCombination: @@ -2057,6 +2055,22 @@ func toAPIError(ctx context.Context, err error) APIError { apiErr = errorCodes.ToAPIErrWithErr(code, e) } + if apiErr.Code == "NotImplemented" { + switch e := err.(type) { + case NotImplemented: + desc := e.Error() + if desc == "" { + desc = apiErr.Description + } + apiErr = APIError{ + Code: apiErr.Code, + Description: desc, + HTTPStatusCode: apiErr.HTTPStatusCode, + } + return apiErr + } + } + if apiErr.Code == "InternalError" { // If we see an internal error try to interpret // any underlying errors if possible depending on diff --git a/cmd/api-errors_test.go b/cmd/api-errors_test.go index dab7a811a..e1134f922 100644 --- a/cmd/api-errors_test.go +++ b/cmd/api-errors_test.go @@ -43,7 +43,6 @@ var toAPIErrorTests = []struct { {err: InvalidPart{}, errCode: ErrInvalidPart}, {err: InsufficientReadQuorum{}, errCode: ErrSlowDown}, {err: InsufficientWriteQuorum{}, errCode: ErrSlowDown}, - {err: UnsupportedDelimiter{}, errCode: ErrNotImplemented}, {err: InvalidMarkerPrefixCombination{}, errCode: ErrNotImplemented}, {err: InvalidUploadIDKeyCombination{}, errCode: ErrNotImplemented}, {err: MalformedUploadID{}, errCode: ErrNoSuchUpload}, diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 4847f2d4d..4892d04d4 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -105,7 +105,7 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m } if tgt.Type == madmin.ReplicationService { if !globalIsErasure { - return NotImplemented{} + return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()} } if !globalBucketVersioningSys.Enabled(bucket) { return BucketReplicationSourceNotVersioned{Bucket: bucket} @@ -117,6 +117,9 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m if vcfg.Status != string(versioning.Enabled) { return BucketRemoteTargetNotVersioned{Bucket: tgt.TargetBucket} } + if tgt.ReplicationSync && tgt.BandwidthLimit > 0 { + return NotImplemented{Message: "Synchronous replication does not support bandwidth limits"} + } } if tgt.Type == madmin.ILMService { if globalBucketVersioningSys.Enabled(bucket) { @@ -180,7 +183,7 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str } if arn.Type == madmin.ReplicationService { if !globalIsErasure { - return NotImplemented{} + return NotImplemented{Message: "Replication is not implemented in " + getMinioMode()} } // reject removal of remote target if replication configuration is present rcfg, err := getReplicationConfig(ctx, bucket) diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index 02478891a..6818a2a0f 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -332,15 +332,6 @@ func (e BucketExists) Error() string { return "Bucket exists: " + e.Bucket } -// UnsupportedDelimiter - unsupported delimiter. -type UnsupportedDelimiter struct { - Delimiter string -} - -func (e UnsupportedDelimiter) Error() string { - return fmt.Sprintf("delimiter '%s' is not supported. Only '/' is supported", e.Delimiter) -} - // InvalidUploadIDKeyCombination - invalid upload id and key marker combination. type InvalidUploadIDKeyCombination struct { UploadIDMarker, KeyMarker string @@ -638,14 +629,11 @@ func (e InvalidETag) Error() string { // NotImplemented If a feature is not implemented type NotImplemented struct { - API string + Message string } func (e NotImplemented) Error() string { - if e.API != "" { - return e.API + " is Not Implemented" - } - return "Not Implemented" + return e.Message } // UnsupportedMetadata - unsupported metadata diff --git a/pkg/bucket/bandwidth/monitor.go b/pkg/bucket/bandwidth/monitor.go index 949c5a44f..968ffa373 100644 --- a/pkg/bucket/bandwidth/monitor.go +++ b/pkg/bucket/bandwidth/monitor.go @@ -22,7 +22,6 @@ import ( "time" "github.com/minio/minio/pkg/bandwidth" - "github.com/minio/minio/pkg/pubsub" ) // throttleBandwidth gets the throttle for bucket with the configured value @@ -39,26 +38,6 @@ func (m *Monitor) throttleBandwidth(ctx context.Context, bucket string, bandwidt return throttle } -// SubscribeToBuckets subscribes to buckets. Empty array for monitoring all buckets. -func (m *Monitor) SubscribeToBuckets(subCh chan interface{}, doneCh <-chan struct{}, buckets []string) { - m.pubsub.Subscribe(subCh, doneCh, func(f interface{}) bool { - if buckets != nil || len(buckets) == 0 { - return true - } - report, ok := f.(*bandwidth.Report) - if !ok { - return false - } - for _, b := range buckets { - _, ok := report.BucketStats[b] - if ok { - return true - } - } - return false - }) -} - // Monitor implements the monitoring for bandwidth measurements. type Monitor struct { lock sync.Mutex // lock for all updates @@ -67,12 +46,8 @@ type Monitor struct { bucketMovingAvgTicker *time.Ticker // Ticker for calculating moving averages - pubsub *pubsub.PubSub // PubSub for reporting bandwidths. - bucketThrottle map[string]*throttle - startProcessing sync.Once - doneCh <-chan struct{} } @@ -81,10 +56,10 @@ func NewMonitor(doneCh <-chan struct{}) *Monitor { m := &Monitor{ activeBuckets: make(map[string]*bucketMeasurement), bucketMovingAvgTicker: time.NewTicker(2 * time.Second), - pubsub: pubsub.New(), bucketThrottle: make(map[string]*throttle), doneCh: doneCh, } + go m.trackEWMA() return m } @@ -135,12 +110,12 @@ func (m *Monitor) getReport(selectBucket SelectionFunction) *bandwidth.Report { return report } -func (m *Monitor) process(doneCh <-chan struct{}) { +func (m *Monitor) trackEWMA() { for { select { case <-m.bucketMovingAvgTicker.C: - m.processAvg() - case <-doneCh: + m.updateMovingAvg() + case <-m.doneCh: return } } @@ -155,24 +130,19 @@ func (m *Monitor) getBucketMeasurement(bucket string, initTime time.Time) *bucke return bucketTracker } -func (m *Monitor) processAvg() { +func (m *Monitor) updateMovingAvg() { m.lock.Lock() defer m.lock.Unlock() for _, bucketMeasurement := range m.activeBuckets { bucketMeasurement.updateExponentialMovingAverage(time.Now()) } - m.pubsub.Publish(m.getReport(SelectBuckets())) } // track returns the measurement object for bucket and object -func (m *Monitor) track(bucket string, object string, timeNow time.Time) *bucketMeasurement { +func (m *Monitor) track(bucket string, object string) *bucketMeasurement { m.lock.Lock() defer m.lock.Unlock() - m.startProcessing.Do(func() { - go m.process(m.doneCh) - }) - b := m.getBucketMeasurement(bucket, timeNow) - return b + return m.getBucketMeasurement(bucket, time.Now()) } // DeleteBucket deletes monitoring the 'bucket' diff --git a/pkg/bucket/bandwidth/reader.go b/pkg/bucket/bandwidth/reader.go index 3252d5b53..42885c25b 100644 --- a/pkg/bucket/bandwidth/reader.go +++ b/pkg/bucket/bandwidth/reader.go @@ -20,7 +20,6 @@ package bandwidth import ( "context" "io" - "time" ) // MonitoredReader monitors the bandwidth @@ -28,7 +27,6 @@ type MonitoredReader struct { opts *MonitorReaderOptions bucketMeasurement *bucketMeasurement // bucket measurement object reader io.Reader // Reader to wrap - lastStop time.Time // Last timestamp for a measurement throttle *throttle // throttle the rate at which replication occur monitor *Monitor // Monitor reference lastErr error // last error reported, if this non-nil all reads will fail. @@ -45,13 +43,10 @@ type MonitorReaderOptions struct { // NewMonitoredReader returns a io.Reader that reports bandwidth details. func NewMonitoredReader(ctx context.Context, monitor *Monitor, reader io.Reader, opts *MonitorReaderOptions) *MonitoredReader { - timeNow := time.Now() - b := monitor.track(opts.Bucket, opts.Object, timeNow) return &MonitoredReader{ opts: opts, - bucketMeasurement: b, + bucketMeasurement: monitor.track(opts.Bucket, opts.Object), reader: reader, - lastStop: timeNow, throttle: monitor.throttleBandwidth(ctx, opts.Bucket, opts.BandwidthBytesPerSec, opts.ClusterBandwidth), monitor: monitor, } @@ -67,19 +62,19 @@ func (m *MonitoredReader) Read(p []byte) (n int, err error) { p = p[:m.throttle.GetLimitForBytes(int64(len(p)))] n, err = m.reader.Read(p) - stop := time.Now() - update := uint64(n + m.opts.HeaderSize) + if err != nil { + m.lastErr = err + } - m.bucketMeasurement.incrementBytes(update) - m.lastStop = stop - unused := len(p) - (n + m.opts.HeaderSize) + update := n + m.opts.HeaderSize + unused := len(p) - update + + m.bucketMeasurement.incrementBytes(uint64(update)) m.opts.HeaderSize = 0 // Set to 0 post first read if unused > 0 { m.throttle.ReleaseUnusedBandwidth(int64(unused)) } - if err != nil { - m.lastErr = err - } + return }