From bcfaa12a4db986dcb8fdb315c72b41a1c499b497 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Sun, 26 Jul 2015 02:17:19 +0530 Subject: [PATCH] Encoder now directly reads from the object stream. Using split.Stream() was causing lot of redundant memory operations. --- pkg/donut/bucket.go | 31 ++++++++++++++---------- pkg/donut/donut-v1.go | 8 +++---- pkg/donut/donut-v2.go | 1 + pkg/donut/encoder.go | 9 +++++++ pkg/donut/multipart.go | 2 +- pkg/erasure/erasure_encode.go | 45 +++++++++++++++++++++++++++++++++++ 6 files changed, 79 insertions(+), 17 deletions(-) diff --git a/pkg/donut/bucket.go b/pkg/donut/bucket.go index 7befb7928..a6c0ff999 100644 --- a/pkg/donut/bucket.go +++ b/pkg/donut/bucket.go @@ -33,7 +33,6 @@ import ( "github.com/minio/minio/pkg/crypto/sha256" "github.com/minio/minio/pkg/crypto/sha512" "github.com/minio/minio/pkg/donut/disk" - "github.com/minio/minio/pkg/donut/split" "github.com/minio/minio/pkg/iodine" ) @@ -237,7 +236,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, } // WriteObject - write a new object into bucket -func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) { +func (b bucket) WriteObject(objectName string, objectData io.Reader, size int64, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) { b.lock.Lock() defer b.lock.Unlock() if objectName == "" || objectData == nil { @@ -272,7 +271,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 } mwriter := io.MultiWriter(sumMD5, sum256, sum512) // write encoded data with k, m and writers - chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, mwriter) + chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, size, mwriter) if err != nil { CleanupWritersOnError(writers) return ObjectMetadata{}, iodine.New(err, nil) @@ -422,32 +421,40 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) } // writeObjectData - -func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, writer io.Writer) (int, int, error) { +func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, size int64, writer io.Writer) (int, int, error) { encoder, err := newEncoder(k, m, "Cauchy") + chunkSize := int64(10 * 1024 * 1024) if err != nil { return 0, 0, iodine.New(err, nil) } chunkCount := 0 totalLength := 0 - for chunk := range split.Stream(objectData, 10*1024*1024) { - if chunk.Err != nil { - return 0, 0, iodine.New(err, nil) + remaining := size + for remaining > 0 { + readSize := chunkSize + if remaining < chunkSize { + readSize = remaining } - totalLength = totalLength + len(chunk.Data) - encodedBlocks, err := encoder.Encode(chunk.Data) + remaining = remaining - readSize + totalLength = totalLength + int(readSize) + encodedBlocks, inputData, err := encoder.EncodeStream(objectData, readSize) + if err != nil { + return 0, 0, iodine.New(err, nil) + } + _, err = writer.Write(inputData) if err != nil { return 0, 0, iodine.New(err, nil) } - - writer.Write(chunk.Data) for blockIndex, block := range encodedBlocks { errCh := make(chan error, 1) go func(writer io.Writer, reader io.Reader) { + // FIXME: this closes the errCh in the outer scope defer close(errCh) - _, err := io.Copy(writers[blockIndex], bytes.NewReader(block)) + _, err := io.Copy(writer, reader) errCh <- err }(writers[blockIndex], bytes.NewReader(block)) if err := <-errCh; err != nil { + // FIXME: fix premature return in case of err != nil return 0, 0, iodine.New(err, nil) } } diff --git a/pkg/donut/donut-v1.go b/pkg/donut/donut-v1.go index 26883c9ea..5129bd606 100644 --- a/pkg/donut/donut-v1.go +++ b/pkg/donut/donut-v1.go @@ -134,7 +134,7 @@ func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys i } // putObject - put object -func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string, signature *Signature) (ObjectMetadata, error) { +func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (ObjectMetadata, error) { errParams := map[string]string{ "bucket": bucket, "object": object, @@ -158,7 +158,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok { return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams) } - objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata, signature) + objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, size, expectedMD5Sum, metadata, signature) if err != nil { return ObjectMetadata{}, iodine.New(err, errParams) } @@ -170,7 +170,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read } // putObject - put object -func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, metadata map[string]string, signature *Signature) (PartMetadata, error) { +func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (PartMetadata, error) { errParams := map[string]string{ "bucket": bucket, "object": object, @@ -198,7 +198,7 @@ func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, return PartMetadata{}, iodine.New(ObjectExists{Object: object}, errParams) } objectPart := object + "/" + "multipart" + "/" + strconv.Itoa(partID) - objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, expectedMD5Sum, metadata, signature) + objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, size, expectedMD5Sum, metadata, signature) if err != nil { return PartMetadata{}, iodine.New(err, errParams) } diff --git a/pkg/donut/donut-v2.go b/pkg/donut/donut-v2.go index 2010e102c..51f06cf87 100644 --- a/pkg/donut/donut-v2.go +++ b/pkg/donut/donut-v2.go @@ -373,6 +373,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s key, expectedMD5Sum, data, + size, map[string]string{ "contentType": contentType, "contentLength": strconv.FormatInt(size, 10), diff --git a/pkg/donut/encoder.go b/pkg/donut/encoder.go index 042bf60da..ee5005a53 100644 --- a/pkg/donut/encoder.go +++ b/pkg/donut/encoder.go @@ -17,6 +17,7 @@ package donut import ( + "io" "strconv" encoding "github.com/minio/minio/pkg/erasure" @@ -86,6 +87,14 @@ func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) { return encodedData, nil } +func (e encoder) EncodeStream(data io.Reader, size int64) (encodedData [][]byte, inputData []byte, err error) { + encodedData, inputData, err = e.encoder.EncodeStream(data, size) + if err != nil { + return nil, nil, iodine.New(err, nil) + } + return encodedData, inputData, nil +} + // Decode - erasure decode input encoded bytes func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) { decodedData, err := e.encoder.Decode(encodedData, dataLength) diff --git a/pkg/donut/multipart.go b/pkg/donut/multipart.go index 4dd826bac..4e316427b 100644 --- a/pkg/donut/multipart.go +++ b/pkg/donut/multipart.go @@ -155,7 +155,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, metadata, signature) + partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, size, metadata, signature) if err != nil { return "", iodine.New(err, nil) } diff --git a/pkg/erasure/erasure_encode.go b/pkg/erasure/erasure_encode.go index 39a99416e..c17991b43 100644 --- a/pkg/erasure/erasure_encode.go +++ b/pkg/erasure/erasure_encode.go @@ -23,6 +23,7 @@ package erasure import "C" import ( "errors" + "io" "unsafe" ) @@ -195,3 +196,47 @@ func (e *Erasure) Encode(inputData []byte) (encodedBlocks [][]byte, err error) { return encodedBlocks, nil } + +// EncodeStream erasure codes a block of data in "k" data blocks and "m" parity blocks. +// Output is [k+m][]blocks of data and parity slices. +func (e *Erasure) EncodeStream(data io.Reader, size int64) ([][]byte, []byte, error) { + k := int(e.params.K) // "k" data blocks + m := int(e.params.M) // "m" parity blocks + n := k + m // "n" total encoded blocks + + // Length of a single encoded chunk. + // Total number of encoded chunks = "k" data + "m" parity blocks + encodedBlockLen := GetEncodedBlockLen(int(size), uint8(k)) + + // Length of total number of "n" data chunks + encodedDataBlocksLen := encodedBlockLen * n + + inputData := make([]byte, size, encodedDataBlocksLen) + + _, err := io.ReadFull(data, inputData) + if err != nil { + return nil, nil, err + } + + // Allocate memory to the "encoded blocks" return buffer + encodedBlocks := make([][]byte, n) // Return buffer + + // Nessary to bridge Go to the C world. C requires 2D arry of pointers to + // byte array. "encodedBlocks" is a 2D slice. + pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks. + + // Copy data block slices to encoded block buffer + for i := 0; i < n; i++ { + encodedBlocks[i] = inputData[i*encodedBlockLen : (i+1)*encodedBlockLen] + pointersToEncodedBlock[i] = &encodedBlocks[i][0] + } + + // Erasure code the data into K data blocks and M parity + // blocks. Only the parity blocks are filled. Data blocks remain + // intact. + C.ec_encode_data(C.int(encodedBlockLen), C.int(k), C.int(m), e.encodeTbls, + (**C.uchar)(unsafe.Pointer(&pointersToEncodedBlock[:k][0])), // Pointers to data blocks + (**C.uchar)(unsafe.Pointer(&pointersToEncodedBlock[k:][0]))) // Pointers to parity blocks + + return encodedBlocks, inputData[0:size], nil +}