mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
fix: leaking connections in JSON SQL with limited return (#17239)
This commit is contained in:
parent
b784e458cb
commit
b06d7bf834
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/minio/minio/internal/s3select/json"
|
||||
"github.com/minio/minio/internal/s3select/sql"
|
||||
@ -34,8 +35,9 @@ type Reader struct {
|
||||
decoded chan simdjson.Object
|
||||
|
||||
// err will only be returned after decoded has been closed.
|
||||
err *error
|
||||
readCloser io.ReadCloser
|
||||
err *error
|
||||
readCloser io.ReadCloser
|
||||
onReaderExit func()
|
||||
|
||||
exitReader chan struct{}
|
||||
readerWg sync.WaitGroup
|
||||
@ -79,9 +81,8 @@ func (r *Reader) Close() error {
|
||||
// If r.input is closed, it is assumed that no more input will come.
|
||||
// When this function returns r.readerWg will be decremented and r.decoded will be closed.
|
||||
// On errors, r.err will be set. This should only be accessed after r.decoded has been closed.
|
||||
func (r *Reader) startReader() {
|
||||
defer r.readerWg.Done()
|
||||
defer close(r.decoded)
|
||||
func (r *Reader) startReader(reuse chan<- *simdjson.ParsedJson) {
|
||||
defer r.onReaderExit()
|
||||
var tmpObj simdjson.Object
|
||||
for {
|
||||
var in simdjson.Stream
|
||||
@ -143,6 +144,11 @@ func (r *Reader) startReader() {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Don't block if we cannot reuse.
|
||||
select {
|
||||
case reuse <- in.Value:
|
||||
default:
|
||||
}
|
||||
if in.Error == io.EOF {
|
||||
return
|
||||
}
|
||||
@ -153,14 +159,25 @@ func (r *Reader) startReader() {
|
||||
func NewReader(readCloser io.ReadCloser, args *json.ReaderArgs) *Reader {
|
||||
r := Reader{
|
||||
args: args,
|
||||
readCloser: readCloser,
|
||||
readCloser: &safeCloser{r: io.Reader(readCloser)},
|
||||
decoded: make(chan simdjson.Object, 1000),
|
||||
input: make(chan simdjson.Stream, 2),
|
||||
exitReader: make(chan struct{}),
|
||||
}
|
||||
simdjson.ParseNDStream(readCloser, r.input, nil)
|
||||
r.onReaderExit = func() {
|
||||
close(r.decoded)
|
||||
readCloser.Close()
|
||||
for range r.input {
|
||||
// Read until EOF trickles through.
|
||||
// Otherwise, we risk the decoder hanging.
|
||||
}
|
||||
r.readerWg.Done()
|
||||
}
|
||||
|
||||
reuse := make(chan *simdjson.ParsedJson, 1000)
|
||||
simdjson.ParseNDStream(readCloser, r.input, reuse)
|
||||
r.readerWg.Add(1)
|
||||
go r.startReader()
|
||||
go r.startReader(reuse)
|
||||
return &r
|
||||
}
|
||||
|
||||
@ -174,15 +191,25 @@ func NewElementReader(ch chan simdjson.Object, err *error, args *json.ReaderArgs
|
||||
}
|
||||
}
|
||||
|
||||
// NewTapeReaderChan will start a reader that will read input from the provided channel.
|
||||
func NewTapeReaderChan(pj chan simdjson.Stream, args *json.ReaderArgs) *Reader {
|
||||
r := Reader{
|
||||
args: args,
|
||||
decoded: make(chan simdjson.Object, 1000),
|
||||
input: pj,
|
||||
exitReader: make(chan struct{}),
|
||||
}
|
||||
r.readerWg.Add(1)
|
||||
go r.startReader()
|
||||
return &r
|
||||
// safeCloser will wrap a Reader as a ReadCloser.
|
||||
// It is safe to call Close while the reader is being used.
|
||||
type safeCloser struct {
|
||||
closed uint32
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
func (s *safeCloser) Read(p []byte) (n int, err error) {
|
||||
if atomic.LoadUint32(&s.closed) == 1 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n, err = s.r.Read(p)
|
||||
if atomic.LoadUint32(&s.closed) == 1 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (s *safeCloser) Close() error {
|
||||
atomic.CompareAndSwapUint32(&s.closed, 0, 1)
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user