diff --git a/Makefile b/Makefile index 56a59b2ad..f64cf5a00 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ build-split: build-strbyteconv build-strbyteconv: @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/strbyteconv -build-storage: build-storage-fs build-storage-append build-storage-encoded +build-storage: build-storage-append build-storage-encoded build-storage-fs build-storage-fs: @godep go test -race -coverprofile=cover.out github.com/minio-io/minio/pkgs/storage/fsstorage diff --git a/cmd/erasure-demo/fs.go b/cmd/erasure-demo/fs.go index b5c0f26d3..8c1cf71b3 100644 --- a/cmd/erasure-demo/fs.go +++ b/cmd/erasure-demo/fs.go @@ -14,7 +14,7 @@ import ( func fsGetList(config inputConfig) (io.Reader, error) { var objectStorage storage.ObjectStorage rootDir := path.Join(config.rootDir, config.storageDriver) - objectStorage, _ = fsstorage.NewStorage(rootDir) + objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) objectList, err := objectStorage.List() if err != nil { return nil, err @@ -30,7 +30,7 @@ func fsGetList(config inputConfig) (io.Reader, error) { func fsGet(config inputConfig, objectPath string) (io.Reader, error) { var objectStorage storage.ObjectStorage rootDir := path.Join(config.rootDir, config.storageDriver) - objectStorage, _ = fsstorage.NewStorage(rootDir) + objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) object, err := objectStorage.Get(objectPath) if err != nil { return nil, err @@ -45,7 +45,7 @@ func fsPut(config inputConfig, objectPath string, reader io.Reader) error { return err } var objectStorage storage.ObjectStorage - objectStorage, _ = fsstorage.NewStorage(rootDir) + objectStorage, _ = fsstorage.NewStorage(rootDir, config.blockSize) if err = objectStorage.Put(objectPath, reader); err != nil { return err } diff --git a/cmd/erasure-demo/get.go b/cmd/erasure-demo/get.go index df818335f..d95849733 100644 --- a/cmd/erasure-demo/get.go +++ b/cmd/erasure-demo/get.go @@ -46,5 +46,8 @@ func get(c *cli.Context) { log.Fatal("Unknown driver") } } + if objectReader == nil { + log.Fatal("Object not found") + } io.Copy(os.Stdout, objectReader) } diff --git a/cmd/erasure-demo/main.go b/cmd/erasure-demo/main.go index 0e617d49a..59e91565c 100644 --- a/cmd/erasure-demo/main.go +++ b/cmd/erasure-demo/main.go @@ -72,6 +72,16 @@ func main() { Value: "erasure", Usage: "erasure", }, + cli.StringFlag{ + Name: "protection-level", + Value: "10,6", + Usage: "data,parity", + }, + cli.StringFlag{ + Name: "block-size", + Value: "1M", + Usage: "Size of blocks. Examples: 1K, 1M, full", + }, }, }, { diff --git a/pkgs/gateway/gateway.go b/pkgs/gateway/gateway.go index 4036553cd..67cfaaea8 100644 --- a/pkgs/gateway/gateway.go +++ b/pkgs/gateway/gateway.go @@ -228,7 +228,7 @@ func SimpleEncodedStorageDriver(bucket string, input chan ObjectRequest, config } func SimpleFileStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) { - fileStorage, _ := fsstorage.NewStorage(config.DataDir) + fileStorage, _ := fsstorage.NewStorage(config.DataDir, config.BlockSize) for request := range input { switch request.requestType { case "GET": diff --git a/pkgs/storage/encodedstorage/encoded_storage.go b/pkgs/storage/encodedstorage/encoded_storage.go index b582f4560..5eb8fa304 100644 --- a/pkgs/storage/encodedstorage/encoded_storage.go +++ b/pkgs/storage/encodedstorage/encoded_storage.go @@ -51,6 +51,10 @@ type storeResponse struct { func NewStorage(rootDir string, k, m int, blockSize uint64) (storage.ObjectStorage, error) { // create storage files + if k == 0 || m == 0 { + return nil, errors.New("Invalid protection level") + } + storageNodes := make([]storage.ObjectStorage, k+m) for i := 0; i < k+m; i++ { storageNode, err := appendstorage.NewStorage(rootDir, i) @@ -192,6 +196,10 @@ func (eStorage *encodedStorage) readObject(objectPath string, entry StorageEntry encoder := erasure.NewEncoder(ep) for i, chunk := range entry.Blocks { blockSlices := eStorage.getBlockSlices(objectPath + "$" + strconv.Itoa(i)) + if len(blockSlices) == 0 { + writer.CloseWithError(errors.New("slices missing!!")) + return + } var blocks [][]byte for _, slice := range blockSlices { if slice.err != nil { diff --git a/pkgs/storage/fsstorage/fs_storage.go b/pkgs/storage/fsstorage/fs_storage.go index db82df11b..6f7b8d17e 100644 --- a/pkgs/storage/fsstorage/fs_storage.go +++ b/pkgs/storage/fsstorage/fs_storage.go @@ -1,57 +1,156 @@ package fsstorage import ( + "bytes" + "crypto/md5" + "encoding/gob" + "encoding/hex" + "errors" "io" "io/ioutil" "os" "path" - "path/filepath" + "strconv" + "github.com/minio-io/minio/pkgs/split" "github.com/minio-io/minio/pkgs/storage" + "github.com/minio-io/minio/pkgs/storage/appendstorage" ) type fileSystemStorage struct { - RootDir string + RootDir string + BlockSize uint64 + diskStorage []storage.ObjectStorage + objects map[string]StorageEntry } -func NewStorage(rootDir string) (storage.ObjectStorage, error) { +type StorageEntry struct { + Path string + Md5sum []byte + ChunkLength int +} + +func NewStorage(rootDir string, blockSize uint64) (storage.ObjectStorage, error) { + var storageNodes []storage.ObjectStorage + storageNode, err := appendstorage.NewStorage(rootDir, 0) + if err != nil { + return nil, err + } + storageNodes = append(storageNodes, storageNode) + objects := make(map[string]StorageEntry) + indexPath := path.Join(rootDir, "index") + if _, err := os.Stat(indexPath); err == nil { + indexFile, err := os.Open(indexPath) + defer indexFile.Close() + if err != nil { + return nil, err + } + encoder := gob.NewDecoder(indexFile) + err = encoder.Decode(&objects) + if err != nil && err != io.EOF { + return nil, err + } + } newStorage := fileSystemStorage{ - RootDir: rootDir, + RootDir: rootDir, + diskStorage: storageNodes, + BlockSize: blockSize, + objects: objects, } return &newStorage, nil } func (fsStorage *fileSystemStorage) List() ([]storage.ObjectDescription, error) { - fileInfos, err := ioutil.ReadDir(fsStorage.RootDir) - if err != nil { - return nil, err + var objectDescList []storage.ObjectDescription + for objectName, objectEntry := range fsStorage.objects { + var objectDescription storage.ObjectDescription + objectDescription.Name = objectName + objectDescription.Md5sum = hex.EncodeToString(objectEntry.Md5sum) + objectDescription.Protectionlevel = "" + objectDescList = append(objectDescList, objectDescription) } + if len(objectDescList) == 0 { + return nil, errors.New("No objects found") + } + return objectDescList, nil +} - var descriptions []storage.ObjectDescription +func (fsStorage *fileSystemStorage) Get(objectPath string) (io.Reader, error) { + entry, ok := fsStorage.objects[objectPath] + if ok == false { + return nil, nil + } + reader, writer := io.Pipe() + go fsStorage.readObject(objectPath, entry, writer) + return reader, nil +} - for _, fi := range fileInfos { - description := storage.ObjectDescription{ - Name: fi.Name(), - Md5sum: "", - Protectionlevel: "", +func (fsStorage *fileSystemStorage) readObject(objectPath string, entry StorageEntry, writer *io.PipeWriter) { + appendStorage := fsStorage.diskStorage[0] + for i := 0; i < entry.ChunkLength; i++ { + chunkObjectPath := objectPath + "$" + strconv.Itoa(i) + chunkObject, err := appendStorage.Get(chunkObjectPath) + + if err != nil { + writer.CloseWithError(err) + } + data, readErr := ioutil.ReadAll(chunkObject) + + if readErr != nil { + writer.CloseWithError(readErr) + } + bytesWritten := 0 + for bytesWritten != len(data) { + written, err := writer.Write(data[bytesWritten:len(data)]) + if err != nil { + writer.CloseWithError(err) + } + bytesWritten += written } - descriptions = append(descriptions, description) } - return descriptions, nil + writer.Close() } -func (storage *fileSystemStorage) Get(objectPath string) (io.Reader, error) { - return os.Open(path.Join(storage.RootDir, objectPath)) +func (fsStorage *fileSystemStorage) Put(objectPath string, object io.Reader) error { + // split + chunks := make(chan split.SplitMessage) + go split.SplitStream(object, fsStorage.BlockSize, chunks) + + entry := StorageEntry{ + Path: objectPath, + Md5sum: nil, + ChunkLength: 0, + } + + hash := md5.New() + i := 0 + for chunk := range chunks { + if chunk.Err != nil { + return chunk.Err + } + err := fsStorage.storeBlocks(objectPath, i, chunk.Data) + if err != nil { + return err + } + // md5sum only after chunk is committed to disk + hash.Write(chunk.Data) + i++ + } + entry.Md5sum = hash.Sum(nil) + entry.ChunkLength = i + fsStorage.objects[objectPath] = entry + var gobBuffer bytes.Buffer + gobEncoder := gob.NewEncoder(&gobBuffer) + gobEncoder.Encode(fsStorage.objects) + ioutil.WriteFile(path.Join(fsStorage.RootDir, "index"), gobBuffer.Bytes(), 0600) + return nil } -func (storage *fileSystemStorage) Put(objectPath string, object io.Reader) error { - err := os.MkdirAll(filepath.Dir(path.Join(storage.RootDir, objectPath)), 0700) - if err != nil { +func (fsStorage *fileSystemStorage) storeBlocks(objectPath string, index int, chunk []byte) error { + appendStorage := fsStorage.diskStorage[0] + path := objectPath + "$" + strconv.Itoa(index) + if err := appendStorage.Put(path, bytes.NewBuffer(chunk)); err != nil { return err } - objectBytes, err := ioutil.ReadAll(object) - if err != nil { - return err - } - return ioutil.WriteFile(path.Join(storage.RootDir, objectPath), objectBytes, 0600) + return nil } diff --git a/pkgs/storage/fsstorage/fs_storage_test.go b/pkgs/storage/fsstorage/fs_storage_test.go index 1603e1c35..25cb4af5c 100644 --- a/pkgs/storage/fsstorage/fs_storage_test.go +++ b/pkgs/storage/fsstorage/fs_storage_test.go @@ -26,15 +26,19 @@ func (s *fileSystemStorageSuite) TestfileStoragePutAtRootPath(c *C) { defer os.RemoveAll(rootDir) var objectStorage storage.ObjectStorage - objectStorage, _ = NewStorage(rootDir) + objectStorage, _ = NewStorage(rootDir, 1024) objectBuffer := bytes.NewBuffer([]byte("object1")) - objectStorage.Put("path1", objectBuffer) + err = objectStorage.Put("path1", objectBuffer) + c.Assert(err, IsNil) // assert object1 was created in correct path objectResult1, err := objectStorage.Get("path1") c.Assert(err, IsNil) - object1, _ := ioutil.ReadAll(objectResult1) + + object1, err := ioutil.ReadAll(objectResult1) + c.Assert(err, IsNil) + c.Assert(string(object1), Equals, "object1") objectList, err := objectStorage.List()