2015-03-22 18:14:06 -04:00
package donut
import (
"bytes"
2015-03-24 21:05:23 -04:00
"errors"
2015-03-29 17:43:35 -04:00
"hash"
2015-03-22 18:14:06 -04:00
"io"
"strconv"
2015-03-24 21:05:23 -04:00
"strings"
2015-03-22 18:14:06 -04:00
"time"
2015-03-23 14:46:19 -04:00
"crypto/md5"
2015-03-22 23:11:25 -04:00
"encoding/hex"
2015-03-24 21:05:23 -04:00
2015-03-26 14:32:39 -04:00
"github.com/minio-io/iodine"
2015-03-29 17:43:35 -04:00
encoding "github.com/minio-io/minio/pkg/encoding/erasure"
2015-03-22 18:14:06 -04:00
"github.com/minio-io/minio/pkg/utils/split"
)
2015-03-24 21:05:23 -04:00
// getErasureTechnique - convert technique string into Technique type
2015-03-29 17:43:35 -04:00
func getErasureTechnique ( technique string ) ( encoding . Technique , error ) {
2015-03-24 21:05:23 -04:00
switch true {
case technique == "Cauchy" :
2015-03-29 17:43:35 -04:00
return encoding . Cauchy , nil
2015-03-24 21:05:23 -04:00
case technique == "Vandermonde" :
2015-03-29 17:43:35 -04:00
return encoding . Cauchy , nil
2015-03-24 21:05:23 -04:00
default :
2015-03-29 17:43:35 -04:00
return encoding . None , iodine . New ( errors . New ( "Invalid erasure technique: " + technique ) , nil )
2015-03-24 21:05:23 -04:00
}
}
2015-03-23 23:40:21 -04:00
// erasureReader - returns aligned streaming reads over a PipeWriter
2015-03-22 18:14:06 -04:00
func erasureReader ( readers [ ] io . ReadCloser , donutMetadata map [ string ] string , writer * io . PipeWriter ) {
2015-03-24 21:05:23 -04:00
totalChunks , err := strconv . Atoi ( donutMetadata [ "chunkCount" ] )
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
2015-03-26 14:32:39 -04:00
totalLeft , err := strconv . ParseInt ( donutMetadata [ "size" ] , 10 , 64 )
2015-03-24 21:05:23 -04:00
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
blockSize , err := strconv . Atoi ( donutMetadata [ "blockSize" ] )
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
2015-03-26 14:32:39 -04:00
parsedk , err := strconv . ParseUint ( donutMetadata [ "erasureK" ] , 10 , 8 )
2015-03-24 21:05:23 -04:00
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
2015-03-26 14:32:39 -04:00
k := uint8 ( parsedk )
parsedm , err := strconv . ParseUint ( donutMetadata [ "erasureM" ] , 10 , 8 )
2015-03-24 21:05:23 -04:00
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
2015-03-26 14:32:39 -04:00
m := uint8 ( parsedm )
2015-03-24 21:05:23 -04:00
expectedMd5sum , err := hex . DecodeString ( donutMetadata [ "md5" ] )
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
technique , err := getErasureTechnique ( donutMetadata [ "erasureTechnique" ] )
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-24 21:05:23 -04:00
return
}
2015-03-26 14:32:39 -04:00
hasher := md5 . New ( )
2015-03-29 17:43:35 -04:00
params , err := encoding . ValidateParams ( k , m , technique )
2015-03-26 14:32:39 -04:00
if err != nil {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , donutMetadata ) )
2015-03-26 14:32:39 -04:00
}
2015-03-29 17:43:35 -04:00
encoder := encoding . NewErasure ( params )
2015-03-22 18:14:06 -04:00
for i := 0 ; i < totalChunks ; i ++ {
2015-03-26 14:32:39 -04:00
totalLeft , err = decodeChunk ( writer , readers , encoder , hasher , k , m , totalLeft , blockSize )
2015-03-22 18:14:06 -04:00
if err != nil {
2015-03-26 14:32:39 -04:00
errParams := map [ string ] string {
"totalLeft" : strconv . FormatInt ( totalLeft , 10 ) ,
}
for k , v := range donutMetadata {
errParams [ k ] = v
}
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( err , errParams ) )
2015-03-24 21:05:23 -04:00
}
2015-03-22 18:14:06 -04:00
}
2015-03-26 14:32:39 -04:00
actualMd5sum := hasher . Sum ( nil )
2015-03-23 14:46:19 -04:00
if bytes . Compare ( expectedMd5sum , actualMd5sum ) != 0 {
2015-03-26 18:55:06 -04:00
writer . CloseWithError ( iodine . New ( errors . New ( "decoded md5sum did not match. expected: " + string ( expectedMd5sum ) + " actual: " + string ( actualMd5sum ) ) , donutMetadata ) )
2015-03-23 14:46:19 -04:00
return
2015-03-22 23:11:25 -04:00
}
2015-03-22 18:14:06 -04:00
writer . Close ( )
2015-03-24 21:05:23 -04:00
return
2015-03-22 18:14:06 -04:00
}
2015-03-29 17:43:35 -04:00
func decodeChunk ( writer * io . PipeWriter , readers [ ] io . ReadCloser , encoder * encoding . Erasure , hasher hash . Hash , k , m uint8 , totalLeft int64 , blockSize int ) ( int64 , error ) {
2015-03-26 14:32:39 -04:00
curBlockSize := 0
if int64 ( blockSize ) < totalLeft {
curBlockSize = blockSize
} else {
curBlockSize = int ( totalLeft ) // cast is safe, blockSize in if protects
}
2015-03-29 17:43:35 -04:00
curChunkSize := encoding . GetEncodedBlockLen ( curBlockSize , uint8 ( k ) )
2015-03-26 14:32:39 -04:00
encodedBytes := make ( [ ] [ ] byte , 16 )
for i , reader := range readers {
var bytesBuffer bytes . Buffer
written , err := io . CopyN ( & bytesBuffer , reader , int64 ( curChunkSize ) )
if err != nil {
errParams := map [ string ] string { }
errParams [ "part" ] = strconv . FormatInt ( written , 10 )
errParams [ "block.written" ] = strconv . FormatInt ( written , 10 )
errParams [ "block.length" ] = strconv . Itoa ( curChunkSize )
2015-03-26 18:55:06 -04:00
return totalLeft , iodine . New ( err , errParams )
2015-03-26 14:32:39 -04:00
}
encodedBytes [ i ] = bytesBuffer . Bytes ( )
}
decodedData , err := encoder . Decode ( encodedBytes , curBlockSize )
if err != nil {
errParams := map [ string ] string { }
errParams [ "block.length" ] = strconv . Itoa ( curChunkSize )
2015-03-26 18:55:06 -04:00
return totalLeft , iodine . New ( err , errParams )
2015-03-26 14:32:39 -04:00
}
_ , err = hasher . Write ( decodedData ) // not expecting errors from hash, will also catch further down on .Sum mismatch in parent
if err != nil {
errParams := map [ string ] string { }
errParams [ "block.length" ] = strconv . Itoa ( curChunkSize )
2015-03-26 18:55:06 -04:00
return totalLeft , iodine . New ( err , errParams )
2015-03-26 14:32:39 -04:00
}
_ , err = io . Copy ( writer , bytes . NewBuffer ( decodedData ) )
if err != nil {
errParams := map [ string ] string { }
errParams [ "block.length" ] = strconv . Itoa ( curChunkSize )
2015-03-26 18:55:06 -04:00
return totalLeft , iodine . New ( err , errParams )
2015-03-26 14:32:39 -04:00
}
totalLeft = totalLeft - int64 ( blockSize )
return totalLeft , nil
}
2015-03-22 18:14:06 -04:00
// erasure writer
type erasureWriter struct {
writers [ ] Writer
metadata map [ string ] string
donutMetadata map [ string ] string // not exposed
erasureWriter * io . PipeWriter
isClosed <- chan bool
}
2015-03-23 23:40:21 -04:00
// newErasureWriter - get a new writer
2015-03-22 18:14:06 -04:00
func newErasureWriter ( writers [ ] Writer ) ObjectWriter {
r , w := io . Pipe ( )
isClosed := make ( chan bool )
writer := erasureWriter {
writers : writers ,
metadata : make ( map [ string ] string ) ,
erasureWriter : w ,
isClosed : isClosed ,
}
go erasureGoroutine ( r , writer , isClosed )
return writer
}
func erasureGoroutine ( r * io . PipeReader , eWriter erasureWriter , isClosed chan <- bool ) {
chunks := split . Stream ( r , 10 * 1024 * 1024 )
2015-03-29 17:43:35 -04:00
params , _ := encoding . ValidateParams ( 8 , 8 , encoding . Cauchy )
encoder := encoding . NewErasure ( params )
2015-03-22 18:14:06 -04:00
chunkCount := 0
totalLength := 0
2015-03-23 14:46:19 -04:00
summer := md5 . New ( )
2015-03-22 18:14:06 -04:00
for chunk := range chunks {
if chunk . Err == nil {
totalLength = totalLength + len ( chunk . Data )
encodedBlocks , _ := encoder . Encode ( chunk . Data )
2015-03-22 23:11:25 -04:00
summer . Write ( chunk . Data )
2015-03-22 18:14:06 -04:00
for blockIndex , block := range encodedBlocks {
io . Copy ( eWriter . writers [ blockIndex ] , bytes . NewBuffer ( block ) )
}
}
chunkCount = chunkCount + 1
}
2015-03-23 14:46:19 -04:00
dataMd5sum := summer . Sum ( nil )
2015-03-22 18:14:06 -04:00
metadata := make ( map [ string ] string )
metadata [ "blockSize" ] = strconv . Itoa ( 10 * 1024 * 1024 )
metadata [ "chunkCount" ] = strconv . Itoa ( chunkCount )
metadata [ "created" ] = time . Now ( ) . Format ( time . RFC3339Nano )
metadata [ "erasureK" ] = "8"
metadata [ "erasureM" ] = "8"
metadata [ "erasureTechnique" ] = "Cauchy"
2015-03-23 14:46:19 -04:00
metadata [ "md5" ] = hex . EncodeToString ( dataMd5sum )
2015-03-23 23:06:15 -04:00
metadata [ "size" ] = strconv . Itoa ( totalLength )
2015-03-22 18:14:06 -04:00
for _ , nodeWriter := range eWriter . writers {
if nodeWriter != nil {
nodeWriter . SetMetadata ( eWriter . metadata )
nodeWriter . SetDonutMetadata ( metadata )
nodeWriter . Close ( )
}
}
isClosed <- true
}
2015-03-23 15:25:15 -04:00
func ( eWriter erasureWriter ) Write ( data [ ] byte ) ( int , error ) {
io . Copy ( eWriter . erasureWriter , bytes . NewBuffer ( data ) )
2015-03-22 18:14:06 -04:00
return len ( data ) , nil
}
2015-03-23 15:25:15 -04:00
func ( eWriter erasureWriter ) Close ( ) error {
eWriter . erasureWriter . Close ( )
<- eWriter . isClosed
2015-03-22 18:14:06 -04:00
return nil
}
2015-03-23 15:25:15 -04:00
func ( eWriter erasureWriter ) CloseWithError ( err error ) error {
for _ , writer := range eWriter . writers {
2015-03-22 18:14:06 -04:00
if writer != nil {
writer . CloseWithError ( err )
}
}
return nil
}
2015-03-23 15:25:15 -04:00
func ( eWriter erasureWriter ) SetMetadata ( metadata map [ string ] string ) error {
2015-03-23 15:26:41 -04:00
for k := range metadata {
2015-03-23 14:56:16 -04:00
if strings . HasPrefix ( k , "sys." ) {
return errors . New ( "Invalid key '" + k + "', cannot start with sys.'" )
}
}
2015-03-23 15:25:15 -04:00
for k := range eWriter . metadata {
delete ( eWriter . metadata , k )
2015-03-22 18:14:06 -04:00
}
for k , v := range metadata {
2015-03-23 15:25:15 -04:00
eWriter . metadata [ k ] = v
2015-03-22 18:14:06 -04:00
}
return nil
}
2015-03-23 15:25:15 -04:00
func ( eWriter erasureWriter ) GetMetadata ( ) ( map [ string ] string , error ) {
2015-03-22 18:14:06 -04:00
metadata := make ( map [ string ] string )
2015-03-23 15:25:15 -04:00
for k , v := range eWriter . metadata {
2015-03-22 18:14:06 -04:00
metadata [ k ] = v
}
return metadata , nil
}