mirror of
https://github.com/minio/minio.git
synced 2025-11-20 01:50:24 -05:00
Refactor s3select to support parquet. (#7023)
Also handle pretty formatted JSON documents.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2018 Minio, Inc.
|
||||
* Minio Cloud Storage, (C) 2019 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -17,436 +17,377 @@
|
||||
package s3select
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/pkg/s3select/format"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/xwb1989/sqlparser"
|
||||
"github.com/minio/minio/pkg/s3select/csv"
|
||||
"github.com/minio/minio/pkg/s3select/json"
|
||||
"github.com/minio/minio/pkg/s3select/parquet"
|
||||
"github.com/minio/minio/pkg/s3select/sql"
|
||||
)
|
||||
|
||||
// SelectFuncs contains the relevant values from the parser for S3 Select
|
||||
// Functions
|
||||
type SelectFuncs struct {
|
||||
funcExpr []*sqlparser.FuncExpr
|
||||
index []int
|
||||
type recordReader interface {
|
||||
Read() (sql.Record, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// RunSqlParser allows us to easily bundle all the functions from above and run
|
||||
// them in the appropriate order.
|
||||
func runSelectParser(f format.Select, rowCh chan Row) {
|
||||
reqCols, alias, limit, wc, aggFunctionNames, fns, err := ParseSelect(f)
|
||||
const (
|
||||
csvFormat = "csv"
|
||||
jsonFormat = "json"
|
||||
parquetFormat = "parquet"
|
||||
)
|
||||
|
||||
// CompressionType - represents value inside <CompressionType/> in request XML.
|
||||
type CompressionType string
|
||||
|
||||
const (
|
||||
noneType CompressionType = "none"
|
||||
gzipType CompressionType = "gzip"
|
||||
bzip2Type CompressionType = "bzip2"
|
||||
)
|
||||
|
||||
// UnmarshalXML - decodes XML data.
|
||||
func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
var s string
|
||||
if err := d.DecodeElement(&s, &start); err != nil {
|
||||
return errMalformedXML(err)
|
||||
}
|
||||
|
||||
parsedType := CompressionType(strings.ToLower(s))
|
||||
if s == "" {
|
||||
parsedType = noneType
|
||||
}
|
||||
|
||||
switch parsedType {
|
||||
case noneType, gzipType, bzip2Type:
|
||||
default:
|
||||
return errInvalidCompressionFormat(fmt.Errorf("invalid compression format '%v'", s))
|
||||
}
|
||||
|
||||
*c = parsedType
|
||||
return nil
|
||||
}
|
||||
|
||||
// InputSerialization - represents elements inside <InputSerialization/> in request XML.
|
||||
type InputSerialization struct {
|
||||
CompressionType CompressionType `xml:"CompressionType"`
|
||||
CSVArgs csv.ReaderArgs `xml:"CSV"`
|
||||
JSONArgs json.ReaderArgs `xml:"JSON"`
|
||||
ParquetArgs parquet.ReaderArgs `xml:"Parquet"`
|
||||
unmarshaled bool
|
||||
format string
|
||||
}
|
||||
|
||||
// IsEmpty - returns whether input serialization is empty or not.
|
||||
func (input *InputSerialization) IsEmpty() bool {
|
||||
return !input.unmarshaled
|
||||
}
|
||||
|
||||
// UnmarshalXML - decodes XML data.
|
||||
func (input *InputSerialization) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
// Make subtype to avoid recursive UnmarshalXML().
|
||||
type subInputSerialization InputSerialization
|
||||
parsedInput := subInputSerialization{}
|
||||
if err := d.DecodeElement(&parsedInput, &start); err != nil {
|
||||
return errMalformedXML(err)
|
||||
}
|
||||
|
||||
found := 0
|
||||
if !parsedInput.CSVArgs.IsEmpty() {
|
||||
parsedInput.format = csvFormat
|
||||
found++
|
||||
}
|
||||
if !parsedInput.JSONArgs.IsEmpty() {
|
||||
parsedInput.format = jsonFormat
|
||||
found++
|
||||
}
|
||||
if !parsedInput.ParquetArgs.IsEmpty() {
|
||||
if parsedInput.CompressionType != noneType {
|
||||
return errInvalidRequestParameter(fmt.Errorf("CompressionType must be NONE for Parquet format"))
|
||||
}
|
||||
|
||||
parsedInput.format = parquetFormat
|
||||
found++
|
||||
}
|
||||
|
||||
if found != 1 {
|
||||
return errInvalidDataSource(nil)
|
||||
}
|
||||
|
||||
*input = InputSerialization(parsedInput)
|
||||
input.unmarshaled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// OutputSerialization - represents elements inside <OutputSerialization/> in request XML.
|
||||
type OutputSerialization struct {
|
||||
CSVArgs csv.WriterArgs `xml:"CSV"`
|
||||
JSONArgs json.WriterArgs `xml:"JSON"`
|
||||
unmarshaled bool
|
||||
format string
|
||||
}
|
||||
|
||||
// IsEmpty - returns whether output serialization is empty or not.
|
||||
func (output *OutputSerialization) IsEmpty() bool {
|
||||
return !output.unmarshaled
|
||||
}
|
||||
|
||||
// UnmarshalXML - decodes XML data.
|
||||
func (output *OutputSerialization) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
// Make subtype to avoid recursive UnmarshalXML().
|
||||
type subOutputSerialization OutputSerialization
|
||||
parsedOutput := subOutputSerialization{}
|
||||
if err := d.DecodeElement(&parsedOutput, &start); err != nil {
|
||||
return errMalformedXML(err)
|
||||
}
|
||||
|
||||
found := 0
|
||||
if !parsedOutput.CSVArgs.IsEmpty() {
|
||||
parsedOutput.format = csvFormat
|
||||
found++
|
||||
}
|
||||
if !parsedOutput.JSONArgs.IsEmpty() {
|
||||
parsedOutput.format = jsonFormat
|
||||
found++
|
||||
}
|
||||
if found != 1 {
|
||||
return errObjectSerializationConflict(fmt.Errorf("either CSV or JSON should be present in OutputSerialization"))
|
||||
}
|
||||
|
||||
*output = OutputSerialization(parsedOutput)
|
||||
output.unmarshaled = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequestProgress - represents elements inside <RequestProgress/> in request XML.
|
||||
type RequestProgress struct {
|
||||
Enabled bool `xml:"Enabled"`
|
||||
}
|
||||
|
||||
// S3Select - filters the contents on a simple structured query language (SQL) statement. It
|
||||
// represents elements inside <SelectObjectContentRequest/> in request XML specified in detail at
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html.
|
||||
type S3Select struct {
|
||||
XMLName xml.Name `xml:"SelectObjectContentRequest"`
|
||||
Expression string `xml:"Expression"`
|
||||
ExpressionType string `xml:"ExpressionType"`
|
||||
Input InputSerialization `xml:"InputSerialization"`
|
||||
Output OutputSerialization `xml:"OutputSerialization"`
|
||||
Progress RequestProgress `xml:"RequestProgress"`
|
||||
|
||||
statement *sql.Select
|
||||
progressReader *progressReader
|
||||
recordReader recordReader
|
||||
}
|
||||
|
||||
// UnmarshalXML - decodes XML data.
|
||||
func (s3Select *S3Select) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
|
||||
// Make subtype to avoid recursive UnmarshalXML().
|
||||
type subS3Select S3Select
|
||||
parsedS3Select := subS3Select{}
|
||||
if err := d.DecodeElement(&parsedS3Select, &start); err != nil {
|
||||
if _, ok := err.(*s3Error); ok {
|
||||
return err
|
||||
}
|
||||
|
||||
return errMalformedXML(err)
|
||||
}
|
||||
|
||||
parsedS3Select.ExpressionType = strings.ToLower(parsedS3Select.ExpressionType)
|
||||
if parsedS3Select.ExpressionType != "sql" {
|
||||
return errInvalidExpressionType(fmt.Errorf("invalid expression type '%v'", parsedS3Select.ExpressionType))
|
||||
}
|
||||
|
||||
if parsedS3Select.Input.IsEmpty() {
|
||||
return errMissingRequiredParameter(fmt.Errorf("InputSerialization must be provided"))
|
||||
}
|
||||
|
||||
if parsedS3Select.Output.IsEmpty() {
|
||||
return errMissingRequiredParameter(fmt.Errorf("OutputSerialization must be provided"))
|
||||
}
|
||||
|
||||
statement, err := sql.NewSelect(parsedS3Select.Expression)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
processSelectReq(reqCols, alias, wc, limit, aggFunctionNames, rowCh, fns, f)
|
||||
}
|
||||
|
||||
// ParseSelect parses the SELECT expression, and effectively tokenizes it into
|
||||
// its separate parts. It returns the requested column names,alias,limit of
|
||||
// records, and the where clause.
|
||||
func ParseSelect(f format.Select) ([]string, string, int64, sqlparser.Expr, []string, SelectFuncs, error) {
|
||||
var sFuncs = SelectFuncs{}
|
||||
var whereClause sqlparser.Expr
|
||||
var alias string
|
||||
var limit int64
|
||||
|
||||
stmt, err := sqlparser.Parse(f.Expression())
|
||||
// TODO: Maybe can parse their errors a bit to return some more of the s3 errors
|
||||
if err != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrLexerInvalidChar
|
||||
return err
|
||||
}
|
||||
|
||||
switch stmt := stmt.(type) {
|
||||
case *sqlparser.Select:
|
||||
// evaluates the where clause
|
||||
fnNames := make([]string, len(stmt.SelectExprs))
|
||||
columnNames := make([]string, len(stmt.SelectExprs))
|
||||
parsedS3Select.statement = statement
|
||||
|
||||
if stmt.Where != nil {
|
||||
whereClause = stmt.Where.Expr
|
||||
}
|
||||
for i, sexpr := range stmt.SelectExprs {
|
||||
switch expr := sexpr.(type) {
|
||||
case *sqlparser.StarExpr:
|
||||
columnNames[0] = "*"
|
||||
case *sqlparser.AliasedExpr:
|
||||
switch smallerexpr := expr.Expr.(type) {
|
||||
case *sqlparser.FuncExpr:
|
||||
if smallerexpr.IsAggregate() {
|
||||
fnNames[i] = smallerexpr.Name.CompliantName()
|
||||
// Will return function name
|
||||
// Case to deal with if we have functions and not an asterix
|
||||
switch tempagg := smallerexpr.Exprs[0].(type) {
|
||||
case *sqlparser.StarExpr:
|
||||
columnNames[0] = "*"
|
||||
if smallerexpr.Name.CompliantName() != "count" {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedCallWithStar
|
||||
}
|
||||
case *sqlparser.AliasedExpr:
|
||||
switch col := tempagg.Expr.(type) {
|
||||
case *sqlparser.BinaryExpr:
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseNonUnaryAgregateFunctionCall
|
||||
case *sqlparser.ColName:
|
||||
columnNames[i] = col.Name.CompliantName()
|
||||
}
|
||||
}
|
||||
// Case to deal with if COALESCE was used..
|
||||
} else if supportedFunc(smallerexpr.Name.CompliantName()) {
|
||||
if sFuncs.funcExpr == nil {
|
||||
sFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs))
|
||||
sFuncs.index = make([]int, len(stmt.SelectExprs))
|
||||
}
|
||||
sFuncs.funcExpr[i] = smallerexpr
|
||||
sFuncs.index[i] = i
|
||||
} else {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrUnsupportedSQLOperation
|
||||
}
|
||||
case *sqlparser.ColName:
|
||||
columnNames[i] = smallerexpr.Name.CompliantName()
|
||||
}
|
||||
}
|
||||
}
|
||||
*s3Select = S3Select(parsedS3Select)
|
||||
return nil
|
||||
}
|
||||
|
||||
// This code retrieves the alias and makes sure it is set to the correct
|
||||
// value, if not it sets it to the tablename
|
||||
for _, fexpr := range stmt.From {
|
||||
switch smallerexpr := fexpr.(type) {
|
||||
case *sqlparser.JoinTableExpr:
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseMalformedJoin
|
||||
case *sqlparser.AliasedTableExpr:
|
||||
alias = smallerexpr.As.CompliantName()
|
||||
if alias == "" {
|
||||
alias = sqlparser.GetTableName(smallerexpr.Expr).CompliantName()
|
||||
}
|
||||
}
|
||||
}
|
||||
if stmt.Limit != nil {
|
||||
switch expr := stmt.Limit.Rowcount.(type) {
|
||||
case *sqlparser.SQLVal:
|
||||
// The Value of how many rows we're going to limit by
|
||||
parsedLimit, _ := strconv.Atoi(string(expr.Val[:]))
|
||||
limit = int64(parsedLimit)
|
||||
}
|
||||
}
|
||||
if stmt.GroupBy != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedLiteralsGroupBy
|
||||
}
|
||||
if stmt.OrderBy != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedToken
|
||||
}
|
||||
if err := parseErrs(columnNames, whereClause, alias, sFuncs, f); err != nil {
|
||||
return nil, "", 0, nil, nil, sFuncs, err
|
||||
}
|
||||
return columnNames, alias, limit, whereClause, fnNames, sFuncs, nil
|
||||
func (s3Select *S3Select) outputRecord() sql.Record {
|
||||
switch s3Select.Output.format {
|
||||
case csvFormat:
|
||||
return csv.NewRecord()
|
||||
case jsonFormat:
|
||||
return json.NewRecord()
|
||||
}
|
||||
return nil, "", 0, nil, nil, sFuncs, nil
|
||||
|
||||
panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format))
|
||||
}
|
||||
|
||||
type columnKv struct {
|
||||
Key string
|
||||
Value int
|
||||
func (s3Select *S3Select) getProgress() (bytesScanned, bytesProcessed int64) {
|
||||
if s3Select.progressReader != nil {
|
||||
return s3Select.progressReader.Stats()
|
||||
}
|
||||
|
||||
return -1, -1
|
||||
}
|
||||
|
||||
func columnsIndex(reqColNames []string, f format.Select) ([]columnKv, error) {
|
||||
var (
|
||||
columnsKv []columnKv
|
||||
columnsMap = make(map[string]int)
|
||||
columns = f.Header()
|
||||
)
|
||||
if f.HasHeader() {
|
||||
err := checkForDuplicates(columns, columnsMap)
|
||||
if format.IsInt(reqColNames[0]) {
|
||||
err = ErrMissingHeaders
|
||||
// Open - opens S3 object by using callback for SQL selection query.
|
||||
// Currently CSV, JSON and Apache Parquet formats are supported.
|
||||
func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadCloser, error)) error {
|
||||
switch s3Select.Input.format {
|
||||
case csvFormat:
|
||||
rc, err := getReader(0, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s3Select.progressReader, err = newProgressReader(rc, s3Select.Input.CompressionType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s3Select.recordReader, err = csv.NewReader(s3Select.progressReader, &s3Select.Input.CSVArgs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
case jsonFormat:
|
||||
rc, err := getReader(0, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s3Select.progressReader, err = newProgressReader(rc, s3Select.Input.CompressionType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
return nil
|
||||
case parquetFormat:
|
||||
var err error
|
||||
s3Select.recordReader, err = parquet.NewReader(getReader, &s3Select.Input.ParquetArgs)
|
||||
return err
|
||||
}
|
||||
|
||||
panic(fmt.Errorf("unknown input format '%v'", s3Select.Input.format))
|
||||
}
|
||||
|
||||
func (s3Select *S3Select) marshal(record sql.Record) ([]byte, error) {
|
||||
switch s3Select.Output.format {
|
||||
case csvFormat:
|
||||
data, err := record.MarshalCSV([]rune(s3Select.Output.CSVArgs.FieldDelimiter)[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range columnsMap {
|
||||
columnsKv = append(columnsKv, columnKv{
|
||||
Key: k,
|
||||
Value: v,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
for i := range columns {
|
||||
columnsKv = append(columnsKv, columnKv{
|
||||
Key: "_" + strconv.Itoa(i),
|
||||
Value: i,
|
||||
})
|
||||
|
||||
return append(data, []byte(s3Select.Output.CSVArgs.RecordDelimiter)...), nil
|
||||
case jsonFormat:
|
||||
data, err := record.MarshalJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append(data, []byte(s3Select.Output.JSONArgs.RecordDelimiter)...), nil
|
||||
}
|
||||
sort.Slice(columnsKv, func(i, j int) bool {
|
||||
return columnsKv[i].Value < columnsKv[j].Value
|
||||
})
|
||||
return columnsKv, nil
|
||||
|
||||
panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format))
|
||||
}
|
||||
|
||||
// This is the main function, It goes row by row and for records which validate
|
||||
// the where clause it currently prints the appropriate row given the requested
|
||||
// columns.
|
||||
func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lrecords int64, fnNames []string, rowCh chan Row, fn SelectFuncs, f format.Select) {
|
||||
counter := -1
|
||||
filtrCount := 0
|
||||
functionFlag := false
|
||||
|
||||
// Values used to store our aggregation values.
|
||||
aggVals := make([]float64, len(reqColNames))
|
||||
if lrecords == 0 {
|
||||
lrecords = math.MaxInt64
|
||||
// Evaluate - filters and sends records read from opened reader as per select statement to http response writer.
|
||||
func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
getProgressFunc := s3Select.getProgress
|
||||
if !s3Select.Progress.Enabled {
|
||||
getProgressFunc = nil
|
||||
}
|
||||
writer := newMessageWriter(w, getProgressFunc)
|
||||
|
||||
var results []string
|
||||
var columnsKv []columnKv
|
||||
if f.Type() == format.CSV {
|
||||
var err error
|
||||
columnsKv, err = columnsIndex(reqColNames, f)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
var inputRecord sql.Record
|
||||
var outputRecord sql.Record
|
||||
var err error
|
||||
var data []byte
|
||||
sendRecord := func() bool {
|
||||
if outputRecord == nil {
|
||||
return true
|
||||
}
|
||||
results = make([]string, len(columnsKv))
|
||||
|
||||
if data, err = s3Select.marshal(outputRecord); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if err = writer.SendRecords(data); err != nil {
|
||||
// FIXME: log this error.
|
||||
err = nil
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
for {
|
||||
record, err := f.Read()
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
if inputRecord, err = s3Select.recordReader.Read(); err != nil {
|
||||
if err != io.EOF {
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
if record == nil {
|
||||
if functionFlag {
|
||||
rowCh <- Row{
|
||||
record: aggFuncToStr(aggVals, f) + "\n",
|
||||
|
||||
if s3Select.statement.IsAggregated() {
|
||||
outputRecord = s3Select.outputRecord()
|
||||
if err = s3Select.statement.AggregateResult(outputRecord); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !sendRecord() {
|
||||
break
|
||||
}
|
||||
}
|
||||
close(rowCh)
|
||||
return
|
||||
}
|
||||
|
||||
// For JSON multi-line input type columns needs
|
||||
// to be handled for each record.
|
||||
if f.Type() == format.JSON {
|
||||
columnsKv, err = columnsIndex(reqColNames, f)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
if err = writer.SendStats(s3Select.getProgress()); err != nil {
|
||||
// FIXME: log this error.
|
||||
err = nil
|
||||
}
|
||||
results = make([]string, len(columnsKv))
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
f.UpdateBytesProcessed(int64(len(record)))
|
||||
|
||||
// Return in case the number of record reaches the LIMIT
|
||||
// defined in select query
|
||||
if int64(filtrCount) == lrecords {
|
||||
close(rowCh)
|
||||
return
|
||||
outputRecord = s3Select.outputRecord()
|
||||
if outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// The call to the where function clause, ensures that
|
||||
// the rows we print match our where clause.
|
||||
condition, err := matchesMyWhereClause(record, alias, wc)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
if !s3Select.statement.IsAggregated() {
|
||||
if !sendRecord() {
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if condition {
|
||||
// if its an asterix we just print everything in the row
|
||||
if reqColNames[0] == "*" && fnNames[0] == "" {
|
||||
switch f.OutputType() {
|
||||
case format.CSV:
|
||||
for i, kv := range columnsKv {
|
||||
results[i] = gjson.GetBytes(record, kv.Key).String()
|
||||
}
|
||||
rowCh <- Row{
|
||||
record: strings.Join(results, f.OutputFieldDelimiter()) + f.OutputRecordDelimiter(),
|
||||
}
|
||||
case format.JSON:
|
||||
rowCh <- Row{
|
||||
record: string(record) + f.OutputRecordDelimiter(),
|
||||
}
|
||||
}
|
||||
} else if alias != "" {
|
||||
// This is for dealing with the case of if we have to deal with a
|
||||
// request for a column with an index e.g A_1.
|
||||
if format.IsInt(reqColNames[0]) {
|
||||
// This checks whether any aggregation function was called as now we
|
||||
// no longer will go through printing each row, and only print at the end
|
||||
if len(fnNames) > 0 && fnNames[0] != "" {
|
||||
functionFlag = true
|
||||
aggregationFns(counter, filtrCount, aggVals, reqColNames, fnNames, record)
|
||||
} else {
|
||||
// The code below finds the appropriate columns of the row given the
|
||||
// indicies provided in the SQL request.
|
||||
var rowStr string
|
||||
rowStr, err = processColNameIndex(record, reqColNames, f)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
rowCh <- Row{
|
||||
record: rowStr + "\n",
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This code does aggregation if we were provided column names in the
|
||||
// form of actual names rather an indices.
|
||||
if len(fnNames) > 0 && fnNames[0] != "" {
|
||||
functionFlag = true
|
||||
aggregationFns(counter, filtrCount, aggVals, reqColNames, fnNames, record)
|
||||
} else {
|
||||
// This code prints the appropriate part of the row given the filter
|
||||
// and select request, if the select request was based on column
|
||||
// names rather than indices.
|
||||
var rowStr string
|
||||
rowStr, err = processColNameLiteral(record, reqColNames, fn, f)
|
||||
if err != nil {
|
||||
rowCh <- Row{
|
||||
err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
rowCh <- Row{
|
||||
record: rowStr + "\n",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
filtrCount++
|
||||
if err != nil {
|
||||
if serr := writer.SendError("InternalError", err.Error()); serr != nil {
|
||||
// FIXME: log errors.
|
||||
}
|
||||
counter++
|
||||
}
|
||||
}
|
||||
|
||||
// processColumnNames is a function which allows for cleaning of column names.
|
||||
func processColumnNames(reqColNames []string, alias string, f format.Select) error {
|
||||
switch f.Type() {
|
||||
case format.CSV:
|
||||
for i := range reqColNames {
|
||||
// The code below basically cleans the column name of its alias and other
|
||||
// syntax, so that we can extract its pure name.
|
||||
reqColNames[i] = cleanCol(reqColNames[i], alias)
|
||||
}
|
||||
case format.JSON:
|
||||
// JSON doesnt have columns so no cleaning required
|
||||
}
|
||||
|
||||
return nil
|
||||
// Close - closes opened S3 object.
|
||||
func (s3Select *S3Select) Close() error {
|
||||
return s3Select.recordReader.Close()
|
||||
}
|
||||
|
||||
// processColNameIndex is the function which creates the row for an index based query.
|
||||
func processColNameIndex(record []byte, reqColNames []string, f format.Select) (string, error) {
|
||||
var row []string
|
||||
for _, colName := range reqColNames {
|
||||
// COALESCE AND NULLIF do not support index based access.
|
||||
if reqColNames[0] == "0" {
|
||||
return "", format.ErrInvalidColumnIndex
|
||||
}
|
||||
cindex, err := strconv.Atoi(colName)
|
||||
if err != nil {
|
||||
return "", ErrMissingHeaders
|
||||
}
|
||||
if cindex > len(f.Header()) {
|
||||
return "", format.ErrInvalidColumnIndex
|
||||
}
|
||||
// NewS3Select - creates new S3Select by given request XML reader.
|
||||
func NewS3Select(r io.Reader) (*S3Select, error) {
|
||||
s3Select := &S3Select{}
|
||||
if err := xml.NewDecoder(r).Decode(s3Select); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Subtract 1 because SELECT indexing is not 0 based, it
|
||||
// starts at 1 generating the key like "_1".
|
||||
row = append(row, gjson.GetBytes(record, string("_"+strconv.Itoa(cindex-1))).String())
|
||||
}
|
||||
rowStr := strings.Join(row, f.OutputFieldDelimiter())
|
||||
if len(rowStr) > MaxCharsPerRecord {
|
||||
return "", ErrOverMaxRecordSize
|
||||
}
|
||||
return rowStr, nil
|
||||
}
|
||||
|
||||
// processColNameLiteral is the function which creates the row for an name based query.
|
||||
func processColNameLiteral(record []byte, reqColNames []string, fn SelectFuncs, f format.Select) (string, error) {
|
||||
row := make([]string, len(reqColNames))
|
||||
for i, colName := range reqColNames {
|
||||
// this is the case to deal with COALESCE.
|
||||
if colName == "" && isValidFunc(fn.index, i) {
|
||||
row[i] = evaluateFuncExpr(fn.funcExpr[i], "", record)
|
||||
continue
|
||||
}
|
||||
row[i] = gjson.GetBytes(record, colName).String()
|
||||
}
|
||||
rowStr := strings.Join(row, f.OutputFieldDelimiter())
|
||||
if len(rowStr) > MaxCharsPerRecord {
|
||||
return "", ErrOverMaxRecordSize
|
||||
}
|
||||
return rowStr, nil
|
||||
}
|
||||
|
||||
// aggregationFns is a function which performs the actual aggregation
|
||||
// methods on the given row, it uses an array defined in the main parsing
|
||||
// function to keep track of values.
|
||||
func aggregationFns(counter int, filtrCount int, aggVals []float64, storeReqCols []string, storeFns []string, record []byte) error {
|
||||
for i, storeFn := range storeFns {
|
||||
switch storeFn {
|
||||
case "":
|
||||
continue
|
||||
case "count":
|
||||
aggVals[i]++
|
||||
default:
|
||||
// Column names are provided as an index it'll use
|
||||
// this if statement instead.
|
||||
var convAggFloat float64
|
||||
if format.IsInt(storeReqCols[i]) {
|
||||
index, _ := strconv.Atoi(storeReqCols[i])
|
||||
convAggFloat = gjson.GetBytes(record, "_"+strconv.Itoa(index)).Float()
|
||||
} else {
|
||||
// Named columns rather than indices.
|
||||
convAggFloat = gjson.GetBytes(record, storeReqCols[i]).Float()
|
||||
}
|
||||
switch storeFn {
|
||||
case "min":
|
||||
if counter == -1 {
|
||||
aggVals[i] = math.MaxFloat64
|
||||
}
|
||||
if convAggFloat < aggVals[i] {
|
||||
aggVals[i] = convAggFloat
|
||||
}
|
||||
case "max":
|
||||
// Calculate the max.
|
||||
if counter == -1 {
|
||||
aggVals[i] = math.SmallestNonzeroFloat64
|
||||
}
|
||||
if convAggFloat > aggVals[i] {
|
||||
aggVals[i] = convAggFloat
|
||||
}
|
||||
case "sum":
|
||||
// Calculate the sum.
|
||||
aggVals[i] += convAggFloat
|
||||
case "avg":
|
||||
// Calculating the average.
|
||||
if filtrCount == 0 {
|
||||
aggVals[i] = convAggFloat
|
||||
} else {
|
||||
aggVals[i] = (convAggFloat + (aggVals[i] * float64(filtrCount))) / float64((filtrCount + 1))
|
||||
}
|
||||
default:
|
||||
return ErrParseNonUnaryAgregateFunctionCall
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return s3Select, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user