diff --git a/internal/s3select/json/reader.go b/internal/s3select/json/reader.go index d6f13b093..4285c23fa 100644 --- a/internal/s3select/json/reader.go +++ b/internal/s3select/json/reader.go @@ -18,7 +18,6 @@ package json import ( - "errors" "io" "sync" @@ -90,45 +89,31 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader { } } -// syncReadCloser will wrap a readcloser and make it safe to call Close -// while reads are running. -// All read errors are also postponed until Close is called and -// io.EOF is returned instead. +// syncReadCloser will wrap a readcloser and make it safe to call Close while +// reads are running. type syncReadCloser struct { - rc io.ReadCloser - errMu sync.Mutex - err error + rc io.ReadCloser + mu sync.Mutex } func (pr *syncReadCloser) Read(p []byte) (n int, err error) { // This ensures that Close will block until Read has completed. // This allows another goroutine to close the reader. - pr.errMu.Lock() - defer pr.errMu.Unlock() - if pr.err != nil { + pr.mu.Lock() + defer pr.mu.Unlock() + if pr.rc == nil { return 0, io.EOF } - n, pr.err = pr.rc.Read(p) - if pr.err != nil { - // Translate any error into io.EOF, so we don't crash: - // https://github.com/bcicen/jstream/blob/master/scanner.go#L48 - return n, io.EOF - } - - return n, nil + return pr.rc.Read(p) } -var errClosed = errors.New("read after close") - func (pr *syncReadCloser) Close() error { - pr.errMu.Lock() - defer pr.errMu.Unlock() - if pr.err == errClosed { - return nil + pr.mu.Lock() + defer pr.mu.Unlock() + if pr.rc != nil { + err := pr.rc.Close() + pr.rc = nil + return err } - if pr.err != nil { - return pr.err - } - pr.err = errClosed - return pr.rc.Close() + return nil } diff --git a/internal/s3select/select.go b/internal/s3select/select.go index 34e78d00d..6026bd45f 100644 --- a/internal/s3select/select.go +++ b/internal/s3select/select.go @@ -290,7 +290,6 @@ type S3Select struct { statement *sql.SelectStatement progressReader *progressReader recordReader recordReader - close func() error } var legacyXMLName = "SelectObjectContentRequest" @@ -383,7 +382,9 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos s3Select.recordReader, err = csv.NewReader(s3Select.progressReader, &s3Select.Input.CSVArgs) if err != nil { - rc.Close() + // Close all reader resources opened so far. + s3Select.progressReader.Close() + var stErr bzip2.StructuralError if errors.As(err, &stErr) { return errInvalidCompression(err, s3Select.Input.CompressionType) @@ -402,7 +403,6 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos } return err } - s3Select.close = rc.Close return nil case jsonFormat: rc, err := getReader(offset, end) @@ -426,7 +426,6 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) } - s3Select.close = rc.Close return nil case parquetFormat: if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") { @@ -494,12 +493,6 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { // Evaluate - filters and sends records read from opened reader as per select statement to http response writer. func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { - defer func() { - if s3Select.close != nil { - s3Select.close() - } - }() - getProgressFunc := s3Select.getProgress if !s3Select.Progress.Enabled { getProgressFunc = nil