Merge pull request #106 from harshavardhana/pr_out_ignore_io_eof_for_gob_decoding

This commit is contained in:
Harshavardhana 2014-12-11 01:47:49 -08:00
commit 23a3609a64
3 changed files with 36 additions and 38 deletions

View File

@ -33,7 +33,6 @@ build-strbyteconv:
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/strbyteconv @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/strbyteconv
build-storage: build-storage-fs build-storage-append build-storage-encoded build-storage: build-storage-fs build-storage-append build-storage-encoded
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage
build-storage-fs: build-storage-fs:
@godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/fsstorage @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/fsstorage

View File

@ -24,7 +24,7 @@ type Header struct {
Path string Path string
Offset int64 Offset int64
Length int Length int
Crc []byte Crc uint32
} }
func NewStorage(rootDir string, slice int) (storage.ObjectStorage, error) { func NewStorage(rootDir string, slice int) (storage.ObjectStorage, error) {
@ -44,7 +44,7 @@ func NewStorage(rootDir string, slice int) (storage.ObjectStorage, error) {
} }
dec := gob.NewDecoder(mapFile) dec := gob.NewDecoder(mapFile)
err = dec.Decode(&objects) err = dec.Decode(&objects)
if err != nil { if err != nil && err != io.EOF {
return &appendStorage{}, nil return &appendStorage{}, nil
} }
} }
@ -81,7 +81,7 @@ func (aStorage *appendStorage) Put(objectPath string, object io.Reader) error {
Path: objectPath, Path: objectPath,
Offset: 0, Offset: 0,
Length: 0, Length: 0,
Crc: nil, Crc: 0,
} }
offset, err := aStorage.file.Seek(0, os.SEEK_END) offset, err := aStorage.file.Seek(0, os.SEEK_END)
if err != nil { if err != nil {

View File

@ -25,10 +25,33 @@ type encodedStorage struct {
diskStorage []storage.ObjectStorage diskStorage []storage.ObjectStorage
} }
type StorageEntry struct {
Path string
Md5sum string
Crc uint32
Blocks []StorageBlockEntry
Encoderparams *erasure.EncoderParams
}
type StorageBlockEntry struct {
Index int
Length int
}
type storeRequest struct {
path string
data []byte
}
type storeResponse struct {
data []byte
err error
}
func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) {
// create storage files // create storage files
storageNodes := make([]storage.ObjectStorage, 16) storageNodes := make([]storage.ObjectStorage, k+m)
for i := 0; i < 16; i++ { for i := 0; i < k+m; i++ {
storageNode, err := appendstorage.NewStorage(rootDir, i) storageNode, err := appendstorage.NewStorage(rootDir, i)
storageNodes[i] = storageNode storageNodes[i] = storageNode
if err != nil { if err != nil {
@ -45,7 +68,7 @@ func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStora
} }
encoder := gob.NewDecoder(indexFile) encoder := gob.NewDecoder(indexFile)
err = encoder.Decode(&objects) err = encoder.Decode(&objects)
if err != nil { if err != nil && err != io.EOF {
return nil, err return nil, err
} }
} }
@ -88,8 +111,9 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
entry := StorageEntry{ entry := StorageEntry{
Path: objectPath, Path: objectPath,
Md5sum: "md5sum", Md5sum: "md5sum",
Crc: 24, Crc: 0,
Blocks: make([]StorageBlockEntry, 0), Blocks: make([]StorageBlockEntry, 0),
Encoderparams: encoderParameters,
} }
i := 0 i := 0
// encode // encode
@ -122,28 +146,6 @@ func (eStorage *encodedStorage) Put(objectPath string, object io.Reader) error {
return nil return nil
} }
type storeRequest struct {
path string
data []byte
}
type storeResponse struct {
data []byte
err error
}
type StorageEntry struct {
Path string
Md5sum string
Crc uint32
Blocks []StorageBlockEntry
}
type StorageBlockEntry struct {
Index int
Length int
}
func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error { func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []error {
returnChannels := make([]<-chan error, len(eStorage.diskStorage)) returnChannels := make([]<-chan error, len(eStorage.diskStorage))
for i, store := range eStorage.diskStorage { for i, store := range eStorage.diskStorage {
@ -161,16 +163,13 @@ func (eStorage *encodedStorage) storeBlocks(path string, blocks [][]byte) []erro
} }
func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) {
params, err := erasure.ParseEncoderParams(eStorage.K, eStorage.M, erasure.CAUCHY) encoder := erasure.NewEncoder(entry.Encoderparams)
if err != nil {
}
encoder := erasure.NewEncoder(params)
for i, chunk := range entry.Blocks { for i, chunk := range entry.Blocks {
blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i))
var blocks [][]byte var blocks [][]byte
for _, slice := range blockSlices { for _, slice := range blockSlices {
if slice.err != nil { if slice.err != nil {
writer.CloseWithError(err) writer.CloseWithError(slice.err)
return return
} }
blocks = append(blocks, slice.data) blocks = append(blocks, slice.data)