/* * Minio Cloud Storage, (C) 2019 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package encoding import ( "fmt" "github.com/minio/minio/pkg/s3select/internal/parquet-go/common" "github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet" ) const ( blockSize = 128 miniBlockSize = 32 miniBlockCount = blockSize / miniBlockSize ) var deltaEncodeHeaderBytes []byte func init() { deltaEncodeHeaderBytes = varIntEncode(blockSize) deltaEncodeHeaderBytes = append(deltaEncodeHeaderBytes, varIntEncode(miniBlockCount)...) } // Supported Types: BOOLEAN, INT32, INT64 func bitPackedEncode(values interface{}, bitWidth uint64, withHeader bool, parquetType parquet.Type) []byte { var i64s []int64 switch parquetType { case parquet.Type_BOOLEAN: bs, ok := values.([]bool) if !ok { panic(fmt.Errorf("expected slice of bool")) } i64s = make([]int64, len(bs)) for i := range bs { if bs[i] { i64s[i] = 1 } } case parquet.Type_INT32: i32s, ok := values.([]int32) if !ok { panic(fmt.Errorf("expected slice of int32")) } for i := range i32s { i64s[i] = int64(i32s[i]) } case parquet.Type_INT64: var ok bool i64s, ok = values.([]int64) if !ok { panic(fmt.Errorf("expected slice of int64")) } default: panic(fmt.Errorf("%v parquet type unsupported", parquetType)) } if len(i64s) == 0 { return nil } var valueByte byte bitsSet := uint64(0) bitsNeeded := uint64(8) bitsToSet := bitWidth value := i64s[0] valueBytes := []byte{} for i := 0; i < len(i64s); { if bitsToSet >= bitsNeeded { valueByte |= byte(((value >> bitsSet) & ((1 << bitsNeeded) - 1)) << (8 - bitsNeeded)) valueBytes = append(valueBytes, valueByte) bitsToSet -= bitsNeeded bitsSet += bitsNeeded bitsNeeded = 8 valueByte = 0 if bitsToSet <= 0 && (i+1) < len(i64s) { i++ value = i64s[i] bitsToSet = bitWidth bitsSet = 0 } } else { valueByte |= byte((value >> bitsSet) << (8 - bitsNeeded)) i++ if i < len(i64s) { value = i64s[i] } bitsNeeded -= bitsToSet bitsToSet = bitWidth bitsSet = 0 } } if withHeader { header := uint64(((len(i64s) / 8) << 1) | 1) headerBytes := varIntEncode(header) return append(headerBytes, valueBytes...) } return valueBytes } func deltaEncodeInt32s(i32s []int32) (data []byte) { getValue := func(i32 int32) uint64 { return uint64((i32 >> 31) ^ (i32 << 1)) } data = append(data, deltaEncodeHeaderBytes...) data = append(data, varIntEncode(uint64(len(i32s)))...) data = append(data, varIntEncode(getValue(i32s[0]))...) for i := 1; i < len(i32s); { block := []int32{} minDelta := int32(0x7FFFFFFF) for ; i < len(i32s) && len(block) < blockSize; i++ { delta := i32s[i] - i32s[i-1] block = append(block, delta) if delta < minDelta { minDelta = delta } } for len(block) < blockSize { block = append(block, minDelta) } bitWidths := make([]byte, miniBlockCount) for j := 0; j < miniBlockCount; j++ { maxValue := int32(0) for k := j * miniBlockSize; k < (j+1)*miniBlockSize; k++ { block[k] -= minDelta if block[k] > maxValue { maxValue = block[k] } } bitWidths[j] = byte(common.BitWidth(uint64(maxValue))) } minDeltaZigZag := getValue(minDelta) data = append(data, varIntEncode(minDeltaZigZag)...) data = append(data, bitWidths...) for j := 0; j < miniBlockCount; j++ { bitPacked := bitPackedEncode( block[j*miniBlockSize:(j+1)*miniBlockSize], uint64(bitWidths[j]), false, parquet.Type_INT32, ) data = append(data, bitPacked...) } } return data } func deltaEncodeInt64s(i64s []int64) (data []byte) { getValue := func(i64 int64) uint64 { return uint64((i64 >> 63) ^ (i64 << 1)) } data = append(data, deltaEncodeHeaderBytes...) data = append(data, varIntEncode(uint64(len(i64s)))...) data = append(data, varIntEncode(getValue(i64s[0]))...) for i := 1; i < len(i64s); { block := []int64{} minDelta := int64(0x7FFFFFFFFFFFFFFF) for ; i < len(i64s) && len(block) < blockSize; i++ { delta := i64s[i] - i64s[i-1] block = append(block, delta) if delta < minDelta { minDelta = delta } } for len(block) < blockSize { block = append(block, minDelta) } bitWidths := make([]byte, miniBlockCount) for j := 0; j < miniBlockCount; j++ { maxValue := int64(0) for k := j * miniBlockSize; k < (j+1)*miniBlockSize; k++ { block[k] -= minDelta if block[k] > maxValue { maxValue = block[k] } } bitWidths[j] = byte(common.BitWidth(uint64(maxValue))) } minDeltaZigZag := getValue(minDelta) data = append(data, varIntEncode(minDeltaZigZag)...) data = append(data, bitWidths...) for j := 0; j < miniBlockCount; j++ { bitPacked := bitPackedEncode( block[j*miniBlockSize:(j+1)*miniBlockSize], uint64(bitWidths[j]), false, parquet.Type_INT64, ) data = append(data, bitPacked...) } } return data } // DeltaEncode encodes values specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5 // // Supported Types: INT32, INT64. func DeltaEncode(values interface{}, parquetType parquet.Type) []byte { switch parquetType { case parquet.Type_INT32: i32s, ok := values.([]int32) if !ok { panic(fmt.Errorf("expected slice of int32")) } return deltaEncodeInt32s(i32s) case parquet.Type_INT64: i64s, ok := values.([]int64) if !ok { panic(fmt.Errorf("expected slice of int64")) } return deltaEncodeInt64s(i64s) } panic(fmt.Errorf("%v parquet type unsupported", parquetType)) } // DeltaLengthByteArrayEncode encodes bytes slices specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-length-byte-array-delta_length_byte_array--6 // // Supported Types: BYTE_ARRAY func DeltaLengthByteArrayEncode(bytesSlices [][]byte) (data []byte) { lengths := make([]int32, len(bytesSlices)) for i, bytes := range bytesSlices { lengths[i] = int32(len(bytes)) } data = deltaEncodeInt32s(lengths) for _, bytes := range bytesSlices { data = append(data, []byte(bytes)...) } return data } // DeltaByteArrayEncode encodes sequence of strings values specified in https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-strings-delta_byte_array--7 // // Supported Types: BYTE_ARRAY func DeltaByteArrayEncode(bytesSlices [][]byte) (data []byte) { prefixLengths := make([]int32, len(bytesSlices)) suffixes := make([][]byte, len(bytesSlices)) var i, j int for i = 1; i < len(bytesSlices); i++ { for j = 0; j < len(bytesSlices[i-1]) && j < len(bytesSlices[i]); j++ { if bytesSlices[i-1][j] != bytesSlices[i][j] { break } } prefixLengths[i] = int32(j) suffixes[i] = bytesSlices[i][j:] } data = deltaEncodeInt32s(prefixLengths) return append(data, DeltaLengthByteArrayEncode(suffixes)...) }