297 lines
7.2 KiB
Go
Raw Normal View History

/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package encoding
import (
"fmt"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/common"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet"
)
const (
blockSize = 128
miniBlockSize = 32
miniBlockCount = blockSize / miniBlockSize
)
var deltaEncodeHeaderBytes []byte
func init() {
deltaEncodeHeaderBytes = varIntEncode(blockSize)
deltaEncodeHeaderBytes = append(deltaEncodeHeaderBytes, varIntEncode(miniBlockCount)...)
}
// Supported Types: BOOLEAN, INT32, INT64
func bitPackedEncode(values interface{}, bitWidth uint64, withHeader bool, parquetType parquet.Type) []byte {
var i64s []int64
switch parquetType {
case parquet.Type_BOOLEAN:
bs, ok := values.([]bool)
if !ok {
panic(fmt.Errorf("expected slice of bool"))
}
i64s = make([]int64, len(bs))
for i := range bs {
if bs[i] {
i64s[i] = 1
}
}
case parquet.Type_INT32:
i32s, ok := values.([]int32)
if !ok {
panic(fmt.Errorf("expected slice of int32"))
}
for i := range i32s {
i64s[i] = int64(i32s[i])
}
case parquet.Type_INT64:
var ok bool
i64s, ok = values.([]int64)
if !ok {
panic(fmt.Errorf("expected slice of int64"))
}
default:
panic(fmt.Errorf("%v parquet type unsupported", parquetType))
}
if len(i64s) == 0 {
return nil
}
var valueByte byte
bitsSet := uint64(0)
bitsNeeded := uint64(8)
bitsToSet := bitWidth
value := i64s[0]
valueBytes := []byte{}
for i := 0; i < len(i64s); {
if bitsToSet >= bitsNeeded {
valueByte |= byte(((value >> bitsSet) & ((1 << bitsNeeded) - 1)) << (8 - bitsNeeded))
valueBytes = append(valueBytes, valueByte)
bitsToSet -= bitsNeeded
bitsSet += bitsNeeded
bitsNeeded = 8
valueByte = 0
if bitsToSet <= 0 && (i+1) < len(i64s) {
i++
value = i64s[i]
bitsToSet = bitWidth
bitsSet = 0
}
} else {
valueByte |= byte((value >> bitsSet) << (8 - bitsNeeded))
i++
if i < len(i64s) {
value = i64s[i]
}
bitsNeeded -= bitsToSet
bitsToSet = bitWidth
bitsSet = 0
}
}
if withHeader {
header := uint64(((len(i64s) / 8) << 1) | 1)
headerBytes := varIntEncode(header)
return append(headerBytes, valueBytes...)
}
return valueBytes
}
func deltaEncodeInt32s(i32s []int32) (data []byte) {
getValue := func(i32 int32) uint64 {
return uint64((i32 >> 31) ^ (i32 << 1))
}
data = append(data, deltaEncodeHeaderBytes...)
data = append(data, varIntEncode(uint64(len(i32s)))...)
data = append(data, varIntEncode(getValue(i32s[0]))...)
for i := 1; i < len(i32s); {
block := []int32{}
minDelta := int32(0x7FFFFFFF)
for ; i < len(i32s) && len(block) < blockSize; i++ {
delta := i32s[i] - i32s[i-1]
block = append(block, delta)
if delta < minDelta {
minDelta = delta
}
}
for len(block) < blockSize {
block = append(block, minDelta)
}
bitWidths := make([]byte, miniBlockCount)
for j := 0; j < miniBlockCount; j++ {
maxValue := int32(0)
for k := j * miniBlockSize; k < (j+1)*miniBlockSize; k++ {
block[k] -= minDelta
if block[k] > maxValue {
maxValue = block[k]
}
}
bitWidths[j] = byte(common.BitWidth(uint64(maxValue)))
}
minDeltaZigZag := getValue(minDelta)
data = append(data, varIntEncode(minDeltaZigZag)...)
data = append(data, bitWidths...)
for j := 0; j < miniBlockCount; j++ {
bitPacked := bitPackedEncode(
block[j*miniBlockSize:(j+1)*miniBlockSize],
uint64(bitWidths[j]),
false,
parquet.Type_INT32,
)
data = append(data, bitPacked...)
}
}
return data
}
func deltaEncodeInt64s(i64s []int64) (data []byte) {
getValue := func(i64 int64) uint64 {
return uint64((i64 >> 63) ^ (i64 << 1))
}
data = append(data, deltaEncodeHeaderBytes...)
data = append(data, varIntEncode(uint64(len(i64s)))...)
data = append(data, varIntEncode(getValue(i64s[0]))...)
for i := 1; i < len(i64s); {
block := []int64{}
minDelta := int64(0x7FFFFFFFFFFFFFFF)
for ; i < len(i64s) && len(block) < blockSize; i++ {
delta := i64s[i] - i64s[i-1]
block = append(block, delta)
if delta < minDelta {
minDelta = delta
}
}
for len(block) < blockSize {
block = append(block, minDelta)
}
bitWidths := make([]byte, miniBlockCount)
for j := 0; j < miniBlockCount; j++ {
maxValue := int64(0)
for k := j * miniBlockSize; k < (j+1)*miniBlockSize; k++ {
block[k] -= minDelta
if block[k] > maxValue {
maxValue = block[k]
}
}
bitWidths[j] = byte(common.BitWidth(uint64(maxValue)))
}
minDeltaZigZag := getValue(minDelta)
data = append(data, varIntEncode(minDeltaZigZag)...)
data = append(data, bitWidths...)
for j := 0; j < miniBlockCount; j++ {
bitPacked := bitPackedEncode(
block[j*miniBlockSize:(j+1)*miniBlockSize],
uint64(bitWidths[j]),
false,
parquet.Type_INT64,
)
data = append(data, bitPacked...)
}
}
return data
}
// DeltaEncode encodes values specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5
//
// Supported Types: INT32, INT64.
func DeltaEncode(values interface{}, parquetType parquet.Type) []byte {
switch parquetType {
case parquet.Type_INT32:
i32s, ok := values.([]int32)
if !ok {
panic(fmt.Errorf("expected slice of int32"))
}
return deltaEncodeInt32s(i32s)
case parquet.Type_INT64:
i64s, ok := values.([]int64)
if !ok {
panic(fmt.Errorf("expected slice of int64"))
}
return deltaEncodeInt64s(i64s)
}
panic(fmt.Errorf("%v parquet type unsupported", parquetType))
}
// DeltaLengthByteArrayEncode encodes bytes slices specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-length-byte-array-delta_length_byte_array--6
//
// Supported Types: BYTE_ARRAY
func DeltaLengthByteArrayEncode(bytesSlices [][]byte) (data []byte) {
lengths := make([]int32, len(bytesSlices))
for i, bytes := range bytesSlices {
lengths[i] = int32(len(bytes))
}
data = deltaEncodeInt32s(lengths)
for _, bytes := range bytesSlices {
data = append(data, []byte(bytes)...)
}
return data
}
// DeltaByteArrayEncode encodes sequence of strings values specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7
//
// Supported Types: BYTE_ARRAY
func DeltaByteArrayEncode(bytesSlices [][]byte) (data []byte) {
prefixLengths := make([]int32, len(bytesSlices))
suffixes := make([][]byte, len(bytesSlices))
var i, j int
for i = 1; i < len(bytesSlices); i++ {
for j = 0; j < len(bytesSlices[i-1]) && j < len(bytesSlices[i]); j++ {
if bytesSlices[i-1][j] != bytesSlices[i][j] {
break
}
}
prefixLengths[i] = int32(j)
suffixes[i] = bytesSlices[i][j:]
}
data = deltaEncodeInt32s(prefixLengths)
return append(data, DeltaLengthByteArrayEncode(suffixes)...)
}