mirror of https://github.com/minio/minio.git
Fragment reader implemented
This commit is contained in:
parent
ca1a4b616c
commit
679c6ace3a
|
@ -146,6 +146,68 @@ func Write(target io.Writer, reader io.Reader, length uint64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reads a donut fragment
|
||||||
|
func Read(reader io.Reader) (io.Reader, error) {
|
||||||
|
header, err := ReadHeader(reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
sumReader, sumWriter := io.Pipe()
|
||||||
|
|
||||||
|
teeReader := io.TeeReader(reader, sumWriter)
|
||||||
|
|
||||||
|
defer sumWriter.Close()
|
||||||
|
checksumChannel := make(chan checksumValue)
|
||||||
|
go generateChecksum(sumReader, checksumChannel)
|
||||||
|
|
||||||
|
data := make([]byte, header.DataLength)
|
||||||
|
teeReader.Read(data)
|
||||||
|
sumWriter.Close()
|
||||||
|
|
||||||
|
// read crc
|
||||||
|
footerBuffer := make([]byte, 80)
|
||||||
|
reader.Read(footerBuffer)
|
||||||
|
expectedCrc := binary.LittleEndian.Uint32(footerBuffer[:4])
|
||||||
|
|
||||||
|
actualCrc := crc32c.Sum32(footerBuffer[4:])
|
||||||
|
if expectedCrc != actualCrc {
|
||||||
|
// TODO perhaps we should return data and still report error?
|
||||||
|
return nil, errors.New("Expected CRC doesn't match for footer")
|
||||||
|
}
|
||||||
|
|
||||||
|
footer := DonutFrameFooter{}
|
||||||
|
err = binary.Read(bytes.NewBuffer(footerBuffer[4:]), binary.LittleEndian, &footer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes.NewBuffer(data), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reads the header of a donut
|
||||||
|
func ReadHeader(reader io.Reader) (header DonutFrameHeader, err error) {
|
||||||
|
headerSlice := make([]byte, 32)
|
||||||
|
headerLength, err := reader.Read(headerSlice)
|
||||||
|
if err != nil {
|
||||||
|
return header, err
|
||||||
|
}
|
||||||
|
if headerLength != 32 {
|
||||||
|
return header, errors.New("EOF found while reading donut header")
|
||||||
|
}
|
||||||
|
actualCrc := crc32c.Sum32(headerSlice[:24])
|
||||||
|
|
||||||
|
expectedCrc := binary.LittleEndian.Uint32(headerSlice[24:28])
|
||||||
|
|
||||||
|
if actualCrc != expectedCrc {
|
||||||
|
return header, errors.New("CRC for donut did not match")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = binary.Read(bytes.NewBuffer(headerSlice[0:24]), binary.LittleEndian, &header)
|
||||||
|
|
||||||
|
return header, nil
|
||||||
|
}
|
||||||
|
|
||||||
type checksumValue struct {
|
type checksumValue struct {
|
||||||
checksum Sha512
|
checksum Sha512
|
||||||
err error
|
err error
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/sha512"
|
"crypto/sha512"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/minio-io/minio/pkg/utils/checksum/crc32c"
|
"github.com/minio-io/minio/pkg/utils/checksum/crc32c"
|
||||||
|
@ -115,6 +116,23 @@ func (s *MySuite) TestLengthMismatchInWrite(c *C) {
|
||||||
c.Assert(err, Not(IsNil))
|
c.Assert(err, Not(IsNil))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MySuite) TestWriteAndRead(c *C) {
|
||||||
|
testData := "Hello, World"
|
||||||
|
testLength := uint64(len(testData))
|
||||||
|
var testBuffer bytes.Buffer
|
||||||
|
err := Write(&testBuffer, bytes.NewBufferString(testData), testLength)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
testReader, err := Read(&testBuffer)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
var actualData bytes.Buffer
|
||||||
|
length, err := io.Copy(&actualData, testReader)
|
||||||
|
c.Assert(int64(len(testData)), Equals, length)
|
||||||
|
|
||||||
|
c.Assert([]byte(testData), DeepEquals, actualData.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
var buf = make([]byte, 1024*1024*8)
|
var buf = make([]byte, 1024*1024*8)
|
||||||
|
|
||||||
func benchmarkSize(b *testing.B, size int) {
|
func benchmarkSize(b *testing.B, size int) {
|
||||||
|
|
Loading…
Reference in New Issue