select: Support Parquet dates (#11928)

Pass schema to parser to support dates.

Fixes #11926
This commit is contained in:
Klaus Post 2021-04-03 17:25:19 +02:00 committed by GitHub
parent bf106453b8
commit dca7cf7200
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 13 deletions

View File

@ -67,10 +67,18 @@ func getColumns(
if nameColumnMap == nil {
nameColumnMap = make(map[string]*column)
}
var se *parquet.SchemaElement
for _, schema := range schemaElements {
if schema != nil && schema.Name == columnName {
se = schema
break
}
}
nameColumnMap[columnName] = &column{
name: columnName,
metadata: meta,
schema: se,
schemaElements: schemaElements,
rc: rc,
thriftReader: thriftReader,
@ -95,6 +103,7 @@ type column struct {
valueIndex int
valueType parquet.Type
metadata *parquet.ColumnMetaData
schema *parquet.SchemaElement
schemaElements []*parquet.SchemaElement
nameIndexMap map[string]int
dictPage *page
@ -140,14 +149,14 @@ func (column *column) readPage() {
column.dataTable.Merge(page.DataTable)
}
func (column *column) read() (value interface{}, valueType parquet.Type) {
func (column *column) read() (value interface{}, valueType parquet.Type, cnv *parquet.SchemaElement) {
if column.dataTable == nil {
column.readPage()
column.valueIndex = 0
}
if column.endOfValues {
return nil, column.metadata.GetType()
return nil, column.metadata.GetType(), column.schema
}
value = column.dataTable.Values[column.valueIndex]
@ -156,5 +165,5 @@ func (column *column) read() (value interface{}, valueType parquet.Type) {
column.dataTable = nil
}
return value, column.metadata.GetType()
return value, column.metadata.GetType(), column.schema
}

View File

@ -72,8 +72,9 @@ func fileMetadata(getReaderFunc GetReaderFunc) (*parquet.FileMetaData, error) {
// Value - denotes column value
type Value struct {
Value interface{}
Type parquet.Type
Value interface{}
Type parquet.Type
Schema *parquet.SchemaElement
}
// MarshalJSON - encodes to JSON data
@ -144,8 +145,9 @@ func (reader *Reader) Read() (record *Record, err error) {
record = newRecord(reader.nameList)
for name := range reader.columns {
value, valueType := reader.columns[name].read()
record.set(name, Value{value, valueType})
col := reader.columns[name]
value, valueType, schema := col.read()
record.set(name, Value{Value: value, Type: valueType, Schema: schema})
}
reader.rowIndex++

View File

@ -59,9 +59,9 @@ func TestReader(t *testing.T) {
}
expectedRecords := []string{
`map[one:{-1 DOUBLE} three:{true BOOLEAN} two:{[102 111 111] BYTE_ARRAY}]`,
`map[one:{<nil> DOUBLE} three:{false BOOLEAN} two:{[98 97 114] BYTE_ARRAY}]`,
`map[one:{2.5 DOUBLE} three:{true BOOLEAN} two:{[98 97 122] BYTE_ARRAY}]`,
`map[one:{-1 DOUBLE SchemaElement({Type:DOUBLE TypeLength:<nil> RepetitionType:OPTIONAL Name:one NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} three:{true BOOLEAN SchemaElement({Type:BOOLEAN TypeLength:<nil> RepetitionType:OPTIONAL Name:three NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} two:{[102 111 111] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength:<nil> RepetitionType:OPTIONAL Name:two NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})}]`,
`map[one:{<nil> DOUBLE SchemaElement({Type:DOUBLE TypeLength:<nil> RepetitionType:OPTIONAL Name:one NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} three:{false BOOLEAN SchemaElement({Type:BOOLEAN TypeLength:<nil> RepetitionType:OPTIONAL Name:three NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} two:{[98 97 114] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength:<nil> RepetitionType:OPTIONAL Name:two NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})}]`,
`map[one:{2.5 DOUBLE SchemaElement({Type:DOUBLE TypeLength:<nil> RepetitionType:OPTIONAL Name:one NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} three:{true BOOLEAN SchemaElement({Type:BOOLEAN TypeLength:<nil> RepetitionType:OPTIONAL Name:three NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})} two:{[98 97 122] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength:<nil> RepetitionType:OPTIONAL Name:two NumChildren:<nil> ConvertedType:<nil> Scale:<nil> Precision:<nil> FieldID:<nil> LogicalType:<nil>})}]`,
}
i := 0
@ -76,11 +76,11 @@ func TestReader(t *testing.T) {
}
if i == len(expectedRecords) {
t.Fatalf("read more than expected record count %v", len(expectedRecords))
t.Errorf("read more than expected record count %v", len(expectedRecords))
}
if record.String() != expectedRecords[i] {
t.Fatalf("record%v: expected: %v, got: %v", i+1, expectedRecords[i], record.String())
t.Errorf("record%v: expected: %v, got: %v", i+1, expectedRecords[i], record.String())
}
i++

View File

@ -19,6 +19,7 @@ package parquet
import (
"fmt"
"io"
"time"
"github.com/bcicen/jstream"
parquetgo "github.com/minio/minio/pkg/s3select/internal/parquet-go"
@ -63,8 +64,23 @@ func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
value = v.Value.(bool)
case parquetgen.Type_INT32:
value = int64(v.Value.(int32))
if v.Schema != nil && v.Schema.ConvertedType != nil {
switch *v.Schema.ConvertedType {
case parquetgen.ConvertedType_DATE:
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(v.Value.(int32)), 0).UTC())
}
}
case parquetgen.Type_INT64:
value = v.Value.(int64)
if v.Schema != nil && v.Schema.ConvertedType != nil {
switch *v.Schema.ConvertedType {
// Only UTC supported, add one NS to never be exactly midnight.
case parquetgen.ConvertedType_TIMESTAMP_MILLIS:
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Millisecond).UTC())
case parquetgen.ConvertedType_TIMESTAMP_MICROS:
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Microsecond).UTC())
}
}
case parquetgen.Type_FLOAT:
value = float64(v.Value.(float32))
case parquetgen.Type_DOUBLE:

View File

@ -1069,7 +1069,7 @@ func TestParquetInput(t *testing.T) {
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
getReader := func(offset int64, length int64) (io.ReadCloser, error) {
testdataFile := "testdata.parquet"
testdataFile := "testdata/testdata.parquet"
file, err := os.Open(testdataFile)
if err != nil {
return nil, err
@ -1126,3 +1126,243 @@ func TestParquetInput(t *testing.T) {
})
}
}
func TestParquetInputSchema(t *testing.T) {
os.Setenv("MINIO_API_SELECT_PARQUET", "on")
defer os.Setenv("MINIO_API_SELECT_PARQUET", "off")
var testTable = []struct {
requestXML []byte
wantResult string
}{
{
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>SELECT * FROM S3Object LIMIT 5</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet>
</Parquet>
</InputSerialization>
<OutputSerialization>
<JSON>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>
`), wantResult: `{"shipdate":"1996-03-13T"}
{"shipdate":"1996-04-12T"}
{"shipdate":"1996-01-29T"}
{"shipdate":"1996-04-21T"}
{"shipdate":"1996-03-30T"}`,
},
{
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>SELECT DATE_ADD(day, 2, shipdate) as shipdate FROM S3Object LIMIT 5</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet>
</Parquet>
</InputSerialization>
<OutputSerialization>
<JSON>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>
`), wantResult: `{"shipdate":"1996-03-15T"}
{"shipdate":"1996-04-14T"}
{"shipdate":"1996-01-31T"}
{"shipdate":"1996-04-23T"}
{"shipdate":"1996-04T"}`,
},
}
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
getReader := func(offset int64, length int64) (io.ReadCloser, error) {
testdataFile := "testdata/lineitem_shipdate.parquet"
file, err := os.Open(testdataFile)
if err != nil {
return nil, err
}
fi, err := file.Stat()
if err != nil {
return nil, err
}
if offset < 0 {
offset = fi.Size() + offset
}
if _, err = file.Seek(offset, io.SeekStart); err != nil {
return nil, err
}
return file, nil
}
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(getReader); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
resp := http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(w.response)),
ContentLength: int64(len(w.response)),
}
res, err := minio.NewSelectResults(&resp, "testbucket")
if err != nil {
t.Error(err)
return
}
got, err := ioutil.ReadAll(res)
if err != nil {
t.Error(err)
return
}
gotS := strings.TrimSpace(string(got))
if !reflect.DeepEqual(gotS, testCase.wantResult) {
t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.requestXML, gotS, testCase.wantResult)
}
})
}
}
func TestParquetInputSchemaCSV(t *testing.T) {
os.Setenv("MINIO_API_SELECT_PARQUET", "on")
defer os.Setenv("MINIO_API_SELECT_PARQUET", "off")
var testTable = []struct {
requestXML []byte
wantResult string
}{
{
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>SELECT * FROM S3Object LIMIT 5</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet>
</Parquet>
</InputSerialization>
<OutputSerialization>
<CSV/>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>
`), wantResult: `1996-03-13T
1996-04-12T
1996-01-29T
1996-04-21T
1996-03-30T`,
},
{
requestXML: []byte(`
<?xml version="1.0" encoding="UTF-8"?>
<SelectObjectContentRequest>
<Expression>SELECT DATE_ADD(day, 2, shipdate) as shipdate FROM S3Object LIMIT 5</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet>
</Parquet>
</InputSerialization>
<OutputSerialization>
<CSV/>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectObjectContentRequest>
`), wantResult: `1996-03-15T
1996-04-14T
1996-01-31T
1996-04-23T
1996-04T`,
},
}
for i, testCase := range testTable {
t.Run(fmt.Sprint(i), func(t *testing.T) {
getReader := func(offset int64, length int64) (io.ReadCloser, error) {
testdataFile := "testdata/lineitem_shipdate.parquet"
file, err := os.Open(testdataFile)
if err != nil {
return nil, err
}
fi, err := file.Stat()
if err != nil {
return nil, err
}
if offset < 0 {
offset = fi.Size() + offset
}
if _, err = file.Seek(offset, io.SeekStart); err != nil {
return nil, err
}
return file, nil
}
s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML))
if err != nil {
t.Fatal(err)
}
if err = s3Select.Open(getReader); err != nil {
t.Fatal(err)
}
w := &testResponseWriter{}
s3Select.Evaluate(w)
s3Select.Close()
resp := http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(w.response)),
ContentLength: int64(len(w.response)),
}
res, err := minio.NewSelectResults(&resp, "testbucket")
if err != nil {
t.Error(err)
return
}
got, err := ioutil.ReadAll(res)
if err != nil {
t.Error(err)
return
}
gotS := strings.TrimSpace(string(got))
if !reflect.DeepEqual(gotS, testCase.wantResult) {
t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.requestXML, gotS, testCase.wantResult)
}
})
}
}

Binary file not shown.