minio/pkg/s3select/format/json/json.go
Harshavardhana 7e1661f4fa Performance improvements to SELECT API on certain query operations (#6752)
This improves the performance of certain queries dramatically,
such as 'count(*)' etc.

Without this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m42.464s
user	0m0.071s
sys	0m0.010s
```

With this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m17.603s
user	0m0.093s
sys	0m0.008s
```

Almost a 250% improvement in performance. This PR avoids a lot of type
conversions and instead relies on raw sequences of data and interprets
them lazily.

```
benchcmp old new
benchmark                        old ns/op       new ns/op       delta
BenchmarkSQLAggregate_100K-4     551213          259782          -52.87%
BenchmarkSQLAggregate_1M-4       6981901985      2432413729      -65.16%
BenchmarkSQLAggregate_2M-4       13511978488     4536903552      -66.42%
BenchmarkSQLAggregate_10M-4      68427084908     23266283336     -66.00%

benchmark                        old allocs     new allocs     delta
BenchmarkSQLAggregate_100K-4     2366           485            -79.50%
BenchmarkSQLAggregate_1M-4       47455492       21462860       -54.77%
BenchmarkSQLAggregate_2M-4       95163637       43110771       -54.70%
BenchmarkSQLAggregate_10M-4      476959550      216906510      -54.52%

benchmark                        old bytes       new bytes      delta
BenchmarkSQLAggregate_100K-4     1233079         1086024        -11.93%
BenchmarkSQLAggregate_1M-4       2607984120      557038536      -78.64%
BenchmarkSQLAggregate_2M-4       5254103616      1128149168     -78.53%
BenchmarkSQLAggregate_10M-4      26443524872     5722715992     -78.36%
```
2018-11-14 15:55:10 -08:00

178 lines
4.7 KiB
Go

/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"bufio"
"encoding/xml"
"io"
"github.com/minio/minio/pkg/s3select/format"
)
// Options options are passed to the underlying encoding/json reader.
type Options struct {
// Name of the table that is used for querying
Name string
// ReadFrom is where the data will be read from.
ReadFrom io.Reader
// If true then we need to add gzip or bzip reader.
// to extract the csv.
Compressed string
// SQL expression meant to be evaluated.
Expression string
// What the outputted will be delimited by .
RecordDelimiter string
// Size of incoming object
StreamSize int64
// True if Type is DOCUMENTS
Type bool
// Progress enabled, enable/disable progress messages.
Progress bool
}
// jinput represents a record producing input from a formatted file or pipe.
type jinput struct {
options *Options
reader *bufio.Reader
firstRow []string
header []string
minOutputLength int
stats struct {
BytesScanned int64
BytesReturned int64
BytesProcessed int64
}
}
// New sets up a new, the first Json is read when this is run.
// If there is a problem with reading the first Json, the error is returned.
// Otherwise, the returned reader can be reliably consumed with jsonRead()
// until jsonRead() returns nil.
func New(opts *Options) (format.Select, error) {
reader := &jinput{
options: opts,
reader: bufio.NewReader(opts.ReadFrom),
}
reader.stats.BytesScanned = opts.StreamSize
reader.stats.BytesProcessed = 0
reader.stats.BytesReturned = 0
return reader, nil
}
// Progress - return true if progress was requested.
func (reader *jinput) Progress() bool {
return reader.options.Progress
}
// UpdateBytesProcessed - populates the bytes Processed
func (reader *jinput) UpdateBytesProcessed(size int64) {
reader.stats.BytesProcessed += size
}
// Read the file and returns
func (reader *jinput) Read() ([]byte, error) {
data, err := reader.reader.ReadBytes('\n')
if err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
err = nil
} else {
err = format.ErrJSONParsingError
}
}
return data, err
}
// OutputFieldDelimiter - returns the delimiter specified in input request
func (reader *jinput) OutputFieldDelimiter() string {
return ","
}
// HasHeader - returns true or false depending upon the header.
func (reader *jinput) HasHeader() bool {
return false
}
// Expression - return the Select Expression for
func (reader *jinput) Expression() string {
return reader.options.Expression
}
// UpdateBytesReturned - updates the Bytes returned for
func (reader *jinput) UpdateBytesReturned(size int64) {
reader.stats.BytesReturned += size
}
// Header returns a nil in case of
func (reader *jinput) Header() []string {
return nil
}
// CreateStatXML is the function which does the marshaling from the stat
// structs into XML so that the progress and stat message can be sent
func (reader *jinput) CreateStatXML() (string, error) {
if reader.options.Compressed == "NONE" {
reader.stats.BytesProcessed = reader.options.StreamSize
reader.stats.BytesScanned = reader.stats.BytesProcessed
}
out, err := xml.Marshal(&format.Stats{
BytesScanned: reader.stats.BytesScanned,
BytesProcessed: reader.stats.BytesProcessed,
BytesReturned: reader.stats.BytesReturned,
})
if err != nil {
return "", err
}
return xml.Header + string(out), nil
}
// CreateProgressXML is the function which does the marshaling from the progress
// structs into XML so that the progress and stat message can be sent
func (reader *jinput) CreateProgressXML() (string, error) {
if !(reader.options.Compressed != "NONE") {
reader.stats.BytesScanned = reader.stats.BytesProcessed
}
out, err := xml.Marshal(&format.Progress{
BytesScanned: reader.stats.BytesScanned,
BytesProcessed: reader.stats.BytesProcessed,
BytesReturned: reader.stats.BytesReturned,
})
if err != nil {
return "", err
}
return xml.Header + string(out), nil
}
// Type - return the data format type {
func (reader *jinput) Type() format.Type {
return format.JSON
}
// ColNameErrs - this is a dummy function for JSON input type.
func (reader *jinput) ColNameErrs(columnNames []string) error {
return nil
}