fix: properly close leaking bandwidth monitor channel (#11967)

This PR fixes

- close leaking bandwidth report channel leakage
- remove the closer requirement for bandwidth monitor
  instead if Read() fails remember the error and return
  error for all subsequent reads.
- use locking for usage-cache.bin updates, with inline
  data we cannot afford to have concurrent writes to
  usage-cache.bin corrupting xl.meta
This commit is contained in:
Harshavardhana 2021-04-05 16:07:53 -07:00 committed by GitHub
parent bb6561fe55
commit abb55bd49e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 41 deletions

View File

@ -24,6 +24,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -1526,30 +1527,33 @@ func (a adminAPIHandlers) BandwidthMonitorHandler(w http.ResponseWriter, r *http
return return
} }
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
setEventStreamHeaders(w) setEventStreamHeaders(w)
reportCh := make(chan bandwidth.Report, 1) reportCh := make(chan bandwidth.Report)
keepAliveTicker := time.NewTicker(500 * time.Millisecond) keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
bucketsRequestedString := r.URL.Query().Get("buckets") bucketsRequestedString := r.URL.Query().Get("buckets")
bucketsRequested := strings.Split(bucketsRequestedString, ",") bucketsRequested := strings.Split(bucketsRequestedString, ",")
go func() { go func() {
defer close(reportCh)
for { for {
reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case reportCh <- globalNotificationSys.GetBandwidthReports(ctx, bucketsRequested...):
time.Sleep(2 * time.Second) time.Sleep(time.Duration(rnd.Float64() * float64(2*time.Second)))
} }
} }
}() }()
for { for {
select { select {
case report := <-reportCh: case report, ok := <-reportCh:
enc := json.NewEncoder(w) if !ok {
err := enc.Encode(report) return
if err != nil { }
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL) if err := json.NewEncoder(w).Encode(report); err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()

View File

@ -701,19 +701,25 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
if totalNodesCount == 0 { if totalNodesCount == 0 {
totalNodesCount = 1 // For standalone erasure coding totalNodesCount = 1 // For standalone erasure coding
} }
b := target.BandwidthLimit / int64(totalNodesCount)
var headerSize int var headerSize int
for k, v := range putOpts.Header() { for k, v := range putOpts.Header() {
headerSize += len(k) + len(v) headerSize += len(k) + len(v)
} }
// r takes over closing gr. opts := &bandwidth.MonitorReaderOptions{
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit) Bucket: objInfo.Bucket,
Object: objInfo.Name,
HeaderSize: headerSize,
BandwidthBytesPerSec: target.BandwidthLimit / int64(totalNodesCount),
ClusterBandwidth: target.BandwidthLimit,
}
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, gr, opts)
if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil { if _, err = c.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts); err != nil {
replicationStatus = replication.Failed replicationStatus = replication.Failed
logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err)) logger.LogIf(ctx, fmt.Errorf("Unable to replicate for object %s/%s(%s): %w", bucket, objInfo.Name, objInfo.VersionID, err))
} }
defer r.Close()
} }
prevReplStatus := objInfo.ReplicationStatus prevReplStatus := objInfo.ReplicationStatus

View File

@ -565,7 +565,7 @@ func (d *dataUsageCache) save(ctx context.Context, store objectIO, name string)
dataUsageBucket, dataUsageBucket,
name, name,
NewPutObjReader(r), NewPutObjReader(r),
ObjectOptions{NoLock: true}) ObjectOptions{})
if isErrBucketNotFound(err) { if isErrBucketNotFound(err) {
return nil return nil
} }

View File

@ -25,62 +25,61 @@ import (
// MonitoredReader monitors the bandwidth // MonitoredReader monitors the bandwidth
type MonitoredReader struct { type MonitoredReader struct {
bucket string // Token to track bucket opts *MonitorReaderOptions
bucketMeasurement *bucketMeasurement // bucket measurement object bucketMeasurement *bucketMeasurement // bucket measurement object
object string // Token to track object reader io.Reader // Reader to wrap
reader io.ReadCloser // Reader to wrap
lastStop time.Time // Last timestamp for a measurement lastStop time.Time // Last timestamp for a measurement
headerSize int // Size of the header not captured by reader
throttle *throttle // throttle the rate at which replication occur throttle *throttle // throttle the rate at which replication occur
monitor *Monitor // Monitor reference monitor *Monitor // Monitor reference
closed bool // Reader is closed lastErr error // last error reported, if this non-nil all reads will fail.
} }
// NewMonitoredReader returns a io.ReadCloser that reports bandwidth details. // MonitorReaderOptions provides configurable options for monitor reader implementation.
// The supplied reader will be closed. type MonitorReaderOptions struct {
func NewMonitoredReader(ctx context.Context, monitor *Monitor, bucket string, object string, reader io.ReadCloser, headerSize int, bandwidthBytesPerSecond int64, clusterBandwidth int64) *MonitoredReader { Bucket string
Object string
HeaderSize int
BandwidthBytesPerSec int64
ClusterBandwidth int64
}
// NewMonitoredReader returns a io.Reader that reports bandwidth details.
func NewMonitoredReader(ctx context.Context, monitor *Monitor, reader io.Reader, opts *MonitorReaderOptions) *MonitoredReader {
timeNow := time.Now() timeNow := time.Now()
b := monitor.track(bucket, object, timeNow) b := monitor.track(opts.Bucket, opts.Object, timeNow)
return &MonitoredReader{ return &MonitoredReader{
bucket: bucket, opts: opts,
object: object,
bucketMeasurement: b, bucketMeasurement: b,
reader: reader, reader: reader,
lastStop: timeNow, lastStop: timeNow,
headerSize: headerSize, throttle: monitor.throttleBandwidth(ctx, opts.Bucket, opts.BandwidthBytesPerSec, opts.ClusterBandwidth),
throttle: monitor.throttleBandwidth(ctx, bucket, bandwidthBytesPerSecond, clusterBandwidth),
monitor: monitor, monitor: monitor,
} }
} }
// Read wraps the read reader // Read wraps the read reader
func (m *MonitoredReader) Read(p []byte) (n int, err error) { func (m *MonitoredReader) Read(p []byte) (n int, err error) {
if m.closed { if m.lastErr != nil {
err = io.ErrClosedPipe err = m.lastErr
return return
} }
p = p[:m.throttle.GetLimitForBytes(int64(len(p)))] p = p[:m.throttle.GetLimitForBytes(int64(len(p)))]
n, err = m.reader.Read(p) n, err = m.reader.Read(p)
stop := time.Now() stop := time.Now()
update := uint64(n + m.headerSize) update := uint64(n + m.opts.HeaderSize)
m.bucketMeasurement.incrementBytes(update) m.bucketMeasurement.incrementBytes(update)
m.lastStop = stop m.lastStop = stop
unused := len(p) - (n + m.headerSize) unused := len(p) - (n + m.opts.HeaderSize)
m.headerSize = 0 // Set to 0 post first read m.opts.HeaderSize = 0 // Set to 0 post first read
if unused > 0 { if unused > 0 {
m.throttle.ReleaseUnusedBandwidth(int64(unused)) m.throttle.ReleaseUnusedBandwidth(int64(unused))
} }
if err != nil {
m.lastErr = err
}
return return
} }
// Close stops tracking the io
func (m *MonitoredReader) Close() error {
if m.closed {
return nil
}
m.closed = true
return m.reader.Close()
}