mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: remove auto-close GetObjectReader (#12009)
locks can get relinquished when Read() sees io.EOF leading to prematurely closing of the readers concurrent writes on the same object can have undesired consequences here when these locks are relinquished.
This commit is contained in:
parent
48c5e7e5b6
commit
4223ebab8d
@ -530,7 +530,7 @@ type SelectParameters struct {
|
||||
|
||||
// IsEmpty returns true if no select parameters set
|
||||
func (sp *SelectParameters) IsEmpty() bool {
|
||||
return sp == nil || sp.S3Select == s3select.S3Select{}
|
||||
return sp == nil
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -817,13 +817,7 @@ func (g *GetObjectReader) Close() error {
|
||||
|
||||
// Read - to implement Reader interface.
|
||||
func (g *GetObjectReader) Read(p []byte) (n int, err error) {
|
||||
n, err = g.pReader.Read(p)
|
||||
if err != nil {
|
||||
// Calling code may not Close() in case of error, so
|
||||
// we ensure it.
|
||||
g.Close()
|
||||
}
|
||||
return
|
||||
return g.pReader.Read(p)
|
||||
}
|
||||
|
||||
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
* MinIO Cloud Storage, (C) 2019-2021 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -216,6 +216,7 @@ type S3Select struct {
|
||||
statement *sql.SelectStatement
|
||||
progressReader *progressReader
|
||||
recordReader recordReader
|
||||
close func() error
|
||||
}
|
||||
|
||||
var (
|
||||
@ -311,6 +312,7 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
||||
}
|
||||
return err
|
||||
}
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case jsonFormat:
|
||||
rc, err := getReader(0, -1)
|
||||
@ -333,6 +335,8 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
||||
} else {
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
}
|
||||
|
||||
s3Select.close = rc.Close
|
||||
return nil
|
||||
case parquetFormat:
|
||||
if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") {
|
||||
@ -396,6 +400,12 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
|
||||
|
||||
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
|
||||
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
defer func() {
|
||||
if s3Select.close != nil {
|
||||
s3Select.close()
|
||||
}
|
||||
}()
|
||||
|
||||
getProgressFunc := s3Select.getProgress
|
||||
if !s3Select.Progress.Enabled {
|
||||
getProgressFunc = nil
|
||||
|
Loading…
Reference in New Issue
Block a user