/* * Minio Cloud Storage, (C) 2019 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package parquet import ( "bytes" "encoding/binary" "errors" "fmt" "math" "github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet" ) func boolsToBytes(bs []bool) []byte { size := (len(bs) + 7) / 8 result := make([]byte, size) for i := range bs { if bs[i] { result[i/8] |= 1 << uint32(i%8) } } return result } func int32sToBytes(i32s []int32) []byte { buf := make([]byte, 4*len(i32s)) for i, i32 := range i32s { binary.LittleEndian.PutUint32(buf[i*4:], uint32(i32)) } return buf } func int64sToBytes(i64s []int64) []byte { buf := make([]byte, 8*len(i64s)) for i, i64 := range i64s { binary.LittleEndian.PutUint64(buf[i*8:], uint64(i64)) } return buf } func float32sToBytes(f32s []float32) []byte { buf := make([]byte, 4*len(f32s)) for i, f32 := range f32s { binary.LittleEndian.PutUint32(buf[i*4:], math.Float32bits(f32)) } return buf } func float64sToBytes(f64s []float64) []byte { buf := make([]byte, 8*len(f64s)) for i, f64 := range f64s { binary.LittleEndian.PutUint64(buf[i*8:], math.Float64bits(f64)) } return buf } func byteSlicesToBytes(byteSlices [][]byte) []byte { buf := new(bytes.Buffer) for _, s := range byteSlices { if err := binary.Write(buf, binary.LittleEndian, uint32(len(s))); err != nil { panic(err) } if _, err := buf.Write(s); err != nil { panic(err) } } return buf.Bytes() } func byteArraysToBytes(arrayList [][]byte) []byte { buf := new(bytes.Buffer) arrayLen := -1 for _, array := range arrayList { if arrayLen != -1 && len(array) != arrayLen { panic(errors.New("array list does not have same length")) } arrayLen = len(array) if _, err := buf.Write(array); err != nil { panic(err) } } return buf.Bytes() } func int96sToBytes(i96s [][]byte) []byte { return byteArraysToBytes(i96s) } func valuesToBytes(values interface{}, dataType parquet.Type) []byte { switch dataType { case parquet.Type_BOOLEAN: return boolsToBytes(values.([]bool)) case parquet.Type_INT32: return int32sToBytes(values.([]int32)) case parquet.Type_INT64: return int64sToBytes(values.([]int64)) case parquet.Type_INT96: return int96sToBytes(values.([][]byte)) case parquet.Type_FLOAT: return float32sToBytes(values.([]float32)) case parquet.Type_DOUBLE: return float64sToBytes(values.([]float64)) case parquet.Type_BYTE_ARRAY: return byteSlicesToBytes(values.([][]byte)) case parquet.Type_FIXED_LEN_BYTE_ARRAY: return byteArraysToBytes(values.([][]byte)) } return []byte{} } func valueToBytes(value interface{}, dataType parquet.Type) []byte { var values interface{} switch dataType { case parquet.Type_BOOLEAN: values = []bool{value.(bool)} case parquet.Type_INT32: values = []int32{value.(int32)} case parquet.Type_INT64: values = []int64{value.(int64)} case parquet.Type_INT96: values = [][]byte{value.([]byte)} case parquet.Type_FLOAT: values = []float32{value.(float32)} case parquet.Type_DOUBLE: values = []float64{value.(float64)} case parquet.Type_BYTE_ARRAY, parquet.Type_FIXED_LEN_BYTE_ARRAY: values = [][]byte{value.([]byte)} } return valuesToBytes(values, dataType) } func unsignedVarIntToBytes(ui64 uint64) []byte { size := (getBitWidth(ui64) + 6) / 7 if size == 0 { return []byte{0} } buf := make([]byte, size) for i := uint64(0); i < size; i++ { buf[i] = byte(ui64&0x7F) | 0x80 ui64 >>= 7 } buf[size-1] &= 0x7F return buf } func valuesToRLEBytes(values interface{}, bitWidth int32, valueType parquet.Type) []byte { vals := valuesToInterfaces(values, valueType) result := []byte{} j := 0 for i := 0; i < len(vals); i = j { for j = i + 1; j < len(vals) && vals[i] == vals[j]; j++ { } headerBytes := unsignedVarIntToBytes(uint64((j - i) << 1)) result = append(result, headerBytes...) valBytes := valueToBytes(vals[i], valueType) byteCount := (bitWidth + 7) / 8 result = append(result, valBytes[:byteCount]...) } return result } func valuesToRLEBitPackedHybridBytes(values interface{}, bitWidth int32, dataType parquet.Type) []byte { rleBytes := valuesToRLEBytes(values, bitWidth, dataType) lenBytes := valueToBytes(int32(len(rleBytes)), parquet.Type_INT32) return append(lenBytes, rleBytes...) } func valuesToBitPackedBytes(values interface{}, bitWidth int64, withHeader bool, dataType parquet.Type) []byte { var i64s []int64 switch dataType { case parquet.Type_BOOLEAN: bs := values.([]bool) i64s = make([]int64, len(bs)) for i := range bs { if bs[i] { i64s[i] = 1 } } case parquet.Type_INT32: i32s := values.([]int32) i64s = make([]int64, len(i32s)) for i := range i32s { i64s[i] = int64(i32s[i]) } case parquet.Type_INT64: i64s = values.([]int64) default: panic(fmt.Errorf("data type %v is not supported for bit packing", dataType)) } if len(i64s) == 0 { return nil } var valueByte byte bitsSet := uint64(0) bitsNeeded := uint64(8) bitsToSet := uint64(bitWidth) value := i64s[0] valueBytes := []byte{} for i := 0; i < len(i64s); { if bitsToSet >= bitsNeeded { valueByte |= byte(((value >> bitsSet) & ((1 << bitsNeeded) - 1)) << (8 - bitsNeeded)) valueBytes = append(valueBytes, valueByte) bitsToSet -= bitsNeeded bitsSet += bitsNeeded bitsNeeded = 8 valueByte = 0 if bitsToSet <= 0 && (i+1) < len(i64s) { i++ value = i64s[i] bitsToSet = uint64(bitWidth) bitsSet = 0 } } else { valueByte |= byte((value >> bitsSet) << (8 - bitsNeeded)) i++ if i < len(i64s) { value = i64s[i] } bitsNeeded -= bitsToSet bitsToSet = uint64(bitWidth) bitsSet = 0 } } if withHeader { header := uint64(((len(i64s) / 8) << 1) | 1) headerBytes := unsignedVarIntToBytes(header) return append(headerBytes, valueBytes...) } return valueBytes } const ( blockSize = 128 subBlockSize = 32 subBlockCount = blockSize / subBlockSize ) var ( blockSizeBytes = unsignedVarIntToBytes(blockSize) subBlockCountBytes = unsignedVarIntToBytes(subBlockCount) ) func int32ToDeltaBytes(i32s []int32) []byte { getValue := func(i32 int32) uint64 { return uint64((i32 >> 31) ^ (i32 << 1)) } result := append([]byte{}, blockSizeBytes...) result = append(result, subBlockCountBytes...) result = append(result, unsignedVarIntToBytes(uint64(len(i32s)))...) result = append(result, unsignedVarIntToBytes(getValue(i32s[0]))...) for i := 1; i < len(i32s); { block := []int32{} minDelta := int32(0x7FFFFFFF) for ; i < len(i32s) && len(block) < blockSize; i++ { delta := i32s[i] - i32s[i-1] block = append(block, delta) if delta < minDelta { minDelta = delta } } for len(block) < blockSize { block = append(block, minDelta) } bitWidths := make([]byte, subBlockCount) for j := 0; j < subBlockCount; j++ { maxValue := int32(0) for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ { block[k] -= minDelta if block[k] > maxValue { maxValue = block[k] } } bitWidths[j] = byte(getBitWidth(uint64(maxValue))) } minDeltaZigZag := getValue(minDelta) result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...) result = append(result, bitWidths...) for j := 0; j < subBlockCount; j++ { bitPacked := valuesToBitPackedBytes( block[j*subBlockSize:(j+1)*subBlockSize], int64(bitWidths[j]), false, parquet.Type_INT32, ) result = append(result, bitPacked...) } } return result } func int64ToDeltaBytes(i64s []int64) []byte { getValue := func(i64 int64) uint64 { return uint64((i64 >> 63) ^ (i64 << 1)) } result := append([]byte{}, blockSizeBytes...) result = append(result, subBlockCountBytes...) result = append(result, unsignedVarIntToBytes(uint64(len(i64s)))...) result = append(result, unsignedVarIntToBytes(getValue(i64s[0]))...) for i := 1; i < len(i64s); { block := []int64{} minDelta := int64(0x7FFFFFFFFFFFFFFF) for ; i < len(i64s) && len(block) < blockSize; i++ { delta := i64s[i] - i64s[i-1] block = append(block, delta) if delta < minDelta { minDelta = delta } } for len(block) < blockSize { block = append(block, minDelta) } bitWidths := make([]byte, subBlockCount) for j := 0; j < subBlockCount; j++ { maxValue := int64(0) for k := j * subBlockSize; k < (j+1)*subBlockSize; k++ { block[k] -= minDelta if block[k] > maxValue { maxValue = block[k] } } bitWidths[j] = byte(getBitWidth(uint64(maxValue))) } minDeltaZigZag := getValue(minDelta) result = append(result, unsignedVarIntToBytes(minDeltaZigZag)...) result = append(result, bitWidths...) for j := 0; j < subBlockCount; j++ { bitPacked := valuesToBitPackedBytes( block[j*subBlockSize:(j+1)*subBlockSize], int64(bitWidths[j]), false, parquet.Type_INT64, ) result = append(result, bitPacked...) } } return result } func valuesToDeltaBytes(values interface{}, dataType parquet.Type) []byte { switch dataType { case parquet.Type_INT32: return int32ToDeltaBytes(values.([]int32)) case parquet.Type_INT64: return int64ToDeltaBytes(values.([]int64)) } return nil } func stringsToDeltaLengthByteArrayBytes(strs []string) []byte { lengths := make([]int32, len(strs)) for i, s := range strs { lengths[i] = int32(len(s)) } result := int32ToDeltaBytes(lengths) for _, s := range strs { result = append(result, []byte(s)...) } return result } func stringsToDeltaByteArrayBytes(strs []string) []byte { prefixLengths := make([]int32, len(strs)) suffixes := make([]string, len(strs)) var i, j int for i = 1; i < len(strs); i++ { for j = 0; j < len(strs[i-1]) && j < len(strs[i]); j++ { if strs[i-1][j] != strs[i][j] { break } } prefixLengths[i] = int32(j) suffixes[i] = strs[i][j:] } result := int32ToDeltaBytes(prefixLengths) return append(result, stringsToDeltaLengthByteArrayBytes(suffixes)...) } func encodeValues(values interface{}, dataType parquet.Type, encoding parquet.Encoding, bitWidth int32) []byte { switch encoding { case parquet.Encoding_RLE: return valuesToRLEBitPackedHybridBytes(values, bitWidth, dataType) case parquet.Encoding_DELTA_BINARY_PACKED: return valuesToDeltaBytes(values, dataType) case parquet.Encoding_DELTA_BYTE_ARRAY: return stringsToDeltaByteArrayBytes(values.([]string)) case parquet.Encoding_DELTA_LENGTH_BYTE_ARRAY: return stringsToDeltaLengthByteArrayBytes(values.([]string)) } return valuesToBytes(values, dataType) }