Merge pull request #395 from fkautz/pr_out_refactoring_erasure_each_block_is_encoded_by_a_function_iodine_included

This commit is contained in:
Frederick F. Kautz IV 2015-03-26 11:32:55 -07:00
commit f4790fc440
2 changed files with 76 additions and 44 deletions

View File

@ -11,8 +11,10 @@ import (
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"github.com/minio-io/iodine"
"github.com/minio-io/minio/pkg/encoding/erasure" "github.com/minio-io/minio/pkg/encoding/erasure"
"github.com/minio-io/minio/pkg/utils/split" "github.com/minio-io/minio/pkg/utils/split"
"hash"
) )
// getErasureTechnique - convert technique string into Technique type // getErasureTechnique - convert technique string into Technique type
@ -23,7 +25,7 @@ func getErasureTechnique(technique string) (erasure.Technique, error) {
case technique == "Vandermonde": case technique == "Vandermonde":
return erasure.Cauchy, nil return erasure.Cauchy, nil
default: default:
return erasure.None, errors.New("Invalid erasure technique") return erasure.None, iodine.Error(errors.New("Invalid erasure technique: "+technique), nil)
} }
} }
@ -31,82 +33,113 @@ func getErasureTechnique(technique string) (erasure.Technique, error) {
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) { func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) {
totalChunks, err := strconv.Atoi(donutMetadata["chunkCount"]) totalChunks, err := strconv.Atoi(donutMetadata["chunkCount"])
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
totalLeft, err := strconv.Atoi(donutMetadata["size"]) totalLeft, err := strconv.ParseInt(donutMetadata["size"], 10, 64)
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
blockSize, err := strconv.Atoi(donutMetadata["blockSize"]) blockSize, err := strconv.Atoi(donutMetadata["blockSize"])
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
k, err := strconv.Atoi(donutMetadata["erasureK"]) parsedk, err := strconv.ParseUint(donutMetadata["erasureK"], 10, 8)
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
m, err := strconv.Atoi(donutMetadata["erasureM"]) k := uint8(parsedk)
parsedm, err := strconv.ParseUint(donutMetadata["erasureM"], 10, 8)
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
m := uint8(parsedm)
expectedMd5sum, err := hex.DecodeString(donutMetadata["md5"]) expectedMd5sum, err := hex.DecodeString(donutMetadata["md5"])
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
technique, err := getErasureTechnique(donutMetadata["erasureTechnique"]) technique, err := getErasureTechnique(donutMetadata["erasureTechnique"])
if err != nil { if err != nil {
writer.CloseWithError(err) writer.CloseWithError(iodine.Error(err, donutMetadata))
return return
} }
summer := md5.New() hasher := md5.New()
params, _ := erasure.ParseEncoderParams(uint8(k), uint8(m), technique) params, err := erasure.ParseEncoderParams(k, m, technique)
if err != nil {
writer.CloseWithError(iodine.Error(err, donutMetadata))
}
encoder := erasure.NewEncoder(params) encoder := erasure.NewEncoder(params)
for i := 0; i < totalChunks; i++ { for i := 0; i < totalChunks; i++ {
curBlockSize := totalLeft totalLeft, err = decodeChunk(writer, readers, encoder, hasher, k, m, totalLeft, blockSize)
if blockSize < totalLeft { if err != nil {
curBlockSize = blockSize errParams := map[string]string{
} "totalLeft": strconv.FormatInt(totalLeft, 10),
curChunkSize := erasure.GetEncodedBlockLen(curBlockSize, uint8(k))
encodedBytes := make([][]byte, 16)
for i, reader := range readers {
defer reader.Close()
var bytesBuffer bytes.Buffer
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
if err != nil {
writer.CloseWithError(err)
return
} }
encodedBytes[i] = bytesBuffer.Bytes() for k, v := range donutMetadata {
errParams[k] = v
}
writer.CloseWithError(iodine.Error(err, errParams))
} }
decodedData, err := encoder.Decode(encodedBytes, curBlockSize)
if err != nil {
writer.CloseWithError(err)
return
}
summer.Write(decodedData)
_, err = io.Copy(writer, bytes.NewBuffer(decodedData))
if err != nil {
writer.CloseWithError(err)
return
}
totalLeft = totalLeft - blockSize
} }
actualMd5sum := summer.Sum(nil) actualMd5sum := hasher.Sum(nil)
if bytes.Compare(expectedMd5sum, actualMd5sum) != 0 { if bytes.Compare(expectedMd5sum, actualMd5sum) != 0 {
writer.CloseWithError(errors.New("decoded md5sum did not match")) writer.CloseWithError(iodine.Error(errors.New("decoded md5sum did not match. expected: "+string(expectedMd5sum)+" actual: "+string(actualMd5sum)), donutMetadata))
return return
} }
writer.Close() writer.Close()
return return
} }
func decodeChunk(writer *io.PipeWriter, readers []io.ReadCloser, encoder *erasure.Encoder, hasher hash.Hash, k, m uint8, totalLeft int64, blockSize int) (int64, error) {
curBlockSize := 0
if int64(blockSize) < totalLeft {
curBlockSize = blockSize
} else {
curBlockSize = int(totalLeft) // cast is safe, blockSize in if protects
}
curChunkSize := erasure.GetEncodedBlockLen(curBlockSize, uint8(k))
encodedBytes := make([][]byte, 16)
for i, reader := range readers {
defer reader.Close()
var bytesBuffer bytes.Buffer
written, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
if err != nil {
errParams := map[string]string{}
errParams["part"] = strconv.FormatInt(written, 10)
errParams["block.written"] = strconv.FormatInt(written, 10)
errParams["block.length"] = strconv.Itoa(curChunkSize)
return totalLeft, iodine.Error(err, errParams)
}
encodedBytes[i] = bytesBuffer.Bytes()
}
decodedData, err := encoder.Decode(encodedBytes, curBlockSize)
if err != nil {
errParams := map[string]string{}
errParams["block.length"] = strconv.Itoa(curChunkSize)
return totalLeft, iodine.Error(err, errParams)
}
_, err = hasher.Write(decodedData) // not expecting errors from hash, will also catch further down on .Sum mismatch in parent
if err != nil {
errParams := map[string]string{}
errParams["block.length"] = strconv.Itoa(curChunkSize)
return totalLeft, iodine.Error(err, errParams)
}
_, err = io.Copy(writer, bytes.NewBuffer(decodedData))
if err != nil {
errParams := map[string]string{}
errParams["block.length"] = strconv.Itoa(curChunkSize)
return totalLeft, iodine.Error(err, errParams)
}
totalLeft = totalLeft - int64(blockSize)
return totalLeft, nil
}
// erasure writer // erasure writer
type erasureWriter struct { type erasureWriter struct {

View File

@ -31,8 +31,7 @@ type donutObjectWriter struct {
func (d donutObjectWriter) Write(data []byte) (int, error) { func (d donutObjectWriter) Write(data []byte) (int, error) {
written, err := d.file.Write(data) written, err := d.file.Write(data)
iodine.Error(err, nil) return written, iodine.Error(err, nil)
return written, err
} }
func (d donutObjectWriter) Close() error { func (d donutObjectWriter) Close() error {