SQL select query for CSV/JSON (#6648)

select * , select column names have been implemented for CSV.
select * is implemented for JSON.
This commit is contained in:
Ashish Kumar Sinha
2018-10-23 00:42:22 +05:30
committed by kannappanr
parent acf46cc3b5
commit c0b4bf0a3e
111 changed files with 12888 additions and 1398 deletions

View File

@@ -17,10 +17,13 @@
package s3select
import (
"encoding/json"
"math"
"sort"
"strconv"
"strings"
"github.com/minio/minio/pkg/s3select/format"
"github.com/xwb1989/sqlparser"
)
@@ -33,8 +36,8 @@ type SelectFuncs struct {
// RunSqlParser allows us to easily bundle all the functions from above and run
// them in the appropriate order.
func (reader *Input) runSelectParser(selectExpression string, myRow chan *Row) {
reqCols, alias, myLimit, whereClause, aggFunctionNames, myFuncs, myErr := reader.ParseSelect(selectExpression)
func runSelectParser(f format.Select, myRow chan *Row) {
reqCols, alias, myLimit, whereClause, aggFunctionNames, myFuncs, myErr := ParseSelect(f)
if myErr != nil {
rowStruct := &Row{
err: myErr,
@@ -42,23 +45,26 @@ func (reader *Input) runSelectParser(selectExpression string, myRow chan *Row) {
myRow <- rowStruct
return
}
reader.processSelectReq(reqCols, alias, whereClause, myLimit, aggFunctionNames, myRow, myFuncs)
processSelectReq(reqCols, alias, whereClause, myLimit, aggFunctionNames, myRow, myFuncs, f)
}
// ParseSelect parses the SELECT expression, and effectively tokenizes it into
// its separate parts. It returns the requested column names,alias,limit of
// records, and the where clause.
func (reader *Input) ParseSelect(sqlInput string) ([]string, string, int64, interface{}, []string, *SelectFuncs, error) {
func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, *SelectFuncs, error) {
// return columnNames, alias, limitOfRecords, whereclause,coalStore, nil
stmt, err := sqlparser.Parse(sqlInput)
var whereClause interface{}
var alias string
var limit int64
myFuncs := &SelectFuncs{}
stmt, err := sqlparser.Parse(cleanExpr(f.Expression()))
// TODO Maybe can parse their errors a bit to return some more of the s3 errors
if err != nil {
return nil, "", 0, nil, nil, nil, ErrLexerInvalidChar
}
var whereClause interface{}
var alias string
var limit int64
myFuncs := &SelectFuncs{}
switch stmt := stmt.(type) {
case *sqlparser.Select:
// evaluates the where clause
@@ -146,7 +152,7 @@ func (reader *Input) ParseSelect(sqlInput string) ([]string, string, int64, inte
if stmt.OrderBy != nil {
return nil, "", 0, nil, nil, nil, ErrParseUnsupportedToken
}
if err := reader.parseErrs(columnNames, whereClause, alias, myFuncs); err != nil {
if err := parseErrs(columnNames, whereClause, alias, myFuncs, f); err != nil {
return nil, "", 0, nil, nil, nil, err
}
return columnNames, alias, limit, whereClause, functionNames, myFuncs, nil
@@ -157,13 +163,13 @@ func (reader *Input) ParseSelect(sqlInput string) ([]string, string, int64, inte
// This is the main function, It goes row by row and for records which validate
// the where clause it currently prints the appropriate row given the requested
// columns.
func (reader *Input) processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan *Row, myFunc *SelectFuncs) {
func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan *Row, myFunc *SelectFuncs, f format.Select) {
counter := -1
var columns []string
filtrCount := 0
functionFlag := false
// My values is used to store our aggregation values if we need to store them.
myAggVals := make([]float64, len(reqColNames))
var columns []string
// LowercasecolumnsMap is used in accordance with hasDuplicates so that we can
// raise the error "Ambigious" if a case insensitive column is provided and we
// have multiple matches.
@@ -174,23 +180,35 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
if limitOfRecords == 0 {
limitOfRecords = math.MaxInt64
}
for {
record := reader.ReadRecord()
reader.stats.BytesProcessed += processSize(record)
record, err := f.Read()
if err != nil {
rowStruct := &Row{
err: err,
}
myRow <- rowStruct
return
}
if record == nil {
if functionFlag {
rowStruct := &Row{
record: reader.aggFuncToStr(myAggVals) + "\n",
record: aggFuncToStr(myAggVals, f) + "\n",
}
myRow <- rowStruct
}
close(myRow)
return
}
if counter == -1 && reader.options.HeaderOpt && len(reader.header) > 0 {
columns = reader.Header()
out, _ := json.Marshal(record)
f.UpdateBytesProcessed(record)
if counter == -1 && f.HasHeader() && len(f.Header()) > 0 {
columns = f.Header()
myErr := checkForDuplicates(columns, columnsMap, hasDuplicates, lowercaseColumnsMap)
if format.IsInt(reqColNames[0]) {
myErr = ErrMissingHeaders
}
if myErr != nil {
rowStruct := &Row{
err: myErr,
@@ -198,17 +216,21 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
myRow <- rowStruct
return
}
} else if counter == -1 && len(reader.header) > 0 {
columns = reader.Header()
} else if counter == -1 && len(f.Header()) > 0 {
columns = f.Header()
for i := 0; i < len(columns); i++ {
columnsMap["_"+strconv.Itoa(i)] = i
}
}
// When we have reached our limit, on what the user specified as the number
// of rows they wanted, we terminate our interpreter.
// Return in case the number of record reaches the LIMIT defined in select query
if int64(filtrCount) == limitOfRecords && limitOfRecords != 0 {
close(myRow)
return
}
// The call to the where function clause,ensures that the rows we print match our where clause.
condition, myErr := matchesMyWhereClause(record, columnsMap, alias, whereClause)
condition, myErr := matchesMyWhereClause(record, alias, whereClause)
if myErr != nil {
rowStruct := &Row{
err: myErr,
@@ -219,25 +241,33 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
if condition {
// if its an asterix we just print everything in the row
if reqColNames[0] == "*" && functionNames[0] == "" {
rowStruct := &Row{
record: reader.printAsterix(record) + "\n",
var row *Row
switch f.Type() {
case format.CSV:
row = &Row{
record: strings.Join(convertToSlice(columnsMap, record, string(out)), f.OutputFieldDelimiter()) + "\n",
}
case format.JSON:
row = &Row{
record: string(out) + "\n",
}
}
myRow <- rowStruct
myRow <- row
} else if alias != "" {
// This is for dealing with the case of if we have to deal with a
// request for a column with an index e.g A_1.
if representsInt(reqColNames[0]) {
if format.IsInt(reqColNames[0]) {
// This checks whether any aggregation function was called as now we
// no longer will go through printing each row, and only print at the
// end
// no longer will go through printing each row, and only print at the end
if len(functionNames) > 0 && functionNames[0] != "" {
functionFlag = true
aggregationFunctions(counter, filtrCount, myAggVals, columnsMap, reqColNames, functionNames, record)
aggregationFunctions(counter, filtrCount, myAggVals, reqColNames, functionNames, string(out))
} else {
// The code below finds the appropriate columns of the row given the
// indicies provided in the SQL request and utilizes the map to
// retrieve the correct part of the row.
myQueryRow, myErr := reader.processColNameIndex(record, reqColNames, columns)
myQueryRow, myErr := processColNameIndex(string(out), reqColNames, columns, f)
if myErr != nil {
rowStruct := &Row{
err: myErr,
@@ -255,12 +285,12 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
// form of acutal names rather an indices.
if len(functionNames) > 0 && functionNames[0] != "" {
functionFlag = true
aggregationFunctions(counter, filtrCount, myAggVals, columnsMap, reqColNames, functionNames, record)
aggregationFunctions(counter, filtrCount, myAggVals, reqColNames, functionNames, string(out))
} else {
// This code prints the appropriate part of the row given the filter
// and select request, if the select request was based on column
// names rather than indices.
myQueryRow, myErr := reader.processColNameLiteral(record, reqColNames, columns, columnsMap, myFunc)
myQueryRow, myErr := processColNameLiteral(string(out), reqColNames, myFunc, f)
if myErr != nil {
rowStruct := &Row{
err: myErr,
@@ -281,75 +311,73 @@ func (reader *Input) processSelectReq(reqColNames []string, alias string, whereC
}
}
// printAsterix helps to print out the entire row if an asterix is used.
func (reader *Input) printAsterix(record []string) string {
return strings.Join(record, reader.options.OutputFieldDelimiter)
}
// processColumnNames is a function which allows for cleaning of column names.
func (reader *Input) processColumnNames(reqColNames []string, alias string) error {
for i := 0; i < len(reqColNames); i++ {
// The code below basically cleans the column name of its alias and other
// syntax, so that we can extract its pure name.
reqColNames[i] = cleanCol(reqColNames[i], alias)
func processColumnNames(reqColNames []string, alias string, f format.Select) error {
switch f.Type() {
case format.CSV:
for i := 0; i < len(reqColNames); i++ {
// The code below basically cleans the column name of its alias and other
// syntax, so that we can extract its pure name.
reqColNames[i] = cleanCol(reqColNames[i], alias)
}
case format.JSON:
// JSON doesnt have columns so no cleaning required
}
return nil
}
// processColNameIndex is the function which creates the row for an index based
// query.
func (reader *Input) processColNameIndex(record []string, reqColNames []string, columns []string) (string, error) {
func processColNameIndex(record string, reqColNames []string, columns []string, f format.Select) (string, error) {
row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ {
// COALESCE AND NULLIF do not support index based access.
if reqColNames[0] == "0" {
return "", ErrInvalidColumnIndex
return "", format.ErrInvalidColumnIndex
}
// Subtract 1 because AWS Indexing is not 0 based, it starts at 1.
mytempindex, err := strconv.Atoi(reqColNames[i])
if mytempindex > len(columns) {
return "", format.ErrInvalidColumnIndex
}
if err != nil {
return "", ErrMissingHeaders
}
mytempindex = mytempindex - 1
if mytempindex > len(columns) {
return "", ErrInvalidColumnIndex
}
row[i] = record[mytempindex]
// Subtract 1 because AWS Indexing is not 0 based, it starts at 1 generating the key like "_1".
row[i] = jsonValue(string("_"+strconv.Itoa(mytempindex-1)), record)
}
rowStr := strings.Join(row, reader.options.OutputFieldDelimiter)
if len(rowStr) > 1000000 {
rowStr := strings.Join(row, f.OutputFieldDelimiter())
if len(rowStr) > MaxCharsPerRecord {
return "", ErrOverMaxRecordSize
}
return rowStr, nil
}
// processColNameLiteral is the function which creates the row for an name based
// query.
func (reader *Input) processColNameLiteral(record []string, reqColNames []string, columns []string, columnsMap map[string]int, myFunc *SelectFuncs) (string, error) {
func processColNameLiteral(record string, reqColNames []string, myFunc *SelectFuncs, f format.Select) (string, error) {
row := make([]string, len(reqColNames))
for i := 0; i < len(reqColNames); i++ {
// this is the case to deal with COALESCE.
if reqColNames[i] == "" && isValidFunc(myFunc.index, i) {
row[i] = evaluateFuncExpr(myFunc.funcExpr[i], "", record, columnsMap)
row[i] = evaluateFuncExpr(myFunc.funcExpr[i], "", record)
continue
}
myTempIndex, notFound := columnsMap[trimQuotes(reqColNames[i])]
if !notFound {
return "", ErrMissingHeaders
}
row[i] = record[myTempIndex]
row[i] = jsonValue(reqColNames[i], record)
}
rowStr := strings.Join(row, reader.options.OutputFieldDelimiter)
if len(rowStr) > 1000000 {
rowStr := strings.Join(row, f.OutputFieldDelimiter())
if len(rowStr) > MaxCharsPerRecord {
return "", ErrOverMaxRecordSize
}
return rowStr, nil
}
// aggregationFunctions performs the actual aggregation methods on the
// given row, it uses an array defined for the main parsing function
// to keep track of values.
func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, columnsMap map[string]int, storeReqCols []string, storeFunctions []string, record []string) error {
// aggregationFunctions is a function which performs the actual aggregation
// methods on the given row, it uses an array defined in the main parsing
// function to keep track of values.
func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, storeReqCols []string, storeFunctions []string, record string) error {
for i := 0; i < len(storeFunctions); i++ {
if storeFunctions[i] == "" {
i++
@@ -358,15 +386,13 @@ func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, colu
} else {
// If column names are provided as an index it'll use this if statement instead of the else/
var convAggFloat float64
if representsInt(storeReqCols[i]) {
colIndex, _ := strconv.Atoi(storeReqCols[i])
// colIndex is 1-based
convAggFloat, _ = strconv.ParseFloat(record[colIndex-1], 64)
if format.IsInt(storeReqCols[i]) {
myIndex, _ := strconv.Atoi(storeReqCols[i])
convAggFloat, _ = strconv.ParseFloat(jsonValue(string("_"+strconv.Itoa(myIndex)), record), 64)
} else {
// case that the columns are in the form of named columns rather than indices.
convAggFloat, _ = strconv.ParseFloat(record[columnsMap[trimQuotes(storeReqCols[i])]], 64)
convAggFloat, _ = strconv.ParseFloat(jsonValue(storeReqCols[i], record), 64)
}
// This if statement is for calculating the min.
if storeFunctions[i] == "min" {
@@ -404,3 +430,25 @@ func aggregationFunctions(counter int, filtrCount int, myAggVals []float64, colu
}
return nil
}
// convertToSlice takes the map[string]interface{} and convert it to []string
func convertToSlice(columnsMap map[string]int, record map[string]interface{}, marshalledRecord string) []string {
var result []string
type kv struct {
Key string
Value int
}
var ss []kv
for k, v := range columnsMap {
ss = append(ss, kv{k, v})
}
sort.Slice(ss, func(i, j int) bool {
return ss[i].Value < ss[j].Value
})
for _, kv := range ss {
if _, ok := record[kv.Key]; ok {
result = append(result, jsonValue(kv.Key, marshalledRecord))
}
}
return result
}