minio/pkg/s3select/input.go
Harshavardhana 7e1661f4fa Performance improvements to SELECT API on certain query operations (#6752)
This improves the performance of certain queries dramatically,
such as 'count(*)' etc.

Without this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m42.464s
user	0m0.071s
sys	0m0.010s
```

With this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m17.603s
user	0m0.093s
sys	0m0.008s
```

Almost a 250% improvement in performance. This PR avoids a lot of type
conversions and instead relies on raw sequences of data and interprets
them lazily.

```
benchcmp old new
benchmark                        old ns/op       new ns/op       delta
BenchmarkSQLAggregate_100K-4     551213          259782          -52.87%
BenchmarkSQLAggregate_1M-4       6981901985      2432413729      -65.16%
BenchmarkSQLAggregate_2M-4       13511978488     4536903552      -66.42%
BenchmarkSQLAggregate_10M-4      68427084908     23266283336     -66.00%

benchmark                        old allocs     new allocs     delta
BenchmarkSQLAggregate_100K-4     2366           485            -79.50%
BenchmarkSQLAggregate_1M-4       47455492       21462860       -54.77%
BenchmarkSQLAggregate_2M-4       95163637       43110771       -54.70%
BenchmarkSQLAggregate_10M-4      476959550      216906510      -54.52%

benchmark                        old bytes       new bytes      delta
BenchmarkSQLAggregate_100K-4     1233079         1086024        -11.93%
BenchmarkSQLAggregate_1M-4       2607984120      557038536      -78.64%
BenchmarkSQLAggregate_2M-4       5254103616      1128149168     -78.53%
BenchmarkSQLAggregate_10M-4      26443524872     5722715992     -78.36%
```
2018-11-14 15:55:10 -08:00

205 lines
5.9 KiB
Go

/*
* Minio Cloud Storage, (C) 2018 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package s3select
import (
"bytes"
"compress/bzip2"
"io"
"net/http"
"strings"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/klauspost/pgzip"
"github.com/minio/minio/pkg/s3select/format"
"github.com/minio/minio/pkg/s3select/format/csv"
"github.com/minio/minio/pkg/s3select/format/json"
)
const (
// progressTime is the time interval for which a progress message is sent.
progressTime time.Duration = 60 * time.Second
// continuationTime is the time interval for which a continuation message is
// sent.
continuationTime time.Duration = 5 * time.Second
)
// Row is a Struct for keeping track of key aspects of a row.
type Row struct {
record string
err error
}
// This function replaces "",'' with `` for the select parser
func cleanExpr(expr string) string {
r := strings.NewReplacer("\"", "`")
return r.Replace(expr)
}
// New - initialize new select format
func New(reader io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, err error) {
switch req.InputSerialization.CompressionType {
case SelectCompressionGZIP:
if reader, err = pgzip.NewReader(reader); err != nil {
return nil, format.ErrTruncatedInput
}
case SelectCompressionBZIP:
reader = bzip2.NewReader(reader)
}
// Initializating options for CSV
if req.InputSerialization.CSV != nil {
if req.OutputSerialization.CSV.FieldDelimiter == "" {
req.OutputSerialization.CSV.FieldDelimiter = ","
}
if req.InputSerialization.CSV.FileHeaderInfo == "" {
req.InputSerialization.CSV.FileHeaderInfo = CSVFileHeaderInfoNone
}
if req.InputSerialization.CSV.RecordDelimiter == "" {
req.InputSerialization.CSV.RecordDelimiter = "\n"
}
s3s, err = csv.New(&csv.Options{
HasHeader: req.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone,
RecordDelimiter: req.InputSerialization.CSV.RecordDelimiter,
FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter,
Comments: req.InputSerialization.CSV.Comments,
Name: "S3Object", // Default table name for all objects
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
OutputFieldDelimiter: req.OutputSerialization.CSV.FieldDelimiter,
StreamSize: size,
HeaderOpt: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse,
Progress: req.RequestProgress.Enabled,
})
} else if req.InputSerialization.JSON != nil {
// Initializating options for JSON
s3s, err = json.New(&json.Options{
Name: "S3Object", // Default table name for all objects
ReadFrom: reader,
Compressed: string(req.InputSerialization.CompressionType),
Expression: cleanExpr(req.Expression),
StreamSize: size,
Type: req.InputSerialization.JSON.Type == JSONTypeDocument,
Progress: req.RequestProgress.Enabled,
})
}
return s3s, err
}
// Execute is the function where all the blocking occurs, It writes to the HTTP
// response writer in a streaming fashion so that the client can actively use
// the results before the query is finally finished executing. The
func Execute(writer io.Writer, f format.Select) error {
rowCh := make(chan Row)
curBuf := bytes.NewBuffer(make([]byte, humanize.MiByte))
curBuf.Reset()
progressTicker := time.NewTicker(progressTime)
continuationTimer := time.NewTimer(continuationTime)
defer progressTicker.Stop()
defer continuationTimer.Stop()
go runSelectParser(f, rowCh)
for {
select {
case row, ok := <-rowCh:
if ok && row.err != nil {
_, err := writeErrorMessage(row.err, curBuf).WriteTo(writer)
flusher, okFlush := writer.(http.Flusher)
if okFlush {
flusher.Flush()
}
if err != nil {
return err
}
curBuf.Reset()
close(rowCh)
return nil
} else if ok {
_, err := writeRecordMessage(row.record, curBuf).WriteTo(writer)
flusher, okFlush := writer.(http.Flusher)
if okFlush {
flusher.Flush()
}
if err != nil {
return err
}
curBuf.Reset()
f.UpdateBytesReturned(int64(len(row.record)))
if !continuationTimer.Stop() {
<-continuationTimer.C
}
continuationTimer.Reset(continuationTime)
} else if !ok {
statPayload, err := f.CreateStatXML()
if err != nil {
return err
}
_, err = writeStatMessage(statPayload, curBuf).WriteTo(writer)
flusher, ok := writer.(http.Flusher)
if ok {
flusher.Flush()
}
if err != nil {
return err
}
curBuf.Reset()
_, err = writeEndMessage(curBuf).WriteTo(writer)
flusher, ok = writer.(http.Flusher)
if ok {
flusher.Flush()
}
if err != nil {
return err
}
return nil
}
case <-progressTicker.C:
// Send progress messages only if requested by client.
if f.Progress() {
progressPayload, err := f.CreateProgressXML()
if err != nil {
return err
}
_, err = writeProgressMessage(progressPayload, curBuf).WriteTo(writer)
flusher, ok := writer.(http.Flusher)
if ok {
flusher.Flush()
}
if err != nil {
return err
}
curBuf.Reset()
}
case <-continuationTimer.C:
_, err := writeContinuationMessage(curBuf).WriteTo(writer)
flusher, ok := writer.(http.Flusher)
if ok {
flusher.Flush()
}
if err != nil {
return err
}
curBuf.Reset()
continuationTimer.Reset(continuationTime)
}
}
}