// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package parquet

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"math"
	"strings"

	"git.apache.org/thrift.git/lib/go/thrift"
	"github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet"
)

// getBitWidth - returns bits required to place num e.g.
//
//    num | width
//   -----|-------
//     0  |   0
//     1  |   1
//     2  |   2
//     3  |   2
//     4  |   3
//     5  |   3
//    ... |  ...
//    ... |  ...
//
func getBitWidth(num uint64) (width uint64) {
	for ; num != 0; num >>= 1 {
		width++
	}

	return width
}

// getMaxDefLevel - get maximum definition level.
func getMaxDefLevel(nameIndexMap map[string]int, schemaElements []*parquet.SchemaElement, path []string) (v int) {
	for i := 1; i <= len(path); i++ {
		name := strings.Join(path[:i], ".")
		if index, ok := nameIndexMap[name]; ok {
			if schemaElements[index].GetRepetitionType() != parquet.FieldRepetitionType_REQUIRED {
				v++
			}
		}
	}

	return v
}

// getMaxRepLevel - get maximum repetition level.
func getMaxRepLevel(nameIndexMap map[string]int, schemaElements []*parquet.SchemaElement, path []string) (v int) {
	for i := 1; i <= len(path); i++ {
		name := strings.Join(path[:i], ".")
		if index, ok := nameIndexMap[name]; ok {
			if schemaElements[index].GetRepetitionType() == parquet.FieldRepetitionType_REPEATED {
				v++
			}
		}
	}

	return v
}

func readPageHeader(reader *thrift.TBufferedTransport) (*parquet.PageHeader, error) {
	pageHeader := parquet.NewPageHeader()
	if err := pageHeader.Read(thrift.NewTCompactProtocol(reader)); err != nil {
		return nil, err
	}

	return pageHeader, nil
}

func readPage(
	thriftReader *thrift.TBufferedTransport,
	metadata *parquet.ColumnMetaData,
	columnNameIndexMap map[string]int,
	schemaElements []*parquet.SchemaElement,
) (page *page, definitionLevels, numRows int64, err error) {

	pageHeader, err := readPageHeader(thriftReader)
	if err != nil {
		return nil, 0, 0, err
	}

	read := func() (data []byte, err error) {
		var repLevelsLen, defLevelsLen int32
		var repLevelsBuf, defLevelsBuf []byte

		if pageHeader.GetType() == parquet.PageType_DATA_PAGE_V2 {
			if pageHeader.DataPageHeaderV2 == nil {
				return nil, errors.New("parquet: Header not set")
			}
			repLevelsLen = pageHeader.DataPageHeaderV2.GetRepetitionLevelsByteLength()
			repLevelsBuf = make([]byte, repLevelsLen)

			n, err := io.ReadFull(thriftReader, repLevelsBuf)
			if err != nil {
				return nil, err
			}
			if n != int(repLevelsLen) {
				return nil, fmt.Errorf("expected parquet header repetition levels %d, got %d", repLevelsLen, n)
			}

			defLevelsLen = pageHeader.DataPageHeaderV2.GetDefinitionLevelsByteLength()
			defLevelsBuf = make([]byte, defLevelsLen)

			n, err = io.ReadFull(thriftReader, defLevelsBuf)
			if err != nil {
				return nil, err
			}
			if n != int(defLevelsLen) {
				return nil, fmt.Errorf("expected parquet header definition levels %d, got %d", defLevelsLen, n)
			}
		}
		dbLen := pageHeader.GetCompressedPageSize() - repLevelsLen - defLevelsLen
		if dbLen < 0 {
			return nil, errors.New("parquet: negative data length")
		}

		dataBuf := make([]byte, dbLen)
		n, err := io.ReadFull(thriftReader, dataBuf)
		if err != nil {
			return nil, err
		}
		if n != int(dbLen) {
			return nil, fmt.Errorf("expected parquet data buffer %d, got %d", dbLen, n)
		}

		if dataBuf, err = compressionCodec(metadata.GetCodec()).uncompress(dataBuf); err != nil {
			return nil, err
		}

		if repLevelsLen == 0 && defLevelsLen == 0 {
			return dataBuf, nil
		}

		if repLevelsLen > 0 {
			data = append(data, uint32ToBytes(uint32(repLevelsLen))...)
			data = append(data, repLevelsBuf...)
		}

		if defLevelsLen > 0 {
			data = append(data, uint32ToBytes(uint32(defLevelsLen))...)
			data = append(data, defLevelsBuf...)
		}

		data = append(data, dataBuf...)

		return data, nil
	}

	buf, err := read()
	if err != nil {
		return nil, 0, 0, err
	}
	if metadata == nil {
		return nil, 0, 0, errors.New("parquet: metadata not set")
	}
	path := append([]string{}, metadata.GetPathInSchema()...)

	bytesReader := bytes.NewReader(buf)
	pageType := pageHeader.GetType()
	switch pageType {
	case parquet.PageType_INDEX_PAGE:
		return nil, 0, 0, fmt.Errorf("page type %v is not supported", parquet.PageType_INDEX_PAGE)

	case parquet.PageType_DICTIONARY_PAGE:
		page = newDictPage()
		page.Header = pageHeader
		table := new(table)
		table.Path = path
		if pageHeader.DictionaryPageHeader == nil {
			return nil, 0, 0, errors.New("parquet: dictionary not set")
		}
		values, err := readValues(bytesReader, metadata.GetType(),
			uint64(pageHeader.DictionaryPageHeader.GetNumValues()), 0)
		if err != nil {
			return nil, 0, 0, err
		}
		table.Values = getTableValues(values, metadata.GetType())
		page.DataTable = table

		return page, 0, 0, nil

	case parquet.PageType_DATA_PAGE, parquet.PageType_DATA_PAGE_V2:
		name := strings.Join(path, ".")

		page = newDataPage()
		page.Header = pageHeader

		maxDefinitionLevel := getMaxDefLevel(columnNameIndexMap, schemaElements, path)
		maxRepetitionLevel := getMaxRepLevel(columnNameIndexMap, schemaElements, path)

		var numValues uint64
		var encodingType parquet.Encoding

		if pageHeader.GetType() == parquet.PageType_DATA_PAGE {
			if pageHeader.DataPageHeader == nil {
				return nil, 0, 0, errors.New("parquet: Header not set")
			}
			numValues = uint64(pageHeader.DataPageHeader.GetNumValues())
			encodingType = pageHeader.DataPageHeader.GetEncoding()
		} else {
			if pageHeader.DataPageHeaderV2 == nil {
				return nil, 0, 0, errors.New("parquet: Header not set")
			}
			numValues = uint64(pageHeader.DataPageHeaderV2.GetNumValues())
			encodingType = pageHeader.DataPageHeaderV2.GetEncoding()
		}

		var repetitionLevels []int64
		if maxRepetitionLevel > 0 {
			values, _, err := readDataPageValues(bytesReader, parquet.Encoding_RLE, parquet.Type_INT64,
				-1, numValues, getBitWidth(uint64(maxRepetitionLevel)))
			if err != nil {
				return nil, 0, 0, err
			}

			if repetitionLevels = values.([]int64); len(repetitionLevels) > int(numValues) && int(numValues) >= 0 {
				repetitionLevels = repetitionLevels[:numValues]
			}
		} else {
			if numValues > math.MaxInt64/8 {
				return nil, 0, 0, errors.New("parquet: numvalues too large")
			}
			repetitionLevels = make([]int64, numValues)
		}

		var definitionLevels []int64
		if maxDefinitionLevel > 0 {
			values, _, err := readDataPageValues(bytesReader, parquet.Encoding_RLE, parquet.Type_INT64,
				-1, numValues, getBitWidth(uint64(maxDefinitionLevel)))
			if err != nil {
				return nil, 0, 0, err
			}
			if numValues > math.MaxInt64/8 {
				return nil, 0, 0, errors.New("parquet: numvalues too large")
			}
			if definitionLevels = values.([]int64); len(definitionLevels) > int(numValues) {
				definitionLevels = definitionLevels[:numValues]
			}
		} else {
			if numValues > math.MaxInt64/8 {
				return nil, 0, 0, errors.New("parquet: numvalues too large")
			}
			definitionLevels = make([]int64, numValues)
		}

		var numNulls uint64
		for i := 0; i < len(definitionLevels); i++ {
			if definitionLevels[i] != int64(maxDefinitionLevel) {
				numNulls++
			}
		}

		var convertedType parquet.ConvertedType = -1
		if schemaElements[columnNameIndexMap[name]].IsSetConvertedType() {
			convertedType = schemaElements[columnNameIndexMap[name]].GetConvertedType()
		}
		values, valueType, err := readDataPageValues(bytesReader, encodingType, metadata.GetType(),
			convertedType, uint64(len(definitionLevels))-numNulls,
			uint64(schemaElements[columnNameIndexMap[name]].GetTypeLength()))
		if err != nil {
			return nil, 0, 0, err
		}
		tableValues := getTableValues(values, valueType)

		table := new(table)
		table.Path = path
		table.RepetitionType = schemaElements[columnNameIndexMap[name]].GetRepetitionType()
		table.MaxRepetitionLevel = int32(maxRepetitionLevel)
		table.MaxDefinitionLevel = int32(maxDefinitionLevel)
		table.Values = make([]interface{}, len(definitionLevels))
		table.RepetitionLevels = make([]int32, len(definitionLevels))
		table.DefinitionLevels = make([]int32, len(definitionLevels))

		j := 0
		numRows := int64(0)
		for i := 0; i < len(definitionLevels); i++ {
			table.RepetitionLevels[i] = int32(repetitionLevels[i])
			table.DefinitionLevels[i] = int32(definitionLevels[i])
			if int(table.DefinitionLevels[i]) == maxDefinitionLevel {
				table.Values[i] = tableValues[j]
				j++
			}
			if table.RepetitionLevels[i] == 0 {
				numRows++
			}
		}
		page.DataTable = table

		return page, int64(len(definitionLevels)), numRows, nil
	}

	return nil, 0, 0, fmt.Errorf("unknown page type %v", pageType)
}

type page struct {
	Header       *parquet.PageHeader      // Header of a page
	DataTable    *table                   // Table to store values
	RawData      []byte                   // Compressed data of the page, which is written in parquet file
	CompressType parquet.CompressionCodec // Compress type: gzip/snappy/none
	DataType     parquet.Type             // Parquet type of the values in the page
	Path         []string                 // Path in schema(include the root)
	MaxVal       interface{}              // Maximum of the values
	MinVal       interface{}              // Minimum of the values
	PageSize     int32
}

func newPage() *page {
	return &page{
		Header:   parquet.NewPageHeader(),
		PageSize: defaultPageSize,
	}
}

func newDictPage() *page {
	page := newPage()
	page.Header.DictionaryPageHeader = parquet.NewDictionaryPageHeader()
	return page
}

func newDataPage() *page {
	page := newPage()
	page.Header.DataPageHeader = parquet.NewDataPageHeader()
	return page
}

func (page *page) decode(dictPage *page) {
	if dictPage == nil || page == nil || page.Header.DataPageHeader == nil ||
		(page.Header.DataPageHeader.Encoding != parquet.Encoding_RLE_DICTIONARY &&
			page.Header.DataPageHeader.Encoding != parquet.Encoding_PLAIN_DICTIONARY) {
		return
	}

	for i := 0; i < len(page.DataTable.Values); i++ {
		if page.DataTable.Values[i] != nil {
			index, ok := page.DataTable.Values[i].(int64)
			if !ok || int(index) >= len(dictPage.DataTable.Values) {
				return
			}
			page.DataTable.Values[i] = dictPage.DataTable.Values[index]
		}
	}
}

// Get RepetitionLevels and Definitions from RawData
func (page *page) getRLDLFromRawData(columnNameIndexMap map[string]int, schemaElements []*parquet.SchemaElement) (numValues int64, numRows int64, err error) {
	bytesReader := bytes.NewReader(page.RawData)

	pageType := page.Header.GetType()

	var buf []byte
	if pageType == parquet.PageType_DATA_PAGE_V2 {
		var repLevelsLen, defLevelsLen int32
		var repLevelsBuf, defLevelsBuf []byte
		if page.Header.DataPageHeaderV2 == nil {
			return 0, 0, errors.New("parquet: Header not set")
		}
		repLevelsLen = page.Header.DataPageHeaderV2.GetRepetitionLevelsByteLength()
		repLevelsBuf = make([]byte, repLevelsLen)
		if _, err = bytesReader.Read(repLevelsBuf); err != nil {
			return 0, 0, err
		}

		defLevelsLen = page.Header.DataPageHeaderV2.GetDefinitionLevelsByteLength()
		defLevelsBuf = make([]byte, defLevelsLen)
		if _, err = bytesReader.Read(defLevelsBuf); err != nil {
			return 0, 0, err
		}

		dataBuf := make([]byte, len(page.RawData)-int(repLevelsLen)-int(defLevelsLen))
		if _, err = bytesReader.Read(dataBuf); err != nil {
			return 0, 0, err
		}

		if repLevelsLen == 0 && defLevelsLen == 0 {
			buf = dataBuf
		} else {
			if repLevelsLen > 0 {
				buf = append(buf, uint32ToBytes(uint32(repLevelsLen))...)
				buf = append(buf, repLevelsBuf...)
			}

			if defLevelsLen > 0 {
				buf = append(buf, uint32ToBytes(uint32(defLevelsLen))...)
				buf = append(buf, defLevelsBuf...)
			}

			buf = append(buf, dataBuf...)
		}
	} else {
		if buf, err = compressionCodec(page.CompressType).uncompress(page.RawData); err != nil {
			return 0, 0, err
		}
	}

	bytesReader = bytes.NewReader(buf)

	switch pageType {
	case parquet.PageType_DICTIONARY_PAGE:
		table := new(table)
		table.Path = page.Path
		page.DataTable = table
		return 0, 0, nil

	case parquet.PageType_DATA_PAGE, parquet.PageType_DATA_PAGE_V2:
		var numValues uint64
		if pageType == parquet.PageType_DATA_PAGE {
			if page.Header.DataPageHeader == nil {
				return 0, 0, errors.New("parquet: Header not set")
			}
			numValues = uint64(page.Header.DataPageHeader.GetNumValues())
		} else {
			if page.Header.DataPageHeaderV2 == nil {
				return 0, 0, errors.New("parquet: Header not set")
			}
			numValues = uint64(page.Header.DataPageHeaderV2.GetNumValues())
		}

		maxDefinitionLevel := getMaxDefLevel(columnNameIndexMap, schemaElements, page.Path)
		maxRepetitionLevel := getMaxRepLevel(columnNameIndexMap, schemaElements, page.Path)

		var repetitionLevels []int64
		if maxRepetitionLevel > 0 {
			values, _, err := readDataPageValues(bytesReader, parquet.Encoding_RLE, parquet.Type_INT64,
				-1, numValues, getBitWidth(uint64(maxRepetitionLevel)))
			if err != nil {
				return 0, 0, err
			}

			if repetitionLevels = values.([]int64); uint64(len(repetitionLevels)) > numValues {
				repetitionLevels = repetitionLevels[:numValues]
			}
		} else {
			repetitionLevels = make([]int64, numValues)
		}

		var definitionLevels []int64
		if maxDefinitionLevel > 0 {
			values, _, err := readDataPageValues(bytesReader, parquet.Encoding_RLE, parquet.Type_INT64,
				-1, numValues, getBitWidth(uint64(maxDefinitionLevel)))
			if err != nil {
				return 0, 0, err
			}
			if definitionLevels = values.([]int64); uint64(len(definitionLevels)) > numValues {
				definitionLevels = definitionLevels[:numValues]
			}
		} else {
			definitionLevels = make([]int64, numValues)
		}

		table := new(table)
		table.Path = page.Path
		name := strings.Join(page.Path, ".")
		table.RepetitionType = schemaElements[columnNameIndexMap[name]].GetRepetitionType()
		table.MaxRepetitionLevel = int32(maxRepetitionLevel)
		table.MaxDefinitionLevel = int32(maxDefinitionLevel)
		table.Values = make([]interface{}, len(definitionLevels))
		table.RepetitionLevels = make([]int32, len(definitionLevels))
		table.DefinitionLevels = make([]int32, len(definitionLevels))

		numRows := int64(0)
		for i := 0; i < len(definitionLevels); i++ {
			table.RepetitionLevels[i] = int32(repetitionLevels[i])
			table.DefinitionLevels[i] = int32(definitionLevels[i])
			if table.RepetitionLevels[i] == 0 {
				numRows++
			}
		}
		page.DataTable = table
		page.RawData = buf[len(buf)-bytesReader.Len():]

		return int64(numValues), numRows, nil
	}

	return 0, 0, fmt.Errorf("Unsupported page type %v", pageType)
}

func (page *page) getValueFromRawData(columnNameIndexMap map[string]int, schemaElements []*parquet.SchemaElement) (err error) {
	pageType := page.Header.GetType()
	switch pageType {
	case parquet.PageType_DICTIONARY_PAGE:
		bytesReader := bytes.NewReader(page.RawData)
		var values interface{}
		if page.Header.DictionaryPageHeader == nil {
			return errors.New("parquet: dictionary not set")
		}
		values, err = readValues(bytesReader, page.DataType,
			uint64(page.Header.DictionaryPageHeader.GetNumValues()), 0)
		if err != nil {
			return err
		}

		page.DataTable.Values = getTableValues(values, page.DataType)
		return nil

	case parquet.PageType_DATA_PAGE_V2:
		if page.RawData, err = compressionCodec(page.CompressType).uncompress(page.RawData); err != nil {
			return err
		}
		fallthrough
	case parquet.PageType_DATA_PAGE:
		encodingType := page.Header.DataPageHeader.GetEncoding()
		bytesReader := bytes.NewReader(page.RawData)

		var numNulls uint64
		for i := 0; i < len(page.DataTable.DefinitionLevels); i++ {
			if page.DataTable.DefinitionLevels[i] != page.DataTable.MaxDefinitionLevel {
				numNulls++
			}
		}

		name := strings.Join(page.DataTable.Path, ".")
		var convertedType parquet.ConvertedType = -1

		if schemaElements[columnNameIndexMap[name]].IsSetConvertedType() {
			convertedType = schemaElements[columnNameIndexMap[name]].GetConvertedType()
		}

		values, _, err := readDataPageValues(bytesReader, encodingType, page.DataType,
			convertedType, uint64(len(page.DataTable.DefinitionLevels))-numNulls,
			uint64(schemaElements[columnNameIndexMap[name]].GetTypeLength()))
		if err != nil {
			return err
		}

		tableValues := getTableValues(values, page.DataType)

		j := 0
		for i := 0; i < len(page.DataTable.DefinitionLevels); i++ {
			if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel {
				page.DataTable.Values[i] = tableValues[j]
				j++
			}
		}

		page.RawData = []byte{}
		return nil
	}

	return fmt.Errorf("unsupported page type %v", pageType)
}

func (page *page) toDataPage(compressType parquet.CompressionCodec) []byte {
	values := []interface{}{}
	for i := range page.DataTable.DefinitionLevels {
		if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel {
			values = append(values, page.DataTable.Values[i])
		}
	}
	valuesBytes := encodeValues(interfacesToValues(values, page.DataTable.Type), page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth)

	var defLevelBytes []byte
	if page.DataTable.MaxDefinitionLevel > 0 {
		defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
		}
		defLevelBytes = valuesToRLEBitPackedHybridBytes(
			defLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
			parquet.Type_INT64,
		)
	}

	var repLevelBytes []byte
	if page.DataTable.MaxRepetitionLevel > 0 {
		repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
		}
		repLevelBytes = valuesToRLEBitPackedHybridBytes(
			repLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
			parquet.Type_INT64,
		)
	}

	data := repLevelBytes
	data = append(data, defLevelBytes...)
	data = append(data, valuesBytes...)

	compressedData, err := compressionCodec(compressType).compress(data)
	if err != nil {
		panic(err)
	}

	page.Header = parquet.NewPageHeader()
	page.Header.Type = parquet.PageType_DATA_PAGE
	page.Header.CompressedPageSize = int32(len(compressedData))
	page.Header.UncompressedPageSize = int32(len(data))
	page.Header.DataPageHeader = parquet.NewDataPageHeader()
	page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels))
	page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE
	page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE
	page.Header.DataPageHeader.Encoding = page.DataTable.Encoding
	page.Header.DataPageHeader.Statistics = parquet.NewStatistics()
	if page.MaxVal != nil {
		tmpBuf := valueToBytes(page.MaxVal, page.DataType)
		if page.DataType == parquet.Type_BYTE_ARRAY {
			switch page.DataTable.ConvertedType {
			case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
				tmpBuf = tmpBuf[4:]
			}
		}
		page.Header.DataPageHeader.Statistics.Max = tmpBuf
	}
	if page.MinVal != nil {
		tmpBuf := valueToBytes(page.MinVal, page.DataType)
		if page.DataType == parquet.Type_BYTE_ARRAY {
			switch page.DataTable.ConvertedType {
			case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
				tmpBuf = tmpBuf[4:]
			}
		}
		page.Header.DataPageHeader.Statistics.Min = tmpBuf
	}

	ts := thrift.NewTSerializer()
	ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
	pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
	if err != nil {
		panic(err)
	}

	page.RawData = append(pageHeaderBytes, compressedData...)
	return page.RawData
}

func (page *page) toDataPageV2(compressType parquet.CompressionCodec) []byte {
	values := []interface{}{}
	for i := range page.DataTable.DefinitionLevels {
		if page.DataTable.DefinitionLevels[i] == page.DataTable.MaxDefinitionLevel {
			values = append(values, page.DataTable.Values[i])
		}
	}
	valuesBytes := encodeValues(values, page.DataType, page.DataTable.Encoding, page.DataTable.BitWidth)

	var defLevelBytes []byte
	if page.DataTable.MaxDefinitionLevel > 0 {
		defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
		}
		defLevelBytes = valuesToRLEBytes(
			defLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
			parquet.Type_INT64,
		)
	}

	var repLevelBytes []byte
	numRows := int32(0)
	if page.DataTable.MaxRepetitionLevel > 0 {
		repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
			if page.DataTable.RepetitionLevels[i] == 0 {
				numRows++
			}
		}
		repLevelBytes = valuesToRLEBytes(
			repLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
			parquet.Type_INT64,
		)
	}

	compressedData, err := compressionCodec(compressType).compress(valuesBytes)
	if err != nil {
		panic(err)
	}

	page.Header = parquet.NewPageHeader()
	page.Header.Type = parquet.PageType_DATA_PAGE_V2
	page.Header.CompressedPageSize = int32(len(compressedData) + len(defLevelBytes) + len(repLevelBytes))
	page.Header.UncompressedPageSize = int32(len(valuesBytes) + len(defLevelBytes) + len(repLevelBytes))
	page.Header.DataPageHeaderV2 = parquet.NewDataPageHeaderV2()
	page.Header.DataPageHeaderV2.NumValues = int32(len(page.DataTable.Values))
	page.Header.DataPageHeaderV2.NumNulls = page.Header.DataPageHeaderV2.NumValues - int32(len(values))
	page.Header.DataPageHeaderV2.NumRows = numRows
	page.Header.DataPageHeaderV2.Encoding = page.DataTable.Encoding
	page.Header.DataPageHeaderV2.DefinitionLevelsByteLength = int32(len(defLevelBytes))
	page.Header.DataPageHeaderV2.RepetitionLevelsByteLength = int32(len(repLevelBytes))
	page.Header.DataPageHeaderV2.IsCompressed = true

	page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics()
	if page.MaxVal != nil {
		tmpBuf := valueToBytes(page.MaxVal, page.DataType)
		if page.DataType == parquet.Type_BYTE_ARRAY {
			switch page.DataTable.ConvertedType {
			case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
				tmpBuf = tmpBuf[4:]
			}
		}
		page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf
	}
	if page.MinVal != nil {
		tmpBuf := valueToBytes(page.MinVal, page.DataType)
		if page.DataType == parquet.Type_BYTE_ARRAY {
			switch page.DataTable.ConvertedType {
			case parquet.ConvertedType_UTF8, parquet.ConvertedType_DECIMAL:
				tmpBuf = tmpBuf[4:]
			}
		}
		page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf
	}

	ts := thrift.NewTSerializer()
	ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
	pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
	if err != nil {
		panic(err)
	}

	page.RawData = append(pageHeaderBytes, repLevelBytes...)
	page.RawData = append(page.RawData, defLevelBytes...)
	page.RawData = append(page.RawData, compressedData...)

	return page.RawData
}

func (page *page) toDictPage(compressType parquet.CompressionCodec, dataType parquet.Type) []byte {
	valuesBytes := valuesToBytes(page.DataTable.Values, dataType)
	compressedData, err := compressionCodec(compressType).compress(valuesBytes)
	if err != nil {
		panic(err)
	}

	page.Header = parquet.NewPageHeader()
	page.Header.Type = parquet.PageType_DICTIONARY_PAGE
	page.Header.CompressedPageSize = int32(len(compressedData))
	page.Header.UncompressedPageSize = int32(len(valuesBytes))
	page.Header.DictionaryPageHeader = parquet.NewDictionaryPageHeader()
	page.Header.DictionaryPageHeader.NumValues = int32(len(page.DataTable.Values))
	page.Header.DictionaryPageHeader.Encoding = parquet.Encoding_PLAIN

	ts := thrift.NewTSerializer()
	ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
	pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
	if err != nil {
		panic(err)
	}

	page.RawData = append(pageHeaderBytes, compressedData...)
	return page.RawData
}

func (page *page) toDictDataPage(compressType parquet.CompressionCodec, bitWidth int32) []byte {
	valuesBytes := append([]byte{byte(bitWidth)}, valuesToRLEBytes(page.DataTable.Values, bitWidth, parquet.Type_INT32)...)

	var defLevelBytes []byte
	if page.DataTable.MaxDefinitionLevel > 0 {
		defLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			defLevels[i] = int64(page.DataTable.DefinitionLevels[i])
		}
		defLevelBytes = valuesToRLEBitPackedHybridBytes(
			defLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxDefinitionLevel))),
			parquet.Type_INT64,
		)
	}

	var repLevelBytes []byte
	if page.DataTable.MaxRepetitionLevel > 0 {
		repLevels := make([]int64, len(page.DataTable.DefinitionLevels))
		for i := range page.DataTable.DefinitionLevels {
			repLevels[i] = int64(page.DataTable.RepetitionLevels[i])
		}
		repLevelBytes = valuesToRLEBitPackedHybridBytes(
			repLevels,
			int32(getBitWidth(uint64(page.DataTable.MaxRepetitionLevel))),
			parquet.Type_INT64,
		)
	}

	data := append(repLevelBytes, defLevelBytes...)
	data = append(data, valuesBytes...)

	compressedData, err := compressionCodec(compressType).compress(data)
	if err != nil {
		panic(err)
	}

	page.Header = parquet.NewPageHeader()
	page.Header.Type = parquet.PageType_DATA_PAGE
	page.Header.CompressedPageSize = int32(len(compressedData))
	page.Header.UncompressedPageSize = int32(len(data))
	page.Header.DataPageHeader = parquet.NewDataPageHeader()
	page.Header.DataPageHeader.NumValues = int32(len(page.DataTable.DefinitionLevels))
	page.Header.DataPageHeader.DefinitionLevelEncoding = parquet.Encoding_RLE
	page.Header.DataPageHeader.RepetitionLevelEncoding = parquet.Encoding_RLE
	page.Header.DataPageHeader.Encoding = parquet.Encoding_PLAIN_DICTIONARY

	ts := thrift.NewTSerializer()
	ts.Protocol = thrift.NewTCompactProtocolFactory().GetProtocol(ts.Transport)
	pageHeaderBytes, err := ts.Write(context.TODO(), page.Header)
	if err != nil {
		panic(err)
	}

	page.RawData = append(pageHeaderBytes, compressedData...)
	return page.RawData
}