/*
 * Minio Cloud Storage, (C) 2019 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package parquet

import (
	"io"

	"github.com/minio/minio/pkg/s3select/json"
	"github.com/minio/minio/pkg/s3select/sql"
	parquetgo "github.com/minio/parquet-go"
	parquetgen "github.com/minio/parquet-go/gen-go/parquet"
)

// Reader - Parquet record reader for S3Select.
type Reader struct {
	args *ReaderArgs
	file *parquetgo.File
}

// Read - reads single record.
func (r *Reader) Read() (rec sql.Record, rerr error) {
	parquetRecord, err := r.file.Read()
	if err != nil {
		if err != io.EOF {
			return nil, errParquetParsingError(err)
		}

		return nil, err
	}

	record := json.NewRecord()
	f := func(name string, v parquetgo.Value) bool {
		if v.Value == nil {
			if err := record.Set(name, sql.FromNull()); err != nil {
				rerr = errParquetParsingError(err)
			}
			return rerr == nil
		}

		var value *sql.Value
		switch v.Type {
		case parquetgen.Type_BOOLEAN:
			value = sql.FromBool(v.Value.(bool))
		case parquetgen.Type_INT32:
			value = sql.FromInt(int64(v.Value.(int32)))
		case parquetgen.Type_INT64:
			value = sql.FromInt(int64(v.Value.(int64)))
		case parquetgen.Type_FLOAT:
			value = sql.FromFloat(float64(v.Value.(float32)))
		case parquetgen.Type_DOUBLE:
			value = sql.FromFloat(v.Value.(float64))
		case parquetgen.Type_INT96, parquetgen.Type_BYTE_ARRAY, parquetgen.Type_FIXED_LEN_BYTE_ARRAY:
			value = sql.FromString(string(v.Value.([]byte)))
		default:
			rerr = errParquetParsingError(nil)
			return false
		}

		if err = record.Set(name, value); err != nil {
			rerr = errParquetParsingError(err)
		}
		return rerr == nil
	}

	parquetRecord.Range(f)
	return record, rerr
}

// Close - closes underlaying readers.
func (r *Reader) Close() error {
	return r.file.Close()
}

// NewReader - creates new Parquet reader using readerFunc callback.
func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), args *ReaderArgs) (*Reader, error) {
	file, err := parquetgo.Open(getReaderFunc, nil)
	if err != nil {
		if err != io.EOF {
			return nil, errParquetParsingError(err)
		}

		return nil, err
	}

	return &Reader{
		args: args,
		file: file,
	}, nil
}