mirror of
https://github.com/minio/minio.git
synced 2025-04-12 07:22:18 -04:00
S3 Select: Convert CSV data to JSON (#8464)
This commit is contained in:
parent
26863009c0
commit
1c90a6bd49
@ -87,7 +87,13 @@ func (r *Record) Set(name string, value *sql.Value) error {
|
|||||||
} else if value.IsNull() {
|
} else if value.IsNull() {
|
||||||
v = nil
|
v = nil
|
||||||
} else if b, ok := value.ToBytes(); ok {
|
} else if b, ok := value.ToBytes(); ok {
|
||||||
v = RawJSON(b)
|
// This can either be raw json or a CSV value.
|
||||||
|
// Only treat objects and arrays as JSON.
|
||||||
|
if len(b) > 0 && (b[0] == '{' || b[0] == '[') {
|
||||||
|
v = RawJSON(b)
|
||||||
|
} else {
|
||||||
|
v = string(b)
|
||||||
|
}
|
||||||
} else if arr, ok := value.ToArray(); ok {
|
} else if arr, ok := value.ToArray(); ok {
|
||||||
v = arr
|
v = arr
|
||||||
} else {
|
} else {
|
||||||
|
@ -59,7 +59,7 @@ func TestJSONQueries(t *testing.T) {
|
|||||||
var testTable = []struct {
|
var testTable = []struct {
|
||||||
name string
|
name string
|
||||||
query string
|
query string
|
||||||
requestXML []byte
|
requestXML []byte // override request XML
|
||||||
wantResult string
|
wantResult string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -294,6 +294,121 @@ func TestJSONQueries(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCSVQueries(t *testing.T) {
|
||||||
|
input := `id,time,num,num2,text
|
||||||
|
1,2010-01-01T,7867786,4565.908123,"a text, with comma"
|
||||||
|
2,2017-01-02T03:04Z,-5, 0.765111,
|
||||||
|
`
|
||||||
|
var testTable = []struct {
|
||||||
|
name string
|
||||||
|
query string
|
||||||
|
requestXML []byte // override request XML
|
||||||
|
wantResult string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "select-all",
|
||||||
|
query: `SELECT * from s3object AS s WHERE id = '1'`,
|
||||||
|
wantResult: `{"id":"1","time":"2010-01-01T","num":"7867786","num2":"4565.908123","text":"a text, with comma"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-all-2",
|
||||||
|
query: `SELECT * from s3object s WHERE id = 2`,
|
||||||
|
wantResult: `{"id":"2","time":"2017-01-02T03:04Z","num":"-5","num2":" 0.765111","text":""}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-text-convert",
|
||||||
|
query: `SELECT CAST(text AS STRING) AS text from s3object s WHERE id = 1`,
|
||||||
|
wantResult: `{"text":"a text, with comma"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-text-direct",
|
||||||
|
query: `SELECT text from s3object s WHERE id = 1`,
|
||||||
|
wantResult: `{"text":"a text, with comma"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-time-direct",
|
||||||
|
query: `SELECT time from s3object s WHERE id = 2`,
|
||||||
|
wantResult: `{"time":"2017-01-02T03:04Z"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-int-direct",
|
||||||
|
query: `SELECT num from s3object s WHERE id = 2`,
|
||||||
|
wantResult: `{"num":"-5"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-float-direct",
|
||||||
|
query: `SELECT num2 from s3object s WHERE id = 2`,
|
||||||
|
wantResult: `{"num2":" 0.765111"}`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "select-float-by-val",
|
||||||
|
query: `SELECT num2 from s3object s WHERE num2 = 0.765111`,
|
||||||
|
wantResult: `{"num2":" 0.765111"}`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
defRequest := `<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<SelectObjectContentRequest>
|
||||||
|
<Expression>%s</Expression>
|
||||||
|
<ExpressionType>SQL</ExpressionType>
|
||||||
|
<InputSerialization>
|
||||||
|
<CompressionType>NONE</CompressionType>
|
||||||
|
<CSV>
|
||||||
|
<FileHeaderInfo>USE</FileHeaderInfo>
|
||||||
|
</CSV>
|
||||||
|
</InputSerialization>
|
||||||
|
<OutputSerialization>
|
||||||
|
<JSON>
|
||||||
|
</JSON>
|
||||||
|
</OutputSerialization>
|
||||||
|
<RequestProgress>
|
||||||
|
<Enabled>FALSE</Enabled>
|
||||||
|
</RequestProgress>
|
||||||
|
</SelectObjectContentRequest>`
|
||||||
|
|
||||||
|
for _, testCase := range testTable {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
testReq := testCase.requestXML
|
||||||
|
if len(testReq) == 0 {
|
||||||
|
testReq = []byte(fmt.Sprintf(defRequest, testCase.query))
|
||||||
|
}
|
||||||
|
s3Select, err := NewS3Select(bytes.NewReader(testReq))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) {
|
||||||
|
return ioutil.NopCloser(bytes.NewBufferString(input)), nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w := &testResponseWriter{}
|
||||||
|
s3Select.Evaluate(w)
|
||||||
|
s3Select.Close()
|
||||||
|
resp := http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: ioutil.NopCloser(bytes.NewReader(w.response)),
|
||||||
|
ContentLength: int64(len(w.response)),
|
||||||
|
}
|
||||||
|
res, err := minio.NewSelectResults(&resp, "testbucket")
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
got, err := ioutil.ReadAll(res)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gotS := strings.TrimSpace(string(got))
|
||||||
|
if !reflect.DeepEqual(gotS, testCase.wantResult) {
|
||||||
|
t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.query, gotS, testCase.wantResult)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCSVInput(t *testing.T) {
|
func TestCSVInput(t *testing.T) {
|
||||||
var testTable = []struct {
|
var testTable = []struct {
|
||||||
requestXML []byte
|
requestXML []byte
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -158,13 +159,13 @@ func (v Value) ToFloat() (val float64, ok bool) {
|
|||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToInt converts value to int.
|
// ToInt returns the value if int.
|
||||||
func (v Value) ToInt() (val int64, ok bool) {
|
func (v Value) ToInt() (val int64, ok bool) {
|
||||||
val, ok = v.value.(int64)
|
val, ok = v.value.(int64)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToString converts value to string.
|
// ToString returns the value if string.
|
||||||
func (v Value) ToString() (val string, ok bool) {
|
func (v Value) ToString() (val string, ok bool) {
|
||||||
val, ok = v.value.(string)
|
val, ok = v.value.(string)
|
||||||
return
|
return
|
||||||
@ -215,7 +216,7 @@ func (v Value) ToTimestamp() (t time.Time, ok bool) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ToBytes converts Value to byte-slice.
|
// ToBytes returns the value if byte-slice.
|
||||||
func (v Value) ToBytes() (val []byte, ok bool) {
|
func (v Value) ToBytes() (val []byte, ok bool) {
|
||||||
val, ok = v.value.([]byte)
|
val, ok = v.value.([]byte)
|
||||||
return
|
return
|
||||||
@ -339,6 +340,48 @@ const (
|
|||||||
opIneq = "!="
|
opIneq = "!="
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// InferBytesType will attempt to infer the data type of bytes.
|
||||||
|
// Will fail if value type is not bytes or it would result in invalid utf8.
|
||||||
|
// ORDER: int, float, bool, JSON (object or array), timestamp, string
|
||||||
|
// If the content is valid JSON, the type will still be bytes.
|
||||||
|
func (v *Value) InferBytesType() (err error) {
|
||||||
|
b, ok := v.ToBytes()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("InferByteType: Input is not bytes, but %v", v.GetTypeString())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for numeric inference
|
||||||
|
if x, ok := v.bytesToInt(); ok {
|
||||||
|
v.setInt(x)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if x, ok := v.bytesToFloat(); ok {
|
||||||
|
v.setFloat(x)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if x, ok := v.bytesToBool(); ok {
|
||||||
|
v.setBool(x)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
asString := strings.TrimSpace(v.bytesToString())
|
||||||
|
if len(b) > 0 &&
|
||||||
|
(strings.HasPrefix(asString, "{") || strings.HasPrefix(asString, "[")) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if t, err := parseSQLTimestamp(asString); err == nil {
|
||||||
|
v.setTimestamp(t)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !utf8.Valid(b) {
|
||||||
|
return errors.New("value is not valid utf-8")
|
||||||
|
}
|
||||||
|
// Fallback to string
|
||||||
|
v.setString(asString)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// When numeric types are compared, type promotions could happen. If
|
// When numeric types are compared, type promotions could happen. If
|
||||||
// values do not have types (e.g. when reading from CSV), for
|
// values do not have types (e.g. when reading from CSV), for
|
||||||
// comparison operations, automatic type conversion happens by trying
|
// comparison operations, automatic type conversion happens by trying
|
||||||
|
Loading…
x
Reference in New Issue
Block a user