From d0cea7adeac78d6d8e823864551e987578551b84 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 6 Feb 2020 20:13:55 -0800 Subject: [PATCH] Fix stream read IO count (#8961) Streams are returning a readcloser and returning would decrement io count instantly, fix it. change maxActiveIOCount to 3, meaning it will pause crawling if 3 operations are running. --- cmd/posix.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/cmd/posix.go b/cmd/posix.go index 5a5c37942..98e1b7354 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -219,11 +219,11 @@ func newPosix(path string) (*posix, error) { }, stopUsageCh: make(chan struct{}), diskMount: mountinfo.IsLikelyMountPoint(path), - // Allow disk usage crawler to run upto 10 concurrent + // Allow disk usage crawler to run with up to 2 concurrent // I/O ops, if and when activeIOCount reaches this // value disk usage routine suspends the crawler // and waits until activeIOCount reaches below this threshold. - maxActiveIOCount: 10, + maxActiveIOCount: 3, } // Success. @@ -986,11 +986,6 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re return nil, errInvalidArgument } - atomic.AddInt32(&s.activeIOCount, 1) - defer func() { - atomic.AddInt32(&s.activeIOCount, -1) - }() - volumeDir, err := s.getVolDir(volume) if err != nil { return nil, err @@ -1046,12 +1041,23 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re return nil, err } + atomic.AddInt32(&s.activeIOCount, 1) r := struct { io.Reader io.Closer - }{Reader: io.LimitReader(file, length), Closer: file} + }{Reader: io.LimitReader(file, length), Closer: closeWrapper(func() error { + atomic.AddInt32(&s.activeIOCount, -1) + return file.Close() + })} + + // Add readahead to big reads if length >= readAheadSize { - return readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize) + rc, err := readahead.NewReadCloserSize(r, readAheadBuffers, readAheadBufSize) + if err != nil { + r.Close() + return nil, err + } + return rc, nil } // Just add a small 64k buffer. @@ -1059,6 +1065,14 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re return r, nil } +// closeWrapper converts a function to an io.Closer +type closeWrapper func() error + +// Close calls the wrapped function. +func (c closeWrapper) Close() error { + return c() +} + // CreateFile - creates the file. func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) { if fileSize < -1 {