mirror of
https://github.com/minio/minio.git
synced 2025-01-13 07:53:21 -05:00
247 lines
7.1 KiB
Go
247 lines
7.1 KiB
Go
package donut
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
|
|
"github.com/minio-io/iodine"
|
|
"github.com/minio-io/minio/pkg/encoding/erasure"
|
|
"github.com/minio-io/minio/pkg/utils/split"
|
|
"hash"
|
|
)
|
|
|
|
// getErasureTechnique - convert technique string into Technique type
|
|
func getErasureTechnique(technique string) (erasure.Technique, error) {
|
|
switch true {
|
|
case technique == "Cauchy":
|
|
return erasure.Cauchy, nil
|
|
case technique == "Vandermonde":
|
|
return erasure.Cauchy, nil
|
|
default:
|
|
return erasure.None, iodine.Error(errors.New("Invalid erasure technique: "+technique), nil)
|
|
}
|
|
}
|
|
|
|
// erasureReader - returns aligned streaming reads over a PipeWriter
|
|
func erasureReader(readers []io.ReadCloser, donutMetadata map[string]string, writer *io.PipeWriter) {
|
|
totalChunks, err := strconv.Atoi(donutMetadata["chunkCount"])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
totalLeft, err := strconv.ParseInt(donutMetadata["size"], 10, 64)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
blockSize, err := strconv.Atoi(donutMetadata["blockSize"])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
parsedk, err := strconv.ParseUint(donutMetadata["erasureK"], 10, 8)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
k := uint8(parsedk)
|
|
parsedm, err := strconv.ParseUint(donutMetadata["erasureM"], 10, 8)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
m := uint8(parsedm)
|
|
expectedMd5sum, err := hex.DecodeString(donutMetadata["md5"])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
technique, err := getErasureTechnique(donutMetadata["erasureTechnique"])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
return
|
|
}
|
|
hasher := md5.New()
|
|
params, err := erasure.ParseEncoderParams(k, m, technique)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.Error(err, donutMetadata))
|
|
}
|
|
encoder := erasure.NewEncoder(params)
|
|
for i := 0; i < totalChunks; i++ {
|
|
totalLeft, err = decodeChunk(writer, readers, encoder, hasher, k, m, totalLeft, blockSize)
|
|
if err != nil {
|
|
errParams := map[string]string{
|
|
"totalLeft": strconv.FormatInt(totalLeft, 10),
|
|
}
|
|
for k, v := range donutMetadata {
|
|
errParams[k] = v
|
|
}
|
|
writer.CloseWithError(iodine.Error(err, errParams))
|
|
}
|
|
}
|
|
actualMd5sum := hasher.Sum(nil)
|
|
if bytes.Compare(expectedMd5sum, actualMd5sum) != 0 {
|
|
writer.CloseWithError(iodine.Error(errors.New("decoded md5sum did not match. expected: "+string(expectedMd5sum)+" actual: "+string(actualMd5sum)), donutMetadata))
|
|
return
|
|
}
|
|
writer.Close()
|
|
return
|
|
}
|
|
|
|
func decodeChunk(writer *io.PipeWriter, readers []io.ReadCloser, encoder *erasure.Encoder, hasher hash.Hash, k, m uint8, totalLeft int64, blockSize int) (int64, error) {
|
|
curBlockSize := 0
|
|
if int64(blockSize) < totalLeft {
|
|
curBlockSize = blockSize
|
|
} else {
|
|
curBlockSize = int(totalLeft) // cast is safe, blockSize in if protects
|
|
}
|
|
|
|
curChunkSize := erasure.GetEncodedBlockLen(curBlockSize, uint8(k))
|
|
encodedBytes := make([][]byte, 16)
|
|
for i, reader := range readers {
|
|
defer reader.Close()
|
|
var bytesBuffer bytes.Buffer
|
|
written, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
|
|
if err != nil {
|
|
errParams := map[string]string{}
|
|
errParams["part"] = strconv.FormatInt(written, 10)
|
|
errParams["block.written"] = strconv.FormatInt(written, 10)
|
|
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
|
return totalLeft, iodine.Error(err, errParams)
|
|
}
|
|
encodedBytes[i] = bytesBuffer.Bytes()
|
|
}
|
|
decodedData, err := encoder.Decode(encodedBytes, curBlockSize)
|
|
if err != nil {
|
|
errParams := map[string]string{}
|
|
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
|
return totalLeft, iodine.Error(err, errParams)
|
|
}
|
|
_, err = hasher.Write(decodedData) // not expecting errors from hash, will also catch further down on .Sum mismatch in parent
|
|
if err != nil {
|
|
errParams := map[string]string{}
|
|
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
|
return totalLeft, iodine.Error(err, errParams)
|
|
}
|
|
_, err = io.Copy(writer, bytes.NewBuffer(decodedData))
|
|
if err != nil {
|
|
errParams := map[string]string{}
|
|
errParams["block.length"] = strconv.Itoa(curChunkSize)
|
|
return totalLeft, iodine.Error(err, errParams)
|
|
}
|
|
totalLeft = totalLeft - int64(blockSize)
|
|
return totalLeft, nil
|
|
}
|
|
|
|
// erasure writer
|
|
|
|
type erasureWriter struct {
|
|
writers []Writer
|
|
metadata map[string]string
|
|
donutMetadata map[string]string // not exposed
|
|
erasureWriter *io.PipeWriter
|
|
isClosed <-chan bool
|
|
}
|
|
|
|
// newErasureWriter - get a new writer
|
|
func newErasureWriter(writers []Writer) ObjectWriter {
|
|
r, w := io.Pipe()
|
|
isClosed := make(chan bool)
|
|
writer := erasureWriter{
|
|
writers: writers,
|
|
metadata: make(map[string]string),
|
|
erasureWriter: w,
|
|
isClosed: isClosed,
|
|
}
|
|
go erasureGoroutine(r, writer, isClosed)
|
|
return writer
|
|
}
|
|
|
|
func erasureGoroutine(r *io.PipeReader, eWriter erasureWriter, isClosed chan<- bool) {
|
|
chunks := split.Stream(r, 10*1024*1024)
|
|
params, _ := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
|
|
encoder := erasure.NewEncoder(params)
|
|
chunkCount := 0
|
|
totalLength := 0
|
|
summer := md5.New()
|
|
for chunk := range chunks {
|
|
if chunk.Err == nil {
|
|
totalLength = totalLength + len(chunk.Data)
|
|
encodedBlocks, _ := encoder.Encode(chunk.Data)
|
|
summer.Write(chunk.Data)
|
|
for blockIndex, block := range encodedBlocks {
|
|
io.Copy(eWriter.writers[blockIndex], bytes.NewBuffer(block))
|
|
}
|
|
}
|
|
chunkCount = chunkCount + 1
|
|
}
|
|
dataMd5sum := summer.Sum(nil)
|
|
metadata := make(map[string]string)
|
|
metadata["blockSize"] = strconv.Itoa(10 * 1024 * 1024)
|
|
metadata["chunkCount"] = strconv.Itoa(chunkCount)
|
|
metadata["created"] = time.Now().Format(time.RFC3339Nano)
|
|
metadata["erasureK"] = "8"
|
|
metadata["erasureM"] = "8"
|
|
metadata["erasureTechnique"] = "Cauchy"
|
|
metadata["md5"] = hex.EncodeToString(dataMd5sum)
|
|
metadata["size"] = strconv.Itoa(totalLength)
|
|
for _, nodeWriter := range eWriter.writers {
|
|
if nodeWriter != nil {
|
|
nodeWriter.SetMetadata(eWriter.metadata)
|
|
nodeWriter.SetDonutMetadata(metadata)
|
|
nodeWriter.Close()
|
|
}
|
|
}
|
|
isClosed <- true
|
|
}
|
|
|
|
func (eWriter erasureWriter) Write(data []byte) (int, error) {
|
|
io.Copy(eWriter.erasureWriter, bytes.NewBuffer(data))
|
|
return len(data), nil
|
|
}
|
|
|
|
func (eWriter erasureWriter) Close() error {
|
|
eWriter.erasureWriter.Close()
|
|
<-eWriter.isClosed
|
|
return nil
|
|
}
|
|
|
|
func (eWriter erasureWriter) CloseWithError(err error) error {
|
|
for _, writer := range eWriter.writers {
|
|
if writer != nil {
|
|
writer.CloseWithError(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (eWriter erasureWriter) SetMetadata(metadata map[string]string) error {
|
|
for k := range metadata {
|
|
if strings.HasPrefix(k, "sys.") {
|
|
return errors.New("Invalid key '" + k + "', cannot start with sys.'")
|
|
}
|
|
}
|
|
for k := range eWriter.metadata {
|
|
delete(eWriter.metadata, k)
|
|
}
|
|
for k, v := range metadata {
|
|
eWriter.metadata[k] = v
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (eWriter erasureWriter) GetMetadata() (map[string]string, error) {
|
|
metadata := make(map[string]string)
|
|
for k, v := range eWriter.metadata {
|
|
metadata[k] = v
|
|
}
|
|
return metadata, nil
|
|
}
|