Merge pull request #296 from fkautz/pr_out_inmemory_donutbox_is_now_thread_safe

This commit is contained in:
Frederick F. Kautz IV 2015-03-14 19:21:44 -07:00
commit d52ca9a238
3 changed files with 71 additions and 43 deletions

View File

@ -14,7 +14,7 @@ type DonutBox interface {
SetBucketMetadata(bucket, name string, metadata io.Reader) error SetBucketMetadata(bucket, name string, metadata io.Reader) error
// object operations // object operations
GetObjectWriter(bucket, object string, column, blockSize uint) (io.WriteCloser, <-chan Result, error) GetObjectWriter(bucket, object string, column, blockSize uint) *io.PipeWriter
GetObjectReader(bucket, object string, column int) (io.Reader, error) GetObjectReader(bucket, object string, column int) (io.Reader, error)
StoreObjectMetadata(bucket, object, name string, reader io.Reader) error StoreObjectMetadata(bucket, object, name string, reader io.Reader) error
GetObjectMetadata(bucket, object, name string) (io.Reader, error) GetObjectMetadata(bucket, object, name string) (io.Reader, error)

View File

@ -7,28 +7,33 @@ import (
"io" "io"
"strconv" "strconv"
"strings" "strings"
"sync"
) )
type bucket struct { type bucket struct {
name string name string
metadata map[string]string metadata map[string]string
objects map[string]object objects map[string]*object
lock *sync.RWMutex
} }
type object struct { type object struct {
name string name string
data []byte data []byte
metadata map[string]string metadata map[string]string
lock *sync.RWMutex
} }
type donutMem struct { type donutMem struct {
buckets map[string]bucket buckets map[string]*bucket
lock *sync.RWMutex
} }
// NewDonutMem creates a new in memory donut // NewDonutMem creates a new in memory donut
func NewDonutMem() donutbox.DonutBox { func NewDonutMem() donutbox.DonutBox {
return donutMem{ return donutMem{
buckets: make(map[string]bucket), buckets: make(map[string]*bucket),
lock: new(sync.RWMutex),
} }
} }
@ -48,9 +53,10 @@ func (donutMem donutMem) CreateBucket(b string) error {
newBucket := bucket{ newBucket := bucket{
name: b, name: b,
metadata: metadata, metadata: metadata,
objects: make(map[string]object), objects: make(map[string]*object),
lock: new(sync.RWMutex),
} }
donutMem.buckets[b] = newBucket donutMem.buckets[b] = &newBucket
return nil return nil
} }
func (donutMem donutMem) ListObjects(bucket, prefix string) ([]string, error) { func (donutMem donutMem) ListObjects(bucket, prefix string) ([]string, error) {
@ -64,37 +70,63 @@ func (donutMem donutMem) SetBucketMetadata(bucket, name string, metadata io.Read
} }
// object operations // object operations
func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) (io.WriteCloser, <-chan donutbox.Result, error) { func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) *io.PipeWriter {
if _, ok := donutMem.buckets[bucket]; ok { reader, writer := io.Pipe()
if _, ok := donutMem.buckets[bucket].objects[key]; !ok { donutMem.lock.RLock()
reader, writer := io.Pipe() defer donutMem.lock.RUnlock()
ch := make(chan donutbox.Result) if curBucket, ok := donutMem.buckets[bucket]; ok {
curBucket.lock.Lock()
defer curBucket.lock.Unlock()
if _, ok := curBucket.objects[key]; !ok {
// create object
metadata := make(map[string]string)
metadata["key"] = key
metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10)
newObject := object{
name: key,
data: make([]byte, 0),
metadata: metadata,
lock: new(sync.RWMutex),
}
newObject.lock.Lock()
curBucket.objects[key] = &newObject
go func() { go func() {
defer newObject.lock.Unlock()
var objBuffer bytes.Buffer var objBuffer bytes.Buffer
_, err := io.Copy(&objBuffer, reader) _, err := io.Copy(&objBuffer, reader)
metadata := make(map[string]string)
metadata["key"] = key
metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10)
if err == nil { if err == nil {
newObject := object{ newObject.data = objBuffer.Bytes()
name: key, writer.Close()
data: objBuffer.Bytes(), } else {
metadata: metadata, donutMem.lock.RLock()
} defer donutMem.lock.RUnlock()
donutMem.buckets[bucket].objects[key] = newObject bucket, _ := donutMem.buckets[bucket]
bucket.lock.Lock()
defer bucket.lock.Unlock()
delete(bucket.objects, key)
writer.CloseWithError(err)
} }
ch <- donutbox.Result{Err: err}
}() }()
return writer, ch, nil return writer
} }
return nil, nil, errors.New("Object exists") writer.CloseWithError(errors.New("Object exists"))
return writer
} }
return nil, nil, errors.New("Bucket not found") writer.CloseWithError(errors.New("Bucket does not exist"))
return writer
} }
func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) { func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) {
if b, ok := donutMem.buckets[bucket]; ok { donutMem.lock.RLock()
if obj, ok := b.objects[key]; ok { defer donutMem.lock.RUnlock()
return bytes.NewBuffer(obj.data), nil if curBucket, ok := donutMem.buckets[bucket]; ok {
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[key]; ok {
curObject.lock.RLock()
defer curObject.lock.RUnlock()
return bytes.NewBuffer(curObject.data), nil
} }
return nil, errors.New("Object not found") return nil, errors.New("Object not found")
} }

View File

@ -3,9 +3,8 @@ package donutmem
import ( import (
"testing" "testing"
"bytes"
. "gopkg.in/check.v1" . "gopkg.in/check.v1"
"io" "io/ioutil"
) )
func Test(t *testing.T) { TestingT(t) } func Test(t *testing.T) { TestingT(t) }
@ -15,30 +14,27 @@ type MySuite struct{}
var _ = Suite(&MySuite{}) var _ = Suite(&MySuite{})
func (s *MySuite) TestAPISuite(c *C) { func (s *MySuite) TestAPISuite(c *C) {
c.Skip("Not implemented") data := "Hello World"
donut := NewDonutMem() donut := NewDonutMem()
writer, ch, err := donut.GetObjectWriter("foo", "bar", 0, 2) writer := donut.GetObjectWriter("foo", "bar", 0, 2)
c.Assert(writer, IsNil) count, err := writer.Write([]byte("hello"))
c.Assert(ch, IsNil)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
err = donut.CreateBucket("foo") err = donut.CreateBucket("foo")
c.Assert(err, IsNil) c.Assert(err, IsNil)
writer, ch, err = donut.GetObjectWriter("foo", "bar", 0, 2) writer = donut.GetObjectWriter("foo", "bar", 0, 2)
count, err = writer.Write([]byte(data))
c.Assert(count, Equals, len(data))
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(ch, Not(IsNil))
writer.Write([]byte("Hello World"))
writer.Close() writer.Close()
res := <-ch c.Assert(err, IsNil)
c.Assert(res.Err, IsNil)
// time.Sleep(1 * time.Second)
reader, err := donut.GetObjectReader("foo", "bar", 0) reader, err := donut.GetObjectReader("foo", "bar", 0)
c.Assert(err, IsNil) c.Assert(err, IsNil)
var target bytes.Buffer result, err := ioutil.ReadAll(reader)
_, err = io.Copy(&target, reader) c.Assert(result, DeepEquals, []byte(data))
c.Assert(err, IsNil)
c.Assert(target.Bytes(), DeepEquals, []byte("Hello World"))
} }