Performance improvements to SELECT API on certain query operations (#6752)

This improves the performance of certain queries dramatically,
such as 'count(*)' etc.

Without this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m42.464s
user	0m0.071s
sys	0m0.010s
```

With this PR
```
~ time mc select --query "select count(*) from S3Object" myminio/sjm-airlines/star2000.csv.gz
2173762

real	0m17.603s
user	0m0.093s
sys	0m0.008s
```

Almost a 250% improvement in performance. This PR avoids a lot of type
conversions and instead relies on raw sequences of data and interprets
them lazily.

```
benchcmp old new
benchmark                        old ns/op       new ns/op       delta
BenchmarkSQLAggregate_100K-4     551213          259782          -52.87%
BenchmarkSQLAggregate_1M-4       6981901985      2432413729      -65.16%
BenchmarkSQLAggregate_2M-4       13511978488     4536903552      -66.42%
BenchmarkSQLAggregate_10M-4      68427084908     23266283336     -66.00%

benchmark                        old allocs     new allocs     delta
BenchmarkSQLAggregate_100K-4     2366           485            -79.50%
BenchmarkSQLAggregate_1M-4       47455492       21462860       -54.77%
BenchmarkSQLAggregate_2M-4       95163637       43110771       -54.70%
BenchmarkSQLAggregate_10M-4      476959550      216906510      -54.52%

benchmark                        old bytes       new bytes      delta
BenchmarkSQLAggregate_100K-4     1233079         1086024        -11.93%
BenchmarkSQLAggregate_1M-4       2607984120      557038536      -78.64%
BenchmarkSQLAggregate_2M-4       5254103616      1128149168     -78.53%
BenchmarkSQLAggregate_10M-4      26443524872     5722715992     -78.36%
```
This commit is contained in:
Harshavardhana
2018-11-14 15:55:10 -08:00
committed by kannappanr
parent f9779b24ad
commit 7e1661f4fa
108 changed files with 640 additions and 12237 deletions

View File

@@ -17,13 +17,13 @@
package s3select
import (
"encoding/json"
"math"
"sort"
"strconv"
"strings"
"github.com/minio/minio/pkg/s3select/format"
"github.com/tidwall/gjson"
"github.com/xwb1989/sqlparser"
)
@@ -36,29 +36,28 @@ type SelectFuncs struct {
// RunSqlParser allows us to easily bundle all the functions from above and run
// them in the appropriate order.
func runSelectParser(f format.Select, myRow chan Row) {
reqCols, alias, myLimit, whereClause, aggFunctionNames, myFuncs, myErr := ParseSelect(f)
if myErr != nil {
myRow <- Row{
err: myErr,
func runSelectParser(f format.Select, rowCh chan Row) {
reqCols, alias, limit, wc, aggFunctionNames, fns, err := ParseSelect(f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
processSelectReq(reqCols, alias, whereClause, myLimit, aggFunctionNames, myRow, myFuncs, f)
processSelectReq(reqCols, alias, wc, limit, aggFunctionNames, rowCh, fns, f)
}
// ParseSelect parses the SELECT expression, and effectively tokenizes it into
// its separate parts. It returns the requested column names,alias,limit of
// records, and the where clause.
func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, SelectFuncs, error) {
func ParseSelect(f format.Select) ([]string, string, int64, sqlparser.Expr, []string, SelectFuncs, error) {
var sFuncs = SelectFuncs{}
var whereClause interface{}
var whereClause sqlparser.Expr
var alias string
var limit int64
stmt, err := sqlparser.Parse(f.Expression())
// TODO Maybe can parse their errors a bit to return some more of the s3 errors
// TODO: Maybe can parse their errors a bit to return some more of the s3 errors
if err != nil {
return nil, "", 0, nil, nil, sFuncs, ErrLexerInvalidChar
}
@@ -66,73 +65,64 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin
switch stmt := stmt.(type) {
case *sqlparser.Select:
// evaluates the where clause
functionNames := make([]string, len(stmt.SelectExprs))
fnNames := make([]string, len(stmt.SelectExprs))
columnNames := make([]string, len(stmt.SelectExprs))
if stmt.Where != nil {
switch expr := stmt.Where.Expr.(type) {
default:
whereClause = expr
case *sqlparser.ComparisonExpr:
whereClause = expr
}
whereClause = stmt.Where.Expr
}
if stmt.SelectExprs != nil {
for i := 0; i < len(stmt.SelectExprs); i++ {
switch expr := stmt.SelectExprs[i].(type) {
case *sqlparser.StarExpr:
columnNames[0] = "*"
case *sqlparser.AliasedExpr:
switch smallerexpr := expr.Expr.(type) {
case *sqlparser.FuncExpr:
if smallerexpr.IsAggregate() {
functionNames[i] = smallerexpr.Name.CompliantName()
// Will return function name
// Case to deal with if we have functions and not an asterix
switch tempagg := smallerexpr.Exprs[0].(type) {
case *sqlparser.StarExpr:
columnNames[0] = "*"
if smallerexpr.Name.CompliantName() != "count" {
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedCallWithStar
}
case *sqlparser.AliasedExpr:
switch col := tempagg.Expr.(type) {
case *sqlparser.BinaryExpr:
return nil, "", 0, nil, nil, sFuncs, ErrParseNonUnaryAgregateFunctionCall
case *sqlparser.ColName:
columnNames[i] = col.Name.CompliantName()
}
for i, sexpr := range stmt.SelectExprs {
switch expr := sexpr.(type) {
case *sqlparser.StarExpr:
columnNames[0] = "*"
case *sqlparser.AliasedExpr:
switch smallerexpr := expr.Expr.(type) {
case *sqlparser.FuncExpr:
if smallerexpr.IsAggregate() {
fnNames[i] = smallerexpr.Name.CompliantName()
// Will return function name
// Case to deal with if we have functions and not an asterix
switch tempagg := smallerexpr.Exprs[0].(type) {
case *sqlparser.StarExpr:
columnNames[0] = "*"
if smallerexpr.Name.CompliantName() != "count" {
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedCallWithStar
}
// Case to deal with if COALESCE was used..
} else if supportedFunc(smallerexpr.Name.CompliantName()) {
if sFuncs.funcExpr == nil {
sFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs))
sFuncs.index = make([]int, len(stmt.SelectExprs))
case *sqlparser.AliasedExpr:
switch col := tempagg.Expr.(type) {
case *sqlparser.BinaryExpr:
return nil, "", 0, nil, nil, sFuncs, ErrParseNonUnaryAgregateFunctionCall
case *sqlparser.ColName:
columnNames[i] = col.Name.CompliantName()
}
sFuncs.funcExpr[i] = smallerexpr
sFuncs.index[i] = i
} else {
return nil, "", 0, nil, nil, sFuncs, ErrUnsupportedSQLOperation
}
case *sqlparser.ColName:
columnNames[i] = smallerexpr.Name.CompliantName()
// Case to deal with if COALESCE was used..
} else if supportedFunc(smallerexpr.Name.CompliantName()) {
if sFuncs.funcExpr == nil {
sFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs))
sFuncs.index = make([]int, len(stmt.SelectExprs))
}
sFuncs.funcExpr[i] = smallerexpr
sFuncs.index[i] = i
} else {
return nil, "", 0, nil, nil, sFuncs, ErrUnsupportedSQLOperation
}
case *sqlparser.ColName:
columnNames[i] = smallerexpr.Name.CompliantName()
}
}
}
// This code retrieves the alias and makes sure it is set to the correct
// value, if not it sets it to the tablename
if (stmt.From) != nil {
for i := 0; i < len(stmt.From); i++ {
switch smallerexpr := stmt.From[i].(type) {
case *sqlparser.JoinTableExpr:
return nil, "", 0, nil, nil, sFuncs, ErrParseMalformedJoin
case *sqlparser.AliasedTableExpr:
alias = smallerexpr.As.CompliantName()
if alias == "" {
alias = sqlparser.GetTableName(smallerexpr.Expr).CompliantName()
}
for _, fexpr := range stmt.From {
switch smallerexpr := fexpr.(type) {
case *sqlparser.JoinTableExpr:
return nil, "", 0, nil, nil, sFuncs, ErrParseMalformedJoin
case *sqlparser.AliasedTableExpr:
alias = smallerexpr.As.CompliantName()
if alias == "" {
alias = sqlparser.GetTableName(smallerexpr.Expr).CompliantName()
}
}
}
@@ -153,143 +143,170 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin
if err := parseErrs(columnNames, whereClause, alias, sFuncs, f); err != nil {
return nil, "", 0, nil, nil, sFuncs, err
}
return columnNames, alias, limit, whereClause, functionNames, sFuncs, nil
return columnNames, alias, limit, whereClause, fnNames, sFuncs, nil
}
return nil, "", 0, nil, nil, sFuncs, nil
}
type columnKv struct {
Key string
Value int
}
func columnsIndex(reqColNames []string, f format.Select) ([]columnKv, error) {
var (
columnsKv []columnKv
columnsMap = make(map[string]int)
columns = f.Header()
)
if f.HasHeader() {
err := checkForDuplicates(columns, columnsMap)
if format.IsInt(reqColNames[0]) {
err = ErrMissingHeaders
}
if err != nil {
return nil, err
}
for k, v := range columnsMap {
columnsKv = append(columnsKv, columnKv{
Key: k,
Value: v,
})
}
} else {
for i := range columns {
columnsKv = append(columnsKv, columnKv{
Key: "_" + strconv.Itoa(i),
Value: i,
})
}
}
sort.Slice(columnsKv, func(i, j int) bool {
return columnsKv[i].Value < columnsKv[j].Value
})
return columnsKv, nil
}
// This is the main function, It goes row by row and for records which validate
// the where clause it currently prints the appropriate row given the requested
// columns.
func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan Row, myFunc SelectFuncs, f format.Select) {
func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lrecords int64, fnNames []string, rowCh chan Row, fn SelectFuncs, f format.Select) {
counter := -1
var columns []string
filtrCount := 0
functionFlag := false
// My values is used to store our aggregation values if we need to store them.
myAggVals := make([]float64, len(reqColNames))
// LowercasecolumnsMap is used in accordance with hasDuplicates so that we can
// raise the error "Ambigious" if a case insensitive column is provided and we
// have multiple matches.
lowercaseColumnsMap := make(map[string]int)
hasDuplicates := make(map[string]bool)
// ColumnsMap stores our columns and their index.
columnsMap := make(map[string]int)
if limitOfRecords == 0 {
limitOfRecords = math.MaxInt64
// Values used to store our aggregation values.
aggVals := make([]float64, len(reqColNames))
if lrecords == 0 {
lrecords = math.MaxInt64
}
columnsKv, err := columnsIndex(reqColNames, f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
var results = make([]string, len(columnsKv))
for {
record, err := f.Read()
if err != nil {
myRow <- Row{
rowCh <- Row{
err: err,
}
return
}
if record == nil {
if functionFlag {
myRow <- Row{
record: aggFuncToStr(myAggVals, f) + "\n",
rowCh <- Row{
record: aggFuncToStr(aggVals, f) + "\n",
}
}
close(myRow)
close(rowCh)
return
}
out, _ := json.Marshal(record)
f.UpdateBytesProcessed(record)
f.UpdateBytesProcessed(int64(len(record)))
if counter == -1 && f.HasHeader() && len(f.Header()) > 0 {
columns = f.Header()
myErr := checkForDuplicates(columns, columnsMap, hasDuplicates, lowercaseColumnsMap)
if format.IsInt(reqColNames[0]) {
myErr = ErrMissingHeaders
}
if myErr != nil {
myRow <- Row{
err: myErr,
}
return
}
} else if counter == -1 && len(f.Header()) > 0 {
columns = f.Header()
for i := 0; i < len(columns); i++ {
columnsMap["_"+strconv.Itoa(i)] = i
}
}
// Return in case the number of record reaches the LIMIT defined in select query
if int64(filtrCount) == limitOfRecords && limitOfRecords != 0 {
close(myRow)
// Return in case the number of record reaches the LIMIT
// defined in select query
if int64(filtrCount) == lrecords {
close(rowCh)
return
}
// The call to the where function clause,ensures that the rows we print match our where clause.
condition, myErr := matchesMyWhereClause(record, alias, whereClause)
if myErr != nil {
myRow <- Row{
err: myErr,
// The call to the where function clause, ensures that
// the rows we print match our where clause.
condition, err := matchesMyWhereClause(record, alias, wc)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
if condition {
// if its an asterix we just print everything in the row
if reqColNames[0] == "*" && functionNames[0] == "" {
var row Row
if reqColNames[0] == "*" && fnNames[0] == "" {
switch f.Type() {
case format.CSV:
row = Row{
record: strings.Join(convertToSlice(columnsMap, record, string(out)), f.OutputFieldDelimiter()) + "\n",
for i, kv := range columnsKv {
results[i] = gjson.GetBytes(record, kv.Key).String()
}
rowCh <- Row{
record: strings.Join(results, f.OutputFieldDelimiter()) + "\n",
}
case format.JSON:
row = Row{
record: string(out) + "\n",
rowCh <- Row{
record: string(record) + "\n",
}
}
myRow <- row
} else if alias != "" {
// This is for dealing with the case of if we have to deal with a
// request for a column with an index e.g A_1.
if format.IsInt(reqColNames[0]) {
// This checks whether any aggregation function was called as now we
// no longer will go through printing each row, and only print at the end
if len(functionNames) > 0 && functionNames[0] != "" {
if len(fnNames) > 0 && fnNames[0] != "" {
functionFlag = true
aggregationFunctions(counter, filtrCount, myAggVals, reqColNames, functionNames, string(out))
aggregationFns(counter, filtrCount, aggVals, reqColNames, fnNames, record)
} else {
// The code below finds the appropriate columns of the row given the
// indicies provided in the SQL request and utilizes the map to
// retrieve the correct part of the row.
myQueryRow, myErr := processColNameIndex(string(out), reqColNames, columns, f)
if myErr != nil {
myRow <- Row{
err: myErr,
// indicies provided in the SQL request.
var rowStr string
rowStr, err = processColNameIndex(record, reqColNames, f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
myRow <- Row{
record: myQueryRow + "\n",
rowCh <- Row{
record: rowStr + "\n",
}
}
} else {
// This code does aggregation if we were provided column names in the
// form of acutal names rather an indices.
if len(functionNames) > 0 && functionNames[0] != "" {
// form of actual names rather an indices.
if len(fnNames) > 0 && fnNames[0] != "" {
functionFlag = true
aggregationFunctions(counter, filtrCount, myAggVals, reqColNames, functionNames, string(out))
aggregationFns(counter, filtrCount, aggVals, reqColNames, fnNames, record)
} else {
// This code prints the appropriate part of the row given the filter
// and select request, if the select request was based on column
// names rather than indices.
myQueryRow, myErr := processColNameLiteral(string(out), reqColNames, myFunc, f)
if myErr != nil {
myRow <- Row{
err: myErr,
var rowStr string
rowStr, err = processColNameLiteral(record, reqColNames, fn, f)
if err != nil {
rowCh <- Row{
err: err,
}
return
}
myRow <- Row{
record: myQueryRow + "\n",
rowCh <- Row{
record: rowStr + "\n",
}
}
}
@@ -304,7 +321,7 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{
func processColumnNames(reqColNames []string, alias string, f format.Select) error {
switch f.Type() {
case format.CSV:
for i := 0; i < len(reqColNames); i++ {
for i := range reqColNames {
// The code below basically cleans the column name of its alias and other
// syntax, so that we can extract its pure name.
reqColNames[i] = cleanCol(reqColNames[i], alias)
@@ -316,45 +333,43 @@ func processColumnNames(reqColNames []string, alias string, f format.Select) err
return nil
}
// processColNameIndex is the function which creates the row for an index based
// query.
func processColNameIndex(record string, reqColNames []string, columns []string, f format.Select) (string, error) {
row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ {
// processColNameIndex is the function which creates the row for an index based query.
func processColNameIndex(record []byte, reqColNames []string, f format.Select) (string, error) {
var row []string
for _, colName := range reqColNames {
// COALESCE AND NULLIF do not support index based access.
if reqColNames[0] == "0" {
return "", format.ErrInvalidColumnIndex
}
mytempindex, err := strconv.Atoi(reqColNames[i])
if mytempindex > len(columns) {
return "", format.ErrInvalidColumnIndex
}
cindex, err := strconv.Atoi(colName)
if err != nil {
return "", ErrMissingHeaders
}
// Subtract 1 because AWS Indexing is not 0 based, it starts at 1 generating the key like "_1".
row[i] = jsonValue(string("_"+strconv.Itoa(mytempindex-1)), record)
if cindex > len(f.Header()) {
return "", format.ErrInvalidColumnIndex
}
// Subtract 1 because SELECT indexing is not 0 based, it
// starts at 1 generating the key like "_1".
row = append(row, gjson.GetBytes(record, string("_"+strconv.Itoa(cindex-1))).String())
}
rowStr := strings.Join(row, f.OutputFieldDelimiter())
if len(rowStr) > MaxCharsPerRecord {
return "", ErrOverMaxRecordSize
}
return rowStr, nil
}
// processColNameLiteral is the function which creates the row for an name based
// query.
func processColNameLiteral(record string, reqColNames []string, myFunc SelectFuncs, f format.Select) (string, error) {
// processColNameLiteral is the function which creates the row for an name based query.
func processColNameLiteral(record []byte, reqColNames []string, fn SelectFuncs, f format.Select) (string, error) {
row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ {
for i, colName := range reqColNames {
// this is the case to deal with COALESCE.
if reqColNames[i] == "" && isValidFunc(myFunc.index, i) {
row[i] = evaluateFuncExpr(myFunc.funcExpr[i], "", record)
if colName == "" && isValidFunc(fn.index, i) {
row[i] = evaluateFuncExpr(fn.funcExpr[i], "", record)
continue
}
row[i] = jsonValue(reqColNames[i], record)
row[i] = gjson.GetBytes(record, colName).String()
}
rowStr := strings.Join(row, f.OutputFieldDelimiter())
if len(rowStr) > MaxCharsPerRecord {
@@ -363,81 +378,57 @@ func processColNameLiteral(record string, reqColNames []string, myFunc SelectFun
return rowStr, nil
}
// aggregationFunctions is a function which performs the actual aggregation
// aggregationFns is a function which performs the actual aggregation
// methods on the given row, it uses an array defined in the main parsing
// function to keep track of values.
func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, storeReqCols []string, storeFunctions []string, record string) error {
for i := 0; i < len(storeFunctions); i++ {
if storeFunctions[i] == "" {
i++
} else if storeFunctions[i] == "count" {
myAggVals[i]++
} else {
// If column names are provided as an index it'll use this if statement instead of the else/
func aggregationFns(counter int, filtrCount int, aggVals []float64, storeReqCols []string, storeFns []string, record []byte) error {
for i, storeFn := range storeFns {
switch storeFn {
case "":
continue
case "count":
aggVals[i]++
default:
// Column names are provided as an index it'll use
// this if statement instead.
var convAggFloat float64
if format.IsInt(storeReqCols[i]) {
myIndex, _ := strconv.Atoi(storeReqCols[i])
convAggFloat, _ = strconv.ParseFloat(jsonValue(string("_"+strconv.Itoa(myIndex)), record), 64)
index, _ := strconv.Atoi(storeReqCols[i])
convAggFloat = gjson.GetBytes(record, "_"+strconv.Itoa(index)).Float()
} else {
// case that the columns are in the form of named columns rather than indices.
convAggFloat, _ = strconv.ParseFloat(jsonValue(storeReqCols[i], record), 64)
// Named columns rather than indices.
convAggFloat = gjson.GetBytes(record, storeReqCols[i]).Float()
}
// This if statement is for calculating the min.
if storeFunctions[i] == "min" {
switch storeFn {
case "min":
if counter == -1 {
myAggVals[i] = math.MaxFloat64
aggVals[i] = math.MaxFloat64
}
if convAggFloat < myAggVals[i] {
myAggVals[i] = convAggFloat
if convAggFloat < aggVals[i] {
aggVals[i] = convAggFloat
}
} else if storeFunctions[i] == "max" {
// This if statement is for calculating the max.
case "max":
// Calculate the max.
if counter == -1 {
myAggVals[i] = math.SmallestNonzeroFloat64
aggVals[i] = math.SmallestNonzeroFloat64
}
if convAggFloat > myAggVals[i] {
myAggVals[i] = convAggFloat
if convAggFloat > aggVals[i] {
aggVals[i] = convAggFloat
}
} else if storeFunctions[i] == "sum" {
// This if statement is for calculating the sum.
myAggVals[i] += convAggFloat
} else if storeFunctions[i] == "avg" {
// This if statement is for calculating the average.
case "sum":
// Calculate the sum.
aggVals[i] += convAggFloat
case "avg":
// Calculating the average.
if filtrCount == 0 {
myAggVals[i] = convAggFloat
aggVals[i] = convAggFloat
} else {
myAggVals[i] = (convAggFloat + (myAggVals[i] * float64(filtrCount))) / float64((filtrCount + 1))
aggVals[i] = (convAggFloat + (aggVals[i] * float64(filtrCount))) / float64((filtrCount + 1))
}
} else {
default:
return ErrParseNonUnaryAgregateFunctionCall
}
}
}
return nil
}
// convertToSlice takes the map[string]interface{} and convert it to []string
func convertToSlice(columnsMap map[string]int, record map[string]interface{}, marshalledRecord string) []string {
var result []string
type kv struct {
Key string
Value int
}
var ss []kv
for k, v := range columnsMap {
ss = append(ss, kv{k, v})
}
sort.Slice(ss, func(i, j int) bool {
return ss[i].Value < ss[j].Value
})
for _, kv := range ss {
if _, ok := record[kv.Key]; ok {
result = append(result, jsonValue(kv.Key, marshalledRecord))
}
}
return result
}