mirror of
https://github.com/minio/minio.git
synced 2025-02-03 09:55:59 -05:00
fix: missing data on multiple columns reading parquet (#11499)
fixes #11413
This commit is contained in:
parent
5a18d437ce
commit
f53d1de87f
@ -21,6 +21,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -108,24 +109,39 @@ func readPage(
|
|||||||
}
|
}
|
||||||
repLevelsLen = pageHeader.DataPageHeaderV2.GetRepetitionLevelsByteLength()
|
repLevelsLen = pageHeader.DataPageHeaderV2.GetRepetitionLevelsByteLength()
|
||||||
repLevelsBuf = make([]byte, repLevelsLen)
|
repLevelsBuf = make([]byte, repLevelsLen)
|
||||||
if _, err = thriftReader.Read(repLevelsBuf); err != nil {
|
|
||||||
|
n, err := io.ReadFull(thriftReader, repLevelsBuf)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if n != int(repLevelsLen) {
|
||||||
|
return nil, fmt.Errorf("expected parquet header repetition levels %d, got %d", repLevelsLen, n)
|
||||||
|
}
|
||||||
|
|
||||||
defLevelsLen = pageHeader.DataPageHeaderV2.GetDefinitionLevelsByteLength()
|
defLevelsLen = pageHeader.DataPageHeaderV2.GetDefinitionLevelsByteLength()
|
||||||
defLevelsBuf = make([]byte, defLevelsLen)
|
defLevelsBuf = make([]byte, defLevelsLen)
|
||||||
if _, err = thriftReader.Read(defLevelsBuf); err != nil {
|
|
||||||
|
n, err = io.ReadFull(thriftReader, defLevelsBuf)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if n != int(defLevelsLen) {
|
||||||
|
return nil, fmt.Errorf("expected parquet header definition levels %d, got %d", defLevelsLen, n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dbLen := pageHeader.GetCompressedPageSize() - repLevelsLen - defLevelsLen
|
dbLen := pageHeader.GetCompressedPageSize() - repLevelsLen - defLevelsLen
|
||||||
if dbLen < 0 {
|
if dbLen < 0 {
|
||||||
return nil, errors.New("parquet: negative data length")
|
return nil, errors.New("parquet: negative data length")
|
||||||
}
|
}
|
||||||
|
|
||||||
dataBuf := make([]byte, dbLen)
|
dataBuf := make([]byte, dbLen)
|
||||||
if _, err = thriftReader.Read(dataBuf); err != nil {
|
n, err := io.ReadFull(thriftReader, dataBuf)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if n != int(dbLen) {
|
||||||
|
return nil, fmt.Errorf("expected parquet data buffer %d, got %d", dbLen, n)
|
||||||
|
}
|
||||||
|
|
||||||
if dataBuf, err = compressionCodec(metadata.GetCodec()).uncompress(dataBuf); err != nil {
|
if dataBuf, err = compressionCodec(metadata.GetCodec()).uncompress(dataBuf); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user