Encoder now directly reads from the object stream. Using split.Stream() was causing lot of redundant memory operations.

This commit is contained in:
Krishna Srinivas
2015-07-26 02:17:19 +05:30
parent ed07310471
commit bcfaa12a4d
6 changed files with 79 additions and 17 deletions

View File

@@ -33,7 +33,6 @@ import (
"github.com/minio/minio/pkg/crypto/sha256"
"github.com/minio/minio/pkg/crypto/sha512"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/donut/split"
"github.com/minio/minio/pkg/iodine"
)
@@ -237,7 +236,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
}
// WriteObject - write a new object into bucket
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
func (b bucket) WriteObject(objectName string, objectData io.Reader, size int64, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
b.lock.Lock()
defer b.lock.Unlock()
if objectName == "" || objectData == nil {
@@ -272,7 +271,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
}
mwriter := io.MultiWriter(sumMD5, sum256, sum512)
// write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, mwriter)
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, size, mwriter)
if err != nil {
CleanupWritersOnError(writers)
return ObjectMetadata{}, iodine.New(err, nil)
@@ -422,32 +421,40 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error)
}
// writeObjectData -
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, writer io.Writer) (int, int, error) {
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, size int64, writer io.Writer) (int, int, error) {
encoder, err := newEncoder(k, m, "Cauchy")
chunkSize := int64(10 * 1024 * 1024)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
chunkCount := 0
totalLength := 0
for chunk := range split.Stream(objectData, 10*1024*1024) {
if chunk.Err != nil {
return 0, 0, iodine.New(err, nil)
remaining := size
for remaining > 0 {
readSize := chunkSize
if remaining < chunkSize {
readSize = remaining
}
totalLength = totalLength + len(chunk.Data)
encodedBlocks, err := encoder.Encode(chunk.Data)
remaining = remaining - readSize
totalLength = totalLength + int(readSize)
encodedBlocks, inputData, err := encoder.EncodeStream(objectData, readSize)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
_, err = writer.Write(inputData)
if err != nil {
return 0, 0, iodine.New(err, nil)
}
writer.Write(chunk.Data)
for blockIndex, block := range encodedBlocks {
errCh := make(chan error, 1)
go func(writer io.Writer, reader io.Reader) {
// FIXME: this closes the errCh in the outer scope
defer close(errCh)
_, err := io.Copy(writers[blockIndex], bytes.NewReader(block))
_, err := io.Copy(writer, reader)
errCh <- err
}(writers[blockIndex], bytes.NewReader(block))
if err := <-errCh; err != nil {
// FIXME: fix premature return in case of err != nil
return 0, 0, iodine.New(err, nil)
}
}

View File

@@ -134,7 +134,7 @@ func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys i
}
// putObject - put object
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
@@ -158,7 +158,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
}
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata, signature)
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, size, expectedMD5Sum, metadata, signature)
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
@@ -170,7 +170,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read
}
// putObject - put object
func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, metadata map[string]string, signature *Signature) (PartMetadata, error) {
func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (PartMetadata, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
@@ -198,7 +198,7 @@ func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string,
return PartMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
}
objectPart := object + "/" + "multipart" + "/" + strconv.Itoa(partID)
objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, expectedMD5Sum, metadata, signature)
objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, size, expectedMD5Sum, metadata, signature)
if err != nil {
return PartMetadata{}, iodine.New(err, errParams)
}

View File

@@ -373,6 +373,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
key,
expectedMD5Sum,
data,
size,
map[string]string{
"contentType": contentType,
"contentLength": strconv.FormatInt(size, 10),

View File

@@ -17,6 +17,7 @@
package donut
import (
"io"
"strconv"
encoding "github.com/minio/minio/pkg/erasure"
@@ -86,6 +87,14 @@ func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) {
return encodedData, nil
}
func (e encoder) EncodeStream(data io.Reader, size int64) (encodedData [][]byte, inputData []byte, err error) {
encodedData, inputData, err = e.encoder.EncodeStream(data, size)
if err != nil {
return nil, nil, iodine.New(err, nil)
}
return encodedData, inputData, nil
}
// Decode - erasure decode input encoded bytes
func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) {
decodedData, err := e.encoder.Decode(encodedData, dataLength)

View File

@@ -155,7 +155,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, metadata, signature)
partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, size, metadata, signature)
if err != nil {
return "", iodine.New(err, nil)
}