681 lines
20 KiB
Go

/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package data
import (
"bytes"
"context"
"fmt"
"strings"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/common"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/encoding"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/schema"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
func getDefaultEncoding(parquetType parquet.Type) parquet.Encoding {
switch parquetType {
case parquet.Type_BOOLEAN:
return parquet.Encoding_PLAIN
case parquet.Type_INT32, parquet.Type_INT64, parquet.Type_FLOAT, parquet.Type_DOUBLE:
return parquet.Encoding_RLE_DICTIONARY
case parquet.Type_BYTE_ARRAY:
return parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY
}
return parquet.Encoding_PLAIN
}
func getFirstValueElement(tree *schema.Tree) (valueElement *schema.Element) {
tree.Range(func(name string, element *schema.Element) bool {
if element.Children == nil {
valueElement = element
} else {
valueElement = getFirstValueElement(element.Children)
}
return false
})
return valueElement
}
func populate(columnDataMap map[string]*Column, input *jsonValue, tree *schema.Tree, firstValueRL int64) (map[string]*Column, error) {
var err error
pos := 0
handleElement := func(name string, element *schema.Element) bool {
pos++
dataPath := element.PathInTree
if *element.RepetitionType == parquet.FieldRepetitionType_REPEATED {
panic(fmt.Errorf("%v: repetition type must be REQUIRED or OPTIONAL type", dataPath))
}
inputValue := input.Get(name)
if *element.RepetitionType == parquet.FieldRepetitionType_REQUIRED && inputValue.IsNull() {
err = fmt.Errorf("%v: nil value for required field", dataPath)
return false
}
add := func(element *schema.Element, value interface{}, DL, RL int64) {
columnData := columnDataMap[element.PathInSchema]
if columnData == nil {
columnData = NewColumn(*element.Type)
}
columnData.add(value, DL, RL)
columnDataMap[element.PathInSchema] = columnData
}
// Handle primitive type element.
if element.Type != nil {
var value interface{}
if value, err = inputValue.GetValue(*element.Type, element.ConvertedType); err != nil {
return false
}
DL := element.MaxDefinitionLevel
if value == nil && DL > 0 {
DL--
}
RL := element.MaxRepetitionLevel
if pos == 1 {
RL = firstValueRL
}
add(element, value, DL, RL)
return true
}
addNull := func() {
valueElement := getFirstValueElement(element.Children)
DL := element.MaxDefinitionLevel
if DL > 0 {
DL--
}
RL := element.MaxRepetitionLevel
if RL > 0 {
RL--
}
add(valueElement, nil, DL, RL)
}
// Handle group type element.
if element.ConvertedType == nil {
if inputValue.IsNull() {
addNull()
return true
}
columnDataMap, err = populate(columnDataMap, inputValue, element.Children, firstValueRL)
return (err == nil)
}
// Handle list type element.
if *element.ConvertedType == parquet.ConvertedType_LIST {
if inputValue.IsNull() {
addNull()
return true
}
var results []gjson.Result
if results, err = inputValue.GetArray(); err != nil {
return false
}
listElement, _ := element.Children.Get("list")
valueElement, _ := listElement.Children.Get("element")
for i := range results {
rl := valueElement.MaxRepetitionLevel
if i == 0 {
rl = firstValueRL
}
var jsonData []byte
if jsonData, err = sjson.SetBytes([]byte{}, "element", results[i].Value()); err != nil {
return false
}
var jv *jsonValue
if jv, err = bytesToJSONValue(jsonData); err != nil {
return false
}
if columnDataMap, err = populate(columnDataMap, jv, listElement.Children, rl); err != nil {
return false
}
}
return true
}
if *element.ConvertedType == parquet.ConvertedType_MAP {
if inputValue.IsNull() {
addNull()
return true
}
keyValueElement, _ := element.Children.Get("key_value")
var rerr error
err = inputValue.Range(func(key, value gjson.Result) bool {
if !key.Exists() || key.Type == gjson.Null {
rerr = fmt.Errorf("%v.key_value.key: not found or null", dataPath)
return false
}
var jsonData []byte
if jsonData, rerr = sjson.SetBytes([]byte{}, "key", key.Value()); rerr != nil {
return false
}
if jsonData, rerr = sjson.SetBytes(jsonData, "value", value.Value()); rerr != nil {
return false
}
var jv *jsonValue
if jv, rerr = bytesToJSONValue(jsonData); rerr != nil {
return false
}
if columnDataMap, rerr = populate(columnDataMap, jv, keyValueElement.Children, firstValueRL); rerr != nil {
return false
}
return true
})
if err != nil {
return false
}
err = rerr
return (err == nil)
}
err = fmt.Errorf("%v: unsupported converted type %v in %v field type", dataPath, *element.ConvertedType, *element.RepetitionType)
return false
}
tree.Range(handleElement)
return columnDataMap, err
}
// Column - denotes values of a column.
type Column struct {
parquetType parquet.Type // value type.
values []interface{} // must be a slice of parquet typed values.
definitionLevels []int64 // exactly same length of values.
repetitionLevels []int64 // exactly same length of values.
rowCount int32
maxBitWidth int32
minValue interface{}
maxValue interface{}
}
func (column *Column) updateMinMaxValue(value interface{}) {
if column.minValue == nil && column.maxValue == nil {
column.minValue = value
column.maxValue = value
return
}
switch column.parquetType {
case parquet.Type_BOOLEAN:
if column.minValue.(bool) && !value.(bool) {
column.minValue = value
}
if !column.maxValue.(bool) && value.(bool) {
column.maxValue = value
}
case parquet.Type_INT32:
if column.minValue.(int32) > value.(int32) {
column.minValue = value
}
if column.maxValue.(int32) < value.(int32) {
column.maxValue = value
}
case parquet.Type_INT64:
if column.minValue.(int64) > value.(int64) {
column.minValue = value
}
if column.maxValue.(int64) < value.(int64) {
column.maxValue = value
}
case parquet.Type_FLOAT:
if column.minValue.(float32) > value.(float32) {
column.minValue = value
}
if column.maxValue.(float32) < value.(float32) {
column.maxValue = value
}
case parquet.Type_DOUBLE:
if column.minValue.(float64) > value.(float64) {
column.minValue = value
}
if column.maxValue.(float64) < value.(float64) {
column.maxValue = value
}
case parquet.Type_BYTE_ARRAY:
if bytes.Compare(column.minValue.([]byte), value.([]byte)) > 0 {
column.minValue = value
}
if bytes.Compare(column.minValue.([]byte), value.([]byte)) < 0 {
column.maxValue = value
}
}
}
func (column *Column) updateStats(value interface{}, DL, RL int64) {
if RL == 0 {
column.rowCount++
}
if value == nil {
return
}
var bitWidth int32
switch column.parquetType {
case parquet.Type_BOOLEAN:
bitWidth = 1
case parquet.Type_INT32:
bitWidth = common.BitWidth(uint64(value.(int32)))
case parquet.Type_INT64:
bitWidth = common.BitWidth(uint64(value.(int64)))
case parquet.Type_FLOAT:
bitWidth = 32
case parquet.Type_DOUBLE:
bitWidth = 64
case parquet.Type_BYTE_ARRAY:
bitWidth = int32(len(value.([]byte)))
}
if column.maxBitWidth < bitWidth {
column.maxBitWidth = bitWidth
}
column.updateMinMaxValue(value)
}
func (column *Column) add(value interface{}, DL, RL int64) {
column.values = append(column.values, value)
column.definitionLevels = append(column.definitionLevels, DL)
column.repetitionLevels = append(column.repetitionLevels, RL)
column.updateStats(value, DL, RL)
}
// AddNull - adds nil value.
func (column *Column) AddNull(DL, RL int64) {
column.add(nil, DL, RL)
}
// AddBoolean - adds boolean value.
func (column *Column) AddBoolean(value bool, DL, RL int64) {
if column.parquetType != parquet.Type_BOOLEAN {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// AddInt32 - adds int32 value.
func (column *Column) AddInt32(value int32, DL, RL int64) {
if column.parquetType != parquet.Type_INT32 {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// AddInt64 - adds int64 value.
func (column *Column) AddInt64(value int64, DL, RL int64) {
if column.parquetType != parquet.Type_INT64 {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// AddFloat - adds float32 value.
func (column *Column) AddFloat(value float32, DL, RL int64) {
if column.parquetType != parquet.Type_FLOAT {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// AddDouble - adds float64 value.
func (column *Column) AddDouble(value float64, DL, RL int64) {
if column.parquetType != parquet.Type_DOUBLE {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// AddByteArray - adds byte array value.
func (column *Column) AddByteArray(value []byte, DL, RL int64) {
if column.parquetType != parquet.Type_BYTE_ARRAY {
panic(fmt.Errorf("expected %v value", column.parquetType))
}
column.add(value, DL, RL)
}
// Merge - merges columns.
func (column *Column) Merge(column2 *Column) {
if column.parquetType != column2.parquetType {
panic(fmt.Errorf("merge differs in parquet type"))
}
column.values = append(column.values, column2.values...)
column.definitionLevels = append(column.definitionLevels, column2.definitionLevels...)
column.repetitionLevels = append(column.repetitionLevels, column2.repetitionLevels...)
column.rowCount += column2.rowCount
if column.maxBitWidth < column2.maxBitWidth {
column.maxBitWidth = column2.maxBitWidth
}
column.updateMinMaxValue(column2.minValue)
column.updateMinMaxValue(column2.maxValue)
}
func (column *Column) String() string {
var strs []string
strs = append(strs, fmt.Sprintf("parquetType: %v", column.parquetType))
strs = append(strs, fmt.Sprintf("values: %v", column.values))
strs = append(strs, fmt.Sprintf("definitionLevels: %v", column.definitionLevels))
strs = append(strs, fmt.Sprintf("repetitionLevels: %v", column.repetitionLevels))
strs = append(strs, fmt.Sprintf("rowCount: %v", column.rowCount))
strs = append(strs, fmt.Sprintf("maxBitWidth: %v", column.maxBitWidth))
strs = append(strs, fmt.Sprintf("minValue: %v", column.minValue))
strs = append(strs, fmt.Sprintf("maxValue: %v", column.maxValue))
return "{" + strings.Join(strs, ", ") + "}"
}
func (column *Column) encodeValue(value interface{}, element *schema.Element) []byte {
if value == nil {
return nil
}
valueData := encoding.PlainEncode(common.ToSliceValue([]interface{}{value}, column.parquetType), column.parquetType)
if column.parquetType == parquet.Type_BYTE_ARRAY && element.ConvertedType != nil {
switch *element.ConvertedType {
case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
valueData = valueData[4:]
}
}
return valueData
}
func (column *Column) toDataPageV2(element *schema.Element, parquetEncoding parquet.Encoding) *ColumnChunk {
var definedValues []interface{}
for _, value := range column.values {
if value != nil {
definedValues = append(definedValues, value)
}
}
var encodedData []byte
switch parquetEncoding {
case parquet.Encoding_PLAIN:
encodedData = encoding.PlainEncode(common.ToSliceValue(definedValues, column.parquetType), column.parquetType)
case parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY:
var bytesSlices [][]byte
for _, value := range column.values {
bytesSlices = append(bytesSlices, value.([]byte))
}
encodedData = encoding.DeltaLengthByteArrayEncode(bytesSlices)
}
compressionType := parquet.CompressionCodec_SNAPPY
if element.CompressionType != nil {
compressionType = *element.CompressionType
}
compressedData, err := common.Compress(compressionType, encodedData)
if err != nil {
panic(err)
}
DLData := encoding.RLEBitPackedHybridEncode(
column.definitionLevels,
common.BitWidth(uint64(element.MaxDefinitionLevel)),
parquet.Type_INT64,
)
RLData := encoding.RLEBitPackedHybridEncode(
column.repetitionLevels,
common.BitWidth(uint64(element.MaxRepetitionLevel)),
parquet.Type_INT64,
)
pageHeader := parquet.NewPageHeader()
pageHeader.Type = parquet.PageType_DATA_PAGE_V2
pageHeader.CompressedPageSize = int32(len(compressedData) + len(DLData) + len(RLData))
pageHeader.UncompressedPageSize = int32(len(encodedData) + len(DLData) + len(RLData))
pageHeader.DataPageHeaderV2 = parquet.NewDataPageHeaderV2()
pageHeader.DataPageHeaderV2.NumValues = int32(len(column.values))
pageHeader.DataPageHeaderV2.NumNulls = int32(len(column.values) - len(definedValues))
pageHeader.DataPageHeaderV2.NumRows = column.rowCount
pageHeader.DataPageHeaderV2.Encoding = parquetEncoding
pageHeader.DataPageHeaderV2.DefinitionLevelsByteLength = int32(len(DLData))
pageHeader.DataPageHeaderV2.RepetitionLevelsByteLength = int32(len(RLData))
pageHeader.DataPageHeaderV2.IsCompressed = true
pageHeader.DataPageHeaderV2.Statistics = parquet.NewStatistics()
pageHeader.DataPageHeaderV2.Statistics.Min = column.encodeValue(column.minValue, element)
pageHeader.DataPageHeaderV2.Statistics.Max = column.encodeValue(column.maxValue, element)
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
rawData, err := ts.Write(context.TODO(), pageHeader)
if err != nil {
panic(err)
}
rawData = append(rawData, RLData...)
rawData = append(rawData, DLData...)
rawData = append(rawData, compressedData...)
metadata := parquet.NewColumnMetaData()
metadata.Type = column.parquetType
metadata.Encodings = []parquet.Encoding{
parquet.Encoding_PLAIN,
parquet.Encoding_RLE,
parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY,
}
metadata.Codec = compressionType
metadata.NumValues = int64(pageHeader.DataPageHeaderV2.NumValues)
metadata.TotalCompressedSize = int64(len(rawData))
metadata.TotalUncompressedSize = int64(pageHeader.UncompressedPageSize) + int64(len(rawData)) - int64(pageHeader.CompressedPageSize)
metadata.PathInSchema = strings.Split(element.PathInSchema, ".")
metadata.Statistics = parquet.NewStatistics()
metadata.Statistics.Min = pageHeader.DataPageHeaderV2.Statistics.Min
metadata.Statistics.Max = pageHeader.DataPageHeaderV2.Statistics.Max
chunk := new(ColumnChunk)
chunk.ColumnChunk.MetaData = metadata
chunk.dataPageLen = int64(len(rawData))
chunk.dataLen = int64(len(rawData))
chunk.data = rawData
return chunk
}
func (column *Column) toRLEDictPage(element *schema.Element) *ColumnChunk {
dictPageData, dataPageData, dictValueCount, indexBitWidth := encoding.RLEDictEncode(column.values, column.parquetType, column.maxBitWidth)
compressionType := parquet.CompressionCodec_SNAPPY
if element.CompressionType != nil {
compressionType = *element.CompressionType
}
compressedData, err := common.Compress(compressionType, dictPageData)
if err != nil {
panic(err)
}
dictPageHeader := parquet.NewPageHeader()
dictPageHeader.Type = parquet.PageType_DICTIONARY_PAGE
dictPageHeader.CompressedPageSize = int32(len(compressedData))
dictPageHeader.UncompressedPageSize = int32(len(dictPageData))
dictPageHeader.DictionaryPageHeader = parquet.NewDictionaryPageHeader()
dictPageHeader.DictionaryPageHeader.NumValues = dictValueCount
dictPageHeader.DictionaryPageHeader.Encoding = parquet.Encoding_PLAIN
ts := thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
dictPageRawData, err := ts.Write(context.TODO(), dictPageHeader)
if err != nil {
panic(err)
}
dictPageRawData = append(dictPageRawData, compressedData...)
RLData := encoding.RLEBitPackedHybridEncode(
column.repetitionLevels,
common.BitWidth(uint64(element.MaxRepetitionLevel)),
parquet.Type_INT64,
)
encodedData := RLData
DLData := encoding.RLEBitPackedHybridEncode(
column.definitionLevels,
common.BitWidth(uint64(element.MaxDefinitionLevel)),
parquet.Type_INT64,
)
encodedData = append(encodedData, DLData...)
encodedData = append(encodedData, indexBitWidth)
encodedData = append(encodedData, dataPageData...)
compressedData, err = common.Compress(compressionType, encodedData)
if err != nil {
panic(err)
}
dataPageHeader := parquet.NewPageHeader()
dataPageHeader.Type = parquet.PageType_DATA_PAGE
dataPageHeader.CompressedPageSize = int32(len(compressedData))
dataPageHeader.UncompressedPageSize = int32(len(encodedData))
dataPageHeader.DataPageHeader = parquet.NewDataPageHeader()
dataPageHeader.DataPageHeader.NumValues = int32(len(column.values))
dataPageHeader.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE
dataPageHeader.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE
dataPageHeader.DataPageHeader.Encoding = parquet.Encoding_RLE_DICTIONARY
ts = thrift.NewTSerializer()
ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
dataPageRawData, err := ts.Write(context.TODO(), dataPageHeader)
if err != nil {
panic(err)
}
dataPageRawData = append(dataPageRawData, compressedData...)
metadata := parquet.NewColumnMetaData()
metadata.Type = column.parquetType
metadata.Encodings = []parquet.Encoding{
parquet.Encoding_PLAIN,
parquet.Encoding_RLE,
parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY,
parquet.Encoding_RLE_DICTIONARY,
}
metadata.Codec = compressionType
metadata.NumValues = int64(dataPageHeader.DataPageHeader.NumValues)
metadata.TotalCompressedSize = int64(len(dictPageRawData)) + int64(len(dataPageRawData))
uncompressedSize := int64(dictPageHeader.UncompressedPageSize) + int64(len(dictPageData)) - int64(dictPageHeader.CompressedPageSize)
uncompressedSize += int64(dataPageHeader.UncompressedPageSize) + int64(len(dataPageData)) - int64(dataPageHeader.CompressedPageSize)
metadata.TotalUncompressedSize = uncompressedSize
metadata.PathInSchema = strings.Split(element.PathInSchema, ".")
metadata.Statistics = parquet.NewStatistics()
metadata.Statistics.Min = column.encodeValue(column.minValue, element)
metadata.Statistics.Max = column.encodeValue(column.maxValue, element)
chunk := new(ColumnChunk)
chunk.ColumnChunk.MetaData = metadata
chunk.isDictPage = true
chunk.dictPageLen = int64(len(dictPageRawData))
chunk.dataPageLen = int64(len(dataPageRawData))
chunk.dataLen = chunk.dictPageLen + chunk.dataPageLen
chunk.data = append(dictPageRawData, dataPageRawData...)
return chunk
}
// Encode an element.
func (column *Column) Encode(element *schema.Element) *ColumnChunk {
parquetEncoding := getDefaultEncoding(column.parquetType)
if element.Encoding != nil {
parquetEncoding = *element.Encoding
}
switch parquetEncoding {
case parquet.Encoding_PLAIN, parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY:
return column.toDataPageV2(element, parquetEncoding)
}
return column.toRLEDictPage(element)
}
// NewColumn - creates new column data
func NewColumn(parquetType parquet.Type) *Column {
switch parquetType {
case parquet.Type_BOOLEAN, parquet.Type_INT32, parquet.Type_INT64, parquet.Type_FLOAT, parquet.Type_DOUBLE, parquet.Type_BYTE_ARRAY:
default:
panic(fmt.Errorf("unsupported parquet type %v", parquetType))
}
return &Column{
parquetType: parquetType,
}
}
// UnmarshalJSON - decodes JSON data into map of Column.
func UnmarshalJSON(data []byte, tree *schema.Tree) (map[string]*Column, error) {
if !tree.ReadOnly() {
return nil, fmt.Errorf("tree must be read only")
}
inputValue, err := bytesToJSONValue(data)
if err != nil {
return nil, err
}
columnDataMap := make(map[string]*Column)
return populate(columnDataMap, inputValue, tree, 0)
}