/*
 * Mini Object Storage, (C) 2014 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// +build amd64

package split

import (
	"bufio"
	"bytes"
	"errors"
	"github.com/minio-io/minio/pkgs/strbyteconv"
	"io"
	"io/ioutil"
	"os"
	"strconv"
	"strings"
)

// Message structure for results from the SplitStream goroutine
type SplitMessage struct {
	Data []byte
	Err  error
}

type JoinMessage struct {
	Reader io.Reader
	Length int64
	Err    error
}

// SplitStream reads from io.Reader, splits the data into chunks, and sends
// each chunk to the channel. This method runs until an EOF or error occurs. If
// an error occurs, the method sends the error over the channel and returns.
// Before returning, the channel is always closed.
//
// The user should run this as a gorountine and retrieve the data over the
// channel.
//
//  channel := make(chan SplitMessage)
//  go SplitStream(reader, chunkSize, channel)
//  for chunk := range channel {
//    log.Println(chunk.Data)
//  }
func SplitStream(reader io.Reader, chunkSize uint64, ch chan SplitMessage) {
	// we read until EOF or another error
	var readError error

	// run this until an EOF or error occurs
	for readError == nil {
		// keep track of how much data has been read
		var totalRead uint64
		// Create a buffer to write the current chunk into
		var bytesBuffer bytes.Buffer
		bytesWriter := bufio.NewWriter(&bytesBuffer)
		// read a full chunk
		for totalRead < chunkSize && readError == nil {
			var currentRead int
			// if we didn't read a full chunk, we should attempt to read again.
			// We create a byte array representing how much space is left
			// unwritten in the given chunk
			chunk := make([]byte, chunkSize-totalRead)
			currentRead, readError = reader.Read(chunk)
			// keep track of how much we have read in total
			totalRead = totalRead + uint64(currentRead)
			// prune the array to only what has been read, write to chunk buffer
			chunk = chunk[0:currentRead]
			bytesWriter.Write(chunk)
		}
		// flush stream to underlying byte buffer
		bytesWriter.Flush()
		// if we have data available, send it over the channel
		if bytesBuffer.Len() != 0 {
			ch <- SplitMessage{bytesBuffer.Bytes(), nil}
		}
	}
	// if we have an error other than an EOF, send it over the channel
	if readError != io.EOF {
		ch <- SplitMessage{nil, readError}
	}
	// close the channel, signaling the channel reader that the stream is complete
	close(ch)
}

func JoinStream(dirname string, inputPrefix string, ch chan JoinMessage) {
	var readError error

	var bytesBuffer bytes.Buffer
	bytesWriter := bufio.NewWriter(&bytesBuffer)
	// read a full directory
	fileInfos, readError := ioutil.ReadDir(dirname)
	if readError != nil {
		ch <- JoinMessage{nil, 0, readError}
	}

	var newfileInfos []os.FileInfo
	for _, fi := range fileInfos {
		if strings.Contains(fi.Name(), inputPrefix) == true {
			newfileInfos = append(newfileInfos, fi)
			continue
		}
	}

	if len(newfileInfos) == 0 {
		ch <- JoinMessage{nil, 0, errors.New("no files found for given prefix")}
	}

	for i := range newfileInfos {
		slice, err := ioutil.ReadFile(newfileInfos[i].Name())
		if err != nil {
			ch <- JoinMessage{nil, 0, err}
		}
		bytesWriter.Write(slice)
		bytesWriter.Flush()
		if bytesBuffer.Len() != 0 {
			ch <- JoinMessage{&bytesBuffer, newfileInfos[i].Size(), nil}
		}
	}

	// close the channel, signaling the channel reader that the stream is complete
	close(ch)
}

func JoinFilesWithPrefix(dirname string, inputPrefix string, outputFile string) error {
	if dirname == "" {
		return errors.New("Invalid directory")
	}

	if inputPrefix == "" {
		return errors.New("Invalid argument inputPrefix cannot be empty string")
	}

	if outputFile == "" {
		return errors.New("Invalid output file")
	}

	ch := make(chan JoinMessage)
	go JoinStream(dirname, inputPrefix, ch)

	var multiReaders []io.Reader
	var aggregatedLength int64
	for output := range ch {
		if output.Err != nil {
			return output.Err
		}
		multiReaders = append(multiReaders, output.Reader)
		aggregatedLength += output.Length
	}

	newReader := io.MultiReader(multiReaders...)
	aggregatedBytes := make([]byte, aggregatedLength)
	_, err := newReader.Read(aggregatedBytes)
	if err != nil {
		return err
	}

	err = ioutil.WriteFile(outputFile, aggregatedBytes, 0600)
	if err != nil {
		return err
	}
	return nil
}

// Takes a file and splits it into chunks with size chunkSize. The output
// filename is given with outputPrefix.
func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) error {
	// open file
	file, err := os.Open(filename)
	defer file.Close()
	if err != nil {
		return err
	}

	if outputPrefix == "" {
		return errors.New("Invalid argument outputPrefix cannot be empty string")
	}

	chunkSize, err := strbyteconv.StringToBytes(chunkstr)
	if err != nil {
		return err
	}

	// start stream splitting goroutine
	ch := make(chan SplitMessage)
	go SplitStream(file, chunkSize, ch)

	// used to write each chunk out as a separate file. {{outputPrefix}}.{{i}}
	i := 0

	// write each chunk out to a separate file
	for chunk := range ch {
		if chunk.Err != nil {
			return chunk.Err
		}
		err := ioutil.WriteFile(outputPrefix+"."+strconv.Itoa(i), chunk.Data, 0600)
		if err != nil {
			return err
		}
		i = i + 1
	}
	return nil
}