From c02fa65b0f39267b78f9f6f1cb4e45ecaa02c547 Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Tue, 16 Dec 2014 09:48:06 +1300 Subject: [PATCH] Split's SplitStream now creates and returns a read only channel. --- pkgs/split/split.go | 11 ++++++++--- pkgs/split/split_test.go | 3 +-- pkgs/storage/encodedstorage/encoded_storage.go | 3 +-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkgs/split/split.go b/pkgs/split/split.go index b99f6e1f5..63cea5d10 100644 --- a/pkgs/split/split.go +++ b/pkgs/split/split.go @@ -55,7 +55,13 @@ type JoinMessage struct { // for chunk := range channel { // log.Println(chunk.Data) // } -func SplitStream(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { +func SplitStream(reader io.Reader, chunkSize uint64) <-chan SplitMessage { + ch := make(chan SplitMessage) + go splitStreamGoRoutine(reader, chunkSize, ch) + return ch +} + +func splitStreamGoRoutine(reader io.Reader, chunkSize uint64, ch chan SplitMessage) { // we read until EOF or another error var readError error @@ -199,8 +205,7 @@ func SplitFilesWithPrefix(filename string, chunkstr string, outputPrefix string) } // start stream splitting goroutine - ch := make(chan SplitMessage) - go SplitStream(file, chunkSize, ch) + ch := SplitStream(file, chunkSize) // used to write each chunk out as a separate file. {{outputPrefix}}.{{i}} i := 0 diff --git a/pkgs/split/split_test.go b/pkgs/split/split_test.go index cf0733fdf..3ccddafec 100644 --- a/pkgs/split/split_test.go +++ b/pkgs/split/split_test.go @@ -38,9 +38,8 @@ func (s *MySuite) TestSplitStream(c *C) { bytesWriter.Write([]byte(strconv.Itoa(i))) } bytesWriter.Flush() - ch := make(chan SplitMessage) reader := bytes.NewReader(bytesBuffer.Bytes()) - go SplitStream(reader, 25, ch) + ch := SplitStream(reader, 25) var resultsBuffer bytes.Buffer resultsWriter := bufio.NewWriter(&resultsBuffer) for chunk := range ch { diff --git a/pkgs/storage/encodedstorage/encoded_storage.go b/pkgs/storage/encodedstorage/encoded_storage.go index a58dc8b37..637c3754a 100644 --- a/pkgs/storage/encodedstorage/encoded_storage.go +++ b/pkgs/storage/encodedstorage/encoded_storage.go @@ -136,8 +136,7 @@ func (eStorage *encodedStorage) List(objectPath string) ([]storage.ObjectDescrip func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error { // split - chunks := make(chan split.SplitMessage) - go split.SplitStream(object, eStorage.BlockSize, chunks) + chunks := split.SplitStream(object, eStorage.BlockSize) // for each chunk encoderParameters, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY)