fix: free up reader resources in S3Select properly (#14600)

This commit is contained in:
Aditya Manthramurthy 2022-03-23 20:58:53 -07:00 committed by GitHub
parent cf220be9b5
commit 79ba458051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 40 deletions

View File

@ -18,7 +18,6 @@
package json package json
import ( import (
"errors"
"io" "io"
"sync" "sync"
@ -90,45 +89,31 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
} }
} }
// syncReadCloser will wrap a readcloser and make it safe to call Close // syncReadCloser will wrap a readcloser and make it safe to call Close while
// while reads are running. // reads are running.
// All read errors are also postponed until Close is called and
// io.EOF is returned instead.
type syncReadCloser struct { type syncReadCloser struct {
rc io.ReadCloser rc io.ReadCloser
errMu sync.Mutex mu sync.Mutex
err error
} }
func (pr *syncReadCloser) Read(p []byte) (n int, err error) { func (pr *syncReadCloser) Read(p []byte) (n int, err error) {
// This ensures that Close will block until Read has completed. // This ensures that Close will block until Read has completed.
// This allows another goroutine to close the reader. // This allows another goroutine to close the reader.
pr.errMu.Lock() pr.mu.Lock()
defer pr.errMu.Unlock() defer pr.mu.Unlock()
if pr.err != nil { if pr.rc == nil {
return 0, io.EOF return 0, io.EOF
} }
n, pr.err = pr.rc.Read(p) return pr.rc.Read(p)
if pr.err != nil {
// Translate any error into io.EOF, so we don't crash:
// https://github.com/bcicen/jstream/blob/master/scanner.go#L48
return n, io.EOF
} }
return n, nil
}
var errClosed = errors.New("read after close")
func (pr *syncReadCloser) Close() error { func (pr *syncReadCloser) Close() error {
pr.errMu.Lock() pr.mu.Lock()
defer pr.errMu.Unlock() defer pr.mu.Unlock()
if pr.err == errClosed { if pr.rc != nil {
err := pr.rc.Close()
pr.rc = nil
return err
}
return nil return nil
} }
if pr.err != nil {
return pr.err
}
pr.err = errClosed
return pr.rc.Close()
}

View File

@ -290,7 +290,6 @@ type S3Select struct {
statement *sql.SelectStatement statement *sql.SelectStatement
progressReader *progressReader progressReader *progressReader
recordReader recordReader recordReader recordReader
close func() error
} }
var legacyXMLName = "SelectObjectContentRequest" var legacyXMLName = "SelectObjectContentRequest"
@ -383,7 +382,9 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
s3Select.recordReader, err = csv.NewReader(s3Select.progressReader, &s3Select.Input.CSVArgs) s3Select.recordReader, err = csv.NewReader(s3Select.progressReader, &s3Select.Input.CSVArgs)
if err != nil { if err != nil {
rc.Close() // Close all reader resources opened so far.
s3Select.progressReader.Close()
var stErr bzip2.StructuralError var stErr bzip2.StructuralError
if errors.As(err, &stErr) { if errors.As(err, &stErr) {
return errInvalidCompression(err, s3Select.Input.CompressionType) return errInvalidCompression(err, s3Select.Input.CompressionType)
@ -402,7 +403,6 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
} }
return err return err
} }
s3Select.close = rc.Close
return nil return nil
case jsonFormat: case jsonFormat:
rc, err := getReader(offset, end) rc, err := getReader(offset, end)
@ -426,7 +426,6 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
} }
s3Select.close = rc.Close
return nil return nil
case parquetFormat: case parquetFormat:
if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") { if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") {
@ -494,12 +493,6 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer. // Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
defer func() {
if s3Select.close != nil {
s3Select.close()
}
}()
getProgressFunc := s3Select.getProgress getProgressFunc := s3Select.getProgress
if !s3Select.Progress.Enabled { if !s3Select.Progress.Enabled {
getProgressFunc = nil getProgressFunc = nil