2021-04-18 15:41:13 -04:00
|
|
|
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
|
|
//
|
|
|
|
// This file is part of MinIO Object Storage stack
|
|
|
|
//
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// This program is distributed in the hope that it will be useful
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Affero General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2020-10-09 23:36:00 -04:00
|
|
|
|
|
|
|
package bandwidth
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
2021-06-24 21:29:30 -04:00
|
|
|
"math"
|
2020-10-09 23:36:00 -04:00
|
|
|
)
|
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
// MonitoredReader represents a throttled reader subject to bandwidth monitoring
|
2020-10-09 23:36:00 -04:00
|
|
|
type MonitoredReader struct {
|
2021-06-24 21:29:30 -04:00
|
|
|
r io.Reader
|
|
|
|
throttle *throttle
|
|
|
|
ctx context.Context // request context
|
|
|
|
lastErr error // last error reported, if this non-nil all reads will fail.
|
|
|
|
m *Monitor
|
|
|
|
opts *MonitorReaderOptions
|
2020-10-09 23:36:00 -04:00
|
|
|
}
|
|
|
|
|
2021-04-05 19:07:53 -04:00
|
|
|
// MonitorReaderOptions provides configurable options for monitor reader implementation.
|
|
|
|
type MonitorReaderOptions struct {
|
2021-06-24 21:29:30 -04:00
|
|
|
Bucket string
|
|
|
|
HeaderSize int
|
2021-04-05 19:07:53 -04:00
|
|
|
}
|
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
// Read implements a throttled read
|
|
|
|
func (r *MonitoredReader) Read(buf []byte) (n int, err error) {
|
2021-06-27 13:11:13 -04:00
|
|
|
if r.throttle == nil {
|
|
|
|
return r.r.Read(buf)
|
|
|
|
}
|
2021-06-24 21:29:30 -04:00
|
|
|
if r.lastErr != nil {
|
|
|
|
err = r.lastErr
|
2020-10-09 23:36:00 -04:00
|
|
|
return
|
|
|
|
}
|
2021-06-24 21:29:30 -04:00
|
|
|
b := r.throttle.Burst() // maximum available tokens
|
|
|
|
need := len(buf) // number of bytes requested by caller
|
|
|
|
hdr := r.opts.HeaderSize // remaining header bytes
|
|
|
|
var tokens int // number of tokens to request
|
2020-10-09 23:36:00 -04:00
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
if hdr > 0 { // available tokens go towards header first
|
|
|
|
if hdr < b { // all of header can be accommodated
|
|
|
|
r.opts.HeaderSize = 0
|
|
|
|
need = int(math.Min(float64(b-hdr), float64(need))) // use remaining tokens towards payload
|
|
|
|
tokens = need + hdr
|
2021-04-15 19:20:45 -04:00
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
} else { // part of header can be accommodated
|
|
|
|
r.opts.HeaderSize -= b - 1
|
|
|
|
need = 1 // to ensure we read at least one byte for every Read
|
|
|
|
tokens = b
|
|
|
|
}
|
2020-10-09 23:36:00 -04:00
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
} else { // all tokens go towards payload
|
|
|
|
need = int(math.Min(float64(b), float64(need)))
|
|
|
|
tokens = need
|
|
|
|
}
|
2020-10-09 23:36:00 -04:00
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
err = r.throttle.WaitN(r.ctx, tokens)
|
|
|
|
if err != nil {
|
|
|
|
return
|
2020-10-09 23:36:00 -04:00
|
|
|
}
|
2021-04-15 19:20:45 -04:00
|
|
|
|
2021-06-24 21:29:30 -04:00
|
|
|
n, err = r.r.Read(buf[:need])
|
|
|
|
if err != nil {
|
|
|
|
r.lastErr = err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r.m.updateMeasurement(r.opts.Bucket, uint64(tokens))
|
2021-04-05 19:07:53 -04:00
|
|
|
return
|
2020-10-09 23:36:00 -04:00
|
|
|
}
|
2021-06-24 21:29:30 -04:00
|
|
|
|
|
|
|
// NewMonitoredReader returns reference to a monitored reader that throttles reads to configured bandwidth for the
|
|
|
|
// bucket.
|
|
|
|
func NewMonitoredReader(ctx context.Context, m *Monitor, r io.Reader, opts *MonitorReaderOptions) *MonitoredReader {
|
|
|
|
reader := MonitoredReader{
|
|
|
|
r: r,
|
|
|
|
throttle: m.throttle(opts.Bucket),
|
|
|
|
m: m,
|
|
|
|
opts: opts,
|
|
|
|
ctx: ctx,
|
|
|
|
}
|
|
|
|
reader.m.track(opts.Bucket)
|
|
|
|
return &reader
|
|
|
|
}
|