Across donut, split, nimble some code cleanup

This commit is contained in:
Harshavardhana
2015-07-06 21:54:00 -07:00
parent 7be35915bc
commit 3622fbc87d
12 changed files with 158 additions and 211 deletions

View File

@@ -25,6 +25,8 @@ import (
"os"
"strconv"
"strings"
"github.com/minio/minio/pkg/iodine"
)
// Message - message structure for results from the Stream goroutine
@@ -52,7 +54,9 @@ func Stream(reader io.Reader, chunkSize uint64) <-chan Message {
return ch
}
func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) {
func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan<- Message) {
defer close(ch)
// we read until EOF or another error
var readError error
@@ -88,8 +92,6 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) {
if readError != io.EOF {
ch <- Message{nil, readError}
}
// close the channel, signaling the channel reader that the stream is complete
close(ch)
}
// JoinFiles reads from a given directory, joins data in chunks with prefix and sends
@@ -103,12 +105,12 @@ func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan Message) {
// fmt.Println(buf)
// }
//
func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) {
func JoinFiles(dirname string, inputPrefix string) io.Reader {
reader, writer := io.Pipe()
fileInfos, readError := ioutil.ReadDir(dirname)
if readError != nil {
writer.CloseWithError(readError)
return nil, readError
return nil
}
var newfileInfos []os.FileInfo
@@ -119,16 +121,16 @@ func JoinFiles(dirname string, inputPrefix string) (io.Reader, error) {
}
if len(newfileInfos) == 0 {
nofilesError := errors.New("no files found for given prefix " + inputPrefix)
nofilesError := iodine.New(errors.New("no files found for given prefix "+inputPrefix), nil)
writer.CloseWithError(nofilesError)
return nil, nofilesError
return nil
}
go joinFilesGoRoutine(newfileInfos, writer)
return reader, nil
go joinFilesInGoRoutine(newfileInfos, writer)
return reader
}
func joinFilesGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) {
func joinFilesInGoRoutine(fileInfos []os.FileInfo, writer *io.PipeWriter) {
for _, fileInfo := range fileInfos {
file, err := os.Open(fileInfo.Name())
defer file.Close()
@@ -159,14 +161,11 @@ func FileWithPrefix(filename string, chunkSize uint64, outputPrefix string) erro
return errors.New("Invalid argument outputPrefix cannot be empty string")
}
// start stream splitting goroutine
ch := Stream(file, chunkSize)
// used to write each chunk out as a separate file. {{outputPrefix}}.{{i}}
i := 0
// write each chunk out to a separate file
for chunk := range ch {
for chunk := range Stream(file, chunkSize) {
if chunk.Err != nil {
return chunk.Err
}