Merge pull request #303 from fkautz/pr_out_decoding_and_returning_objects_now_works

This commit is contained in:
Frederick F. Kautz IV 2015-03-15 19:02:12 -07:00
commit 7936b05451

View File

@ -17,6 +17,7 @@
package encoded
import (
"bytes"
"errors"
"github.com/minio-io/minio/pkg/donutbox"
"github.com/minio-io/minio/pkg/encoding/erasure"
@ -67,7 +68,57 @@ func (diskStorage StorageDriver) GetBucketPolicy(bucket string) (storage.BucketP
// GetObject retrieves an object and writes it to a writer
func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) (int64, error) {
return 0, errors.New("Not Implemented")
metadata, err := diskStorage.donutBox.GetObjectMetadata(bucket, key, 0)
chunkCount, err := strconv.Atoi(metadata["chunkCount"])
columnCount, err := strconv.Atoi(metadata["columnCount"])
blockSize, err := strconv.Atoi(metadata["blockSize"])
length, err := strconv.Atoi(metadata["length"])
if err != nil {
return 0, err
}
var readers []io.Reader
for column := 0; column < columnCount; column++ {
reader, err := diskStorage.donutBox.GetObjectReader(bucket, key, uint(column))
if err != nil {
return 0, err
}
readers = append(readers, reader)
}
totalWritten := int64(length)
totalRemaining := int64(length)
if err != err {
return 0, err
}
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
decoder := erasure.NewEncoder(params)
for chunk := 0; chunk < chunkCount; chunk++ {
blocks := make([][]byte, columnCount)
for column := 0; column < columnCount; column++ {
var block bytes.Buffer
limitReader := io.LimitReader(readers[column], int64(blockSize))
_, err := io.Copy(&block, limitReader)
if err != nil {
return totalWritten, err
}
blocks[column] = block.Bytes()
}
curBlockSize := blockSize
if totalRemaining < int64(blockSize) {
curBlockSize = int(totalRemaining)
}
original, err := decoder.Decode(blocks, curBlockSize)
if err != nil {
return totalWritten, err
}
curWritten, err := io.Copy(target, bytes.NewBuffer(original))
totalRemaining = totalRemaining - curWritten
if err != nil {
return totalWritten, err
}
}
return totalWritten, nil
}
// GetPartialObject retrieves an object and writes it to a writer
@ -100,12 +151,14 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string
writers[i] = newWriter
}
totalLength := uint64(0)
chunkCount := 0
for chunk := range splitStream {
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
if err != nil {
return err
}
totalLength = totalLength + uint64(len(chunk.Data))
chunkCount = chunkCount + 1
encoder := erasure.NewEncoder(params)
if chunk.Err == nil {
parts, _ := encoder.Encode(chunk.Data)
@ -123,16 +176,20 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string
// write
}
// close connections
closeAllWriters(writers)
metadata := make(map[string]string)
metadata["length"] = strconv.FormatUint(totalLength, 10)
metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10)
metadata["chunkCount"] = strconv.FormatUint(uint64(chunkCount), 10)
metadata["columnCount"] = strconv.FormatUint(uint64(16), 10)
// metadata["md5"] := md5sum
for column := uint(0); column < 16; column++ {
writers[column].SetMetadata(metadata)
}
// TODO capture errors in writers, enough should pass before returning
closeAllWriters(writers)
return nil
}