From 163a6c35db3fd2c2d4ee8726d02007f0ff807d43 Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Tue, 27 Jan 2015 11:09:20 -0800 Subject: [PATCH] Adding fs storage --- pkg/storage/fs/fs.go | 201 +++++++++++++++++++++++++++++++ pkg/storage/fs/fs_test.go | 42 +++++++ pkg/storage/storage_api_suite.go | 29 +++-- 3 files changed, 260 insertions(+), 12 deletions(-) create mode 100644 pkg/storage/fs/fs.go create mode 100644 pkg/storage/fs/fs_test.go diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go new file mode 100644 index 000000000..3b3af4a8b --- /dev/null +++ b/pkg/storage/fs/fs.go @@ -0,0 +1,201 @@ +package fs + +import ( + "errors" + "io" + "io/ioutil" + "os" + "path" + "strings" + "sync" + + mstorage "github.com/minio-io/minio/pkg/storage" +) + +type storage struct { + root string + writeLock sync.Mutex +} + +type MkdirFailedError struct{} + +func (self MkdirFailedError) Error() string { + return "Mkdir Failed" +} + +func Start(root string) (chan<- string, <-chan error, *storage) { + ctrlChannel := make(chan string) + errorChannel := make(chan error) + go start(ctrlChannel, errorChannel) + return ctrlChannel, errorChannel, &storage{root: root} +} + +func start(ctrlChannel <-chan string, errorChannel chan<- error) { + close(errorChannel) +} + +// Bucket Operaotions + +func (storage *storage) ListBuckets(prefix string) ([]mstorage.BucketMetadata, error) { + return []mstorage.BucketMetadata{}, errors.New("Not Implemented") +} + +func (storage *storage) StoreBucket(bucket string) error { + storage.writeLock.Lock() + defer storage.writeLock.Unlock() + + // verify bucket path legal + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketNameInvalid{Bucket: bucket} + } + + // get bucket path + bucketDir := path.Join(storage.root, bucket) + + // check if bucket exists + if _, err := os.Stat(bucketDir); err == nil { + return mstorage.BucketExists{ + Bucket: bucket, + } + } + + // make bucket + err := os.Mkdir(bucketDir, 0700) + if err != nil { + return mstorage.EmbedError(bucket, "", err) + } + return nil +} + +// Object Operations + +func (storage *storage) CopyObjectToWriter(w io.Writer, bucket string, object string) (int64, error) { + // validate bucket + if mstorage.IsValidBucket(bucket) == false { + return 0, mstorage.BucketNameInvalid{Bucket: bucket} + } + + // validate object + if mstorage.IsValidObject(object) == false { + return 0, mstorage.ObjectNameInvalid{Bucket: bucket, Object: object} + } + + objectPath := path.Join(storage.root, bucket, object) + + file, err := os.Open(objectPath) + if err != nil { + return 0, mstorage.EmbedError(bucket, object, err) + } + count, err := io.Copy(w, file) + if err != nil { + return count, mstorage.EmbedError(bucket, object, err) + } + return count, nil +} + +func (storage *storage) GetObjectMetadata(bucket string, object string) (mstorage.ObjectMetadata, error) { + if mstorage.IsValidBucket(bucket) == false { + return mstorage.ObjectMetadata{}, mstorage.BucketNameInvalid{Bucket: bucket} + } + + if mstorage.IsValidObject(bucket) == false { + return mstorage.ObjectMetadata{}, mstorage.ObjectNameInvalid{Bucket: bucket, Object: bucket} + } + + objectPath := path.Join(storage.root, bucket, object) + + stat, err := os.Stat(objectPath) + if os.IsNotExist(err) { + return mstorage.ObjectMetadata{}, mstorage.ObjectNotFound{Bucket: bucket, Object: object} + } + + metadata := mstorage.ObjectMetadata{ + Bucket: bucket, + Key: object, + Created: stat.ModTime(), + Size: stat.Size(), + ETag: bucket + "#" + object, + } + + return metadata, nil +} + +func (storage *storage) ListObjects(bucket, prefix string, count int) ([]mstorage.ObjectMetadata, bool, error) { + if mstorage.IsValidBucket(bucket) == false { + return []mstorage.ObjectMetadata{}, false, mstorage.BucketNameInvalid{Bucket: bucket} + } + if mstorage.IsValidObject(prefix) == false { + return []mstorage.ObjectMetadata{}, false, mstorage.ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + + rootPrefix := path.Join(storage.root, bucket) + + files, err := ioutil.ReadDir(rootPrefix) + if err != nil { + return []mstorage.ObjectMetadata{}, false, mstorage.EmbedError("bucket", "", err) + } + + var metadataList []mstorage.ObjectMetadata + for _, file := range files { + if len(metadataList) >= count { + return metadataList, true, nil + } + if strings.HasPrefix(file.Name(), prefix) { + metadata := mstorage.ObjectMetadata{ + Bucket: bucket, + Key: file.Name(), + Created: file.ModTime(), + Size: file.Size(), + ETag: bucket + "#" + file.Name(), + } + metadataList = append(metadataList, metadata) + } + } + return metadataList, false, nil +} + +func (storage *storage) StoreObject(bucket string, key string, data io.Reader) error { + // TODO Commits should stage then move instead of writing directly + storage.writeLock.Lock() + defer storage.writeLock.Unlock() + + // check bucket name valid + if mstorage.IsValidBucket(bucket) == false { + return mstorage.BucketNameInvalid{Bucket: bucket} + } + + // check bucket exists + if _, err := os.Stat(path.Join(storage.root, bucket)); os.IsNotExist(err) { + return mstorage.BucketNotFound{Bucket: bucket} + } + + // verify object path legal + if mstorage.IsValidObject(key) == false { + return mstorage.ObjectNameInvalid{Bucket: bucket, Object: key} + } + + // get object path + objectPath := path.Join(storage.root, bucket, key) + + // check if object exists + if _, err := os.Stat(objectPath); !os.IsNotExist(err) { + return mstorage.ObjectExists{ + Bucket: bucket, + Key: key, + } + } + + // write object + file, err := os.OpenFile(objectPath, os.O_WRONLY|os.O_CREATE, 0600) + defer file.Close() + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + _, err = io.Copy(file, data) + if err != nil { + return mstorage.EmbedError(bucket, key, err) + } + + return nil +} diff --git a/pkg/storage/fs/fs_test.go b/pkg/storage/fs/fs_test.go new file mode 100644 index 000000000..228b53640 --- /dev/null +++ b/pkg/storage/fs/fs_test.go @@ -0,0 +1,42 @@ +package fs + +import ( + "io/ioutil" + "log" + "os" + "testing" + + mstorage "github.com/minio-io/minio/pkg/storage" + + . "gopkg.in/check.v1" +) + +func Test(t *testing.T) { TestingT(t) } + +type MySuite struct{} + +var _ = Suite(&MySuite{}) + +func (s *MySuite) TestAPISuite(c *C) { + var storageList []string + create := func() mstorage.Storage { + path, err := ioutil.TempDir(os.TempDir(), "minio-fs-") + log.Println(path) + c.Check(err, IsNil) + storageList = append(storageList, path) + _, _, store := Start(path) + return store + } + log.Println("FOO") + mstorage.APITestSuite(c, create) + log.Println("BAR") + removeRoots(c, storageList) +} + +func removeRoots(c *C, roots []string) { + log.Println("REMOVING ROOTS: ", roots) + for _, root := range roots { + err := os.RemoveAll(root) + c.Check(err, IsNil) + } +} diff --git a/pkg/storage/storage_api_suite.go b/pkg/storage/storage_api_suite.go index 2a25a02ba..edc354c24 100644 --- a/pkg/storage/storage_api_suite.go +++ b/pkg/storage/storage_api_suite.go @@ -11,10 +11,10 @@ import ( func APITestSuite(c *C, create func() Storage) { testCreateBucket(c, create) testMultipleObjectCreation(c, create) - //testPaging(c, create) - //testObjectOverwriteFails(c, create) - //testNonExistantBucketOperations(c, create) - //testBucketRecreateFails(c, create) + testPaging(c, create) + testObjectOverwriteFails(c, create) + testNonExistantBucketOperations(c, create) + testBucketRecreateFails(c, create) } func testCreateBucket(c *C, create func() Storage) { @@ -58,24 +58,24 @@ func testMultipleObjectCreation(c *C, create func() Storage) { func testPaging(c *C, create func() Storage) { storage := create() storage.StoreBucket("bucket") - storage.ListObjects("bucket", "", 1000) - objects, isTruncated, err := storage.ListObjects("bucket", "", 1000) + storage.ListObjects("bucket", "", 5) + objects, isTruncated, err := storage.ListObjects("bucket", "", 5) c.Check(len(objects), Equals, 0) c.Check(isTruncated, Equals, false) c.Check(err, IsNil) - for i := 1; i <= 1000; i++ { + for i := 1; i <= 5; i++ { key := "obj" + strconv.Itoa(i) storage.StoreObject("bucket", key, bytes.NewBufferString(key)) - objects, isTruncated, err = storage.ListObjects("bucket", "", 1000) + objects, isTruncated, err = storage.ListObjects("bucket", "", 5) c.Check(len(objects), Equals, i) c.Check(isTruncated, Equals, false) c.Check(err, IsNil) } - for i := 1001; i <= 2000; i++ { + for i := 6; i <= 10; i++ { key := "obj" + strconv.Itoa(i) storage.StoreObject("bucket", key, bytes.NewBufferString(key)) - objects, isTruncated, err = storage.ListObjects("bucket", "", 1000) - c.Check(len(objects), Equals, 1000) + objects, isTruncated, err = storage.ListObjects("bucket", "", 5) + c.Check(len(objects), Equals, 5) c.Check(isTruncated, Equals, true) c.Check(err, IsNil) } @@ -86,8 +86,13 @@ func testObjectOverwriteFails(c *C, create func() Storage) { storage.StoreBucket("bucket") err := storage.StoreObject("bucket", "object", bytes.NewBufferString("one")) c.Check(err, IsNil) - err = storage.StoreObject("bucket", "object", bytes.NewBufferString("one")) + err = storage.StoreObject("bucket", "object", bytes.NewBufferString("three")) c.Check(err, Not(IsNil)) + var bytesBuffer bytes.Buffer + length, err := storage.CopyObjectToWriter(&bytesBuffer, "bucket", "object") + c.Check(length, Equals, int64(len("one"))) + c.Check(err, IsNil) + c.Check(string(bytesBuffer.Bytes()), Equals, "one") } func testNonExistantBucketOperations(c *C, create func() Storage) {