Fix stream read IO count (#8961)

Streams are returning a readcloser and returning would 
decrement io count instantly, fix it.


change maxActiveIOCount to 3, meaning it will pause
crawling if 3 operations are running.
This commit is contained in:
Klaus Post 2020-02-06 20:13:55 -08:00 committed by GitHub
parent 2165d45d3f
commit d0cea7adea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -219,11 +219,11 @@ func newPosix(path string) (*posix, error) {
}, },
stopUsageCh: make(chan struct{}), stopUsageCh: make(chan struct{}),
diskMount: mountinfo.IsLikelyMountPoint(path), diskMount: mountinfo.IsLikelyMountPoint(path),
// Allow disk usage crawler to run upto 10 concurrent // Allow disk usage crawler to run with up to 2 concurrent
// I/O ops, if and when activeIOCount reaches this // I/O ops, if and when activeIOCount reaches this
// value disk usage routine suspends the crawler // value disk usage routine suspends the crawler
// and waits until activeIOCount reaches below this threshold. // and waits until activeIOCount reaches below this threshold.
maxActiveIOCount: 10, maxActiveIOCount: 3,
} }
// Success. // Success.
@ -986,11 +986,6 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
return nil, errInvalidArgument return nil, errInvalidArgument
} }
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
volumeDir, err := s.getVolDir(volume) volumeDir, err := s.getVolDir(volume)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1046,12 +1041,23 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
return nil, err return nil, err
} }
atomic.AddInt32(&s.activeIOCount, 1)
r := struct { r := struct {
io.Reader io.Reader
io.Closer io.Closer
}{Reader: io.LimitReader(file, length), Closer: file} }{Reader: io.LimitReader(file, length), Closer: closeWrapper(func() error {
atomic.AddInt32(&s.activeIOCount, -1)
return file.Close()
})}
// Add readahead to big reads
if length >= readAheadSize { if length >= readAheadSize {
return readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize) rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize)
if err != nil {
r.Close()
return nil, err
}
return rc, nil
} }
// Just add a small 64k buffer. // Just add a small 64k buffer.
@ -1059,6 +1065,14 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
return r, nil return r, nil
} }
// closeWrapper converts a function to an io.Closer
type closeWrapper func() error
// Close calls the wrapped function.
func (c closeWrapper) Close() error {
return c()
}
// CreateFile - creates the file. // CreateFile - creates the file.
func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) { func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) {
if fileSize < -1 { if fileSize < -1 {