Add JSON Path expression evaluation support (#7315)

- Includes support for FROM clause JSON path
This commit is contained in:
Aditya Manthramurthy
2019-03-09 08:13:37 -08:00
committed by Harshavardhana
parent b296b3cf8b
commit e463386921
15 changed files with 488 additions and 59 deletions

View File

@@ -20,12 +20,18 @@ import (
"errors"
"fmt"
"strings"
"github.com/bcicen/jstream"
)
var (
errBadLimitSpecified = errors.New("Limit value must be a positive integer")
)
const (
baseTableName = "s3object"
)
// SelectStatement is the top level parsed and analyzed structure
type SelectStatement struct {
selectAST *Select
@@ -74,9 +80,8 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) {
}
// Validate table name
tableString := strings.ToLower(selectAST.From.Table.String())
if !strings.HasPrefix(tableString, "s3object.") && tableString != "s3object" {
err = errBadTableName(errors.New("Table name must be s3object"))
err = validateTableName(selectAST.From)
if err != nil {
return
}
@@ -89,6 +94,19 @@ func ParseSelectStatement(s string) (stmt SelectStatement, err error) {
return
}
func validateTableName(from *TableExpression) error {
if strings.ToLower(from.Table.BaseKey.String()) != baseTableName {
return errBadTableName(errors.New("table name must be `s3object`"))
}
if len(from.Table.PathExpr) > 0 {
if !from.Table.PathExpr[0].ArrayWildcard {
return errBadTableName(errors.New("keypath table name is invalid - please check the service documentation"))
}
}
return nil
}
func parseLimit(v *LitValue) (int64, error) {
switch {
case v == nil:
@@ -104,6 +122,41 @@ func parseLimit(v *LitValue) (int64, error) {
}
}
// EvalFrom evaluates the From clause on the input record. It only
// applies to JSON input data format (currently).
func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) {
if e.selectAST.From.HasKeypath() {
if format == "json" {
objFmt, rawVal := input.Raw()
if objFmt != SelectFmtJSON {
return nil, errDataSource(errors.New("unexpected non JSON input"))
}
jsonRec := rawVal.(jstream.KVS)
txedRec, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], jsonRec)
if err != nil {
return nil, err
}
var kvs jstream.KVS
switch v := txedRec.(type) {
case jstream.KVS:
kvs = v
default:
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}}
}
if err = input.Replace(kvs); err != nil {
return nil, err
}
return input, nil
}
return nil, errDataSource(errors.New("path not supported"))
}
return input, nil
}
// IsAggregated returns if the statement involves SQL aggregation
func (e *SelectStatement) IsAggregated() bool {
return e.selectQProp.isAggregation
@@ -164,12 +217,10 @@ func (e *SelectStatement) AggregateRow(input Record) error {
// applies only to non-aggregation queries.
func (e *SelectStatement) Eval(input, output Record) (Record, error) {
ok, err := e.isPassingWhereClause(input)
if err != nil {
if err != nil || !ok {
// Either error or row did not pass where clause
return nil, err
}
if !ok {
return nil, nil
}
if e.selectAST.Expression.All {
// Return the input record for `SELECT * FROM