mirror of
https://github.com/minio/minio.git
synced 2025-03-13 05:00:11 -04:00
Merge pull request #773 from krishnasrinivas/put-object-stream
Encoder now directly reads from the object stream. Using split.Stream…
This commit is contained in:
commit
4ac23d747c
@ -33,7 +33,6 @@ import (
|
|||||||
"github.com/minio/minio/pkg/crypto/sha256"
|
"github.com/minio/minio/pkg/crypto/sha256"
|
||||||
"github.com/minio/minio/pkg/crypto/sha512"
|
"github.com/minio/minio/pkg/crypto/sha512"
|
||||||
"github.com/minio/minio/pkg/donut/disk"
|
"github.com/minio/minio/pkg/donut/disk"
|
||||||
"github.com/minio/minio/pkg/donut/split"
|
|
||||||
"github.com/minio/minio/pkg/iodine"
|
"github.com/minio/minio/pkg/iodine"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -237,7 +236,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WriteObject - write a new object into bucket
|
// WriteObject - write a new object into bucket
|
||||||
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
|
func (b bucket) WriteObject(objectName string, objectData io.Reader, size int64, expectedMD5Sum string, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
defer b.lock.Unlock()
|
defer b.lock.Unlock()
|
||||||
if objectName == "" || objectData == nil {
|
if objectName == "" || objectData == nil {
|
||||||
@ -272,7 +271,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5
|
|||||||
}
|
}
|
||||||
mwriter := io.MultiWriter(sumMD5, sum256, sum512)
|
mwriter := io.MultiWriter(sumMD5, sum256, sum512)
|
||||||
// write encoded data with k, m and writers
|
// write encoded data with k, m and writers
|
||||||
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, mwriter)
|
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, size, mwriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
CleanupWritersOnError(writers)
|
CleanupWritersOnError(writers)
|
||||||
return ObjectMetadata{}, iodine.New(err, nil)
|
return ObjectMetadata{}, iodine.New(err, nil)
|
||||||
@ -422,32 +421,40 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// writeObjectData -
|
// writeObjectData -
|
||||||
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, writer io.Writer) (int, int, error) {
|
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, size int64, writer io.Writer) (int, int, error) {
|
||||||
encoder, err := newEncoder(k, m, "Cauchy")
|
encoder, err := newEncoder(k, m, "Cauchy")
|
||||||
|
chunkSize := int64(10 * 1024 * 1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, iodine.New(err, nil)
|
return 0, 0, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
chunkCount := 0
|
chunkCount := 0
|
||||||
totalLength := 0
|
totalLength := 0
|
||||||
for chunk := range split.Stream(objectData, 10*1024*1024) {
|
remaining := size
|
||||||
if chunk.Err != nil {
|
for remaining > 0 {
|
||||||
return 0, 0, iodine.New(err, nil)
|
readSize := chunkSize
|
||||||
|
if remaining < chunkSize {
|
||||||
|
readSize = remaining
|
||||||
}
|
}
|
||||||
totalLength = totalLength + len(chunk.Data)
|
remaining = remaining - readSize
|
||||||
encodedBlocks, err := encoder.Encode(chunk.Data)
|
totalLength = totalLength + int(readSize)
|
||||||
|
encodedBlocks, inputData, err := encoder.EncodeStream(objectData, readSize)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
_, err = writer.Write(inputData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, iodine.New(err, nil)
|
return 0, 0, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
writer.Write(chunk.Data)
|
|
||||||
for blockIndex, block := range encodedBlocks {
|
for blockIndex, block := range encodedBlocks {
|
||||||
errCh := make(chan error, 1)
|
errCh := make(chan error, 1)
|
||||||
go func(writer io.Writer, reader io.Reader) {
|
go func(writer io.Writer, reader io.Reader) {
|
||||||
|
// FIXME: this closes the errCh in the outer scope
|
||||||
defer close(errCh)
|
defer close(errCh)
|
||||||
_, err := io.Copy(writers[blockIndex], bytes.NewReader(block))
|
_, err := io.Copy(writer, reader)
|
||||||
errCh <- err
|
errCh <- err
|
||||||
}(writers[blockIndex], bytes.NewReader(block))
|
}(writers[blockIndex], bytes.NewReader(block))
|
||||||
if err := <-errCh; err != nil {
|
if err := <-errCh; err != nil {
|
||||||
|
// FIXME: fix premature return in case of err != nil
|
||||||
return 0, 0, iodine.New(err, nil)
|
return 0, 0, iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys i
|
|||||||
}
|
}
|
||||||
|
|
||||||
// putObject - put object
|
// putObject - put object
|
||||||
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
|
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (ObjectMetadata, error) {
|
||||||
errParams := map[string]string{
|
errParams := map[string]string{
|
||||||
"bucket": bucket,
|
"bucket": bucket,
|
||||||
"object": object,
|
"object": object,
|
||||||
@ -158,7 +158,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read
|
|||||||
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
|
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
|
||||||
return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
|
return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
|
||||||
}
|
}
|
||||||
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata, signature)
|
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, size, expectedMD5Sum, metadata, signature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ObjectMetadata{}, iodine.New(err, errParams)
|
return ObjectMetadata{}, iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
@ -170,7 +170,7 @@ func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Read
|
|||||||
}
|
}
|
||||||
|
|
||||||
// putObject - put object
|
// putObject - put object
|
||||||
func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, metadata map[string]string, signature *Signature) (PartMetadata, error) {
|
func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string, partID int, reader io.Reader, size int64, metadata map[string]string, signature *Signature) (PartMetadata, error) {
|
||||||
errParams := map[string]string{
|
errParams := map[string]string{
|
||||||
"bucket": bucket,
|
"bucket": bucket,
|
||||||
"object": object,
|
"object": object,
|
||||||
@ -198,7 +198,7 @@ func (donut API) putObjectPart(bucket, object, expectedMD5Sum, uploadID string,
|
|||||||
return PartMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
|
return PartMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
|
||||||
}
|
}
|
||||||
objectPart := object + "/" + "multipart" + "/" + strconv.Itoa(partID)
|
objectPart := object + "/" + "multipart" + "/" + strconv.Itoa(partID)
|
||||||
objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, expectedMD5Sum, metadata, signature)
|
objmetadata, err := donut.buckets[bucket].WriteObject(objectPart, reader, size, expectedMD5Sum, metadata, signature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PartMetadata{}, iodine.New(err, errParams)
|
return PartMetadata{}, iodine.New(err, errParams)
|
||||||
}
|
}
|
||||||
|
@ -373,6 +373,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
|
|||||||
key,
|
key,
|
||||||
expectedMD5Sum,
|
expectedMD5Sum,
|
||||||
data,
|
data,
|
||||||
|
size,
|
||||||
map[string]string{
|
map[string]string{
|
||||||
"contentType": contentType,
|
"contentType": contentType,
|
||||||
"contentLength": strconv.FormatInt(size, 10),
|
"contentLength": strconv.FormatInt(size, 10),
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package donut
|
package donut
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
encoding "github.com/minio/minio/pkg/erasure"
|
encoding "github.com/minio/minio/pkg/erasure"
|
||||||
@ -86,6 +87,14 @@ func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) {
|
|||||||
return encodedData, nil
|
return encodedData, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e encoder) EncodeStream(data io.Reader, size int64) (encodedData [][]byte, inputData []byte, err error) {
|
||||||
|
encodedData, inputData, err = e.encoder.EncodeStream(data, size)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
return encodedData, inputData, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Decode - erasure decode input encoded bytes
|
// Decode - erasure decode input encoded bytes
|
||||||
func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) {
|
func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) {
|
||||||
decodedData, err := e.encoder.Decode(encodedData, dataLength)
|
decodedData, err := e.encoder.Decode(encodedData, dataLength)
|
||||||
|
@ -155,7 +155,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
|
|||||||
}
|
}
|
||||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||||
}
|
}
|
||||||
partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, metadata, signature)
|
partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, size, metadata, signature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", iodine.New(err, nil)
|
return "", iodine.New(err, nil)
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ package erasure
|
|||||||
import "C"
|
import "C"
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"io"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -195,3 +196,47 @@ func (e *Erasure) Encode(inputData []byte) (encodedBlocks [][]byte, err error) {
|
|||||||
|
|
||||||
return encodedBlocks, nil
|
return encodedBlocks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EncodeStream erasure codes a block of data in "k" data blocks and "m" parity blocks.
|
||||||
|
// Output is [k+m][]blocks of data and parity slices.
|
||||||
|
func (e *Erasure) EncodeStream(data io.Reader, size int64) ([][]byte, []byte, error) {
|
||||||
|
k := int(e.params.K) // "k" data blocks
|
||||||
|
m := int(e.params.M) // "m" parity blocks
|
||||||
|
n := k + m // "n" total encoded blocks
|
||||||
|
|
||||||
|
// Length of a single encoded chunk.
|
||||||
|
// Total number of encoded chunks = "k" data + "m" parity blocks
|
||||||
|
encodedBlockLen := GetEncodedBlockLen(int(size), uint8(k))
|
||||||
|
|
||||||
|
// Length of total number of "n" data chunks
|
||||||
|
encodedDataBlocksLen := encodedBlockLen * n
|
||||||
|
|
||||||
|
inputData := make([]byte, size, encodedDataBlocksLen)
|
||||||
|
|
||||||
|
_, err := io.ReadFull(data, inputData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate memory to the "encoded blocks" return buffer
|
||||||
|
encodedBlocks := make([][]byte, n) // Return buffer
|
||||||
|
|
||||||
|
// Nessary to bridge Go to the C world. C requires 2D arry of pointers to
|
||||||
|
// byte array. "encodedBlocks" is a 2D slice.
|
||||||
|
pointersToEncodedBlock := make([]*byte, n) // Pointers to encoded blocks.
|
||||||
|
|
||||||
|
// Copy data block slices to encoded block buffer
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
encodedBlocks[i] = inputData[i*encodedBlockLen : (i+1)*encodedBlockLen]
|
||||||
|
pointersToEncodedBlock[i] = &encodedBlocks[i][0]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Erasure code the data into K data blocks and M parity
|
||||||
|
// blocks. Only the parity blocks are filled. Data blocks remain
|
||||||
|
// intact.
|
||||||
|
C.ec_encode_data(C.int(encodedBlockLen), C.int(k), C.int(m), e.encodeTbls,
|
||||||
|
(**C.uchar)(unsafe.Pointer(&pointersToEncodedBlock[:k][0])), // Pointers to data blocks
|
||||||
|
(**C.uchar)(unsafe.Pointer(&pointersToEncodedBlock[k:][0]))) // Pointers to parity blocks
|
||||||
|
|
||||||
|
return encodedBlocks, inputData[0:size], nil
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user