Merge pull request #382 from abperiasamy/janitor-erasure-decode

This commit is contained in:
Harshavardhana 2015-03-25 18:13:46 -07:00
commit 638ed36cc6
3 changed files with 52 additions and 40 deletions

View File

@ -23,8 +23,8 @@ import (
"unsafe" "unsafe"
) )
// Integer to Int conversion // intSlice2CIntArray converts Go int slice to C int array
func int2CInt(srcErrList []int) *C.int32_t { func intSlice2CIntArray(srcErrList []int) *C.int32_t {
var sizeErrInt = int(unsafe.Sizeof(srcErrList[0])) var sizeErrInt = int(unsafe.Sizeof(srcErrList[0]))
switch sizeInt { switch sizeInt {
case sizeErrInt: case sizeErrInt:

View File

@ -27,11 +27,16 @@ import (
"unsafe" "unsafe"
) )
// Decode decodes 2 tuple data containing (k + m) chunks back into its original form. // Decode decodes erasure coded blocks of data into its original
// Additionally original block length should also be provided as input. // form. Erasure coded data contains K data blocks and M parity
// blocks. Decode can withstand data loss up to any M number of blocks.
// //
// Decoded data is exactly similar in length and content as the original data. // "encodedDataBlocks" is an array of K data blocks and M parity
func (e *Encoder) Decode(chunks [][]byte, length int) ([]byte, error) { // blocks. Data blocks are position and order dependent. Missing blocks
// are set to "nil". There must be at least "K" number of data|parity
// blocks.
// "dataLen" is the length of original source data
func (e *Encoder) Decode(encodedDataBlocks [][]byte, dataLen int) (decodedData []byte, err error) {
var decodeMatrix *C.uint8_t var decodeMatrix *C.uint8_t
var decodeTbls *C.uint8_t var decodeTbls *C.uint8_t
var decodeIndex *C.uint32_t var decodeIndex *C.uint32_t
@ -40,66 +45,73 @@ func (e *Encoder) Decode(chunks [][]byte, length int) ([]byte, error) {
k := int(e.params.K) k := int(e.params.K)
m := int(e.params.M) m := int(e.params.M)
n := k + m n := k + m
if len(chunks) != n { // We need the data and parity blocks preserved in the same order. Missing blocks are set to nil.
msg := fmt.Sprintf("chunks length must be %d", n) if len(encodedDataBlocks) != n {
return nil, errors.New(msg) return nil, errors.New(fmt.Sprintf("Encoded data blocks slice must of length [%d]", n))
} }
chunkLen := GetEncodedBlockLen(length, uint8(k))
errorIndex := make([]int, n+1) // Length of a single encoded block
var errCount int encodedBlockLen := GetEncodedBlockLen(dataLen, uint8(k))
for i := range chunks { // Keep track of errors per block.
// Check of chunks are really null missingEncodedBlocks := make([]int, n+1)
if chunks[i] == nil || len(chunks[i]) == 0 { var missingEncodedBlocksCount int = 0
errorIndex[errCount] = i
errCount++ // Check for the missing encoded blocks
for i := range encodedDataBlocks {
if encodedDataBlocks[i] == nil || len(encodedDataBlocks[i]) == 0 {
missingEncodedBlocks[missingEncodedBlocksCount] = i
missingEncodedBlocksCount++
} }
} }
errorIndex[errCount] = -1 missingEncodedBlocks[missingEncodedBlocksCount] = -1
errCount++ missingEncodedBlocksCount++
// Too many missing chunks, cannot be more than parity `m` // Cannot reconstruct original data. Need at least M number of data or parity blocks.
if errCount-1 > int(n-k) { if missingEncodedBlocksCount-1 > m {
return nil, errors.New("too many erasures requested, can't decode") return nil, fmt.Errorf("Cannot reconstruct original data. Need at least [%d] data or parity blocks", m)
} }
errorIndexPtr := int2CInt(errorIndex[:errCount]) // Convert from Go int slice to C int array
missingEncodedBlocksC := intSlice2CIntArray(missingEncodedBlocks[:missingEncodedBlocksCount])
for i := range chunks { // Allocate buffer for the missing blocks
if chunks[i] == nil || len(chunks[i]) == 0 { for i := range encodedDataBlocks {
chunks[i] = make([]byte, chunkLen) if encodedDataBlocks[i] == nil || len(encodedDataBlocks[i]) == 0 {
encodedDataBlocks[i] = make([]byte, encodedBlockLen)
} }
} }
C.minio_init_decoder(errorIndexPtr, C.int(k), C.int(n), C.int(errCount-1), // Initialzie decoder
C.minio_init_decoder(missingEncodedBlocksC, C.int(k), C.int(n), C.int(missingEncodedBlocksCount-1),
e.encodeMatrix, &decodeMatrix, &decodeTbls, &decodeIndex) e.encodeMatrix, &decodeMatrix, &decodeTbls, &decodeIndex)
// Make a slice of pointers to encoded blocks. Necessary to bridge to the C world.
pointers := make([]*byte, n) pointers := make([]*byte, n)
for i := range chunks { for i := range encodedDataBlocks {
pointers[i] = &chunks[i][0] pointers[i] = &encodedDataBlocks[i][0]
} }
data := (**C.uint8_t)(unsafe.Pointer(&pointers[0])) // Get pointers to source "data" and target "parity" blocks from the output byte array.
ret := C.minio_get_source_target(C.int(missingEncodedBlocksCount-1), C.int(k), C.int(m), missingEncodedBlocksC,
ret := C.minio_get_source_target(C.int(errCount-1), C.int(k), C.int(m), errorIndexPtr, decodeIndex, (**C.uint8_t)(unsafe.Pointer(&pointers[0])), &source, &target)
decodeIndex, data, &source, &target)
if int(ret) == -1 { if int(ret) == -1 {
return nil, errors.New("Decoding source target failed") return nil, errors.New("Unable to decode data")
} }
C.ec_encode_data(C.int(chunkLen), C.int(k), C.int(errCount-1), decodeTbls, // Decode data
C.ec_encode_data(C.int(encodedBlockLen), C.int(k), C.int(missingEncodedBlocksCount-1), decodeTbls,
source, target) source, target)
recoveredOutput := make([]byte, 0, chunkLen*int(k)) // Allocate buffer to output buffer
decodedData = make([]byte, 0, encodedBlockLen*int(k))
for i := 0; i < int(k); i++ { for i := 0; i < int(k); i++ {
recoveredOutput = append(recoveredOutput, chunks[i]...) decodedData = append(decodedData, encodedDataBlocks[i]...)
} }
// TODO cache this if necessary // TODO cache this if necessary
e.decodeMatrix = decodeMatrix e.decodeMatrix = decodeMatrix
e.decodeTbls = decodeTbls e.decodeTbls = decodeTbls
return recoveredOutput[:length], nil return decodedData[:dataLen], nil
} }

View File

@ -161,7 +161,7 @@ func (e *Encoder) Encode(inputData []byte) (encodedBlocks [][]byte, err error) {
} }
// Extend inputData buffer to accommodate coded parity blocks // Extend inputData buffer to accommodate coded parity blocks
if true { // create a temporary scope to trigger garbage collect { // Local Scope
encodedParityBlocksLen := encodedBlockLen * m encodedParityBlocksLen := encodedBlockLen * m
parityBlocks := make([]byte, encodedParityBlocksLen) parityBlocks := make([]byte, encodedParityBlocksLen)
inputData = append(inputData, parityBlocks...) inputData = append(inputData, parityBlocks...)