mirror of
https://github.com/minio/minio.git
synced 2025-01-15 00:35:02 -05:00
7e1661f4fa
This improves the performance of certain queries dramatically, such as 'count(*)' etc. Without this PR ``` ~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz 2173762 real 0m42.464s user 0m0.071s sys 0m0.010s ``` With this PR ``` ~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz 2173762 real 0m17.603s user 0m0.093s sys 0m0.008s ``` Almost a 250% improvement in performance. This PR avoids a lot of type conversions and instead relies on raw sequences of data and interprets them lazily. ``` benchcmp old new benchmark old ns/op new ns/op delta BenchmarkSQLAggregate_100K-4 551213 259782 -52.87% BenchmarkSQLAggregate_1M-4 6981901985 2432413729 -65.16% BenchmarkSQLAggregate_2M-4 13511978488 4536903552 -66.42% BenchmarkSQLAggregate_10M-4 68427084908 23266283336 -66.00% benchmark old allocs new allocs delta BenchmarkSQLAggregate_100K-4 2366 485 -79.50% BenchmarkSQLAggregate_1M-4 47455492 21462860 -54.77% BenchmarkSQLAggregate_2M-4 95163637 43110771 -54.70% BenchmarkSQLAggregate_10M-4 476959550 216906510 -54.52% benchmark old bytes new bytes delta BenchmarkSQLAggregate_100K-4 1233079 1086024 -11.93% BenchmarkSQLAggregate_1M-4 2607984120 557038536 -78.64% BenchmarkSQLAggregate_2M-4 5254103616 1128149168 -78.53% BenchmarkSQLAggregate_10M-4 26443524872 5722715992 -78.36% ```
178 lines
4.7 KiB
Go
178 lines
4.7 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package json
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/xml"
|
|
"io"
|
|
|
|
"github.com/minio/minio/pkg/s3select/format"
|
|
)
|
|
|
|
// Options options are passed to the underlying encoding/json reader.
|
|
type Options struct {
|
|
|
|
// Name of the table that is used for querying
|
|
Name string
|
|
|
|
// ReadFrom is where the data will be read from.
|
|
ReadFrom io.Reader
|
|
|
|
// If true then we need to add gzip or bzip reader.
|
|
// to extract the csv.
|
|
Compressed string
|
|
|
|
// SQL expression meant to be evaluated.
|
|
Expression string
|
|
|
|
// What the outputted will be delimited by .
|
|
RecordDelimiter string
|
|
|
|
// Size of incoming object
|
|
StreamSize int64
|
|
|
|
// True if Type is DOCUMENTS
|
|
Type bool
|
|
|
|
// Progress enabled, enable/disable progress messages.
|
|
Progress bool
|
|
}
|
|
|
|
// jinput represents a record producing input from a formatted file or pipe.
|
|
type jinput struct {
|
|
options *Options
|
|
reader *bufio.Reader
|
|
firstRow []string
|
|
header []string
|
|
minOutputLength int
|
|
stats struct {
|
|
BytesScanned int64
|
|
BytesReturned int64
|
|
BytesProcessed int64
|
|
}
|
|
}
|
|
|
|
// New sets up a new, the first Json is read when this is run.
|
|
// If there is a problem with reading the first Json, the error is returned.
|
|
// Otherwise, the returned reader can be reliably consumed with jsonRead()
|
|
// until jsonRead() returns nil.
|
|
func New(opts *Options) (format.Select, error) {
|
|
reader := &jinput{
|
|
options: opts,
|
|
reader: bufio.NewReader(opts.ReadFrom),
|
|
}
|
|
reader.stats.BytesScanned = opts.StreamSize
|
|
reader.stats.BytesProcessed = 0
|
|
reader.stats.BytesReturned = 0
|
|
|
|
return reader, nil
|
|
}
|
|
|
|
// Progress - return true if progress was requested.
|
|
func (reader *jinput) Progress() bool {
|
|
return reader.options.Progress
|
|
}
|
|
|
|
// UpdateBytesProcessed - populates the bytes Processed
|
|
func (reader *jinput) UpdateBytesProcessed(size int64) {
|
|
reader.stats.BytesProcessed += size
|
|
}
|
|
|
|
// Read the file and returns
|
|
func (reader *jinput) Read() ([]byte, error) {
|
|
data, err := reader.reader.ReadBytes('\n')
|
|
if err != nil {
|
|
if err == io.EOF || err == io.ErrClosedPipe {
|
|
err = nil
|
|
} else {
|
|
err = format.ErrJSONParsingError
|
|
}
|
|
}
|
|
return data, err
|
|
}
|
|
|
|
// OutputFieldDelimiter - returns the delimiter specified in input request
|
|
func (reader *jinput) OutputFieldDelimiter() string {
|
|
return ","
|
|
}
|
|
|
|
// HasHeader - returns true or false depending upon the header.
|
|
func (reader *jinput) HasHeader() bool {
|
|
return false
|
|
}
|
|
|
|
// Expression - return the Select Expression for
|
|
func (reader *jinput) Expression() string {
|
|
return reader.options.Expression
|
|
}
|
|
|
|
// UpdateBytesReturned - updates the Bytes returned for
|
|
func (reader *jinput) UpdateBytesReturned(size int64) {
|
|
reader.stats.BytesReturned += size
|
|
}
|
|
|
|
// Header returns a nil in case of
|
|
func (reader *jinput) Header() []string {
|
|
return nil
|
|
}
|
|
|
|
// CreateStatXML is the function which does the marshaling from the stat
|
|
// structs into XML so that the progress and stat message can be sent
|
|
func (reader *jinput) CreateStatXML() (string, error) {
|
|
if reader.options.Compressed == "NONE" {
|
|
reader.stats.BytesProcessed = reader.options.StreamSize
|
|
reader.stats.BytesScanned = reader.stats.BytesProcessed
|
|
}
|
|
out, err := xml.Marshal(&format.Stats{
|
|
BytesScanned: reader.stats.BytesScanned,
|
|
BytesProcessed: reader.stats.BytesProcessed,
|
|
BytesReturned: reader.stats.BytesReturned,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return xml.Header + string(out), nil
|
|
}
|
|
|
|
// CreateProgressXML is the function which does the marshaling from the progress
|
|
// structs into XML so that the progress and stat message can be sent
|
|
func (reader *jinput) CreateProgressXML() (string, error) {
|
|
if !(reader.options.Compressed != "NONE") {
|
|
reader.stats.BytesScanned = reader.stats.BytesProcessed
|
|
}
|
|
out, err := xml.Marshal(&format.Progress{
|
|
BytesScanned: reader.stats.BytesScanned,
|
|
BytesProcessed: reader.stats.BytesProcessed,
|
|
BytesReturned: reader.stats.BytesReturned,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return xml.Header + string(out), nil
|
|
}
|
|
|
|
// Type - return the data format type {
|
|
func (reader *jinput) Type() format.Type {
|
|
return format.JSON
|
|
}
|
|
|
|
// ColNameErrs - this is a dummy function for JSON input type.
|
|
func (reader *jinput) ColNameErrs(columnNames []string) error {
|
|
return nil
|
|
}
|