Adding chunking by block to erasure-demo via --block-size parameter

This commit is contained in:
Frederick F. Kautz IV 2014-12-01 21:06:36 -08:00
parent 59c1197f47
commit 46b08681a4
3 changed files with 108 additions and 39 deletions

View File

@ -24,36 +24,68 @@ func decode(c *cli.Context) {
k := config.k k := config.k
m := config.m m := config.m
// get chunks
chunks := make([][]byte, k+m) // check if output file exists, fail if so
for i := 0; i < k+m; i++ { if _, err := os.Stat(config.output); !os.IsNotExist(err) {
chunks[i], _ = ioutil.ReadFile(config.input + "." + strconv.Itoa(i)) log.Fatal("Output file exists")
} }
// get length // get list of files
lengthBytes, err := ioutil.ReadFile(config.input + ".length") var inputFiles []string
if err != nil { if _, err := os.Stat(config.input + ".length"); os.IsNotExist(err) {
log.Fatal(err) err = nil
} chunkCount := 0
lengthString := string(lengthBytes) for !os.IsNotExist(err) {
length, err := strconv.Atoi(lengthString) _, err = os.Stat(config.input + "." + strconv.Itoa(chunkCount) + ".length")
if err != nil { chunkCount += 1
log.Fatal(err) }
} chunkCount = chunkCount - 1
inputFiles = make([]string, chunkCount)
// set up encoder for i := 0; i < chunkCount; i++ {
erasureParameters, _ := erasure.ParseEncoderParams(k, m, erasure.CAUCHY) inputFiles[i] = config.input + "." + strconv.Itoa(i)
}
// decode data
decodedData, err := erasure.Decode(chunks, erasureParameters, length)
if err != nil {
log.Fatal(err)
}
// write decode data out
if _, err := os.Stat(config.output); os.IsNotExist(err) {
ioutil.WriteFile(config.output, decodedData, 0600)
} else { } else {
log.Fatal("Output file already exists") inputFiles = []string{config.input}
}
// open file to write
outputFile, err := os.OpenFile(config.output, os.O_CREATE|os.O_WRONLY, 0600)
defer outputFile.Close()
if err != nil {
log.Fatal(err)
}
for _, inputFile := range inputFiles {
// get chunks
chunks := make([][]byte, k+m)
for i := 0; i < k+m; i++ {
chunks[i], _ = ioutil.ReadFile(inputFile + "." + strconv.Itoa(i))
}
// get length
lengthBytes, err := ioutil.ReadFile(inputFile + ".length")
if err != nil {
log.Fatal(err)
}
lengthString := string(lengthBytes)
length, err := strconv.Atoi(lengthString)
if err != nil {
log.Fatal(err)
}
// set up encoder
erasureParameters, _ := erasure.ParseEncoderParams(k, m, erasure.CAUCHY)
// decode data
decodedData, err := erasure.Decode(chunks, erasureParameters, length)
if err != nil {
log.Fatal(err)
}
// append decoded data
length, err = outputFile.Write(decodedData)
if err != nil {
log.Fatal(err)
}
} }
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
@ -8,6 +9,7 @@ import (
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/minio-io/minio/pkgs/erasure" "github.com/minio-io/minio/pkgs/erasure"
"github.com/minio-io/minio/pkgs/split"
) )
func encode(c *cli.Context) { func encode(c *cli.Context) {
@ -24,6 +26,7 @@ func encode(c *cli.Context) {
// get file // get file
inputFile, err := os.Open(config.input) inputFile, err := os.Open(config.input)
defer inputFile.Close()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -37,11 +40,27 @@ func encode(c *cli.Context) {
// set up encoder // set up encoder
erasureParameters, _ := erasure.ParseEncoderParams(config.k, config.m, erasure.CAUCHY) erasureParameters, _ := erasure.ParseEncoderParams(config.k, config.m, erasure.CAUCHY)
// encode data // encode data
encodedData, length := erasure.Encode(input, erasureParameters) if config.blockSize == 0 {
encodedData, length := erasure.Encode(input, erasureParameters)
// write encoded data out for key, data := range encodedData {
for key, data := range encodedData { ioutil.WriteFile(config.output+"."+strconv.Itoa(key), data, 0600)
ioutil.WriteFile(config.output+"."+strconv.Itoa(key), data, 0600) ioutil.WriteFile(config.output+".length", []byte(strconv.Itoa(length)), 0600)
}
} else {
chunkCount := 0
splitChannel := make(chan split.ByteMessage)
inputReader := bytes.NewReader(input)
go split.SplitStream(inputReader, config.blockSize, splitChannel)
for chunk := range splitChannel {
if chunk.Err != nil {
log.Fatal(chunk.Err)
}
encodedData, length := erasure.Encode(chunk.Data, erasureParameters)
for key, data := range encodedData {
ioutil.WriteFile(config.output+"."+strconv.Itoa(chunkCount)+"."+strconv.Itoa(key), data, 0600)
ioutil.WriteFile(config.output+"."+strconv.Itoa(chunkCount)+".length", []byte(strconv.Itoa(length)), 0600)
}
chunkCount += 1
}
} }
ioutil.WriteFile(config.output+".length", []byte(strconv.Itoa(length)), 0600)
} }

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
"github.com/minio-io/minio/pkgs/strbyteconv"
) )
func main() { func main() {
@ -29,6 +30,11 @@ func main() {
Value: "10,6", Value: "10,6",
Usage: "data,parity", Usage: "data,parity",
}, },
cli.StringFlag{
Name: "block-size",
Value: "1M",
Usage: "Size of blocks. Examples: 1K, 1M, full",
},
}, },
}, },
{ {
@ -58,7 +64,7 @@ type inputConfig struct {
output string output string
k int k int
m int m int
blockSize int blockSize uint64
} }
// parses input and returns an inputConfig with parsed input // parses input and returns an inputConfig with parsed input
@ -88,10 +94,22 @@ func parseInput(c *cli.Context) (inputConfig, error) {
return inputConfig{}, err return inputConfig{}, err
} }
var blockSize uint64
blockSize = 0
if c.String("block-size") != "" {
if c.String("block-size") != "full" {
blockSize, err = strbyteconv.StringToBytes(c.String("block-size"))
if err != nil {
return inputConfig{}, err
}
}
}
return inputConfig{ return inputConfig{
input: inputFilePath, input: inputFilePath,
output: outputFilePath, output: outputFilePath,
k: k, k: k,
m: m, m: m,
blockSize: blockSize,
}, nil }, nil
} }