minio/pkg/s3select/internal/parquet-go/schema/tree.go

389 lines
11 KiB
Go

/*
* Minio Cloud Storage, (C) 2019 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package schema
import (
"fmt"
"strings"
"github.com/minio/minio/pkg/s3select/internal/parquet-go/gen-go/parquet"
)
func updateMaxDLRL(schemaMap map[string]*Element, maxDL, maxRL int64) {
for _, element := range schemaMap {
element.MaxDefinitionLevel = maxDL
element.MaxRepetitionLevel = maxRL
if *element.RepetitionType != parquet.FieldRepetitionType_REQUIRED {
element.MaxDefinitionLevel++
if *element.RepetitionType == parquet.FieldRepetitionType_REPEATED {
element.MaxRepetitionLevel++
}
}
if element.Children != nil {
updateMaxDLRL(element.Children.schemaMap, element.MaxDefinitionLevel, element.MaxRepetitionLevel)
}
}
}
func toParquetSchema(tree *Tree, treePrefix string, schemaPrefix string, schemaList *[]*parquet.SchemaElement, valueElements *[]*Element) (err error) {
tree.Range(func(name string, element *Element) bool {
pathInTree := name
if treePrefix != "" {
pathInTree = treePrefix + "." + name
}
if element.Type == nil && element.ConvertedType == nil && element.Children == nil {
err = fmt.Errorf("%v: group element must have children", pathInTree)
return false
}
if element.ConvertedType != nil {
switch *element.ConvertedType {
case parquet.ConvertedType_LIST:
// Supported structure.
// <REQUIRED|OPTIONAL> group <name> (LIST) {
// REPEATED group list {
// <REQUIRED|OPTIONAL> <element-type> element;
// }
// }
if element.Type != nil {
err = fmt.Errorf("%v: type must be nil for LIST ConvertedType", pathInTree)
return false
}
if element.Children == nil || element.Children.Length() != 1 {
err = fmt.Errorf("%v: children must have one element only for LIST ConvertedType", pathInTree)
return false
}
listElement, ok := element.Children.Get("list")
if !ok {
err = fmt.Errorf("%v: missing group element 'list' for LIST ConvertedType", pathInTree)
return false
}
if listElement.Name != "list" {
err = fmt.Errorf("%v.list: name must be 'list'", pathInTree)
return false
}
if *listElement.RepetitionType != parquet.FieldRepetitionType_REPEATED {
err = fmt.Errorf("%v.list: repetition type must be REPEATED type", pathInTree)
return false
}
if listElement.Type != nil || listElement.ConvertedType != nil {
err = fmt.Errorf("%v.list: type and converted type must be nil", pathInTree)
return false
}
if listElement.Children == nil || listElement.Children.Length() != 1 {
err = fmt.Errorf("%v.list.element: not found", pathInTree)
return false
}
valueElement, ok := listElement.Children.Get("element")
if !ok {
err = fmt.Errorf("%v.list.element: not found", pathInTree)
return false
}
if valueElement.Name != "element" {
err = fmt.Errorf("%v.list.element: name must be 'element'", pathInTree)
return false
}
case parquet.ConvertedType_MAP:
// Supported structure:
// <REQUIRED|OPTIONAL> group <name> (MAP) {
// REPEATED group key_value {
// REQUIRED <key-type> key;
// <REQUIRED|OPTIONAL> <value-type> value;
// }
// }
if element.Type != nil {
err = fmt.Errorf("%v: type must be nil for MAP ConvertedType", pathInTree)
return false
}
if element.Children == nil || element.Children.Length() != 1 {
err = fmt.Errorf("%v: children must have one element only for MAP ConvertedType", pathInTree)
return false
}
keyValueElement, ok := element.Children.Get("key_value")
if !ok {
err = fmt.Errorf("%v: missing group element 'key_value' for MAP ConvertedType", pathInTree)
return false
}
if keyValueElement.Name != "key_value" {
err = fmt.Errorf("%v.key_value: name must be 'key_value'", pathInTree)
return false
}
if *keyValueElement.RepetitionType != parquet.FieldRepetitionType_REPEATED {
err = fmt.Errorf("%v.key_value: repetition type must be REPEATED type", pathInTree)
return false
}
if keyValueElement.Children == nil || keyValueElement.Children.Length() < 1 || keyValueElement.Children.Length() > 2 {
err = fmt.Errorf("%v.key_value: children must have 'key' and optionally 'value' elements for MAP ConvertedType", pathInTree)
return false
}
keyElement, ok := keyValueElement.Children.Get("key")
if !ok {
err = fmt.Errorf("%v.key_value: missing 'key' element for MAP ConvertedType", pathInTree)
return false
}
if keyElement.Name != "key" {
err = fmt.Errorf("%v.key_value.key: name must be 'key'", pathInTree)
return false
}
if *keyElement.RepetitionType != parquet.FieldRepetitionType_REQUIRED {
err = fmt.Errorf("%v.key_value: repetition type must be REQUIRED type", pathInTree)
return false
}
if keyValueElement.Children.Length() == 2 {
valueElement, ok := keyValueElement.Children.Get("value")
if !ok {
err = fmt.Errorf("%v.key_value: second element must be 'value' element for MAP ConvertedType", pathInTree)
return false
}
if valueElement.Name != "value" {
err = fmt.Errorf("%v.key_value.value: name must be 'value'", pathInTree)
return false
}
}
case parquet.ConvertedType_UTF8, parquet.ConvertedType_UINT_8, parquet.ConvertedType_UINT_16:
fallthrough
case parquet.ConvertedType_UINT_32, parquet.ConvertedType_UINT_64, parquet.ConvertedType_INT_8:
fallthrough
case parquet.ConvertedType_INT_16, parquet.ConvertedType_INT_32, parquet.ConvertedType_INT_64:
if element.Type == nil {
err = fmt.Errorf("%v: ConvertedType %v must have Type value", pathInTree, element.ConvertedType)
return false
}
default:
err = fmt.Errorf("%v: unsupported ConvertedType %v", pathInTree, element.ConvertedType)
return false
}
}
element.PathInTree = pathInTree
element.PathInSchema = element.Name
if schemaPrefix != "" {
element.PathInSchema = schemaPrefix + "." + element.Name
}
if element.Type != nil {
*valueElements = append(*valueElements, element)
}
*schemaList = append(*schemaList, &element.SchemaElement)
if element.Children != nil {
element.numChildren = int32(element.Children.Length())
err = toParquetSchema(element.Children, element.PathInTree, element.PathInSchema, schemaList, valueElements)
}
return (err == nil)
})
return err
}
// Tree - represents tree of schema. Tree preserves order in which elements are added.
type Tree struct {
schemaMap map[string]*Element
keys []string
readOnly bool
}
// String - stringify this tree.
func (tree *Tree) String() string {
var s []string
tree.Range(func(name string, element *Element) bool {
s = append(s, fmt.Sprintf("%v: %v", name, element))
return true
})
return "{" + strings.Join(s, ", ") + "}"
}
// Length - returns length of tree.
func (tree *Tree) Length() int {
return len(tree.keys)
}
func (tree *Tree) travel(pathSegments []string) (pathSegmentIndex int, pathSegment string, currElement *Element, parentTree *Tree, found bool) {
parentTree = tree
for pathSegmentIndex, pathSegment = range pathSegments {
if tree == nil {
found = false
break
}
var tmpCurrElement *Element
if tmpCurrElement, found = tree.schemaMap[pathSegment]; !found {
break
}
currElement = tmpCurrElement
parentTree = tree
tree = currElement.Children
}
return
}
// ReadOnly - returns whether this tree is read only or not.
func (tree *Tree) ReadOnly() bool {
return tree.readOnly
}
// Get - returns the element stored for name.
func (tree *Tree) Get(name string) (element *Element, ok bool) {
pathSegments := strings.Split(name, ".")
for _, pathSegment := range pathSegments {
if tree == nil {
element = nil
ok = false
break
}
if element, ok = tree.schemaMap[pathSegment]; !ok {
break
}
tree = element.Children
}
return element, ok
}
// Set - adds or sets element to name.
func (tree *Tree) Set(name string, element *Element) error {
if tree.readOnly {
return fmt.Errorf("read only tree")
}
pathSegments := strings.Split(name, ".")
if err := validataPathSegments(pathSegments); err != nil {
return err
}
i, pathSegment, currElement, parentTree, found := tree.travel(pathSegments)
if !found {
if i != len(pathSegments)-1 {
return fmt.Errorf("parent %v does not exist", strings.Join(pathSegments[:i+1], "."))
}
if currElement == nil {
parentTree = tree
} else {
if currElement.Type != nil {
return fmt.Errorf("parent %v is not group element", strings.Join(pathSegments[:i], "."))
}
if currElement.Children == nil {
currElement.Children = NewTree()
}
parentTree = currElement.Children
}
parentTree.keys = append(parentTree.keys, pathSegment)
}
parentTree.schemaMap[pathSegment] = element
return nil
}
// Delete - deletes name and its element.
func (tree *Tree) Delete(name string) {
if tree.readOnly {
panic(fmt.Errorf("read only tree"))
}
pathSegments := strings.Split(name, ".")
_, pathSegment, _, parentTree, found := tree.travel(pathSegments)
if found {
for i := range parentTree.keys {
if parentTree.keys[i] == pathSegment {
copy(parentTree.keys[i:], parentTree.keys[i+1:])
parentTree.keys = parentTree.keys[:len(parentTree.keys)-1]
break
}
}
delete(parentTree.schemaMap, pathSegment)
}
}
// Range - calls f sequentially for each name and its element. If f returns false, range stops the iteration.
func (tree *Tree) Range(f func(name string, element *Element) bool) {
for _, name := range tree.keys {
if !f(name, tree.schemaMap[name]) {
break
}
}
}
// ToParquetSchema - returns list of parquet SchemaElement and list of elements those stores values.
func (tree *Tree) ToParquetSchema() (schemaList []*parquet.SchemaElement, valueElements []*Element, err error) {
if tree.readOnly {
return nil, nil, fmt.Errorf("read only tree")
}
updateMaxDLRL(tree.schemaMap, 0, 0)
var schemaElements []*parquet.SchemaElement
if err = toParquetSchema(tree, "", "", &schemaElements, &valueElements); err != nil {
return nil, nil, err
}
tree.readOnly = true
numChildren := int32(len(tree.keys))
schemaList = append(schemaList, &parquet.SchemaElement{
Name: "schema",
RepetitionType: parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REQUIRED),
NumChildren: &numChildren,
})
schemaList = append(schemaList, schemaElements...)
return schemaList, valueElements, nil
}
// NewTree - creates new schema tree.
func NewTree() *Tree {
return &Tree{
schemaMap: make(map[string]*Element),
}
}