mirror of https://github.com/minio/minio.git
Merge pull request #222 from fkautz/pr_out_new_donut_frame_implemented
This commit is contained in:
commit
c2ae974c94
|
@ -19,15 +19,10 @@ package v1
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/minio-io/minio/pkg/storage/erasure"
|
||||
"github.com/minio-io/minio/pkg/utils/checksum/crc32c"
|
||||
"github.com/minio-io/minio/pkg/utils/crypto/sha512"
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -55,178 +50,95 @@ var (
|
|||
MagicINIM = binary.LittleEndian.Uint32([]byte{'I', 'N', 'I', 'M'})
|
||||
)
|
||||
|
||||
type DonutFormat struct {
|
||||
BlockStart uint32 // Magic="MINI"=1229867341
|
||||
type DonutFrameHeader struct {
|
||||
MagicMINI uint32
|
||||
VersionMajor uint16
|
||||
VersionMinor uint16
|
||||
VersionPatch uint16
|
||||
VersionReserved uint16
|
||||
Reserved uint64
|
||||
GobHeaderLen uint32
|
||||
GobHeader []byte
|
||||
HeaderCrc32c uint32
|
||||
BlockData uint32 // Magic="DATA"=1096040772
|
||||
Data io.Reader
|
||||
FooterCrc uint32
|
||||
BlockLen uint64
|
||||
BlockEnd uint32
|
||||
DataLength uint64
|
||||
}
|
||||
type Crc32c uint32
|
||||
type Sha512 [sha512.Size]byte
|
||||
|
||||
type DonutFrameFooter struct {
|
||||
DataSha512 Sha512
|
||||
OffsetToMINI uint64
|
||||
MagicINIM uint32
|
||||
}
|
||||
|
||||
type DonutFooter struct {
|
||||
BlockLen uint64
|
||||
BlockEnd uint32 // Magic="INIM"=1229867341
|
||||
}
|
||||
type Data bytes.Buffer
|
||||
|
||||
type Donut struct {
|
||||
file io.ReadWriteSeeker
|
||||
mutex *sync.RWMutex
|
||||
}
|
||||
|
||||
type GobHeader struct {
|
||||
Blocks []EncodedChunk
|
||||
Md5sum []byte
|
||||
EncoderParams erasure.EncoderParams
|
||||
}
|
||||
|
||||
type EncodedChunk struct {
|
||||
Crc uint32
|
||||
Length int
|
||||
Offset int
|
||||
}
|
||||
|
||||
func New(file io.ReadWriteSeeker) *Donut {
|
||||
donut := Donut{}
|
||||
donut.mutex = new(sync.RWMutex)
|
||||
donut.file = file
|
||||
return &donut
|
||||
}
|
||||
|
||||
func (donut *Donut) WriteGob(gobHeader GobHeader) (bytes.Buffer, error) {
|
||||
var gobBuffer bytes.Buffer
|
||||
encoder := gob.NewEncoder(&gobBuffer)
|
||||
err := encoder.Encode(gobHeader)
|
||||
if err != nil {
|
||||
return bytes.Buffer{}, err
|
||||
}
|
||||
return gobBuffer, nil
|
||||
}
|
||||
|
||||
func (donut *Donut) WriteEnd(target io.Writer, donutFormat DonutFormat) error {
|
||||
var tempBuffer bytes.Buffer
|
||||
if err := binary.Write(&tempBuffer, binary.LittleEndian, donutFormat.BlockLen); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&tempBuffer, binary.LittleEndian, donutFormat.BlockEnd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
crc := crc32c.Sum32(tempBuffer.Bytes())
|
||||
if err := binary.Write(target, binary.LittleEndian, crc); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.Copy(target, &tempBuffer); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (donut *Donut) WriteData(target io.Writer, donutFormat DonutFormat) error {
|
||||
var b bytes.Buffer
|
||||
if count, err := io.Copy(&b, donutFormat.Data); uint64(count) != donutFormat.BlockLen || err != nil {
|
||||
if err == nil {
|
||||
return binary.Write(target, binary.LittleEndian, b.Bytes())
|
||||
}
|
||||
return errors.New("Copy failed, count incorrect.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (donut *Donut) WriteBegin(target io.Writer, donutFormat DonutFormat) error {
|
||||
var headerBytes bytes.Buffer
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.BlockStart); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.VersionMajor); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.VersionMinor); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.VersionPatch); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.VersionReserved); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.Reserved); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.GobHeaderLen); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.GobHeader); err != nil {
|
||||
return err
|
||||
}
|
||||
crc := crc32c.Sum32(headerBytes.Bytes())
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, crc); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&headerBytes, binary.LittleEndian, donutFormat.BlockData); err != nil {
|
||||
return err
|
||||
}
|
||||
io.Copy(target, &headerBytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (donut *Donut) Write(gobHeader GobHeader, object io.Reader) error {
|
||||
donut.mutex.Lock()
|
||||
defer donut.mutex.Unlock()
|
||||
|
||||
gobBytes, err := donut.WriteGob(gobHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create bytes buffer representing the new object
|
||||
donutFormat := DonutFormat{
|
||||
BlockStart: MagicMINI,
|
||||
func Write(target io.Writer, reader io.Reader, length uint64) error {
|
||||
// write header
|
||||
header := DonutFrameHeader{
|
||||
MagicMINI: MagicMINI,
|
||||
VersionMajor: 1,
|
||||
VersionMinor: 0,
|
||||
VersionPatch: 0,
|
||||
VersionReserved: 0,
|
||||
Reserved: 0,
|
||||
GobHeaderLen: uint32(gobBytes.Len()),
|
||||
GobHeader: gobBytes.Bytes(),
|
||||
BlockData: MagicDATA,
|
||||
Data: object,
|
||||
BlockLen: 0,
|
||||
BlockEnd: MagicINIM,
|
||||
DataLength: length,
|
||||
}
|
||||
var headerBytes bytes.Buffer
|
||||
binary.Write(&headerBytes, binary.LittleEndian, header)
|
||||
headerCrc := crc32c.Sum32(headerBytes.Bytes())
|
||||
|
||||
tempBuffer, err := ioutil.TempFile(os.TempDir(), "minio-staging")
|
||||
binary.Write(&headerBytes, binary.LittleEndian, headerCrc)
|
||||
binary.Write(&headerBytes, binary.LittleEndian, MagicDATA)
|
||||
// write header
|
||||
headerLen, err := io.Copy(target, &headerBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer os.Remove(tempBuffer.Name())
|
||||
// write header
|
||||
if err := donut.WriteBegin(tempBuffer, donutFormat); err != nil {
|
||||
// write DATA
|
||||
// create sha512 tee
|
||||
sumReader, sumWriter := io.Pipe()
|
||||
defer sumWriter.Close()
|
||||
checksumChannel := make(chan checksumValue)
|
||||
go generateChecksum(sumReader, checksumChannel)
|
||||
teeReader := io.TeeReader(reader, sumWriter)
|
||||
_, err = io.Copy(target, teeReader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write data
|
||||
if err := donut.WriteData(tempBuffer, donutFormat); err != nil {
|
||||
return err
|
||||
sumWriter.Close()
|
||||
dataChecksum := <-checksumChannel
|
||||
if dataChecksum.err != nil {
|
||||
return dataChecksum.err
|
||||
}
|
||||
|
||||
// generate footer
|
||||
frameFooter := DonutFrameFooter{
|
||||
DataSha512: dataChecksum.checksum,
|
||||
OffsetToMINI: length + uint64(headerLen) + uint64(80), /*footer size*/
|
||||
MagicINIM: MagicINIM,
|
||||
}
|
||||
var frameFooterBytes bytes.Buffer
|
||||
binary.Write(&frameFooterBytes, binary.LittleEndian, frameFooter)
|
||||
// write footer crc
|
||||
if err := donut.WriteEnd(tempBuffer, donutFormat); err != nil {
|
||||
footerChecksum := crc32c.Sum32(frameFooterBytes.Bytes())
|
||||
if err := binary.Write(target, binary.LittleEndian, footerChecksum); err != nil {
|
||||
return err
|
||||
}
|
||||
// write write footer
|
||||
_, err = io.Copy(target, &frameFooterBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// write footer
|
||||
donut.file.Seek(0, 2)
|
||||
tempBuffer.Seek(0, 0)
|
||||
io.Copy(donut.file, tempBuffer)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type checksumValue struct {
|
||||
checksum Sha512
|
||||
err error
|
||||
}
|
||||
|
||||
func generateChecksum(reader io.Reader, c chan<- checksumValue) {
|
||||
checksum, err := sha512.SumStream(reader)
|
||||
result := checksumValue{
|
||||
checksum: checksum,
|
||||
err: err,
|
||||
}
|
||||
c <- result
|
||||
}
|
||||
|
|
|
@ -1,61 +1,65 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
// +build ignore
|
||||
func main() {}
|
||||
|
||||
"github.com/minio-io/minio/pkg/storage/donut/v1"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("--start")
|
||||
|
||||
file, err := os.OpenFile("hello", os.O_WRONLY|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
donut := v1.New(file)
|
||||
|
||||
gobHeader := v1.GobHeader{}
|
||||
data := []byte("Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.")
|
||||
|
||||
dataBuffer := bytes.NewBuffer(data)
|
||||
err = donut.Write(gobHeader, dataBuffer)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
file.Close()
|
||||
fmt.Println("--closed")
|
||||
|
||||
fmt.Println("--verify")
|
||||
stat, _ := os.Stat("hello")
|
||||
fileSize := stat.Size()
|
||||
|
||||
rfile, _ := os.OpenFile("hello", os.O_RDONLY, 0666)
|
||||
blockStart := make([]byte, 4)
|
||||
blockStartCheck := []byte{'M', 'I', 'N', 'I'}
|
||||
|
||||
_, err = rfile.Read(blockStart)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
blockEnd := make([]byte, 4)
|
||||
start := fileSize - 4
|
||||
blockEndCheck := []byte{'I', 'N', 'I', 'M'}
|
||||
rfile.ReadAt(blockEnd, start)
|
||||
rfile.Close()
|
||||
|
||||
if !reflect.DeepEqual(blockStart, blockStartCheck) {
|
||||
panic("Corrupted donut file")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(blockEnd, blockEndCheck) {
|
||||
panic("Corrupted donut file")
|
||||
}
|
||||
|
||||
fmt.Println("--verified")
|
||||
fmt.Println("--end")
|
||||
}
|
||||
//
|
||||
//import (
|
||||
// "bytes"
|
||||
// "fmt"
|
||||
// "os"
|
||||
// "reflect"
|
||||
//
|
||||
// "github.com/minio-io/minio/pkg/storage/donut/v1"
|
||||
//)
|
||||
//
|
||||
//func main() {
|
||||
// fmt.Println("--start")
|
||||
//
|
||||
// file, err := os.OpenFile("newfile", os.O_WRONLY|os.O_CREATE, 0666)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// donut := v1.Write(file)
|
||||
//
|
||||
// gobHeader := v1.GobHeader{}
|
||||
// data := []byte("Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.")
|
||||
//
|
||||
// dataBuffer := bytes.NewBuffer(data)
|
||||
// err = donut.Write(gobHeader, dataBuffer)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
// file.Close()
|
||||
// fmt.Println("--closed")
|
||||
//
|
||||
// fmt.Println("--verify")
|
||||
// stat, _ := os.Stat("newfile")
|
||||
// fileSize := stat.Size()
|
||||
//
|
||||
// rfile, _ := os.OpenFile("newfile", os.O_RDONLY, 0666)
|
||||
// blockStart := make([]byte, 4)
|
||||
// blockStartCheck := []byte{'M', 'I', 'N', 'I'}
|
||||
//
|
||||
// _, err = rfile.Read(blockStart)
|
||||
// if err != nil {
|
||||
// panic(err)
|
||||
// }
|
||||
//
|
||||
// blockEnd := make([]byte, 4)
|
||||
// start := fileSize - 4
|
||||
// blockEndCheck := []byte{'I', 'N', 'I', 'M'}
|
||||
// rfile.ReadAt(blockEnd, start)
|
||||
// rfile.Close()
|
||||
//
|
||||
// if !reflect.DeepEqual(blockStart, blockStartCheck) {
|
||||
// panic("Corrupted donut file")
|
||||
// }
|
||||
//
|
||||
// if !reflect.DeepEqual(blockEnd, blockEndCheck) {
|
||||
// panic("Corrupted donut file")
|
||||
// }
|
||||
//
|
||||
// fmt.Println("--verified")
|
||||
// fmt.Println("--end")
|
||||
//}
|
||||
|
|
|
@ -18,10 +18,13 @@ package v1
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
. "gopkg.in/check.v1"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"crypto/sha512"
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/minio-io/minio/pkg/utils/checksum/crc32c"
|
||||
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
func Test(t *testing.T) { TestingT(t) }
|
||||
|
@ -32,33 +35,92 @@ var _ = Suite(&MySuite{})
|
|||
|
||||
func (s *MySuite) TestSingleWrite(c *C) {
|
||||
//var b io.ReadWriteSeeker
|
||||
var o bytes.Buffer
|
||||
var testBuffer bytes.Buffer
|
||||
|
||||
b, err := ioutil.TempFile(os.TempDir(), "minio-donut-test")
|
||||
defer os.RemoveAll(b.Name())
|
||||
testData := "Hello, World"
|
||||
testLength := uint64(len(testData))
|
||||
err := Write(&testBuffer, bytes.NewBufferString(testData), testLength)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
donut := New(b)
|
||||
gobheader := GobHeader{}
|
||||
err = donut.Write(gobheader, &o)
|
||||
c.Assert(err, IsNil)
|
||||
blockStart := make([]byte, 4)
|
||||
testBufferLength := uint64(testBuffer.Len())
|
||||
|
||||
//n, _ := b.Read(blockStart)
|
||||
// b.Next(b.Len() - n) // jump ahead
|
||||
// b.Read(blockEnd)
|
||||
// read start
|
||||
b.Seek(0, 0) // jump ahead
|
||||
b.Read(blockStart)
|
||||
blockStartCheck := []byte{'M', 'I', 'N', 'I'}
|
||||
c.Assert(blockStart, DeepEquals, blockStartCheck)
|
||||
// we test our crc here too
|
||||
headerBytes := testBuffer.Bytes()[0:28]
|
||||
expectedCrc := crc32c.Sum32(headerBytes)
|
||||
|
||||
// read block
|
||||
// magic mini
|
||||
magicMini := make([]byte, 4)
|
||||
testBuffer.Read(magicMini)
|
||||
c.Assert(magicMini, DeepEquals, []byte{'M', 'I', 'N', 'I'})
|
||||
|
||||
// read end
|
||||
blockEnd := make([]byte, 4)
|
||||
b.Seek(-int64(len(blockEnd)), 2) // jump ahead
|
||||
b.Read(blockEnd)
|
||||
blockEndCheck := []byte{'I', 'N', 'I', 'M'}
|
||||
c.Assert(blockEnd, DeepEquals, blockEndCheck)
|
||||
// major version
|
||||
majorVersion := make([]byte, 2)
|
||||
testBuffer.Read(majorVersion)
|
||||
c.Assert(binary.LittleEndian.Uint16(majorVersion), DeepEquals, uint16(1))
|
||||
|
||||
// minor version
|
||||
minorVersion := make([]byte, 2)
|
||||
testBuffer.Read(minorVersion)
|
||||
c.Assert(binary.LittleEndian.Uint16(minorVersion), DeepEquals, uint16(0))
|
||||
|
||||
// patch version
|
||||
patchVersion := make([]byte, 2)
|
||||
testBuffer.Read(patchVersion)
|
||||
c.Assert(binary.LittleEndian.Uint16(patchVersion), DeepEquals, uint16(0))
|
||||
|
||||
// reserved version
|
||||
reservedVersion := make([]byte, 2)
|
||||
testBuffer.Read(reservedVersion)
|
||||
c.Assert(binary.LittleEndian.Uint16(reservedVersion), DeepEquals, uint16(0))
|
||||
|
||||
// reserved
|
||||
reserved := make([]byte, 8)
|
||||
testBuffer.Read(reserved)
|
||||
c.Assert(binary.LittleEndian.Uint64(reserved), DeepEquals, uint64(0))
|
||||
|
||||
// data length
|
||||
length := make([]byte, 8)
|
||||
testBuffer.Read(length)
|
||||
c.Assert(binary.LittleEndian.Uint64(length), DeepEquals, testLength)
|
||||
|
||||
// test crc
|
||||
bufCrc := make([]byte, 4)
|
||||
testBuffer.Read(bufCrc)
|
||||
c.Assert(binary.LittleEndian.Uint32(bufCrc), DeepEquals, expectedCrc)
|
||||
|
||||
// magic DATA
|
||||
magicData := make([]byte, 4)
|
||||
testBuffer.Read(magicData)
|
||||
c.Assert(magicData, DeepEquals, []byte{'D', 'A', 'T', 'A'})
|
||||
|
||||
// data
|
||||
actualData := make([]byte, int32(testLength))
|
||||
testBuffer.Read(actualData)
|
||||
c.Assert(string(actualData), DeepEquals, testData)
|
||||
|
||||
// extract footer crc32c
|
||||
actualFooterCrc := make([]byte, 4)
|
||||
testBuffer.Read(actualFooterCrc)
|
||||
remainingBytes := testBuffer.Bytes()
|
||||
remainingSum := crc32c.Sum32(remainingBytes)
|
||||
c.Assert(binary.LittleEndian.Uint32(actualFooterCrc), DeepEquals, remainingSum)
|
||||
|
||||
// sha512
|
||||
expectedSha512 := sha512.Sum512([]byte(testData))
|
||||
actualSha512 := make([]byte, 64)
|
||||
testBuffer.Read(actualSha512)
|
||||
c.Assert(actualSha512, DeepEquals, expectedSha512[:])
|
||||
|
||||
// length
|
||||
actualLength := make([]byte, 8)
|
||||
testBuffer.Read(actualLength)
|
||||
c.Assert(testBufferLength, DeepEquals, binary.LittleEndian.Uint64(actualLength))
|
||||
|
||||
// magic INIM
|
||||
magicInim := make([]byte, 4)
|
||||
testBuffer.Read(magicInim)
|
||||
c.Assert(magicInim, DeepEquals, []byte{'I', 'N', 'I', 'M'})
|
||||
|
||||
// ensure no extra data is in the file
|
||||
c.Assert(testBuffer.Len(), Equals, 0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue