mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
Adding simple file storage driver for persistent storage
This commit is contained in:
parent
2679324afe
commit
44b28166f5
22
file_storage.go
Normal file
22
file_storage.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package minio
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FileStorage struct {
|
||||||
|
RootDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storage FileStorage) Get(objectPath string) ([]byte, error) {
|
||||||
|
return ioutil.ReadFile(path.Join(storage.RootDir, objectPath))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storage FileStorage) Put(objectPath string, object []byte) error {
|
||||||
|
os.MkdirAll(filepath.Dir(path.Join(storage.RootDir, objectPath)), 0700)
|
||||||
|
return ioutil.WriteFile(path.Join(storage.RootDir, objectPath), object, 0600)
|
||||||
|
}
|
51
file_storage_test.go
Normal file
51
file_storage_test.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package minio
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FileStorageSuite struct{}
|
||||||
|
|
||||||
|
var _ = Suite(&FileStorageSuite{})
|
||||||
|
|
||||||
|
func makeTempTestDir() (string, error) {
|
||||||
|
return ioutil.TempDir("/tmp", "minio-test-")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FileStorageSuite) TestFileStoragePutAtRootPath(c *C) {
|
||||||
|
rootDir, err := makeTempTestDir()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(rootDir)
|
||||||
|
|
||||||
|
var storage ObjectStorage
|
||||||
|
storage = FileStorage{
|
||||||
|
RootDir: rootDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
storage.Put("path1", []byte("object1"))
|
||||||
|
|
||||||
|
// assert object1 was created in correct path
|
||||||
|
object1, err := storage.Get("path1")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(string(object1), Equals, "object1")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FileStorageSuite) TestFileStoragePutDirPath(c *C) {
|
||||||
|
rootDir, err := makeTempTestDir()
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(rootDir)
|
||||||
|
|
||||||
|
var storage ObjectStorage
|
||||||
|
storage = FileStorage{
|
||||||
|
RootDir: rootDir,
|
||||||
|
}
|
||||||
|
|
||||||
|
storage.Put("path1/path2/path3", []byte("object"))
|
||||||
|
|
||||||
|
// assert object1 was created in correct path
|
||||||
|
object1, err := storage.Get("path1/path2/path3")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(string(object1), Equals, "object")
|
||||||
|
}
|
35
gateway.go
35
gateway.go
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/tchap/go-patricia/patricia"
|
"github.com/tchap/go-patricia/patricia"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stores system configuration, populated from CLI or test runner
|
// Stores system configuration, populated from CLI or test runner
|
||||||
@ -14,6 +15,7 @@ type GatewayConfig struct {
|
|||||||
StorageDriver StorageDriver
|
StorageDriver StorageDriver
|
||||||
BucketDriver BucketDriver
|
BucketDriver BucketDriver
|
||||||
requestBucketChan chan BucketRequest
|
requestBucketChan chan BucketRequest
|
||||||
|
dataDir string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Message for requesting a bucket
|
// Message for requesting a bucket
|
||||||
@ -37,7 +39,7 @@ type Bucket interface {
|
|||||||
type BucketDriver func(config GatewayConfig)
|
type BucketDriver func(config GatewayConfig)
|
||||||
|
|
||||||
// Storage driver function, should read from a channel and respond through callback channels
|
// Storage driver function, should read from a channel and respond through callback channels
|
||||||
type StorageDriver func(bucket string, input chan ObjectRequest)
|
type StorageDriver func(bucket string, input chan ObjectRequest, config GatewayConfig)
|
||||||
|
|
||||||
// TODO remove when building real context
|
// TODO remove when building real context
|
||||||
type fakeContext struct{}
|
type fakeContext struct{}
|
||||||
@ -103,7 +105,7 @@ func SynchronizedBucketDriver(config GatewayConfig) {
|
|||||||
for request := range config.requestBucketChan {
|
for request := range config.requestBucketChan {
|
||||||
if buckets[request.name] == nil {
|
if buckets[request.name] == nil {
|
||||||
bucketChannel := make(chan ObjectRequest)
|
bucketChannel := make(chan ObjectRequest)
|
||||||
go config.StorageDriver(request.name, bucketChannel)
|
go config.StorageDriver(request.name, bucketChannel, config)
|
||||||
buckets[request.name] = &SynchronizedBucket{
|
buckets[request.name] = &SynchronizedBucket{
|
||||||
name: request.name,
|
name: request.name,
|
||||||
channel: bucketChannel,
|
channel: bucketChannel,
|
||||||
@ -176,17 +178,14 @@ func (bucket *SynchronizedBucket) closeChannel() {
|
|||||||
close(bucket.channel)
|
close(bucket.channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
func InMemoryStorageDriver(bucket string, input chan ObjectRequest) {
|
func InMemoryStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) {
|
||||||
objects := patricia.NewTrie()
|
objects := patricia.NewTrie()
|
||||||
for request := range input {
|
for request := range input {
|
||||||
prefix := patricia.Prefix(request.path)
|
prefix := patricia.Prefix(request.path)
|
||||||
fmt.Println("objects:", objects)
|
|
||||||
switch request.requestType {
|
switch request.requestType {
|
||||||
case "GET":
|
case "GET":
|
||||||
fmt.Println("GET: " + request.path)
|
|
||||||
request.callback <- objects.Get(prefix)
|
request.callback <- objects.Get(prefix)
|
||||||
case "PUT":
|
case "PUT":
|
||||||
fmt.Println("PUT: " + request.path)
|
|
||||||
objects.Insert(prefix, request.object)
|
objects.Insert(prefix, request.object)
|
||||||
request.callback <- nil
|
request.callback <- nil
|
||||||
default:
|
default:
|
||||||
@ -194,3 +193,27 @@ func InMemoryStorageDriver(bucket string, input chan ObjectRequest) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SimpleFileStorageDriver(bucket string, input chan ObjectRequest, config GatewayConfig) {
|
||||||
|
storage := FileStorage{
|
||||||
|
RootDir: config.dataDir,
|
||||||
|
}
|
||||||
|
for request := range input {
|
||||||
|
switch request.requestType {
|
||||||
|
case "GET":
|
||||||
|
objectPath := path.Join(bucket, request.path)
|
||||||
|
object, err := storage.Get(objectPath)
|
||||||
|
if err != nil {
|
||||||
|
request.callback <- nil
|
||||||
|
} else {
|
||||||
|
request.callback <- object
|
||||||
|
}
|
||||||
|
case "PUT":
|
||||||
|
objectPath := path.Join(bucket, request.path)
|
||||||
|
storage.Put(objectPath, request.object)
|
||||||
|
request.callback <- nil
|
||||||
|
default:
|
||||||
|
request.callback <- errors.New("Unexpected message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -6,17 +6,15 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MySuite struct{}
|
type GatewaySuite struct{}
|
||||||
|
|
||||||
var _ = Suite(&MySuite{})
|
var _ = Suite(&GatewaySuite{})
|
||||||
|
|
||||||
func Test(t *testing.T) { TestingT(t) }
|
func (s *GatewaySuite) TestPrintsGateway(c *C) {
|
||||||
|
|
||||||
func (s *MySuite) TestPrintsGateway(c *C) {
|
|
||||||
// set up router with in memory storage driver
|
// set up router with in memory storage driver
|
||||||
router := mux.NewRouter()
|
router := mux.NewRouter()
|
||||||
config := GatewayConfig{
|
config := GatewayConfig{
|
||||||
@ -59,7 +57,7 @@ func (s *MySuite) TestPrintsGateway(c *C) {
|
|||||||
|
|
||||||
type TestContext struct{}
|
type TestContext struct{}
|
||||||
|
|
||||||
func (s *MySuite) TestBucketCreation(c *C) {
|
func (s *GatewaySuite) TestBucketCreation(c *C) {
|
||||||
config := GatewayConfig{
|
config := GatewayConfig{
|
||||||
StorageDriver: InMemoryStorageDriver,
|
StorageDriver: InMemoryStorageDriver,
|
||||||
requestBucketChan: make(chan BucketRequest),
|
requestBucketChan: make(chan BucketRequest),
|
||||||
@ -103,38 +101,50 @@ func (s *MySuite) TestBucketCreation(c *C) {
|
|||||||
c.Assert(bucketB.GetName(context), Equals, "bucketB")
|
c.Assert(bucketB.GetName(context), Equals, "bucketB")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MySuite) TestInMemoryBucketOperations(c *C) {
|
func (s *GatewaySuite) TestInMemoryBucketOperations(c *C) {
|
||||||
// Test in memory bucket operations
|
simpleFileStorageRootDir, err := makeTempTestDir()
|
||||||
config := GatewayConfig{
|
|
||||||
StorageDriver: InMemoryStorageDriver,
|
|
||||||
requestBucketChan: make(chan BucketRequest),
|
|
||||||
}
|
|
||||||
defer close(config.requestBucketChan)
|
|
||||||
go SynchronizedBucketDriver(config)
|
|
||||||
context := TestContext{}
|
|
||||||
|
|
||||||
// get bucket
|
|
||||||
callback := make(chan Bucket)
|
|
||||||
config.requestBucketChan <- BucketRequest{
|
|
||||||
name: "bucket",
|
|
||||||
context: context,
|
|
||||||
callback: callback,
|
|
||||||
}
|
|
||||||
bucket := <-callback
|
|
||||||
c.Assert(bucket.GetName(context), Equals, "bucket")
|
|
||||||
|
|
||||||
// get missing value
|
|
||||||
nilResult, err := bucket.Get(context, "foo")
|
|
||||||
c.Assert(nilResult, IsNil)
|
|
||||||
c.Assert(err, Not(IsNil))
|
|
||||||
c.Assert(err.Error(), Equals, "Object not found")
|
|
||||||
|
|
||||||
// add new value
|
|
||||||
err = bucket.Put(context, "foo", []byte("bar"))
|
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
defer os.RemoveAll(simpleFileStorageRootDir)
|
||||||
|
configs := []GatewayConfig{
|
||||||
|
GatewayConfig{
|
||||||
|
StorageDriver: InMemoryStorageDriver,
|
||||||
|
requestBucketChan: make(chan BucketRequest),
|
||||||
|
},
|
||||||
|
GatewayConfig{
|
||||||
|
StorageDriver: SimpleFileStorageDriver,
|
||||||
|
requestBucketChan: make(chan BucketRequest),
|
||||||
|
dataDir: simpleFileStorageRootDir,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, config := range configs {
|
||||||
|
defer close(config.requestBucketChan)
|
||||||
|
go SynchronizedBucketDriver(config)
|
||||||
|
context := TestContext{}
|
||||||
|
|
||||||
// retrieve value
|
// get bucket
|
||||||
barResult, err := bucket.Get(context, "foo")
|
callback := make(chan Bucket)
|
||||||
c.Assert(err, IsNil)
|
config.requestBucketChan <- BucketRequest{
|
||||||
c.Assert(string(barResult), Equals, "bar")
|
name: "bucket",
|
||||||
|
context: context,
|
||||||
|
callback: callback,
|
||||||
|
}
|
||||||
|
bucket := <-callback
|
||||||
|
c.Assert(bucket.GetName(context), Equals, "bucket")
|
||||||
|
|
||||||
|
// get missing value
|
||||||
|
nilResult, err := bucket.Get(context, "foo")
|
||||||
|
c.Assert(nilResult, IsNil)
|
||||||
|
c.Assert(err, Not(IsNil))
|
||||||
|
c.Assert(err.Error(), Equals, "Object not found")
|
||||||
|
|
||||||
|
// add new value
|
||||||
|
err = bucket.Put(context, "foo", []byte("bar"))
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
|
// retrieve value
|
||||||
|
barResult, err := bucket.Get(context, "foo")
|
||||||
|
c.Assert(err, IsNil)
|
||||||
|
c.Assert(string(barResult), Equals, "bar")
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
8
setup_test.go
Normal file
8
setup_test.go
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package minio
|
||||||
|
|
||||||
|
import (
|
||||||
|
. "gopkg.in/check.v1"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test(t *testing.T) { TestingT(t) }
|
@ -6,6 +6,11 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ObjectStorage interface {
|
||||||
|
Get(path string) ([]byte, error)
|
||||||
|
Put(path string, object []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
func RegisterStorageHandlers(router *mux.Router) {
|
func RegisterStorageHandlers(router *mux.Router) {
|
||||||
router.HandleFunc("/storage/rpc", StorageHandler)
|
router.HandleFunc("/storage/rpc", StorageHandler)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user