minio/pkg/s3select/json/reader.go
Klaus Post ddea0bdf11 Concurrent CSV parsing and reduce S3 select allocations (#8200)
```
CSV parsing, BEFORE:
BenchmarkReaderBasic-12         	    2842	    407533 ns/op	  397860 B/op	     957 allocs/op
BenchmarkReaderReplace-12       	    2718	    429914 ns/op	  397844 B/op	     957 allocs/op
BenchmarkReaderReplaceTwo-12    	    2718	    435556 ns/op	  397855 B/op	     957 allocs/op
BenchmarkAggregateCount_100K-12    	     171	   6798974 ns/op	16667102 B/op	  308077 allocs/op
BenchmarkAggregateCount_1M-12    	      19	  65657411 ns/op	168057743 B/op	 3146610 allocs/op
BenchmarkSelectAll_10M-12    	       1	20882119900 ns/op	2758799896 B/op	41978762 allocs/op

CSV parsing, AFTER:
BenchmarkReaderBasic-12         	    3721	    312549 ns/op	  101920 B/op	     338 allocs/op
BenchmarkReaderReplace-12       	    3776	    318810 ns/op	  101993 B/op	     340 allocs/op
BenchmarkReaderReplaceTwo-12    	    3610	    330967 ns/op	  102012 B/op	     341 allocs/op
BenchmarkAggregateCount_100K-12    	     295	   4149588 ns/op	 3553623 B/op	  103261 allocs/op
BenchmarkAggregateCount_1M-12    	      30	  37746503 ns/op	33827931 B/op	 1049435 allocs/op
BenchmarkSelectAll_10M-12    	       1	17608495800 ns/op	1416504040 B/op	21007082 allocs/op

~ benchcmp old.txt new.txt
benchmark                           old ns/op       new ns/op       delta
BenchmarkReaderBasic-12             407533          312549          -23.31%
BenchmarkReaderReplace-12           429914          318810          -25.84%
BenchmarkReaderReplaceTwo-12        435556          330967          -24.01%
BenchmarkAggregateCount_100K-12     6798974         4149588         -38.97%
BenchmarkAggregateCount_1M-12       65657411        37746503        -42.51%
BenchmarkSelectAll_10M-12           20882119900     17608495800     -15.68%

benchmark                           old allocs     new allocs     delta
BenchmarkReaderBasic-12             957            338            -64.68%
BenchmarkReaderReplace-12           957            340            -64.47%
BenchmarkReaderReplaceTwo-12        957            341            -64.37%
BenchmarkAggregateCount_100K-12     308077         103261         -66.48%
BenchmarkAggregateCount_1M-12       3146610        1049435        -66.65%
BenchmarkSelectAll_10M-12           41978762       21007082       -49.96%

benchmark                           old bytes      new bytes      delta
BenchmarkReaderBasic-12             397860         101920         -74.38%
BenchmarkReaderReplace-12           397844         101993         -74.36%
BenchmarkReaderReplaceTwo-12        397855         102012         -74.36%
BenchmarkAggregateCount_100K-12     16667102       3553623        -78.68%
BenchmarkAggregateCount_1M-12       168057743      33827931       -79.87%
BenchmarkSelectAll_10M-12           2758799896     1416504040     -48.66%
```

```
BenchmarkReaderHuge/97K-12         	    2200	    540840 ns/op	 184.32 MB/s	 1604450 B/op	     687 allocs/op
BenchmarkReaderHuge/194K-12        	    1522	    752257 ns/op	 265.04 MB/s	 2143135 B/op	    1335 allocs/op
BenchmarkReaderHuge/389K-12        	    1190	    947858 ns/op	 420.69 MB/s	 3221831 B/op	    2630 allocs/op
BenchmarkReaderHuge/778K-12        	     806	   1472486 ns/op	 541.61 MB/s	 5201856 B/op	    5187 allocs/op
BenchmarkReaderHuge/1557K-12       	     426	   2575269 ns/op	 619.36 MB/s	 9101330 B/op	   10233 allocs/op
BenchmarkReaderHuge/3115K-12       	     286	   4034656 ns/op	 790.66 MB/s	12397968 B/op	   16099 allocs/op
BenchmarkReaderHuge/6230K-12       	     172	   6830563 ns/op	 934.05 MB/s	16008416 B/op	   26844 allocs/op
BenchmarkReaderHuge/12461K-12      	     100	  11409467 ns/op	1118.39 MB/s	22655163 B/op	   48107 allocs/op
BenchmarkReaderHuge/24922K-12      	      66	  19780395 ns/op	1290.19 MB/s	35158559 B/op	   90216 allocs/op
BenchmarkReaderHuge/49844K-12      	      34	  37282559 ns/op	1369.03 MB/s	60528624 B/op	  174497 allocs/op
```
2019-09-13 14:18:35 -07:00

89 lines
2.3 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"io"
"github.com/minio/minio/pkg/s3select/sql"
"github.com/bcicen/jstream"
)
// Reader - JSON record reader for S3Select.
type Reader struct {
args *ReaderArgs
decoder *jstream.Decoder
valueCh chan *jstream.MetaValue
readCloser io.ReadCloser
}
// Read - reads single record.
func (r *Reader) Read(dst sql.Record) (sql.Record, error) {
v, ok := <-r.valueCh
if !ok {
if err := r.decoder.Err(); err != nil {
return nil, errJSONParsingError(err)
}
return nil, io.EOF
}
var kvs jstream.KVS
if v.ValueType == jstream.Object {
// This is a JSON object type (that preserves key
// order)
kvs = v.Value.(jstream.KVS)
} else {
// To be AWS S3 compatible Select for JSON needs to
// output non-object JSON as single column value
// i.e. a map with `_1` as key and value as the
// non-object.
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}}
}
dstRec, ok := dst.(*Record)
if !ok {
dstRec = &Record{}
}
dstRec.KVS = kvs
dstRec.SelectFormat = sql.SelectFmtJSON
return dstRec, nil
}
// Close - closes underlying reader.
func (r *Reader) Close() error {
// Close the input.
// Potentially racy if the stream decoder is still reading.
err := r.readCloser.Close()
for range r.valueCh {
// Drain values so we don't leak a goroutine.
// Since we have closed the input, it should fail rather quickly.
}
return err
}
// NewReader - creates new JSON reader using readCloser.
func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
d := jstream.NewDecoder(readCloser, 0).ObjectAsKVS()
return &Reader{
args: args,
decoder: d,
valueCh: d.Stream(),
readCloser: readCloser,
}
}