mirror of
https://github.com/minio/minio.git
synced 2025-11-20 01:50:24 -05:00
Add new SQL parser to support S3 Select syntax (#7102)
- New parser written from scratch, allows easier and complete parsing of the full S3 Select SQL syntax. Parser definition is directly provided by the AST defined for the SQL grammar. - Bring support to parse and interpret SQL involving JSON path expressions; evaluation of JSON path expressions will be subsequently added. - Bring automatic type inference and conversion for untyped values (e.g. CSV data).
This commit is contained in:
committed by
Harshavardhana
parent
0a28c28a8c
commit
2786055df4
@@ -105,7 +105,7 @@ func (input *InputSerialization) UnmarshalXML(d *xml.Decoder, start xml.StartEle
|
||||
found++
|
||||
}
|
||||
if !parsedInput.ParquetArgs.IsEmpty() {
|
||||
if parsedInput.CompressionType != noneType {
|
||||
if parsedInput.CompressionType != "" && parsedInput.CompressionType != noneType {
|
||||
return errInvalidRequestParameter(fmt.Errorf("CompressionType must be NONE for Parquet format"))
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ type S3Select struct {
|
||||
Output OutputSerialization `xml:"OutputSerialization"`
|
||||
Progress RequestProgress `xml:"RequestProgress"`
|
||||
|
||||
statement *sql.Select
|
||||
statement *sql.SelectStatement
|
||||
progressReader *progressReader
|
||||
recordReader recordReader
|
||||
}
|
||||
@@ -209,12 +209,12 @@ func (s3Select *S3Select) UnmarshalXML(d *xml.Decoder, start xml.StartElement) e
|
||||
return errMissingRequiredParameter(fmt.Errorf("OutputSerialization must be provided"))
|
||||
}
|
||||
|
||||
statement, err := sql.NewSelect(parsedS3Select.Expression)
|
||||
statement, err := sql.ParseSelectStatement(parsedS3Select.Expression)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
parsedS3Select.statement = statement
|
||||
parsedS3Select.statement = &statement
|
||||
|
||||
*s3Select = S3Select(parsedS3Select)
|
||||
return nil
|
||||
@@ -334,6 +334,14 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
}
|
||||
|
||||
for {
|
||||
if s3Select.statement.LimitReached() {
|
||||
if err = writer.SendStats(s3Select.getProgress()); err != nil {
|
||||
// FIXME: log this error.
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if inputRecord, err = s3Select.recordReader.Read(); err != nil {
|
||||
if err != io.EOF {
|
||||
break
|
||||
@@ -358,19 +366,25 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
|
||||
break
|
||||
}
|
||||
|
||||
outputRecord = s3Select.outputRecord()
|
||||
if outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord); err != nil {
|
||||
break
|
||||
}
|
||||
if s3Select.statement.IsAggregated() {
|
||||
if err = s3Select.statement.AggregateRow(inputRecord); err != nil {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
outputRecord = s3Select.outputRecord()
|
||||
if outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if !s3Select.statement.IsAggregated() {
|
||||
if !sendRecord() {
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("SQL Err: %#v\n", err)
|
||||
if serr := writer.SendError("InternalError", err.Error()); serr != nil {
|
||||
// FIXME: log errors.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user