SIMDJSON S3 select input (#8401)

This commit is contained in:
Klaus Post
2020-02-13 14:03:52 -08:00
committed by GitHub
parent d1144c2c7e
commit e4020fb41f
16 changed files with 1116 additions and 73 deletions

View File

@@ -20,9 +20,11 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"github.com/bcicen/jstream"
"github.com/minio/simdjson-go"
)
var (
@@ -370,11 +372,9 @@ func (e *JSONPath) evalNode(r Record) (*Value, error) {
keypath = ps[1]
}
}
objFmt, rawVal := r.Raw()
switch objFmt {
case SelectFmtJSON, SelectFmtParquet:
rowVal := rawVal.(jstream.KVS)
_, rawVal := r.Raw()
switch rowVal := rawVal.(type) {
case jstream.KVS, simdjson.Object:
pathExpr := e.PathExpr
if len(pathExpr) == 0 {
pathExpr = []*JSONPathElement{{Key: &ObjectKey{ID: e.BaseKey}}}
@@ -400,6 +400,11 @@ func jsonToValue(result interface{}) (*Value, error) {
return FromFloat(rval), nil
case int64:
return FromInt(rval), nil
case uint64:
if rval <= math.MaxInt64 {
return FromInt(int64(rval)), nil
}
return FromFloat(float64(rval)), nil
case bool:
return FromBool(rval), nil
case jstream.KVS:
@@ -418,6 +423,17 @@ func jsonToValue(result interface{}) (*Value, error) {
dst[i] = *v
}
return FromArray(dst), nil
case simdjson.Object:
o := rval
elems, err := o.Parse(nil)
if err != nil {
return nil, err
}
bs, err := elems.MarshalJSON()
if err != nil {
return nil, err
}
return FromBytes(bs), nil
case []Value:
return FromArray(rval), nil
case nil:

View File

@@ -20,6 +20,7 @@ import (
"errors"
"github.com/bcicen/jstream"
"github.com/minio/simdjson-go"
)
var (
@@ -42,17 +43,29 @@ func jsonpathEval(p []*JSONPathElement, v interface{}) (r interface{}, flat bool
case p[0].Key != nil:
key := p[0].Key.keyString()
kvs, ok := v.(jstream.KVS)
if !ok {
switch kvs := v.(type) {
case jstream.KVS:
for _, kv := range kvs {
if kv.Key == key {
return jsonpathEval(p[1:], kv.Value)
}
}
// Key not found - return nil result
return nil, false, nil
case simdjson.Object:
elem := kvs.FindKey(key, nil)
if elem == nil {
// Key not found - return nil result
return nil, false, nil
}
val, err := IterToValue(elem.Iter)
if err != nil {
return nil, false, err
}
return jsonpathEval(p[1:], val)
default:
return nil, false, errKeyLookup
}
for _, kv := range kvs {
if kv.Key == key {
return jsonpathEval(p[1:], kv.Value)
}
}
// Key not found - return nil result
return nil, false, nil
case p[0].Index != nil:
idx := *p[0].Index
@@ -68,17 +81,23 @@ func jsonpathEval(p []*JSONPathElement, v interface{}) (r interface{}, flat bool
return jsonpathEval(p[1:], arr[idx])
case p[0].ObjectWildcard:
kvs, ok := v.(jstream.KVS)
if !ok {
switch kvs := v.(type) {
case jstream.KVS:
if len(p[1:]) > 0 {
return nil, false, errWilcardObjectUsageInvalid
}
return kvs, false, nil
case simdjson.Object:
if len(p[1:]) > 0 {
return nil, false, errWilcardObjectUsageInvalid
}
return kvs, false, nil
default:
return nil, false, errWildcardObjectLookup
}
if len(p[1:]) > 0 {
return nil, false, errWilcardObjectUsageInvalid
}
return kvs, false, nil
case p[0].ArrayWildcard:
arr, ok := v.([]interface{})
if !ok {

View File

@@ -17,9 +17,10 @@
package sql
import (
"fmt"
"io"
"github.com/bcicen/jstream"
"github.com/minio/simdjson-go"
)
// SelectObjectFormat specifies the format of the underlying data
@@ -32,6 +33,8 @@ const (
SelectFmtCSV
// SelectFmtJSON - JSON format
SelectFmtJSON
// SelectFmtSIMDJSON - SIMD JSON format
SelectFmtSIMDJSON
// SelectFmtParquet - Parquet format
SelectFmtParquet
)
@@ -39,7 +42,10 @@ const (
// Record - is a type containing columns and their values.
type Record interface {
Get(name string) (*Value, error)
Set(name string, value *Value) error
// Set a value.
// Can return a different record type.
Set(name string, value *Value) (Record, error)
WriteCSV(writer io.Writer, fieldDelimiter rune) error
WriteJSON(writer io.Writer) error
@@ -51,5 +57,77 @@ type Record interface {
Raw() (SelectObjectFormat, interface{})
// Replaces the underlying data
Replace(k jstream.KVS) error
Replace(k interface{}) error
}
// IterToValue converts a simdjson Iter to its underlying value.
// Objects are returned as simdjson.Object
// Arrays are returned as []interface{} with parsed values.
func IterToValue(iter simdjson.Iter) (interface{}, error) {
switch iter.Type() {
case simdjson.TypeString:
v, err := iter.String()
if err != nil {
return nil, err
}
return v, nil
case simdjson.TypeFloat:
v, err := iter.Float()
if err != nil {
return nil, err
}
return v, nil
case simdjson.TypeInt:
v, err := iter.Int()
if err != nil {
return nil, err
}
return v, nil
case simdjson.TypeUint:
v, err := iter.Int()
if err != nil {
// Can't fit into int, convert to float.
v, err := iter.Float()
return v, err
}
return v, nil
case simdjson.TypeBool:
v, err := iter.Bool()
if err != nil {
return nil, err
}
return v, nil
case simdjson.TypeObject:
obj, err := iter.Object(nil)
if err != nil {
return nil, err
}
return *obj, err
case simdjson.TypeArray:
arr, err := iter.Array(nil)
if err != nil {
return nil, err
}
iter := arr.Iter()
var dst []interface{}
var next simdjson.Iter
for {
typ, err := iter.AdvanceIter(&next)
if err != nil {
return nil, err
}
if typ == simdjson.TypeNone {
break
}
v, err := IterToValue(next)
if err != nil {
return nil, err
}
dst = append(dst, v)
}
return dst, err
case simdjson.TypeNull:
return nil, nil
}
return nil, fmt.Errorf("IterToValue: unknown JSON type: %s", iter.Type().String())
}

View File

@@ -22,6 +22,7 @@ import (
"strings"
"github.com/bcicen/jstream"
"github.com/minio/simdjson-go"
)
var (
@@ -140,36 +141,56 @@ func parseLimit(v *LitValue) (int64, error) {
// EvalFrom evaluates the From clause on the input record. It only
// applies to JSON input data format (currently).
func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) {
if e.selectAST.From.HasKeypath() {
if format == "json" {
objFmt, rawVal := input.Raw()
if objFmt != SelectFmtJSON {
return nil, errDataSource(errors.New("unexpected non JSON input"))
}
if !e.selectAST.From.HasKeypath() {
return input, nil
}
_, rawVal := input.Raw()
jsonRec := rawVal.(jstream.KVS)
txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], jsonRec)
if format != "json" {
return nil, errDataSource(errors.New("path not supported"))
}
switch rec := rawVal.(type) {
case jstream.KVS:
txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec)
if err != nil {
return nil, err
}
var kvs jstream.KVS
switch v := txedRec.(type) {
case jstream.KVS:
kvs = v
default:
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}}
}
if err = input.Replace(kvs); err != nil {
return nil, err
}
return input, nil
case simdjson.Object:
txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec)
if err != nil {
return nil, err
}
switch v := txedRec.(type) {
case simdjson.Object:
err := input.Replace(v)
if err != nil {
return nil, err
}
var kvs jstream.KVS
switch v := txedRec.(type) {
case jstream.KVS:
kvs = v
default:
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}}
}
if err = input.Replace(kvs); err != nil {
default:
input.Reset()
input, err = input.Set("_1", &Value{value: v})
if err != nil {
return nil, err
}
return input, nil
}
return nil, errDataSource(errors.New("path not supported"))
return input, nil
}
return input, nil
return nil, errDataSource(errors.New("unexpected non JSON input"))
}
// IsAggregated returns if the statement involves SQL aggregation
@@ -186,9 +207,12 @@ func (e *SelectStatement) AggregateResult(output Record) error {
return err
}
if expr.As != "" {
output.Set(expr.As, v)
output, err = output.Set(expr.As, v)
} else {
output.Set(fmt.Sprintf("_%d", i+1), v)
output, err = output.Set(fmt.Sprintf("_%d", i+1), v)
}
if err != nil {
return err
}
}
return nil
@@ -250,8 +274,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
if e.limitValue > -1 {
e.outputCount++
}
output = input.Clone(output)
return output, nil
return input.Clone(output), nil
}
for i, expr := range e.selectAST.Expression.Expressions {
@@ -262,11 +285,14 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) {
// Pick output column names
if expr.As != "" {
output.Set(expr.As, v)
output, err = output.Set(expr.As, v)
} else if comp, ok := getLastKeypathComponent(expr.Expression); ok {
output.Set(comp, v)
output, err = output.Set(comp, v)
} else {
output.Set(fmt.Sprintf("_%d", i+1), v)
output, err = output.Set(fmt.Sprintf("_%d", i+1), v)
}
if err != nil {
return nil, err
}
}