From b06d7bf834a0d0c6115d4e5b64464dee79cdf04c Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 18 May 2023 11:26:46 -0700 Subject: [PATCH] fix: leaking connections in JSON SQL with limited return (#17239) --- internal/s3select/simdj/reader.go | 65 ++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 19 deletions(-) diff --git a/internal/s3select/simdj/reader.go b/internal/s3select/simdj/reader.go index 7e1cc63c6..86a72e433 100644 --- a/internal/s3select/simdj/reader.go +++ b/internal/s3select/simdj/reader.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "github.com/minio/minio/internal/s3select/json" "github.com/minio/minio/internal/s3select/sql" @@ -34,8 +35,9 @@ type Reader struct { decoded chan simdjson.Object // err will only be returned after decoded has been closed. - err *error - readCloser io.ReadCloser + err *error + readCloser io.ReadCloser + onReaderExit func() exitReader chan struct{} readerWg sync.WaitGroup @@ -79,9 +81,8 @@ func (r *Reader) Close() error { // If r.input is closed, it is assumed that no more input will come. // When this function returns r.readerWg will be decremented and r.decoded will be closed. // On errors, r.err will be set. This should only be accessed after r.decoded has been closed. -func (r *Reader) startReader() { - defer r.readerWg.Done() - defer close(r.decoded) +func (r *Reader) startReader(reuse chan<- *simdjson.ParsedJson) { + defer r.onReaderExit() var tmpObj simdjson.Object for { var in simdjson.Stream @@ -143,6 +144,11 @@ func (r *Reader) startReader() { return } } + // Don't block if we cannot reuse. + select { + case reuse <- in.Value: + default: + } if in.Error == io.EOF { return } @@ -153,14 +159,25 @@ func (r *Reader) startReader() { func NewReader(readCloser io.ReadCloser, args *json.ReaderArgs) *Reader { r := Reader{ args: args, - readCloser: readCloser, + readCloser: &safeCloser{r: io.Reader(readCloser)}, decoded: make(chan simdjson.Object, 1000), input: make(chan simdjson.Stream, 2), exitReader: make(chan struct{}), } - simdjson.ParseNDStream(readCloser, r.input, nil) + r.onReaderExit = func() { + close(r.decoded) + readCloser.Close() + for range r.input { + // Read until EOF trickles through. + // Otherwise, we risk the decoder hanging. + } + r.readerWg.Done() + } + + reuse := make(chan *simdjson.ParsedJson, 1000) + simdjson.ParseNDStream(readCloser, r.input, reuse) r.readerWg.Add(1) - go r.startReader() + go r.startReader(reuse) return &r } @@ -174,15 +191,25 @@ func NewElementReader(ch chan simdjson.Object, err *error, args *json.ReaderArgs } } -// NewTapeReaderChan will start a reader that will read input from the provided channel. -func NewTapeReaderChan(pj chan simdjson.Stream, args *json.ReaderArgs) *Reader { - r := Reader{ - args: args, - decoded: make(chan simdjson.Object, 1000), - input: pj, - exitReader: make(chan struct{}), - } - r.readerWg.Add(1) - go r.startReader() - return &r +// safeCloser will wrap a Reader as a ReadCloser. +// It is safe to call Close while the reader is being used. +type safeCloser struct { + closed uint32 + r io.Reader +} + +func (s *safeCloser) Read(p []byte) (n int, err error) { + if atomic.LoadUint32(&s.closed) == 1 { + return 0, io.EOF + } + n, err = s.r.Read(p) + if atomic.LoadUint32(&s.closed) == 1 { + return 0, io.EOF + } + return n, err +} + +func (s *safeCloser) Close() error { + atomic.CompareAndSwapUint32(&s.closed, 0, 1) + return nil }