S3 select switch to new parquet library and reduce locking (#14731)

- This change switches to a new parquet library
- SelectObjectContent now takes a single lock at the beginning and holds it
during the operation. Previously the operation took a lock every time the
parquet library performed a Seek on the underlying object stream.
- Add basic support for LogicalType annotations for timestamps.
This commit is contained in:
Aditya Manthramurthy
2022-04-14 06:54:47 -07:00
committed by GitHub
parent 67e17ed3f8
commit e8e48e4c4a
7 changed files with 336 additions and 348 deletions

View File

@@ -18,88 +18,55 @@
package parquet
import (
"fmt"
"errors"
"io"
"time"
"github.com/bcicen/jstream"
parquetgo "github.com/fraugster/parquet-go"
parquettypes "github.com/fraugster/parquet-go/parquet"
jsonfmt "github.com/minio/minio/internal/s3select/json"
"github.com/minio/minio/internal/s3select/sql"
parquetgo "github.com/minio/parquet-go"
parquetgen "github.com/minio/parquet-go/gen-go/parquet"
)
// Reader - Parquet record reader for S3Select.
// Reader implements reading records from parquet input.
type Reader struct {
args *ReaderArgs
reader *parquetgo.Reader
io.Closer
r *parquetgo.FileReader
}
// Read - reads single record.
func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
defer func() {
if rec := recover(); rec != nil {
rerr = fmt.Errorf("panic reading parquet record: %v", rec)
}
}()
parquetRecord, err := r.reader.Read()
// NewParquetReader creates a Reader2 from a io.ReadSeekCloser.
func NewParquetReader(rsc io.ReadSeekCloser, _ *ReaderArgs) (r *Reader, err error) {
fr, err := parquetgo.NewFileReader(rsc)
if err != nil {
if err != io.EOF {
return nil, errParquetParsingError(err)
}
return nil, errParquetParsingError(err)
}
return nil, err
return &Reader{Closer: rsc, r: fr}, nil
}
func (pr *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
nextRow, err := pr.r.NextRow()
if err != nil {
if err == io.EOF {
return nil, err
}
return nil, errParquetParsingError(err)
}
kvs := jstream.KVS{}
f := func(name string, v parquetgo.Value) bool {
if v.Value == nil {
kvs = append(kvs, jstream.KV{Key: name, Value: nil})
return true
}
for _, col := range pr.r.Columns() {
var value interface{}
switch v.Type {
case parquetgen.Type_BOOLEAN:
value = v.Value.(bool)
case parquetgen.Type_INT32:
value = int64(v.Value.(int32))
if v.Schema != nil && v.Schema.ConvertedType != nil {
switch *v.Schema.ConvertedType {
case parquetgen.ConvertedType_DATE:
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(v.Value.(int32)), 0).UTC())
}
if v, ok := nextRow[col.FlatName()]; ok {
value, err = convertFromAnnotation(col.Element(), v)
if err != nil {
return nil, errParquetParsingError(err)
}
case parquetgen.Type_INT64:
value = v.Value.(int64)
if v.Schema != nil && v.Schema.ConvertedType != nil {
switch *v.Schema.ConvertedType {
// Only UTC supported, add one NS to never be exactly midnight.
case parquetgen.ConvertedType_TIMESTAMP_MILLIS:
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Millisecond).UTC())
case parquetgen.ConvertedType_TIMESTAMP_MICROS:
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Microsecond).UTC())
}
}
case parquetgen.Type_FLOAT:
value = float64(v.Value.(float32))
case parquetgen.Type_DOUBLE:
value = v.Value.(float64)
case parquetgen.Type_INT96, parquetgen.Type_BYTE_ARRAY, parquetgen.Type_FIXED_LEN_BYTE_ARRAY:
value = string(v.Value.([]byte))
default:
rerr = errParquetParsingError(nil)
return false
}
kvs = append(kvs, jstream.KV{Key: name, Value: value})
return true
kvs = append(kvs, jstream.KV{Key: col.FlatName(), Value: value})
}
// Apply our range
parquetRecord.Range(f)
// Reuse destination if we can.
dstRec, ok := dst.(*jsonfmt.Record)
if !ok {
@@ -110,29 +77,67 @@ func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
return dstRec, nil
}
// Close - closes underlying readers.
func (r *Reader) Close() error {
return r.reader.Close()
}
// NewReader - creates new Parquet reader using readerFunc callback.
func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), args *ReaderArgs) (r *Reader, err error) {
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("panic reading parquet header: %v", rec)
}
}()
reader, err := parquetgo.NewReader(getReaderFunc, nil)
if err != nil {
if err != io.EOF {
return nil, errParquetParsingError(err)
}
return nil, err
// convertFromAnnotation - converts values based on the Parquet column's type
// annotations. LogicalType annotations if present override the deprecated
// ConvertedType annotations. Ref:
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func convertFromAnnotation(se *parquettypes.SchemaElement, v interface{}) (interface{}, error) {
if se == nil {
return v, nil
}
return &Reader{
args: args,
reader: reader,
}, nil
var value interface{}
switch val := v.(type) {
case []byte:
// TODO: only strings are supported in s3select output (not
// binary arrays) - perhaps we need to check the annotation to
// ensure it's UTF8 encoded.
value = string(val)
case [12]byte:
// TODO: This is returned for the parquet INT96 type. We just
// treat it same as []byte (but AWS S3 treats it as a large int)
// - fix this later.
value = string(val[:])
case int32:
value = int64(val)
if logicalType := se.GetLogicalType(); logicalType != nil {
if logicalType.IsSetDATE() {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_DATE {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
case int64:
value = val
if logicalType := se.GetLogicalType(); logicalType != nil {
if ts := logicalType.GetTIMESTAMP(); ts != nil {
var duration time.Duration
// Only support UTC normalized timestamps.
if ts.IsAdjustedToUTC {
switch {
case ts.Unit.IsSetNANOS():
duration = time.Duration(val) * time.Nanosecond
case ts.Unit.IsSetMILLIS():
duration = time.Duration(val) * time.Millisecond
case ts.Unit.IsSetMICROS():
duration = time.Duration(val) * time.Microsecond
default:
return nil, errors.New("Invalid LogicalType annotation found")
}
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MILLIS {
duration := time.Duration(val) * time.Millisecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MICROS {
duration := time.Duration(val) * time.Microsecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
}
case float32:
value = float64(val)
default:
value = v
}
return value, nil
}