144 lines
4.5 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package parquet
import (
"errors"
"io"
"time"
"github.com/bcicen/jstream"
parquetgo "github.com/fraugster/parquet-go"
parquettypes "github.com/fraugster/parquet-go/parquet"
jsonfmt "github.com/minio/minio/internal/s3select/json"
"github.com/minio/minio/internal/s3select/sql"
)
// Reader implements reading records from parquet input.
type Reader struct {
io.Closer
r *parquetgo.FileReader
}
// NewParquetReader creates a Reader2 from a io.ReadSeekCloser.
func NewParquetReader(rsc io.ReadSeekCloser, _ *ReaderArgs) (r *Reader, err error) {
fr, err := parquetgo.NewFileReader(rsc)
if err != nil {
return nil, errParquetParsingError(err)
}
return &Reader{Closer: rsc, r: fr}, nil
}
func (pr *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) {
nextRow, err := pr.r.NextRow()
if err != nil {
if err == io.EOF {
return nil, err
}
return nil, errParquetParsingError(err)
}
kvs := jstream.KVS{}
for _, col := range pr.r.Columns() {
var value interface{}
if v, ok := nextRow[col.FlatName()]; ok {
value, err = convertFromAnnotation(col.Element(), v)
if err != nil {
return nil, errParquetParsingError(err)
}
}
kvs = append(kvs, jstream.KV{Key: col.FlatName(), Value: value})
}
Concurrent CSV parsing and reduce S3 select allocations (#8200) ``` CSV parsing, BEFORE: BenchmarkReaderBasic-12 2842 407533 ns/op 397860 B/op 957 allocs/op BenchmarkReaderReplace-12 2718 429914 ns/op 397844 B/op 957 allocs/op BenchmarkReaderReplaceTwo-12 2718 435556 ns/op 397855 B/op 957 allocs/op BenchmarkAggregateCount_100K-12 171 6798974 ns/op 16667102 B/op 308077 allocs/op BenchmarkAggregateCount_1M-12 19 65657411 ns/op 168057743 B/op 3146610 allocs/op BenchmarkSelectAll_10M-12 1 20882119900 ns/op 2758799896 B/op 41978762 allocs/op CSV parsing, AFTER: BenchmarkReaderBasic-12 3721 312549 ns/op 101920 B/op 338 allocs/op BenchmarkReaderReplace-12 3776 318810 ns/op 101993 B/op 340 allocs/op BenchmarkReaderReplaceTwo-12 3610 330967 ns/op 102012 B/op 341 allocs/op BenchmarkAggregateCount_100K-12 295 4149588 ns/op 3553623 B/op 103261 allocs/op BenchmarkAggregateCount_1M-12 30 37746503 ns/op 33827931 B/op 1049435 allocs/op BenchmarkSelectAll_10M-12 1 17608495800 ns/op 1416504040 B/op 21007082 allocs/op ~ benchcmp old.txt new.txt benchmark old ns/op new ns/op delta BenchmarkReaderBasic-12 407533 312549 -23.31% BenchmarkReaderReplace-12 429914 318810 -25.84% BenchmarkReaderReplaceTwo-12 435556 330967 -24.01% BenchmarkAggregateCount_100K-12 6798974 4149588 -38.97% BenchmarkAggregateCount_1M-12 65657411 37746503 -42.51% BenchmarkSelectAll_10M-12 20882119900 17608495800 -15.68% benchmark old allocs new allocs delta BenchmarkReaderBasic-12 957 338 -64.68% BenchmarkReaderReplace-12 957 340 -64.47% BenchmarkReaderReplaceTwo-12 957 341 -64.37% BenchmarkAggregateCount_100K-12 308077 103261 -66.48% BenchmarkAggregateCount_1M-12 3146610 1049435 -66.65% BenchmarkSelectAll_10M-12 41978762 21007082 -49.96% benchmark old bytes new bytes delta BenchmarkReaderBasic-12 397860 101920 -74.38% BenchmarkReaderReplace-12 397844 101993 -74.36% BenchmarkReaderReplaceTwo-12 397855 102012 -74.36% BenchmarkAggregateCount_100K-12 16667102 3553623 -78.68% BenchmarkAggregateCount_1M-12 168057743 33827931 -79.87% BenchmarkSelectAll_10M-12 2758799896 1416504040 -48.66% ``` ``` BenchmarkReaderHuge/97K-12 2200 540840 ns/op 184.32 MB/s 1604450 B/op 687 allocs/op BenchmarkReaderHuge/194K-12 1522 752257 ns/op 265.04 MB/s 2143135 B/op 1335 allocs/op BenchmarkReaderHuge/389K-12 1190 947858 ns/op 420.69 MB/s 3221831 B/op 2630 allocs/op BenchmarkReaderHuge/778K-12 806 1472486 ns/op 541.61 MB/s 5201856 B/op 5187 allocs/op BenchmarkReaderHuge/1557K-12 426 2575269 ns/op 619.36 MB/s 9101330 B/op 10233 allocs/op BenchmarkReaderHuge/3115K-12 286 4034656 ns/op 790.66 MB/s 12397968 B/op 16099 allocs/op BenchmarkReaderHuge/6230K-12 172 6830563 ns/op 934.05 MB/s 16008416 B/op 26844 allocs/op BenchmarkReaderHuge/12461K-12 100 11409467 ns/op 1118.39 MB/s 22655163 B/op 48107 allocs/op BenchmarkReaderHuge/24922K-12 66 19780395 ns/op 1290.19 MB/s 35158559 B/op 90216 allocs/op BenchmarkReaderHuge/49844K-12 34 37282559 ns/op 1369.03 MB/s 60528624 B/op 174497 allocs/op ```
2019-09-13 14:18:35 -07:00
// Reuse destination if we can.
dstRec, ok := dst.(*jsonfmt.Record)
if !ok {
dstRec = &jsonfmt.Record{}
}
dstRec.SelectFormat = sql.SelectFmtParquet
dstRec.KVS = kvs
return dstRec, nil
}
// convertFromAnnotation - converts values based on the Parquet column's type
// annotations. LogicalType annotations if present override the deprecated
// ConvertedType annotations. Ref:
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
func convertFromAnnotation(se *parquettypes.SchemaElement, v interface{}) (interface{}, error) {
if se == nil {
return v, nil
}
var value interface{}
switch val := v.(type) {
case []byte:
// TODO: only strings are supported in s3select output (not
// binary arrays) - perhaps we need to check the annotation to
// ensure it's UTF8 encoded.
value = string(val)
case [12]byte:
// TODO: This is returned for the parquet INT96 type. We just
// treat it same as []byte (but AWS S3 treats it as a large int)
// - fix this later.
value = string(val[:])
case int32:
value = int64(val)
if logicalType := se.GetLogicalType(); logicalType != nil {
if logicalType.IsSetDATE() {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_DATE {
value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC())
}
case int64:
value = val
if logicalType := se.GetLogicalType(); logicalType != nil {
if ts := logicalType.GetTIMESTAMP(); ts != nil {
var duration time.Duration
// Only support UTC normalized timestamps.
if ts.IsAdjustedToUTC {
switch {
case ts.Unit.IsSetNANOS():
duration = time.Duration(val) * time.Nanosecond
case ts.Unit.IsSetMILLIS():
duration = time.Duration(val) * time.Millisecond
case ts.Unit.IsSetMICROS():
duration = time.Duration(val) * time.Microsecond
default:
return nil, errors.New("Invalid LogicalType annotation found")
}
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MILLIS {
duration := time.Duration(val) * time.Millisecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
} else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MICROS {
duration := time.Duration(val) * time.Microsecond
value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration))
}
}
case float32:
value = float64(val)
default:
value = v
}
return value, nil
}