diff --git a/pkg/storage/encoded/encoded.go b/pkg/storage/encoded/encoded.go index 0a1cecc58..0b34ddd48 100644 --- a/pkg/storage/encoded/encoded.go +++ b/pkg/storage/encoded/encoded.go @@ -17,6 +17,7 @@ package encoded import ( + "bytes" "errors" "github.com/minio-io/minio/pkg/donutbox" "github.com/minio-io/minio/pkg/encoding/erasure" @@ -67,7 +68,57 @@ func (diskStorage StorageDriver) GetBucketPolicy(bucket string) (storage.BucketP // GetObject retrieves an object and writes it to a writer func (diskStorage StorageDriver) GetObject(target io.Writer, bucket, key string) (int64, error) { - return 0, errors.New("Not Implemented") + metadata, err := diskStorage.donutBox.GetObjectMetadata(bucket, key, 0) + chunkCount, err := strconv.Atoi(metadata["chunkCount"]) + columnCount, err := strconv.Atoi(metadata["columnCount"]) + blockSize, err := strconv.Atoi(metadata["blockSize"]) + length, err := strconv.Atoi(metadata["length"]) + if err != nil { + return 0, err + } + var readers []io.Reader + for column := 0; column < columnCount; column++ { + reader, err := diskStorage.donutBox.GetObjectReader(bucket, key, uint(column)) + if err != nil { + return 0, err + } + readers = append(readers, reader) + } + + totalWritten := int64(length) + totalRemaining := int64(length) + if err != err { + return 0, err + } + params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) + decoder := erasure.NewEncoder(params) + for chunk := 0; chunk < chunkCount; chunk++ { + blocks := make([][]byte, columnCount) + for column := 0; column < columnCount; column++ { + var block bytes.Buffer + limitReader := io.LimitReader(readers[column], int64(blockSize)) + _, err := io.Copy(&block, limitReader) + if err != nil { + return totalWritten, err + } + blocks[column] = block.Bytes() + } + curBlockSize := blockSize + if totalRemaining < int64(blockSize) { + curBlockSize = int(totalRemaining) + } + original, err := decoder.Decode(blocks, curBlockSize) + if err != nil { + return totalWritten, err + } + curWritten, err := io.Copy(target, bytes.NewBuffer(original)) + totalRemaining = totalRemaining - curWritten + if err != nil { + return totalWritten, err + } + } + + return totalWritten, nil } // GetPartialObject retrieves an object and writes it to a writer @@ -100,12 +151,14 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string writers[i] = newWriter } totalLength := uint64(0) + chunkCount := 0 for chunk := range splitStream { params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy) if err != nil { return err } totalLength = totalLength + uint64(len(chunk.Data)) + chunkCount = chunkCount + 1 encoder := erasure.NewEncoder(params) if chunk.Err == nil { parts, _ := encoder.Encode(chunk.Data) @@ -123,16 +176,20 @@ func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string // write } // close connections - closeAllWriters(writers) metadata := make(map[string]string) metadata["length"] = strconv.FormatUint(totalLength, 10) metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10) + metadata["chunkCount"] = strconv.FormatUint(uint64(chunkCount), 10) + metadata["columnCount"] = strconv.FormatUint(uint64(16), 10) // metadata["md5"] := md5sum for column := uint(0); column < 16; column++ { writers[column].SetMetadata(metadata) } + // TODO capture errors in writers, enough should pass before returning + closeAllWriters(writers) + return nil }