select: Fix leak on compressed files (#11302)

Properly close gzip reader when done reading

fixes #11300
This commit is contained in:
Klaus Post 2021-01-19 17:51:46 -08:00 committed by GitHub
parent a5e23a40ff
commit 19fb1086b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 11 deletions

View File

@ -243,6 +243,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
} }
return return
} }
defer s3Select.Close()
if err = s3Select.Open(getObject); err != nil { if err = s3Select.Open(getObject); err != nil {
if serr, ok := err.(s3select.SelectError); ok { if serr, ok := err.(s3select.SelectError); ok {
@ -281,7 +282,6 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
} }
s3Select.Evaluate(w) s3Select.Evaluate(w)
s3Select.Close()
// Notify object accessed via a GET request. // Notify object accessed via a GET request.
sendEvent(eventArgs{ sendEvent(eventArgs{

View File

@ -57,6 +57,7 @@ type progressReader struct {
processedReader *countUpReader processedReader *countUpReader
closedMu sync.Mutex closedMu sync.Mutex
gzr *gzip.Reader
closed bool closed bool
} }
@ -72,15 +73,15 @@ func (pr *progressReader) Read(p []byte) (n int, err error) {
} }
func (pr *progressReader) Close() error { func (pr *progressReader) Close() error {
if pr.rc == nil {
return nil
}
pr.closedMu.Lock() pr.closedMu.Lock()
defer pr.closedMu.Unlock() defer pr.closedMu.Unlock()
if pr.closed { if pr.closed {
return nil return nil
} }
pr.closed = true pr.closed = true
if pr.gzr != nil {
pr.gzr.Close()
}
return pr.rc.Close() return pr.rc.Close()
} }
@ -92,30 +93,35 @@ func (pr *progressReader) Stats() (bytesScanned, bytesProcessed int64) {
} }
func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressReader, error) { func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressReader, error) {
if rc == nil {
return nil, errors.New("newProgressReader: nil reader provided")
}
scannedReader := newCountUpReader(rc) scannedReader := newCountUpReader(rc)
var r io.Reader pr := progressReader{
rc: rc,
scannedReader: scannedReader,
}
var err error var err error
var r io.Reader
switch compType { switch compType {
case noneType: case noneType:
r = scannedReader r = scannedReader
case gzipType: case gzipType:
r, err = gzip.NewReader(scannedReader) pr.gzr, err = gzip.NewReader(scannedReader)
if err != nil { if err != nil {
if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) { if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) {
return nil, errInvalidGZIPCompressionFormat(err) return nil, errInvalidGZIPCompressionFormat(err)
} }
return nil, errTruncatedInput(err) return nil, errTruncatedInput(err)
} }
r = pr.gzr
case bzip2Type: case bzip2Type:
r = bzip2.NewReader(scannedReader) r = bzip2.NewReader(scannedReader)
default: default:
return nil, errInvalidCompressionFormat(fmt.Errorf("unknown compression type '%v'", compType)) return nil, errInvalidCompressionFormat(fmt.Errorf("unknown compression type '%v'", compType))
} }
pr.processedReader = newCountUpReader(r)
return &progressReader{ return &pr, nil
rc: rc,
scannedReader: scannedReader,
processedReader: newCountUpReader(r),
}, nil
} }