minio/internal/s3select/parquet/reader.go

144 lines
4.5 KiB
Go

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package parquet
import (
"errors"
"io"
"time"
"github.com/bcicen/jstream"
parquetgo "github.com/fraugster/parquet-go"
parquettypes "github.com/fraugster/parquet-go/parquet"
jsonfmt "github.com/minio/minio/internal/s3select/json"
"github.com/minio/minio/internal/s3select/sql"
)
// Reader implements reading records from parquet input.
type Reader struct {
io.Closer
r *parquetgo.FileReader
}
// NewParquetReader creates a Reader2 from a io.ReadSeekCloser.
func NewParquetReader(rsc io.ReadSeekCloser, _ *ReaderArgs) (r *Reader, err error) {
fr, err := parquetgo.NewFileReader(rsc)
if err != nil {
return nil, errParquetParsingError(err)
}
return &Reader{Closer: rsc, r: fr}, nil
}
func (pr *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
nextRow, err := pr.r.NextRow()
if err != nil {
if err == io.EOF {
return nil, err
}
return nil, errParquetParsingError(err)
}
kvs := jstream.KVS{}
for _, col := range pr.r.Columns() {
var value interface{}
if v, ok := nextRow[col.FlatName()]; ok {
value, err = convertFromAnnotation(col.Element(), v)
if err != nil {
return nil, errParquetParsingError(err)
}
}
kvs = append(kvs, jstream.KV{Key: col.FlatName(), Value: value})
}
// Reuse destination if we can.
dstRec, ok := dst.(*jsonfmt.Record)
if !ok {
dstRec = &jsonfmt.Record{}
}
dstRec.SelectFormat = sql.SelectFmtParquet
dstRec.KVS = kvs
return dstRec, nil
}
// convertFromAnnotation - converts values based on the Parquet column's type
// annotations. LogicalType annotations if present override the deprecated
// ConvertedType annotations. Ref:
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func convertFromAnnotation(se *parquettypes.SchemaElement, v interface{}) (interface{}, error) {
if se == nil {
return v, nil
}
var value interface{}
switch val := v.(type) {
case []byte:
// TODO: only strings are supported in s3select output (not
// binary arrays) - perhaps we need to check the annotation to
// ensure it's UTF8 encoded.
value = string(val)
case [12]byte:
// TODO: This is returned for the parquet INT96 type. We just
// treat it same as []byte (but AWS S3 treats it as a large int)
// - fix this later.
value = string(val[:])
case int32:
value = int64(val)
if logicalType := se.GetLogicalType(); logicalType != nil {
if logicalType.IsSetDATE() {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_DATE {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
case int64:
value = val
if logicalType := se.GetLogicalType(); logicalType != nil {
if ts := logicalType.GetTIMESTAMP(); ts != nil {
var duration time.Duration
// Only support UTC normalized timestamps.
if ts.IsAdjustedToUTC {
switch {
case ts.Unit.IsSetNANOS():
duration = time.Duration(val) * time.Nanosecond
case ts.Unit.IsSetMILLIS():
duration = time.Duration(val) * time.Millisecond
case ts.Unit.IsSetMICROS():
duration = time.Duration(val) * time.Microsecond
default:
return nil, errors.New("Invalid LogicalType annotation found")
}
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MILLIS {
duration := time.Duration(val) * time.Millisecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MICROS {
duration := time.Duration(val) * time.Microsecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
}
case float32:
value = float64(val)
default:
value = v
}
return value, nil
}