mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix owhanging crashes in parquet S3 select (#9921)
This commit is contained in:
parent
5089a7167d
commit
2e338e84cb
@ -17,6 +17,7 @@
|
||||
package parquet
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
@ -34,6 +35,9 @@ func getColumns(
|
||||
nameIndexMap := make(map[string]int)
|
||||
for colIndex, columnChunk := range rowGroup.GetColumns() {
|
||||
meta := columnChunk.GetMetaData()
|
||||
if meta == nil {
|
||||
return nil, errors.New("parquet: column metadata missing")
|
||||
}
|
||||
columnName := strings.Join(meta.GetPathInSchema(), ".")
|
||||
if columnNames != nil && !columnNames.Contains(columnName) {
|
||||
continue
|
||||
@ -50,7 +54,9 @@ func getColumns(
|
||||
}
|
||||
|
||||
size := meta.GetTotalCompressedSize()
|
||||
|
||||
if size < 0 {
|
||||
return nil, errors.New("parquet: negative compressed size")
|
||||
}
|
||||
rc, err := getReaderFunc(offset, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -18,6 +18,7 @@ package parquet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
@ -266,8 +267,11 @@ func readRLE(reader *bytes.Reader, header, bitWidth uint64) (result []int64, err
|
||||
}
|
||||
|
||||
val := int64(bytesToUint32(data))
|
||||
|
||||
count := header >> 1
|
||||
if count > math.MaxInt64/8 {
|
||||
// 8 bytes/element.
|
||||
return nil, errors.New("parquet: size too large")
|
||||
}
|
||||
result = make([]int64, count)
|
||||
for i := range result {
|
||||
result[i] = val
|
||||
@ -283,6 +287,9 @@ func readRLEBitPackedHybrid(reader *bytes.Reader, length, bitWidth uint64) (resu
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if i32s[0] < 0 {
|
||||
return nil, errors.New("parquet: negative RLEBitPackedHybrid length")
|
||||
}
|
||||
length = uint64(i32s[0])
|
||||
}
|
||||
|
||||
@ -338,7 +345,9 @@ func readDeltaBinaryPackedInt(reader *bytes.Reader) (result []int64, err error)
|
||||
|
||||
v := int64(firstValueZigZag>>1) ^ (-int64(firstValueZigZag & 1))
|
||||
result = append(result, v)
|
||||
|
||||
if numMiniblocksInBlock == 0 {
|
||||
return nil, errors.New("parquet: zero mini blocks in block")
|
||||
}
|
||||
numValuesInMiniBlock := blockSize / numMiniblocksInBlock
|
||||
|
||||
bitWidths := make([]uint64, numMiniblocksInBlock)
|
||||
@ -435,7 +444,9 @@ func readDataPageValues(
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
||||
return nil, -1, errors.New("parquet: value out of range")
|
||||
}
|
||||
return i64s[:count], parquet.Type_INT64, nil
|
||||
|
||||
case parquet.Encoding_RLE:
|
||||
@ -444,6 +455,9 @@ func readDataPageValues(
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
||||
return nil, -1, errors.New("parquet: value out of range")
|
||||
}
|
||||
i64s = i64s[:count]
|
||||
|
||||
if dataType == parquet.Type_INT32 {
|
||||
@ -461,6 +475,9 @@ func readDataPageValues(
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
if len(i64s) < int(count) || count > math.MaxInt64/8 {
|
||||
return nil, -1, errors.New("parquet: value out of range")
|
||||
}
|
||||
i64s = i64s[:count]
|
||||
|
||||
if dataType == parquet.Type_INT32 {
|
||||
@ -474,6 +491,9 @@ func readDataPageValues(
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
if len(byteSlices) < int(count) || count > math.MaxInt64/24 {
|
||||
return nil, -1, errors.New("parquet: value out of range")
|
||||
}
|
||||
|
||||
return byteSlices[:count], parquet.Type_FIXED_LEN_BYTE_ARRAY, nil
|
||||
|
||||
@ -482,6 +502,9 @@ func readDataPageValues(
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
if len(byteSlices) < int(count) || count > math.MaxInt64/24 {
|
||||
return nil, -1, errors.New("parquet: value out of range")
|
||||
}
|
||||
|
||||
return byteSlices[:count], parquet.Type_FIXED_LEN_BYTE_ARRAY, nil
|
||||
}
|
||||
|
@ -19,7 +19,9 @@ package parquet
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"git.apache.org/thrift.git/lib/go/thrift"
|
||||
@ -101,6 +103,9 @@ func readPage(
|
||||
var repLevelsBuf, defLevelsBuf []byte
|
||||
|
||||
if pageHeader.GetType() == parquet.PageType_DATA_PAGE_V2 {
|
||||
if pageHeader.DataPageHeaderV2 == nil {
|
||||
return nil, errors.New("parquet: Header not set")
|
||||
}
|
||||
repLevelsLen = pageHeader.DataPageHeaderV2.GetRepetitionLevelsByteLength()
|
||||
repLevelsBuf = make([]byte, repLevelsLen)
|
||||
if _, err = thriftReader.Read(repLevelsBuf); err != nil {
|
||||
@ -113,8 +118,11 @@ func readPage(
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
dataBuf := make([]byte, pageHeader.GetCompressedPageSize()-repLevelsLen-defLevelsLen)
|
||||
dbLen := pageHeader.GetCompressedPageSize() - repLevelsLen - defLevelsLen
|
||||
if dbLen < 0 {
|
||||
return nil, errors.New("parquet: negative data length")
|
||||
}
|
||||
dataBuf := make([]byte, dbLen)
|
||||
if _, err = thriftReader.Read(dataBuf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -146,7 +154,9 @@ func readPage(
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
if metadata == nil {
|
||||
return nil, 0, 0, errors.New("parquet: metadata not set")
|
||||
}
|
||||
path := append([]string{}, metadata.GetPathInSchema()...)
|
||||
|
||||
bytesReader := bytes.NewReader(buf)
|
||||
@ -160,6 +170,9 @@ func readPage(
|
||||
page.Header = pageHeader
|
||||
table := new(table)
|
||||
table.Path = path
|
||||
if pageHeader.DictionaryPageHeader == nil {
|
||||
return nil, 0, 0, errors.New("parquet: dictionary not set")
|
||||
}
|
||||
values, err := readValues(bytesReader, metadata.GetType(),
|
||||
uint64(pageHeader.DictionaryPageHeader.GetNumValues()), 0)
|
||||
if err != nil {
|
||||
@ -183,9 +196,15 @@ func readPage(
|
||||
var encodingType parquet.Encoding
|
||||
|
||||
if pageHeader.GetType() == parquet.PageType_DATA_PAGE {
|
||||
if pageHeader.DataPageHeader == nil {
|
||||
return nil, 0, 0, errors.New("parquet: Header not set")
|
||||
}
|
||||
numValues = uint64(pageHeader.DataPageHeader.GetNumValues())
|
||||
encodingType = pageHeader.DataPageHeader.GetEncoding()
|
||||
} else {
|
||||
if pageHeader.DataPageHeaderV2 == nil {
|
||||
return nil, 0, 0, errors.New("parquet: Header not set")
|
||||
}
|
||||
numValues = uint64(pageHeader.DataPageHeaderV2.GetNumValues())
|
||||
encodingType = pageHeader.DataPageHeaderV2.GetEncoding()
|
||||
}
|
||||
@ -198,10 +217,13 @@ func readPage(
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
if repetitionLevels = values.([]int64); uint64(len(repetitionLevels)) > numValues {
|
||||
if repetitionLevels = values.([]int64); len(repetitionLevels) > int(numValues) && int(numValues) >= 0 {
|
||||
repetitionLevels = repetitionLevels[:numValues]
|
||||
}
|
||||
} else {
|
||||
if numValues > math.MaxInt64/8 {
|
||||
return nil, 0, 0, errors.New("parquet: numvalues too large")
|
||||
}
|
||||
repetitionLevels = make([]int64, numValues)
|
||||
}
|
||||
|
||||
@ -212,10 +234,16 @@ func readPage(
|
||||
if err != nil {
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
if definitionLevels = values.([]int64); uint64(len(definitionLevels)) > numValues {
|
||||
if numValues > math.MaxInt64/8 {
|
||||
return nil, 0, 0, errors.New("parquet: numvalues too large")
|
||||
}
|
||||
if definitionLevels = values.([]int64); len(definitionLevels) > int(numValues) {
|
||||
definitionLevels = definitionLevels[:numValues]
|
||||
}
|
||||
} else {
|
||||
if numValues > math.MaxInt64/8 {
|
||||
return nil, 0, 0, errors.New("parquet: numvalues too large")
|
||||
}
|
||||
definitionLevels = make([]int64, numValues)
|
||||
}
|
||||
|
||||
@ -308,7 +336,10 @@ func (page *page) decode(dictPage *page) {
|
||||
|
||||
for i := 0; i < len(page.DataTable.Values); i++ {
|
||||
if page.DataTable.Values[i] != nil {
|
||||
index := page.DataTable.Values[i].(int64)
|
||||
index, ok := page.DataTable.Values[i].(int64)
|
||||
if !ok || int(index) >= len(dictPage.DataTable.Values) {
|
||||
return
|
||||
}
|
||||
page.DataTable.Values[i] = dictPage.DataTable.Values[index]
|
||||
}
|
||||
}
|
||||
@ -324,7 +355,9 @@ func (page *page) getRLDLFromRawData(columnNameIndexMap map[string]int, schemaEl
|
||||
if pageType == parquet.PageType_DATA_PAGE_V2 {
|
||||
var repLevelsLen, defLevelsLen int32
|
||||
var repLevelsBuf, defLevelsBuf []byte
|
||||
|
||||
if page.Header.DataPageHeaderV2 == nil {
|
||||
return 0, 0, errors.New("parquet: Header not set")
|
||||
}
|
||||
repLevelsLen = page.Header.DataPageHeaderV2.GetRepetitionLevelsByteLength()
|
||||
repLevelsBuf = make([]byte, repLevelsLen)
|
||||
if _, err = bytesReader.Read(repLevelsBuf); err != nil {
|
||||
@ -375,8 +408,14 @@ func (page *page) getRLDLFromRawData(columnNameIndexMap map[string]int, schemaEl
|
||||
case parquet.PageType_DATA_PAGE, parquet.PageType_DATA_PAGE_V2:
|
||||
var numValues uint64
|
||||
if pageType == parquet.PageType_DATA_PAGE {
|
||||
if page.Header.DataPageHeader == nil {
|
||||
return 0, 0, errors.New("parquet: Header not set")
|
||||
}
|
||||
numValues = uint64(page.Header.DataPageHeader.GetNumValues())
|
||||
} else {
|
||||
if page.Header.DataPageHeaderV2 == nil {
|
||||
return 0, 0, errors.New("parquet: Header not set")
|
||||
}
|
||||
numValues = uint64(page.Header.DataPageHeaderV2.GetNumValues())
|
||||
}
|
||||
|
||||
@ -445,6 +484,9 @@ func (page *page) getValueFromRawData(columnNameIndexMap map[string]int, schemaE
|
||||
case parquet.PageType_DICTIONARY_PAGE:
|
||||
bytesReader := bytes.NewReader(page.RawData)
|
||||
var values interface{}
|
||||
if page.Header.DictionaryPageHeader == nil {
|
||||
return errors.New("parquet: dictionary not set")
|
||||
}
|
||||
values, err = readValues(bytesReader, page.DataType,
|
||||
uint64(page.Header.DictionaryPageHeader.GetNumValues()), 0)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user