From abb55bd49e11c2b2c6f329cc12df90d6390e95de Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 5 Apr 2021 16:07:53 -0700 Subject: [PATCH] fix: properly close leaking bandwidth monitor channel (#11967) This PR fixes - close leaking bandwidth report channel leakage - remove the closer requirement for bandwidth monitor instead if Read() fails remember the error and return error for all subsequent reads. - use locking for usage-cache.bin updates, with inline data we cannot afford to have concurrent writes to usage-cache.bin corrupting xl.meta --- cmd/admin-handlers.go | 22 ++++++++------ cmd/bucket-replication.go | 14 ++++++--- cmd/data-usage-cache.go | 2 +- pkg/bucket/bandwidth/reader.go | 53 +++++++++++++++++----------------- 4 files changed, 50 insertions(+), 41 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 4a585554c..04b343b72 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + "math/rand" "net/http" "net/url" "os" @@ -1526,30 +1527,33 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http return } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + setEventStreamHeaders(w) - reportCh := make(chan bandwidth.Report, 1) + reportCh := make(chan bandwidth.Report) keepAliveTicker := time.NewTicker(500 * time.Millisecond) defer keepAliveTicker.Stop() bucketsRequestedString := r.URL.Query().Get("buckets") bucketsRequested := strings.Split(bucketsRequestedString, ",") go func() { + defer close(reportCh) for { - reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...) select { case <-ctx.Done(): return - default: - time.Sleep(2 * time.Second) + case reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...): + time.Sleep(time.Duration(rnd.Float64() * float64(2*time.Second))) } } }() for { select { - case report := <-reportCh: - enc := json.NewEncoder(w) - err := enc.Encode(report) - if err != nil { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) + case report, ok := <-reportCh: + if !ok { + return + } + if err := json.NewEncoder(w).Encode(report); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) return } w.(http.Flusher).Flush() diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 2ae4fb119..4b5a006bf 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -701,19 +701,25 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa if totalNodesCount == 0 { totalNodesCount = 1 // For standalone erasure coding } - b := target.BandwidthLimit / int64(totalNodesCount) + var headerSize int for k, v := range putOpts.Header() { headerSize += len(k) + len(v) } - // r takes over closing gr. - r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) + opts := &bandwidth.MonitorReaderOptions{ + Bucket: objInfo.Bucket, + Object: objInfo.Name, + HeaderSize: headerSize, + BandwidthBytesPerSec: target.BandwidthLimit / int64(totalNodesCount), + ClusterBandwidth: target.BandwidthLimit, + } + + r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts) if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil { replicationStatus = replication.Failed logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) } - defer r.Close() } prevReplStatus := objInfo.ReplicationStatus diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index 23276396c..2cfa4c79f 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -565,7 +565,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string) dataUsageBucket, name, NewPutObjReader(r), - ObjectOptions{NoLock: true}) + ObjectOptions{}) if isErrBucketNotFound(err) { return nil } diff --git a/pkg/bucket/bandwidth/reader.go b/pkg/bucket/bandwidth/reader.go index 468ba26da..3252d5b53 100644 --- a/pkg/bucket/bandwidth/reader.go +++ b/pkg/bucket/bandwidth/reader.go @@ -25,62 +25,61 @@ import ( // MonitoredReader monitors the bandwidth type MonitoredReader struct { - bucket string // Token to track bucket + opts *MonitorReaderOptions bucketMeasurement *bucketMeasurement // bucket measurement object - object string // Token to track object - reader io.ReadCloser // Reader to wrap + reader io.Reader // Reader to wrap lastStop time.Time // Last timestamp for a measurement - headerSize int // Size of the header not captured by reader throttle *throttle // throttle the rate at which replication occur monitor *Monitor // Monitor reference - closed bool // Reader is closed + lastErr error // last error reported, if this non-nil all reads will fail. } -// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details. -// The supplied reader will be closed. -func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.ReadCloser, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader { +// MonitorReaderOptions provides configurable options for monitor reader implementation. +type MonitorReaderOptions struct { + Bucket string + Object string + HeaderSize int + BandwidthBytesPerSec int64 + ClusterBandwidth int64 +} + +// NewMonitoredReader returns a io.Reader that reports bandwidth details. +func NewMonitoredReader(ctx context.Context, monitor *Monitor, reader io.Reader, opts *MonitorReaderOptions) *MonitoredReader { timeNow := time.Now() - b := monitor.track(bucket, object, timeNow) + b := monitor.track(opts.Bucket, opts.Object, timeNow) return &MonitoredReader{ - bucket: bucket, - object: object, + opts: opts, bucketMeasurement: b, reader: reader, lastStop: timeNow, - headerSize: headerSize, - throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth), + throttle: monitor.throttleBandwidth(ctx, opts.Bucket, opts.BandwidthBytesPerSec, opts.ClusterBandwidth), monitor: monitor, } } // Read wraps the read reader func (m *MonitoredReader) Read(p []byte) (n int, err error) { - if m.closed { - err = io.ErrClosedPipe + if m.lastErr != nil { + err = m.lastErr return } + p = p[:m.throttle.GetLimitForBytes(int64(len(p)))] n, err = m.reader.Read(p) stop := time.Now() - update := uint64(n + m.headerSize) + update := uint64(n + m.opts.HeaderSize) m.bucketMeasurement.incrementBytes(update) m.lastStop = stop - unused := len(p) - (n + m.headerSize) - m.headerSize = 0 // Set to 0 post first read + unused := len(p) - (n + m.opts.HeaderSize) + m.opts.HeaderSize = 0 // Set to 0 post first read if unused > 0 { m.throttle.ReleaseUnusedBandwidth(int64(unused)) } + if err != nil { + m.lastErr = err + } return } - -// Close stops tracking the io -func (m *MonitoredReader) Close() error { - if m.closed { - return nil - } - m.closed = true - return m.reader.Close() -}