mirror of
https://github.com/minio/minio.git
synced 2025-11-11 22:40:14 -05:00
Performance improvements by re-using record buffer (#6622)
Avoid unnecessary pointer reference allocations
when not needed, for example
- *SelectFuncs{}
- *Row{}
This commit is contained in:
committed by
Nitish Tiwari
parent
36990aeafd
commit
f162d7bd97
@@ -17,14 +17,12 @@
|
||||
package csv
|
||||
|
||||
import (
|
||||
"compress/bzip2"
|
||||
"encoding/csv"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"github.com/minio/minio/pkg/ioutil"
|
||||
"github.com/minio/minio/pkg/s3select/format"
|
||||
)
|
||||
@@ -89,28 +87,13 @@ type cinput struct {
|
||||
// Otherwise, the returned reader can be reliably consumed with Read().
|
||||
// until Read() return err.
|
||||
func New(opts *Options) (format.Select, error) {
|
||||
myReader := opts.ReadFrom
|
||||
var tempBytesScanned int64
|
||||
tempBytesScanned = 0
|
||||
switch opts.Compressed {
|
||||
case "GZIP":
|
||||
tempBytesScanned = opts.StreamSize
|
||||
var err error
|
||||
if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil {
|
||||
return nil, format.ErrTruncatedInput
|
||||
}
|
||||
case "BZIP2":
|
||||
tempBytesScanned = opts.StreamSize
|
||||
myReader = bzip2.NewReader(opts.ReadFrom)
|
||||
}
|
||||
|
||||
// DelimitedReader treats custom record delimiter like `\r\n`,`\r`,`ab` etc and replaces it with `\n`.
|
||||
normalizedReader := ioutil.NewDelimitedReader(myReader, []rune(opts.RecordDelimiter))
|
||||
normalizedReader := ioutil.NewDelimitedReader(opts.ReadFrom, []rune(opts.RecordDelimiter))
|
||||
reader := &cinput{
|
||||
options: opts,
|
||||
reader: csv.NewReader(normalizedReader),
|
||||
}
|
||||
reader.stats.BytesScanned = tempBytesScanned
|
||||
reader.stats.BytesScanned = opts.StreamSize
|
||||
reader.stats.BytesProcessed = 0
|
||||
reader.stats.BytesReturned = 0
|
||||
|
||||
|
||||
@@ -17,13 +17,11 @@
|
||||
package json
|
||||
|
||||
import (
|
||||
"compress/bzip2"
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
gzip "github.com/klauspost/pgzip"
|
||||
"github.com/minio/minio/pkg/s3select/format"
|
||||
)
|
||||
|
||||
@@ -75,26 +73,11 @@ type jinput struct {
|
||||
// Otherwise, the returned reader can be reliably consumed with jsonRead()
|
||||
// until jsonRead() returns nil.
|
||||
func New(opts *Options) (format.Select, error) {
|
||||
myReader := opts.ReadFrom
|
||||
var tempBytesScanned int64
|
||||
tempBytesScanned = 0
|
||||
switch opts.Compressed {
|
||||
case "GZIP":
|
||||
tempBytesScanned = opts.StreamSize
|
||||
var err error
|
||||
if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil {
|
||||
return nil, format.ErrTruncatedInput
|
||||
}
|
||||
case "BZIP2":
|
||||
tempBytesScanned = opts.StreamSize
|
||||
myReader = bzip2.NewReader(opts.ReadFrom)
|
||||
}
|
||||
|
||||
reader := &jinput{
|
||||
options: opts,
|
||||
reader: jsoniter.NewDecoder(myReader),
|
||||
reader: jsoniter.NewDecoder(opts.ReadFrom),
|
||||
}
|
||||
reader.stats.BytesScanned = tempBytesScanned
|
||||
reader.stats.BytesScanned = opts.StreamSize
|
||||
reader.stats.BytesProcessed = 0
|
||||
reader.stats.BytesReturned = 0
|
||||
|
||||
|
||||
@@ -490,17 +490,6 @@ func likeConvert(pattern string, record string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TrimQuotes allows the following to occur select "name", we need to trim the
|
||||
// quotes to reference our map of columnNames.
|
||||
func trimQuotes(s string) string {
|
||||
if len(s) >= 2 {
|
||||
if c := s[len(s)-1]; s[0] == c && (c == '"') {
|
||||
return s[1 : len(s)-1]
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// cleanCol cleans a column name from the parser so that the name is returned to
|
||||
// original.
|
||||
func cleanCol(myCol string, alias string) string {
|
||||
@@ -641,7 +630,7 @@ func evaluateParserType(col *sqlparser.SQLVal) (interface{}, error) {
|
||||
|
||||
// parseErrs is the function which handles all the errors that could occur
|
||||
// through use of function arguments such as column names in NULLIF
|
||||
func parseErrs(columnNames []string, whereClause interface{}, alias string, myFuncs *SelectFuncs, f format.Select) error {
|
||||
func parseErrs(columnNames []string, whereClause interface{}, alias string, myFuncs SelectFuncs, f format.Select) error {
|
||||
// Below code cleans up column names.
|
||||
processColumnNames(columnNames, alias, f)
|
||||
if columnNames[0] != "*" {
|
||||
|
||||
@@ -18,6 +18,8 @@ package s3select
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/bzip2"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
@@ -26,6 +28,8 @@ import (
|
||||
"github.com/minio/minio/pkg/s3select/format"
|
||||
"github.com/minio/minio/pkg/s3select/format/csv"
|
||||
"github.com/minio/minio/pkg/s3select/format/json"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -61,7 +65,16 @@ func cleanExpr(expr string) string {
|
||||
}
|
||||
|
||||
// New - initialize new select format
|
||||
func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, err error) {
|
||||
func New(reader io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, err error) {
|
||||
switch req.InputSerialization.CompressionType {
|
||||
case SelectCompressionGZIP:
|
||||
if reader, err = gzip.NewReader(reader); err != nil {
|
||||
return nil, format.ErrTruncatedInput
|
||||
}
|
||||
case SelectCompressionBZIP:
|
||||
reader = bzip2.NewReader(reader)
|
||||
}
|
||||
|
||||
// Initializating options for CSV
|
||||
if req.InputSerialization.CSV != nil {
|
||||
if req.OutputSerialization.CSV.FieldDelimiter == "" {
|
||||
@@ -79,7 +92,7 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select,
|
||||
FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter,
|
||||
Comments: req.InputSerialization.CSV.Comments,
|
||||
Name: "S3Object", // Default table name for all objects
|
||||
ReadFrom: gr,
|
||||
ReadFrom: reader,
|
||||
Compressed: string(req.InputSerialization.CompressionType),
|
||||
Expression: cleanExpr(req.Expression),
|
||||
OutputFieldDelimiter: req.OutputSerialization.CSV.FieldDelimiter,
|
||||
@@ -91,7 +104,7 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select,
|
||||
// Initializating options for JSON
|
||||
s3s, err = json.New(&json.Options{
|
||||
Name: "S3Object", // Default table name for all objects
|
||||
ReadFrom: gr,
|
||||
ReadFrom: reader,
|
||||
Compressed: string(req.InputSerialization.CompressionType),
|
||||
Expression: cleanExpr(req.Expression),
|
||||
StreamSize: size,
|
||||
@@ -106,8 +119,8 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select,
|
||||
// response writer in a streaming fashion so that the client can actively use
|
||||
// the results before the query is finally finished executing. The
|
||||
func Execute(writer io.Writer, f format.Select) error {
|
||||
myRow := make(chan *Row)
|
||||
curBuf := bytes.NewBuffer(make([]byte, 1000000))
|
||||
myRow := make(chan Row, 1000)
|
||||
curBuf := bytes.NewBuffer(make([]byte, humanize.MiByte))
|
||||
curBuf.Reset()
|
||||
progressTicker := time.NewTicker(progressTime)
|
||||
continuationTimer := time.NewTimer(continuationTime)
|
||||
@@ -115,13 +128,11 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
defer continuationTimer.Stop()
|
||||
|
||||
go runSelectParser(f, myRow)
|
||||
|
||||
for {
|
||||
select {
|
||||
case row, ok := <-myRow:
|
||||
if ok && row.err != nil {
|
||||
errorMessage := writeErrorMessage(row.err, curBuf)
|
||||
_, err := errorMessage.WriteTo(writer)
|
||||
_, err := writeErrorMessage(row.err, curBuf).WriteTo(writer)
|
||||
flusher, okFlush := writer.(http.Flusher)
|
||||
if okFlush {
|
||||
flusher.Flush()
|
||||
@@ -133,8 +144,7 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
close(myRow)
|
||||
return nil
|
||||
} else if ok {
|
||||
message := writeRecordMessage(row.record, curBuf)
|
||||
_, err := message.WriteTo(writer)
|
||||
_, err := writeRecordMessage(row.record, curBuf).WriteTo(writer)
|
||||
flusher, okFlush := writer.(http.Flusher)
|
||||
if okFlush {
|
||||
flusher.Flush()
|
||||
@@ -153,8 +163,7 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
statMessage := writeStatMessage(statPayload, curBuf)
|
||||
_, err = statMessage.WriteTo(writer)
|
||||
_, err = writeStatMessage(statPayload, curBuf).WriteTo(writer)
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
@@ -163,8 +172,7 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
return err
|
||||
}
|
||||
curBuf.Reset()
|
||||
message := writeEndMessage(curBuf)
|
||||
_, err = message.WriteTo(writer)
|
||||
_, err = writeEndMessage(curBuf).WriteTo(writer)
|
||||
flusher, ok = writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
@@ -182,8 +190,7 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
progressMessage := writeProgressMessage(progressPayload, curBuf)
|
||||
_, err = progressMessage.WriteTo(writer)
|
||||
_, err = writeProgressMessage(progressPayload, curBuf).WriteTo(writer)
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
@@ -194,8 +201,7 @@ func Execute(writer io.Writer, f format.Select) error {
|
||||
curBuf.Reset()
|
||||
}
|
||||
case <-continuationTimer.C:
|
||||
message := writeContinuationMessage(curBuf)
|
||||
_, err := message.WriteTo(writer)
|
||||
_, err := writeContinuationMessage(curBuf).WriteTo(writer)
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
|
||||
@@ -36,13 +36,12 @@ type SelectFuncs struct {
|
||||
|
||||
// RunSqlParser allows us to easily bundle all the functions from above and run
|
||||
// them in the appropriate order.
|
||||
func runSelectParser(f format.Select, myRow chan *Row) {
|
||||
func runSelectParser(f format.Select, myRow chan Row) {
|
||||
reqCols, alias, myLimit, whereClause, aggFunctionNames, myFuncs, myErr := ParseSelect(f)
|
||||
if myErr != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: myErr,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
processSelectReq(reqCols, alias, whereClause, myLimit, aggFunctionNames, myRow, myFuncs, f)
|
||||
@@ -52,19 +51,18 @@ func runSelectParser(f format.Select, myRow chan *Row) {
|
||||
// ParseSelect parses the SELECT expression, and effectively tokenizes it into
|
||||
// its separate parts. It returns the requested column names,alias,limit of
|
||||
// records, and the where clause.
|
||||
func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, *SelectFuncs, error) {
|
||||
// return columnNames, alias, limitOfRecords, whereclause,coalStore, nil
|
||||
|
||||
stmt, err := sqlparser.Parse(cleanExpr(f.Expression()))
|
||||
// TODO Maybe can parse their errors a bit to return some more of the s3 errors
|
||||
if err != nil {
|
||||
return nil, "", 0, nil, nil, nil, ErrLexerInvalidChar
|
||||
}
|
||||
|
||||
func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, SelectFuncs, error) {
|
||||
var sFuncs = SelectFuncs{}
|
||||
var whereClause interface{}
|
||||
var alias string
|
||||
var limit int64
|
||||
myFuncs := &SelectFuncs{}
|
||||
|
||||
stmt, err := sqlparser.Parse(f.Expression())
|
||||
// TODO Maybe can parse their errors a bit to return some more of the s3 errors
|
||||
if err != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrLexerInvalidChar
|
||||
}
|
||||
|
||||
switch stmt := stmt.(type) {
|
||||
case *sqlparser.Select:
|
||||
// evaluates the where clause
|
||||
@@ -95,26 +93,26 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin
|
||||
case *sqlparser.StarExpr:
|
||||
columnNames[0] = "*"
|
||||
if smallerexpr.Name.CompliantName() != "count" {
|
||||
return nil, "", 0, nil, nil, nil, ErrParseUnsupportedCallWithStar
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedCallWithStar
|
||||
}
|
||||
case *sqlparser.AliasedExpr:
|
||||
switch col := tempagg.Expr.(type) {
|
||||
case *sqlparser.BinaryExpr:
|
||||
return nil, "", 0, nil, nil, nil, ErrParseNonUnaryAgregateFunctionCall
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseNonUnaryAgregateFunctionCall
|
||||
case *sqlparser.ColName:
|
||||
columnNames[i] = col.Name.CompliantName()
|
||||
}
|
||||
}
|
||||
// Case to deal with if COALESCE was used..
|
||||
} else if supportedFunc(smallerexpr.Name.CompliantName()) {
|
||||
if myFuncs.funcExpr == nil {
|
||||
myFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs))
|
||||
myFuncs.index = make([]int, len(stmt.SelectExprs))
|
||||
if sFuncs.funcExpr == nil {
|
||||
sFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs))
|
||||
sFuncs.index = make([]int, len(stmt.SelectExprs))
|
||||
}
|
||||
myFuncs.funcExpr[i] = smallerexpr
|
||||
myFuncs.index[i] = i
|
||||
sFuncs.funcExpr[i] = smallerexpr
|
||||
sFuncs.index[i] = i
|
||||
} else {
|
||||
return nil, "", 0, nil, nil, nil, ErrUnsupportedSQLOperation
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrUnsupportedSQLOperation
|
||||
}
|
||||
case *sqlparser.ColName:
|
||||
columnNames[i] = smallerexpr.Name.CompliantName()
|
||||
@@ -129,7 +127,7 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin
|
||||
for i := 0; i < len(stmt.From); i++ {
|
||||
switch smallerexpr := stmt.From[i].(type) {
|
||||
case *sqlparser.JoinTableExpr:
|
||||
return nil, "", 0, nil, nil, nil, ErrParseMalformedJoin
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseMalformedJoin
|
||||
case *sqlparser.AliasedTableExpr:
|
||||
alias = smallerexpr.As.CompliantName()
|
||||
if alias == "" {
|
||||
@@ -147,23 +145,23 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin
|
||||
}
|
||||
}
|
||||
if stmt.GroupBy != nil {
|
||||
return nil, "", 0, nil, nil, nil, ErrParseUnsupportedLiteralsGroupBy
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedLiteralsGroupBy
|
||||
}
|
||||
if stmt.OrderBy != nil {
|
||||
return nil, "", 0, nil, nil, nil, ErrParseUnsupportedToken
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedToken
|
||||
}
|
||||
if err := parseErrs(columnNames, whereClause, alias, myFuncs, f); err != nil {
|
||||
return nil, "", 0, nil, nil, nil, err
|
||||
if err := parseErrs(columnNames, whereClause, alias, sFuncs, f); err != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, err
|
||||
}
|
||||
return columnNames, alias, limit, whereClause, functionNames, myFuncs, nil
|
||||
return columnNames, alias, limit, whereClause, functionNames, sFuncs, nil
|
||||
}
|
||||
return nil, "", 0, nil, nil, nil, nil
|
||||
return nil, "", 0, nil, nil, sFuncs, nil
|
||||
}
|
||||
|
||||
// This is the main function, It goes row by row and for records which validate
|
||||
// the where clause it currently prints the appropriate row given the requested
|
||||
// columns.
|
||||
func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan *Row, myFunc *SelectFuncs, f format.Select) {
|
||||
func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan Row, myFunc SelectFuncs, f format.Select) {
|
||||
counter := -1
|
||||
var columns []string
|
||||
filtrCount := 0
|
||||
@@ -183,18 +181,16 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
|
||||
for {
|
||||
record, err := f.Read()
|
||||
if err != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: err,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
if record == nil {
|
||||
if functionFlag {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
record: aggFuncToStr(myAggVals, f) + "\n",
|
||||
}
|
||||
myRow <- rowStruct
|
||||
}
|
||||
close(myRow)
|
||||
return
|
||||
@@ -210,10 +206,9 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
|
||||
myErr = ErrMissingHeaders
|
||||
}
|
||||
if myErr != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: myErr,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
} else if counter == -1 && len(f.Header()) > 0 {
|
||||
@@ -232,28 +227,26 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
|
||||
// The call to the where function clause,ensures that the rows we print match our where clause.
|
||||
condition, myErr := matchesMyWhereClause(record, alias, whereClause)
|
||||
if myErr != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: myErr,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
if condition {
|
||||
// if its an asterix we just print everything in the row
|
||||
if reqColNames[0] == "*" && functionNames[0] == "" {
|
||||
var row *Row
|
||||
var row Row
|
||||
switch f.Type() {
|
||||
case format.CSV:
|
||||
row = &Row{
|
||||
row = Row{
|
||||
record: strings.Join(convertToSlice(columnsMap, record, string(out)), f.OutputFieldDelimiter()) + "\n",
|
||||
}
|
||||
case format.JSON:
|
||||
row = &Row{
|
||||
row = Row{
|
||||
record: string(out) + "\n",
|
||||
}
|
||||
}
|
||||
myRow <- row
|
||||
|
||||
} else if alias != "" {
|
||||
// This is for dealing with the case of if we have to deal with a
|
||||
// request for a column with an index e.g A_1.
|
||||
@@ -269,16 +262,14 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
|
||||
// retrieve the correct part of the row.
|
||||
myQueryRow, myErr := processColNameIndex(string(out), reqColNames, columns, f)
|
||||
if myErr != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: myErr,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
record: myQueryRow + "\n",
|
||||
}
|
||||
myRow <- rowStruct
|
||||
}
|
||||
} else {
|
||||
// This code does aggregation if we were provided column names in the
|
||||
@@ -292,16 +283,14 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
|
||||
// names rather than indices.
|
||||
myQueryRow, myErr := processColNameLiteral(string(out), reqColNames, myFunc, f)
|
||||
if myErr != nil {
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
err: myErr,
|
||||
}
|
||||
myRow <- rowStruct
|
||||
return
|
||||
}
|
||||
rowStruct := &Row{
|
||||
myRow <- Row{
|
||||
record: myQueryRow + "\n",
|
||||
}
|
||||
myRow <- rowStruct
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -357,7 +346,7 @@ func processColNameIndex(record string, reqColNames []string, columns []string,
|
||||
|
||||
// processColNameLiteral is the function which creates the row for an name based
|
||||
// query.
|
||||
func processColNameLiteral(record string, reqColNames []string, myFunc *SelectFuncs, f format.Select) (string, error) {
|
||||
func processColNameLiteral(record string, reqColNames []string, myFunc SelectFuncs, f format.Select) (string, error) {
|
||||
row := make([]string, len(reqColNames))
|
||||
for i := 0; i < len(reqColNames); i++ {
|
||||
// this is the case to deal with COALESCE.
|
||||
|
||||
Reference in New Issue
Block a user