From 974cbb3bb73f7f42fee2089144ee8a2ad3acb084 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 23 Sep 2024 12:35:41 -0700 Subject: [PATCH] Limit jstream parse depth (#20474) Add https://github.com/bcicen/jstream/pull/15 by vendoring the package. Sets JSON depth limit to 100 entries in S3 Select. --- cmd/postpolicyform.go | 4 +- go.mod | 1 - go.sum | 2 - internal/s3select/csv/record.go | 2 +- internal/s3select/json/preader.go | 4 +- internal/s3select/json/reader.go | 5 +- internal/s3select/json/record.go | 2 +- internal/s3select/jstream/LICENSE | 22 + internal/s3select/jstream/README.md | 116 ++++ internal/s3select/jstream/decoder.go | 675 ++++++++++++++++++++++ internal/s3select/jstream/decoder_test.go | 276 +++++++++ internal/s3select/jstream/errors.go | 52 ++ internal/s3select/jstream/scanner.go | 114 ++++ internal/s3select/jstream/scanner_test.go | 170 ++++++ internal/s3select/jstream/scratch.go | 44 ++ internal/s3select/parquet/reader.go | 2 +- internal/s3select/simdj/record.go | 2 +- internal/s3select/sql/evaluate.go | 2 +- internal/s3select/sql/jsonpath.go | 2 +- internal/s3select/sql/jsonpath_test.go | 4 +- internal/s3select/sql/statement.go | 2 +- 21 files changed, 1484 insertions(+), 19 deletions(-) create mode 100644 internal/s3select/jstream/LICENSE create mode 100644 internal/s3select/jstream/README.md create mode 100644 internal/s3select/jstream/decoder.go create mode 100644 internal/s3select/jstream/decoder_test.go create mode 100644 internal/s3select/jstream/errors.go create mode 100644 internal/s3select/jstream/scanner.go create mode 100644 internal/s3select/jstream/scanner_test.go create mode 100644 internal/s3select/jstream/scratch.go diff --git a/cmd/postpolicyform.go b/cmd/postpolicyform.go index d4d1c214a..2ced4fbd0 100644 --- a/cmd/postpolicyform.go +++ b/cmd/postpolicyform.go @@ -29,10 +29,10 @@ import ( "strings" "time" - "github.com/bcicen/jstream" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/set" xhttp "github.com/minio/minio/internal/http" + "github.com/minio/minio/internal/s3select/jstream" ) // startWithConds - map which indicates if a given condition supports starts-with policy operator @@ -140,7 +140,7 @@ type PostPolicyForm struct { func sanitizePolicy(r io.Reader) (io.Reader, error) { var buf bytes.Buffer e := json.NewEncoder(&buf) - d := jstream.NewDecoder(r, 0).ObjectAsKVS() + d := jstream.NewDecoder(r, 0).ObjectAsKVS().MaxDepth(10) sset := set.NewStringSet() for mv := range d.Stream() { var kvs jstream.KVS diff --git a/go.mod b/go.mod index 7d655fc34..98914f144 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 github.com/IBM/sarama v1.43.3 github.com/alecthomas/participle v0.7.1 - github.com/bcicen/jstream v1.0.1 github.com/beevik/ntp v1.4.3 github.com/buger/jsonparser v1.1.1 github.com/cespare/xxhash/v2 v2.3.0 diff --git a/go.sum b/go.sum index 2c09537a0..c58b70129 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,6 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWpi6yML8= github.com/aymanbagabas/go-udiff v0.2.0/go.mod h1:RE4Ex0qsGkTAJoQdQQCA0uG+nAzJO/pI/QwceO5fgrA= -github.com/bcicen/jstream v1.0.1 h1:BXY7Cu4rdmc0rhyTVyT3UkxAiX3bnLpKLas9btbH5ck= -github.com/bcicen/jstream v1.0.1/go.mod h1:9ielPxqFry7Y4Tg3j4BfjPocfJ3TbsRtXOAYXYmRuAQ= github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho= github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= diff --git a/internal/s3select/csv/record.go b/internal/s3select/csv/record.go index 18e467834..8c38f6767 100644 --- a/internal/s3select/csv/record.go +++ b/internal/s3select/csv/record.go @@ -25,8 +25,8 @@ import ( "strconv" "strings" - "github.com/bcicen/jstream" csv "github.com/minio/csvparser" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" ) diff --git a/internal/s3select/json/preader.go b/internal/s3select/json/preader.go index fa2cf8481..d8d016f78 100644 --- a/internal/s3select/json/preader.go +++ b/internal/s3select/json/preader.go @@ -24,7 +24,7 @@ import ( "runtime" "sync" - "github.com/bcicen/jstream" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" ) @@ -185,7 +185,7 @@ func (r *PReader) startReaders() { dst = make([]jstream.KVS, 0, 1000) } - d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS() + d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS().MaxDepth(100) stream := d.Stream() all := dst[:0] for mv := range stream { diff --git a/internal/s3select/json/reader.go b/internal/s3select/json/reader.go index 52eda1404..70a758d92 100644 --- a/internal/s3select/json/reader.go +++ b/internal/s3select/json/reader.go @@ -21,9 +21,8 @@ import ( "io" "sync" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" - - "github.com/bcicen/jstream" ) // Limit single document size to 10MiB, 10x the AWS limit: @@ -84,7 +83,7 @@ func (r *Reader) Close() error { // NewReader - creates new JSON reader using readCloser. func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader { readCloser = &syncReadCloser{rc: readCloser} - d := jstream.NewDecoder(io.LimitReader(readCloser, maxDocumentSize), 0).ObjectAsKVS() + d := jstream.NewDecoder(io.LimitReader(readCloser, maxDocumentSize), 0).ObjectAsKVS().MaxDepth(100) return &Reader{ args: args, decoder: d, diff --git a/internal/s3select/json/record.go b/internal/s3select/json/record.go index 7b6ddad76..65462e863 100644 --- a/internal/s3select/json/record.go +++ b/internal/s3select/json/record.go @@ -26,8 +26,8 @@ import ( "strconv" "strings" - "github.com/bcicen/jstream" csv "github.com/minio/csvparser" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" ) diff --git a/internal/s3select/jstream/LICENSE b/internal/s3select/jstream/LICENSE new file mode 100644 index 000000000..1c5d82df6 --- /dev/null +++ b/internal/s3select/jstream/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 Bradley Cicenas + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/internal/s3select/jstream/README.md b/internal/s3select/jstream/README.md new file mode 100644 index 000000000..2797b3ba6 --- /dev/null +++ b/internal/s3select/jstream/README.md @@ -0,0 +1,116 @@ +

jstream

+ +# + +[![GoDoc](https://godoc.org/github.com/bcicen/jstream?status.svg)](https://godoc.org/github.com/bcicen/jstream) + + +`jstream` is a streaming JSON parser and value extraction library for Go. + +Unlike most JSON parsers, `jstream` is document position- and depth-aware -- this enables the extraction of values at a specified depth, eliminating the overhead of allocating encompassing arrays or objects; e.g: + +Using the below example document: +jstream + +we can choose to extract and act only the objects within the top-level array: +```go +f, _ := os.Open("input.json") +decoder := jstream.NewDecoder(f, 1) // extract JSON values at a depth level of 1 +for mv := range decoder.Stream() { + fmt.Printf("%v\n ", mv.Value) +} +``` + +output: +``` +map[desc:RGB colors:[red green blue]] +map[desc:CMYK colors:[cyan magenta yellow black]] +``` + +likewise, increasing depth level to `3` yields: +``` +red +green +blue +cyan +magenta +yellow +black +``` + +optionally, kev:value pairs can be emitted as an individual struct: +```go +decoder := jstream.NewDecoder(f, 2).EmitKV() // enable KV streaming at a depth level of 2 +``` + +``` +jstream.KV{desc RGB} +jstream.KV{colors [red green blue]} +jstream.KV{desc CMYK} +jstream.KV{colors [cyan magenta yellow black]} +``` + +## Installing + +```bash +go get github.com/bcicen/jstream +``` + +## Commandline + +`jstream` comes with a cli tool for quick viewing of parsed values from JSON input: + +```bash +jstream -d 1 < input.json +``` + +```json +{"colors":["red","green","blue"],"desc":"RGB"} +{"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"} +``` + +detailed output with `-v` option: +```bash +cat input.json | jstream -v -d -1 + +depth start end type | value +2 018 023 string | "RGB" +3 041 046 string | "red" +3 048 055 string | "green" +3 057 063 string | "blue" +2 039 065 array | ["red","green","blue"] +1 004 069 object | {"colors":["red","green","blue"],"desc":"RGB"} +2 087 093 string | "CMYK" +3 111 117 string | "cyan" +3 119 128 string | "magenta" +3 130 138 string | "yellow" +3 140 147 string | "black" +2 109 149 array | ["cyan","magenta","yellow","black"] +1 073 153 object | {"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"} +0 000 155 array | [{"colors":["red","green","blue"],"desc":"RGB"},{"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}] +``` + +### Options + +Opt | Description +--- | --- +-d \ | emit values at depth n. if n < 0, all values will be emitted +-kv | output inner key value pairs as newly formed objects +-v | output depth and offset details for each value +-h | display help dialog + +## Benchmarks + +Obligatory benchmarks performed on files with arrays of objects, where the decoded objects are to be extracted. + +Two file sizes are used -- regular (1.6mb, 1000 objects) and large (128mb, 100000 objects) + +input size | lib | MB/s | Allocated +--- | --- | --- | --- +regular | standard | 97 | 3.6MB +regular | jstream | 175 | 2.1MB +large | standard | 92 | 305MB +large | jstream | 404 | 69MB + +In a real world scenario, including initialization and reader overhead from varying blob sizes, performance can be expected as below: +jstream diff --git a/internal/s3select/jstream/decoder.go b/internal/s3select/jstream/decoder.go new file mode 100644 index 000000000..abd2c5d51 --- /dev/null +++ b/internal/s3select/jstream/decoder.go @@ -0,0 +1,675 @@ +package jstream + +import ( + "bytes" + "encoding/json" + "io" + "strconv" + "sync/atomic" + "unicode/utf16" +) + +// ValueType - defines the type of each JSON value +type ValueType int + +// Different types of JSON value +const ( + Unknown ValueType = iota + Null + String + Number + Boolean + Array + Object +) + +// MetaValue wraps a decoded interface value with the document +// position and depth at which the value was parsed +type MetaValue struct { + Offset int + Length int + Depth int + Value interface{} + ValueType ValueType +} + +// KV contains a key and value pair parsed from a decoded object +type KV struct { + Key string `json:"key"` + Value interface{} `json:"value"` +} + +// KVS - represents key values in an JSON object +type KVS []KV + +// MarshalJSON - implements converting a KVS datastructure into a JSON +// object with multiple keys and values. +func (kvs KVS) MarshalJSON() ([]byte, error) { + b := new(bytes.Buffer) + b.Write([]byte("{")) + for i, kv := range kvs { + b.Write([]byte("\"" + kv.Key + "\"" + ":")) + valBuf, err := json.Marshal(kv.Value) + if err != nil { + return nil, err + } + b.Write(valBuf) + if i < len(kvs)-1 { + b.Write([]byte(",")) + } + } + b.Write([]byte("}")) + return b.Bytes(), nil +} + +// Decoder wraps an io.Reader to provide incremental decoding of +// JSON values +type Decoder struct { + *scanner + emitDepth int + maxDepth int + emitKV bool + emitRecursive bool + objectAsKVS bool + + depth int + scratch *scratch + metaCh chan *MetaValue + err error + + // follow line position to add context to errors + lineNo int + lineStart int64 +} + +// NewDecoder creates new Decoder to read JSON values at the provided +// emitDepth from the provider io.Reader. +// If emitDepth is < 0, values at every depth will be emitted. +func NewDecoder(r io.Reader, emitDepth int) *Decoder { + d := &Decoder{ + scanner: newScanner(r), + emitDepth: emitDepth, + scratch: &scratch{data: make([]byte, 1024)}, + metaCh: make(chan *MetaValue, 128), + } + if emitDepth < 0 { + d.emitDepth = 0 + d.emitRecursive = true + } + return d +} + +// ObjectAsKVS - by default JSON returns map[string]interface{} this +// is usually fine in most cases, but when you need to preserve the +// input order its not a right data structure. To preserve input +// order please use this option. +func (d *Decoder) ObjectAsKVS() *Decoder { + d.objectAsKVS = true + return d +} + +// EmitKV enables emitting a jstream.KV struct when the items(s) parsed +// at configured emit depth are within a JSON object. By default, only +// the object values are emitted. +func (d *Decoder) EmitKV() *Decoder { + d.emitKV = true + return d +} + +// Recursive enables emitting all values at a depth higher than the +// configured emit depth; e.g. if an array is found at emit depth, all +// values within the array are emitted to the stream, then the array +// containing those values is emitted. +func (d *Decoder) Recursive() *Decoder { + d.emitRecursive = true + return d +} + +// Stream begins decoding from the underlying reader and returns a +// streaming MetaValue channel for JSON values at the configured emitDepth. +func (d *Decoder) Stream() chan *MetaValue { + go d.decode() + return d.metaCh +} + +// Pos returns the number of bytes consumed from the underlying reader +func (d *Decoder) Pos() int { return int(d.pos) } + +// Err returns the most recent decoder error if any, or nil +func (d *Decoder) Err() error { return d.err } + +// MaxDepth will set the maximum recursion depth. +// If the maximum depth is exceeded, ErrMaxDepth is returned. +// Less than or 0 means no limit (default). +func (d *Decoder) MaxDepth(n int) *Decoder { + d.maxDepth = n + return d +} + +// Decode parses the JSON-encoded data and returns an interface value +func (d *Decoder) decode() { + defer close(d.metaCh) + d.skipSpaces() + for d.remaining() > 0 { + _, err := d.emitAny() + if err != nil { + d.err = err + break + } + d.skipSpaces() + } +} + +func (d *Decoder) emitAny() (interface{}, error) { + if d.pos >= atomic.LoadInt64(&d.end) { + return nil, d.mkError(ErrUnexpectedEOF) + } + offset := d.pos - 1 + i, t, err := d.any() + if d.willEmit() { + d.metaCh <- &MetaValue{ + Offset: int(offset), + Length: int(d.pos - offset), + Depth: d.depth, + Value: i, + ValueType: t, + } + } + return i, err +} + +// return whether, at the current depth, the value being decoded will +// be emitted to stream +func (d *Decoder) willEmit() bool { + if d.emitRecursive { + return d.depth >= d.emitDepth + } + return d.depth == d.emitDepth +} + +// any used to decode any valid JSON value, and returns an +// interface{} that holds the actual data +func (d *Decoder) any() (interface{}, ValueType, error) { + c := d.cur() + + switch c { + case '"': + i, err := d.string() + return i, String, err + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': + i, err := d.number() + return i, Number, err + case '-': + if c = d.next(); c < '0' || c > '9' { + return nil, Unknown, d.mkError(ErrSyntax, "in negative numeric literal") + } + n, err := d.number() + if err != nil { + return nil, Unknown, err + } + return -n, Number, nil + case 'f': + if d.remaining() < 4 { + return nil, Unknown, d.mkError(ErrUnexpectedEOF) + } + //nolint:gocritic + if d.next() == 'a' && d.next() == 'l' && d.next() == 's' && d.next() == 'e' { + return false, Boolean, nil + } + return nil, Unknown, d.mkError(ErrSyntax, "in literal false") + case 't': + if d.remaining() < 3 { + return nil, Unknown, d.mkError(ErrUnexpectedEOF) + } + //nolint:gocritic + if d.next() == 'r' && d.next() == 'u' && d.next() == 'e' { + return true, Boolean, nil + } + return nil, Unknown, d.mkError(ErrSyntax, "in literal true") + case 'n': + if d.remaining() < 3 { + return nil, Unknown, d.mkError(ErrUnexpectedEOF) + } + //nolint:gocritic + if d.next() == 'u' && d.next() == 'l' && d.next() == 'l' { + return nil, Null, nil + } + return nil, Unknown, d.mkError(ErrSyntax, "in literal null") + case '[': + i, err := d.array() + return i, Array, err + case '{': + var i interface{} + var err error + if d.objectAsKVS { + i, err = d.objectOrdered() + } else { + i, err = d.object() + } + return i, Object, err + default: + return nil, Unknown, d.mkError(ErrSyntax, "looking for beginning of value") + } +} + +// string called by `any` or `object`(for map keys) after reading `"` +func (d *Decoder) string() (string, error) { + d.scratch.reset() + c := d.next() + +scan: + for { + switch { + case c == '"': + return string(d.scratch.bytes()), nil + case c == '\\': + c = d.next() + goto scan_esc + case c < 0x20: + return "", d.mkError(ErrSyntax, "in string literal") + // Coerce to well-formed UTF-8. + default: + d.scratch.add(c) + if d.remaining() == 0 { + return "", d.mkError(ErrSyntax, "in string literal") + } + c = d.next() + } + } + +scan_esc: + switch c { + case '"', '\\', '/', '\'': + d.scratch.add(c) + case 'u': + goto scan_u + case 'b': + d.scratch.add('\b') + case 'f': + d.scratch.add('\f') + case 'n': + d.scratch.add('\n') + case 'r': + d.scratch.add('\r') + case 't': + d.scratch.add('\t') + default: + return "", d.mkError(ErrSyntax, "in string escape code") + } + c = d.next() + goto scan + +scan_u: + r := d.u4() + if r < 0 { + return "", d.mkError(ErrSyntax, "in unicode escape sequence") + } + + // check for proceeding surrogate pair + c = d.next() + if !utf16.IsSurrogate(r) || c != '\\' { + d.scratch.addRune(r) + goto scan + } + if c = d.next(); c != 'u' { + d.scratch.addRune(r) + goto scan_esc + } + + r2 := d.u4() + if r2 < 0 { + return "", d.mkError(ErrSyntax, "in unicode escape sequence") + } + + // write surrogate pair + d.scratch.addRune(utf16.DecodeRune(r, r2)) + c = d.next() + goto scan +} + +// u4 reads four bytes following a \u escape +func (d *Decoder) u4() rune { + // logic taken from: + // github.com/buger/jsonparser/blob/master/escape.go#L20 + var h [4]int + for i := 0; i < 4; i++ { + c := d.next() + switch { + case c >= '0' && c <= '9': + h[i] = int(c - '0') + case c >= 'A' && c <= 'F': + h[i] = int(c - 'A' + 10) + case c >= 'a' && c <= 'f': + h[i] = int(c - 'a' + 10) + default: + return -1 + } + } + return rune(h[0]<<12 + h[1]<<8 + h[2]<<4 + h[3]) +} + +// number called by `any` after reading number between 0 to 9 +func (d *Decoder) number() (float64, error) { + d.scratch.reset() + + var ( + c = d.cur() + n float64 + isFloat bool + ) + + // digits first + switch { + case c == '0': + d.scratch.add(c) + c = d.next() + case '1' <= c && c <= '9': + for ; c >= '0' && c <= '9'; c = d.next() { + n = 10*n + float64(c-'0') + d.scratch.add(c) + } + } + + // . followed by 1 or more digits + if c == '.' { + isFloat = true + d.scratch.add(c) + + // first char following must be digit + if c = d.next(); c < '0' || c > '9' { + return 0, d.mkError(ErrSyntax, "after decimal point in numeric literal") + } + d.scratch.add(c) + + for { + if d.remaining() == 0 { + return 0, d.mkError(ErrUnexpectedEOF) + } + if c = d.next(); c < '0' || c > '9' { + break + } + d.scratch.add(c) + } + } + + // e or E followed by an optional - or + and + // 1 or more digits. + if c == 'e' || c == 'E' { + isFloat = true + d.scratch.add(c) + + if c = d.next(); c == '+' || c == '-' { + d.scratch.add(c) + if c = d.next(); c < '0' || c > '9' { + return 0, d.mkError(ErrSyntax, "in exponent of numeric literal") + } + d.scratch.add(c) + } + for ; c >= '0' && c <= '9'; c = d.next() { + d.scratch.add(c) + } + } + + if isFloat { + var ( + err error + sn string + ) + sn = string(d.scratch.bytes()) + if n, err = strconv.ParseFloat(sn, 64); err != nil { + return 0, err + } + } + + d.back() + return n, nil +} + +// array accept valid JSON array value +func (d *Decoder) array() ([]interface{}, error) { + d.depth++ + if d.maxDepth > 0 && d.depth > d.maxDepth { + return nil, ErrMaxDepth + } + + var ( + c byte + v interface{} + err error + array = make([]interface{}, 0) + ) + + // look ahead for ] - if the array is empty. + if c = d.skipSpaces(); c == ']' { + goto out + } + +scan: + if v, err = d.emitAny(); err != nil { + goto out + } + + if d.depth > d.emitDepth { // skip alloc for array if it won't be emitted + array = append(array, v) + } + + // next token must be ',' or ']' + switch c = d.skipSpaces(); c { + case ',': + d.skipSpaces() + goto scan + case ']': + goto out + default: + err = d.mkError(ErrSyntax, "after array element") + } + +out: + d.depth-- + return array, err +} + +// object accept valid JSON array value +func (d *Decoder) object() (map[string]interface{}, error) { + d.depth++ + if d.maxDepth > 0 && d.depth > d.maxDepth { + return nil, ErrMaxDepth + } + + var ( + c byte + k string + v interface{} + t ValueType + err error + obj map[string]interface{} + ) + + // skip allocating map if it will not be emitted + if d.depth > d.emitDepth { + obj = make(map[string]interface{}) + } + + // if the object has no keys + if c = d.skipSpaces(); c == '}' { + goto out + } + +scan: + for { + offset := d.pos - 1 + + // read string key + if c != '"' { + err = d.mkError(ErrSyntax, "looking for beginning of object key string") + break + } + if k, err = d.string(); err != nil { + break + } + + // read colon before value + if c = d.skipSpaces(); c != ':' { + err = d.mkError(ErrSyntax, "after object key") + break + } + + // read value + d.skipSpaces() + if d.emitKV { + if v, t, err = d.any(); err != nil { + break + } + if d.willEmit() { + d.metaCh <- &MetaValue{ + Offset: int(offset), + Length: int(d.pos - offset), + Depth: d.depth, + Value: KV{k, v}, + ValueType: t, + } + } + } else { + if v, err = d.emitAny(); err != nil { + break + } + } + + if obj != nil { + obj[k] = v + } + + // next token must be ',' or '}' + switch c = d.skipSpaces(); c { + case '}': + goto out + case ',': + c = d.skipSpaces() + goto scan + default: + err = d.mkError(ErrSyntax, "after object key:value pair") + goto out + } + } + +out: + d.depth-- + return obj, err +} + +// object (ordered) accept valid JSON array value +func (d *Decoder) objectOrdered() (KVS, error) { + d.depth++ + if d.maxDepth > 0 && d.depth > d.maxDepth { + return nil, ErrMaxDepth + } + + var ( + c byte + k string + v interface{} + t ValueType + err error + obj KVS + ) + + // skip allocating map if it will not be emitted + if d.depth > d.emitDepth { + obj = make(KVS, 0) + } + + // if the object has no keys + if c = d.skipSpaces(); c == '}' { + goto out + } + +scan: + for { + offset := d.pos - 1 + + // read string key + if c != '"' { + err = d.mkError(ErrSyntax, "looking for beginning of object key string") + break + } + if k, err = d.string(); err != nil { + break + } + + // read colon before value + if c = d.skipSpaces(); c != ':' { + err = d.mkError(ErrSyntax, "after object key") + break + } + + // read value + d.skipSpaces() + if d.emitKV { + if v, t, err = d.any(); err != nil { + break + } + if d.willEmit() { + d.metaCh <- &MetaValue{ + Offset: int(offset), + Length: int(d.pos - offset), + Depth: d.depth, + Value: KV{k, v}, + ValueType: t, + } + } + } else { + if v, err = d.emitAny(); err != nil { + break + } + } + + if obj != nil { + obj = append(obj, KV{k, v}) + } + + // next token must be ',' or '}' + switch c = d.skipSpaces(); c { + case '}': + goto out + case ',': + c = d.skipSpaces() + goto scan + default: + err = d.mkError(ErrSyntax, "after object key:value pair") + goto out + } + } + +out: + d.depth-- + return obj, err +} + +// returns the next char after white spaces +func (d *Decoder) skipSpaces() byte { + for d.pos < atomic.LoadInt64(&d.end) { + switch c := d.next(); c { + case '\n': + d.lineStart = d.pos + d.lineNo++ + continue + case ' ', '\t', '\r': + continue + default: + return c + } + } + return 0 +} + +// create syntax errors at current position, with optional context +func (d *Decoder) mkError(err DecoderError, context ...string) error { + if len(context) > 0 { + err.context = context[0] + } + err.atChar = d.cur() + err.pos[0] = d.lineNo + 1 + err.pos[1] = int(d.pos - d.lineStart) + err.readerErr = d.readerErr + return err +} diff --git a/internal/s3select/jstream/decoder_test.go b/internal/s3select/jstream/decoder_test.go new file mode 100644 index 000000000..8c876fc9d --- /dev/null +++ b/internal/s3select/jstream/decoder_test.go @@ -0,0 +1,276 @@ +package jstream + +import ( + "bytes" + "testing" +) + +func mkReader(s string) *bytes.Reader { return bytes.NewReader([]byte(s)) } + +func TestDecoderSimple(t *testing.T) { + var ( + counter int + mv *MetaValue + body = `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]` + ) + + decoder := NewDecoder(mkReader(body), 1) + + for mv = range decoder.Stream() { + counter++ + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } +} + +func TestDecoderNested(t *testing.T) { + var ( + counter int + mv *MetaValue + body = `{ + "1": { + "bio": "bada bing bada boom", + "id": 0, + "name": "Roberto", + "nested1": { + "bio": "utf16 surrogate (\ud834\udcb2)\n\u201cutf 8\u201d", + "id": 1.5, + "name": "Roberto*Maestro", + "nested2": { "nested2arr": [0,1,2], "nested3": { + "nested4": { "depth": "recursion" }} + } + } + }, + "2": { + "nullfield": null, + "id": -2 + } +}` + ) + + decoder := NewDecoder(mkReader(body), 2) + + for mv = range decoder.Stream() { + counter++ + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } +} + +func TestDecoderFlat(t *testing.T) { + var ( + counter int + mv *MetaValue + body = `[ + "1st test string", + "Roberto*Maestro", "Charles", + 0, null, false, + 1, 2.5 +]` + expected = []struct { + Value interface{} + ValueType ValueType + }{ + { + "1st test string", + String, + }, + { + "Roberto*Maestro", + String, + }, + { + "Charles", + String, + }, + { + 0.0, + Number, + }, + { + nil, + Null, + }, + { + false, + Boolean, + }, + { + 1.0, + Number, + }, + { + 2.5, + Number, + }, + } + ) + + decoder := NewDecoder(mkReader(body), 1) + + for mv = range decoder.Stream() { + if mv.Value != expected[counter].Value { + t.Fatalf("got %v, expected: %v", mv.Value, expected[counter]) + } + if mv.ValueType != expected[counter].ValueType { + t.Fatalf("got %v value type, expected: %v value type", mv.ValueType, expected[counter].ValueType) + } + counter++ + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } +} + +func TestDecoderMultiDoc(t *testing.T) { + var ( + counter int + mv *MetaValue + body = `{ "bio": "bada bing bada boom", "id": 1, "name": "Charles" } +{ "bio": "bada bing bada boom", "id": 2, "name": "Charles" } +{ "bio": "bada bing bada boom", "id": 3, "name": "Charles" } +{ "bio": "bada bing bada boom", "id": 4, "name": "Charles" } +{ "bio": "bada bing bada boom", "id": 5, "name": "Charles" } +` + ) + + decoder := NewDecoder(mkReader(body), 0) + + for mv = range decoder.Stream() { + if mv.ValueType != Object { + t.Fatalf("got %v value type, expected: Object value type", mv.ValueType) + } + counter++ + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } + if counter != 5 { + t.Fatalf("expected 5 items, got %d", counter) + } + + // test at depth level 1 + counter = 0 + kvcounter := 0 + decoder = NewDecoder(mkReader(body), 1) + + for mv = range decoder.Stream() { + switch mv.Value.(type) { + case KV: + kvcounter++ + default: + counter++ + } + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } + if kvcounter != 0 { + t.Fatalf("expected 0 keyvalue items, got %d", kvcounter) + } + if counter != 15 { + t.Fatalf("expected 15 items, got %d", counter) + } + + // test at depth level 1 w/ emitKV + counter = 0 + kvcounter = 0 + decoder = NewDecoder(mkReader(body), 1).EmitKV() + + for mv = range decoder.Stream() { + switch mv.Value.(type) { + case KV: + kvcounter++ + default: + counter++ + } + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + if err := decoder.Err(); err != nil { + t.Fatalf("decoder error: %s", err) + } + if kvcounter != 15 { + t.Fatalf("expected 15 keyvalue items, got %d", kvcounter) + } + if counter != 0 { + t.Fatalf("expected 0 items, got %d", counter) + } +} + +func TestDecoderReaderFailure(t *testing.T) { + var ( + failAfter = 900 + mockData = byte('[') + ) + + r := newMockReader(failAfter, mockData) + decoder := NewDecoder(r, -1) + + for mv := range decoder.Stream() { + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + + err := decoder.Err() + t.Logf("got error: %s", err) + if err == nil { + t.Fatalf("missing expected decoder error") + } + + derr, ok := err.(DecoderError) + if !ok { + t.Fatalf("expected error of type DecoderError, got %T", err) + } + + if derr.ReaderErr() == nil { + t.Fatalf("missing expected underlying reader error") + } +} + +func TestDecoderMaxDepth(t *testing.T) { + tests := []struct { + input string + maxDepth int + mustFail bool + }{ + // No limit + {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 0, mustFail: false}, + // Array + object = depth 2 = false + {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 1, mustFail: true}, + // Depth 2 = ok + {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 2, mustFail: false}, + // Arrays: + {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 2, mustFail: true}, + {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 10, mustFail: true}, + {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 100, mustFail: false}, + // Objects: + {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 2, mustFail: true}, + {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 10, mustFail: true}, + {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 100, mustFail: false}, + } + + for _, test := range tests { + decoder := NewDecoder(mkReader(test.input), 0).MaxDepth(test.maxDepth) + var mv *MetaValue + for mv = range decoder.Stream() { + t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value) + } + + err := decoder.Err() + if test.mustFail && err != ErrMaxDepth { + t.Fatalf("missing expected decoder error, got %q", err) + } + if !test.mustFail && err != nil { + t.Fatalf("unexpected error: %q", err) + } + } +} diff --git a/internal/s3select/jstream/errors.go b/internal/s3select/jstream/errors.go new file mode 100644 index 000000000..52a0e5f62 --- /dev/null +++ b/internal/s3select/jstream/errors.go @@ -0,0 +1,52 @@ +package jstream + +import ( + "fmt" + "strconv" +) + +// Predefined errors +var ( + ErrSyntax = DecoderError{msg: "invalid character"} + ErrUnexpectedEOF = DecoderError{msg: "unexpected end of JSON input"} + ErrMaxDepth = DecoderError{msg: "maximum recursion depth exceeded"} +) + +type errPos [2]int // line number, byte offset where error occurred + +// DecoderError contains a detailed decoding error. +type DecoderError struct { + msg string // description of error + context string // additional error context + pos errPos + atChar byte + readerErr error // underlying reader error, if any +} + +// ReaderErr returns the underlying error. +func (e DecoderError) ReaderErr() error { return e.readerErr } + +// Error returns a string representation of the error. +func (e DecoderError) Error() string { + loc := fmt.Sprintf("%s [%d,%d]", quoteChar(e.atChar), e.pos[0], e.pos[1]) + s := fmt.Sprintf("%s %s: %s", e.msg, e.context, loc) + if e.readerErr != nil { + s += "\nreader error: " + e.readerErr.Error() + } + return s +} + +// quoteChar formats c as a quoted character literal +func quoteChar(c byte) string { + // special cases - different from quoted strings + if c == '\'' { + return `'\''` + } + if c == '"' { + return `'"'` + } + + // use quoted string with different quotation marks + s := strconv.Quote(string(c)) + return "'" + s[1:len(s)-1] + "'" +} diff --git a/internal/s3select/jstream/scanner.go b/internal/s3select/jstream/scanner.go new file mode 100644 index 000000000..a8e5be7db --- /dev/null +++ b/internal/s3select/jstream/scanner.go @@ -0,0 +1,114 @@ +package jstream + +import ( + "io" + "sync/atomic" +) + +const ( + chunk = 4095 // ~4k + maxUint = ^uint(0) + maxInt = int64(maxUint >> 1) + nullByte = byte(0) +) + +type scanner struct { + pos int64 // position in reader + ipos int64 // internal buffer position + ifill int64 // internal buffer fill + end int64 + buf [chunk + 1]byte // internal buffer (with a lookback size of 1) + nbuf [chunk]byte // next internal buffer + fillReq chan struct{} + fillReady chan int64 + readerErr error // underlying reader error, if any +} + +func newScanner(r io.Reader) *scanner { + sr := &scanner{ + end: maxInt, + fillReq: make(chan struct{}), + fillReady: make(chan int64), + } + + go func() { + var rpos int64 // total bytes read into buffer + + defer func() { + atomic.StoreInt64(&sr.end, rpos) + close(sr.fillReady) + }() + + for range sr.fillReq { + scan: + n, err := r.Read(sr.nbuf[:]) + + if n == 0 { + switch err { + case io.EOF: // reader is exhausted + return + case nil: // no data and no error, retry fill + goto scan + default: // unexpected reader error + sr.readerErr = err + return + } + } + + rpos += int64(n) + sr.fillReady <- int64(n) + } + }() + + sr.fillReq <- struct{}{} // initial fill + + return sr +} + +// remaining returns the number of unread bytes +// if EOF for the underlying reader has not yet been found, +// maximum possible integer value will be returned +func (s *scanner) remaining() int64 { + if atomic.LoadInt64(&s.end) == maxInt { + return maxInt + } + return atomic.LoadInt64(&s.end) - s.pos +} + +// read byte at current position (without advancing) +func (s *scanner) cur() byte { return s.buf[s.ipos] } + +// read next byte +func (s *scanner) next() byte { + if s.pos >= atomic.LoadInt64(&s.end) { + return nullByte + } + s.ipos++ + + if s.ipos > s.ifill { // internal buffer is exhausted + s.ifill = <-s.fillReady + + s.buf[0] = s.buf[len(s.buf)-1] // copy current last item to guarantee lookback + copy(s.buf[1:], s.nbuf[:]) // copy contents of pre-filled next buffer + s.ipos = 1 // move to beginning of internal buffer + + // request next fill to be prepared + if s.end == maxInt { + s.fillReq <- struct{}{} + } + } + + s.pos++ + return s.buf[s.ipos] +} + +// back undoes a previous call to next(), moving backward one byte in the internal buffer. +// as we only guarantee a lookback buffer size of one, any subsequent calls to back() +// before calling next() may panic +func (s *scanner) back() { + if s.ipos <= 0 { + panic("back buffer exhausted") + } + s.ipos-- + s.pos-- +} diff --git a/internal/s3select/jstream/scanner_test.go b/internal/s3select/jstream/scanner_test.go new file mode 100644 index 000000000..a3df2d8d6 --- /dev/null +++ b/internal/s3select/jstream/scanner_test.go @@ -0,0 +1,170 @@ +package jstream + +import ( + "bufio" + "bytes" + "fmt" + "io" + "sync/atomic" + "testing" +) + +var ( + smallInput = make([]byte, 1024*12) // 12K + mediumInput = make([]byte, 1024*1024*12) // 12MB + largeInput = make([]byte, 1024*1024*128) // 128MB +) + +func TestScanner(t *testing.T) { + data := []byte("abcdefghijklmnopqrstuvwxyz0123456789") + + var i int + r := bytes.NewReader(data) + scanner := newScanner(r) + for scanner.pos < atomic.LoadInt64(&scanner.end) { + c := scanner.next() + if scanner.readerErr != nil { + t.Fatal(scanner.readerErr) + } + if c != data[i] { + t.Fatalf("expected %s, got %s", string(data[i]), string(c)) + } + t.Logf("pos=%d remaining=%d (%s)", i, r.Len(), string(c)) + i++ + } +} + +type mockReader struct { + pos int + mockData byte + failAfter int +} + +func newMockReader(failAfter int, data byte) *mockReader { + return &mockReader{0, data, failAfter} +} + +func (r *mockReader) Read(p []byte) (n int, err error) { + if r.pos >= r.failAfter { + return 0, fmt.Errorf("intentionally unexpected reader error") + } + r.pos++ + p[0] = r.mockData + return 1, nil +} + +func TestScannerFailure(t *testing.T) { + var ( + i int + failAfter = 900 + mockData = byte(32) + ) + + r := newMockReader(failAfter, mockData) + scanner := newScanner(r) + + for i < 1000 { + c := scanner.next() + if c == byte(0) { + break + } + if c != mockData { + t.Fatalf("expected \"%s\", got \"%s\"", string(mockData), string(c)) + } + i++ + } + c := scanner.next() + if scanner.readerErr == nil { + t.Fatalf("failed to receive expected error after %d bytes", failAfter) + } + if c != byte(0) { + t.Fatalf("expected null byte, got %v", c) + } +} + +func BenchmarkBufioScanner(b *testing.B) { + b.Run("small", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioScanner(smallInput) + } + }) + b.Run("medium", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioScanner(mediumInput) + } + }) + b.Run("large", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioScanner(largeInput) + } + }) +} + +func benchmarkBufioScanner(b []byte) { + s := bufio.NewScanner(bytes.NewReader(b)) + s.Split(bufio.ScanBytes) + for s.Scan() { + s.Bytes() + } +} + +func BenchmarkBufioReader(b *testing.B) { + b.Run("small", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioReader(smallInput) + } + }) + b.Run("medium", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioReader(mediumInput) + } + }) + b.Run("large", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkBufioReader(largeInput) + } + }) +} + +func benchmarkBufioReader(b []byte) { + br := bufio.NewReader(bytes.NewReader(b)) +loop: + for { + _, err := br.ReadByte() + switch err { + case nil: + continue loop + case io.EOF: + break loop + default: + panic(err) + } + } +} + +func BenchmarkScanner(b *testing.B) { + b.Run("small", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkScanner(smallInput) + } + }) + b.Run("medium", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkScanner(mediumInput) + } + }) + b.Run("large", func(b *testing.B) { + for i := 0; i < b.N; i++ { + benchmarkScanner(largeInput) + } + }) +} + +func benchmarkScanner(b []byte) { + r := bytes.NewReader(b) + + scanner := newScanner(r) + for scanner.remaining() > 0 { + scanner.next() + } +} diff --git a/internal/s3select/jstream/scratch.go b/internal/s3select/jstream/scratch.go new file mode 100644 index 000000000..75bc6c435 --- /dev/null +++ b/internal/s3select/jstream/scratch.go @@ -0,0 +1,44 @@ +package jstream + +import ( + "unicode/utf8" +) + +type scratch struct { + data []byte + fill int +} + +// reset scratch buffer +func (s *scratch) reset() { s.fill = 0 } + +// bytes returns the written contents of scratch buffer +func (s *scratch) bytes() []byte { return s.data[0:s.fill] } + +// grow scratch buffer +func (s *scratch) grow() { + ndata := make([]byte, cap(s.data)*2) + copy(ndata, s.data) + s.data = ndata +} + +// append single byte to scratch buffer +func (s *scratch) add(c byte) { + if s.fill+1 >= cap(s.data) { + s.grow() + } + + s.data[s.fill] = c + s.fill++ +} + +// append encoded rune to scratch buffer +func (s *scratch) addRune(r rune) int { + if s.fill+utf8.UTFMax >= cap(s.data) { + s.grow() + } + + n := utf8.EncodeRune(s.data[s.fill:], r) + s.fill += n + return n +} diff --git a/internal/s3select/parquet/reader.go b/internal/s3select/parquet/reader.go index 7d27c3a35..f8dd311ee 100644 --- a/internal/s3select/parquet/reader.go +++ b/internal/s3select/parquet/reader.go @@ -22,10 +22,10 @@ import ( "io" "time" - "github.com/bcicen/jstream" parquetgo "github.com/fraugster/parquet-go" parquettypes "github.com/fraugster/parquet-go/parquet" jsonfmt "github.com/minio/minio/internal/s3select/json" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" ) diff --git a/internal/s3select/simdj/record.go b/internal/s3select/simdj/record.go index 3cf91de6f..9f66069d6 100644 --- a/internal/s3select/simdj/record.go +++ b/internal/s3select/simdj/record.go @@ -21,9 +21,9 @@ import ( "fmt" "io" - "github.com/bcicen/jstream" csv "github.com/minio/csvparser" "github.com/minio/minio/internal/s3select/json" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/minio/internal/s3select/sql" "github.com/minio/simdjson-go" ) diff --git a/internal/s3select/sql/evaluate.go b/internal/s3select/sql/evaluate.go index b09be3b56..95dd716da 100644 --- a/internal/s3select/sql/evaluate.go +++ b/internal/s3select/sql/evaluate.go @@ -24,7 +24,7 @@ import ( "math" "strings" - "github.com/bcicen/jstream" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/simdjson-go" ) diff --git a/internal/s3select/sql/jsonpath.go b/internal/s3select/sql/jsonpath.go index 7e20c4584..9ac995e96 100644 --- a/internal/s3select/sql/jsonpath.go +++ b/internal/s3select/sql/jsonpath.go @@ -20,7 +20,7 @@ package sql import ( "errors" - "github.com/bcicen/jstream" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/simdjson-go" ) diff --git a/internal/s3select/sql/jsonpath_test.go b/internal/s3select/sql/jsonpath_test.go index b04361345..2825e9a9e 100644 --- a/internal/s3select/sql/jsonpath_test.go +++ b/internal/s3select/sql/jsonpath_test.go @@ -27,11 +27,11 @@ import ( "testing" "github.com/alecthomas/participle" - "github.com/bcicen/jstream" + "github.com/minio/minio/internal/s3select/jstream" ) func getJSONStructs(b []byte) ([]interface{}, error) { - dec := jstream.NewDecoder(bytes.NewBuffer(b), 0).ObjectAsKVS() + dec := jstream.NewDecoder(bytes.NewBuffer(b), 0).ObjectAsKVS().MaxDepth(100) var result []interface{} for parsedVal := range dec.Stream() { result = append(result, parsedVal.Value) diff --git a/internal/s3select/sql/statement.go b/internal/s3select/sql/statement.go index ce8bfd8b8..14068b6d7 100644 --- a/internal/s3select/sql/statement.go +++ b/internal/s3select/sql/statement.go @@ -22,7 +22,7 @@ import ( "fmt" "strings" - "github.com/bcicen/jstream" + "github.com/minio/minio/internal/s3select/jstream" "github.com/minio/simdjson-go" )