From 7e6169690199f0a942252c26bc410b4d5a5343f8 Mon Sep 17 00:00:00 2001 From: "Frederick F. Kautz IV" Date: Sat, 14 Mar 2015 19:21:27 -0700 Subject: [PATCH] Inmemory donutbox is now thread safe --- pkg/donutbox/donutbox.go | 2 +- pkg/donutbox/donutmem/donutmem.go | 84 ++++++++++++++++++-------- pkg/donutbox/donutmem/donutmem_test.go | 28 ++++----- 3 files changed, 71 insertions(+), 43 deletions(-) diff --git a/pkg/donutbox/donutbox.go b/pkg/donutbox/donutbox.go index 91b82b8de..49d40a3eb 100644 --- a/pkg/donutbox/donutbox.go +++ b/pkg/donutbox/donutbox.go @@ -14,7 +14,7 @@ type DonutBox interface { SetBucketMetadata(bucket, name string, metadata io.Reader) error // object operations - GetObjectWriter(bucket, object string, column, blockSize uint) (io.WriteCloser, <-chan Result, error) + GetObjectWriter(bucket, object string, column, blockSize uint) *io.PipeWriter GetObjectReader(bucket, object string, column int) (io.Reader, error) StoreObjectMetadata(bucket, object, name string, reader io.Reader) error GetObjectMetadata(bucket, object, name string) (io.Reader, error) diff --git a/pkg/donutbox/donutmem/donutmem.go b/pkg/donutbox/donutmem/donutmem.go index dd4e02381..8724130f4 100644 --- a/pkg/donutbox/donutmem/donutmem.go +++ b/pkg/donutbox/donutmem/donutmem.go @@ -7,28 +7,33 @@ import ( "io" "strconv" "strings" + "sync" ) type bucket struct { name string metadata map[string]string - objects map[string]object + objects map[string]*object + lock *sync.RWMutex } type object struct { name string data []byte metadata map[string]string + lock *sync.RWMutex } type donutMem struct { - buckets map[string]bucket + buckets map[string]*bucket + lock *sync.RWMutex } // NewDonutMem creates a new in memory donut func NewDonutMem() donutbox.DonutBox { return donutMem{ - buckets: make(map[string]bucket), + buckets: make(map[string]*bucket), + lock: new(sync.RWMutex), } } @@ -48,9 +53,10 @@ func (donutMem donutMem) CreateBucket(b string) error { newBucket := bucket{ name: b, metadata: metadata, - objects: make(map[string]object), + objects: make(map[string]*object), + lock: new(sync.RWMutex), } - donutMem.buckets[b] = newBucket + donutMem.buckets[b] = &newBucket return nil } func (donutMem donutMem) ListObjects(bucket, prefix string) ([]string, error) { @@ -64,37 +70,63 @@ func (donutMem donutMem) SetBucketMetadata(bucket, name string, metadata io.Read } // object operations -func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) (io.WriteCloser, <-chan donutbox.Result, error) { - if _, ok := donutMem.buckets[bucket]; ok { - if _, ok := donutMem.buckets[bucket].objects[key]; !ok { - reader, writer := io.Pipe() - ch := make(chan donutbox.Result) +func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) *io.PipeWriter { + reader, writer := io.Pipe() + donutMem.lock.RLock() + defer donutMem.lock.RUnlock() + if curBucket, ok := donutMem.buckets[bucket]; ok { + curBucket.lock.Lock() + defer curBucket.lock.Unlock() + if _, ok := curBucket.objects[key]; !ok { + // create object + metadata := make(map[string]string) + metadata["key"] = key + metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10) + + newObject := object{ + name: key, + data: make([]byte, 0), + metadata: metadata, + lock: new(sync.RWMutex), + } + + newObject.lock.Lock() + curBucket.objects[key] = &newObject go func() { + defer newObject.lock.Unlock() var objBuffer bytes.Buffer _, err := io.Copy(&objBuffer, reader) - metadata := make(map[string]string) - metadata["key"] = key - metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10) if err == nil { - newObject := object{ - name: key, - data: objBuffer.Bytes(), - metadata: metadata, - } - donutMem.buckets[bucket].objects[key] = newObject + newObject.data = objBuffer.Bytes() + writer.Close() + } else { + donutMem.lock.RLock() + defer donutMem.lock.RUnlock() + bucket, _ := donutMem.buckets[bucket] + bucket.lock.Lock() + defer bucket.lock.Unlock() + delete(bucket.objects, key) + writer.CloseWithError(err) } - ch <- donutbox.Result{Err: err} }() - return writer, ch, nil + return writer } - return nil, nil, errors.New("Object exists") + writer.CloseWithError(errors.New("Object exists")) + return writer } - return nil, nil, errors.New("Bucket not found") + writer.CloseWithError(errors.New("Bucket does not exist")) + return writer } func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) { - if b, ok := donutMem.buckets[bucket]; ok { - if obj, ok := b.objects[key]; ok { - return bytes.NewBuffer(obj.data), nil + donutMem.lock.RLock() + defer donutMem.lock.RUnlock() + if curBucket, ok := donutMem.buckets[bucket]; ok { + curBucket.lock.RLock() + defer curBucket.lock.RUnlock() + if curObject, ok := curBucket.objects[key]; ok { + curObject.lock.RLock() + defer curObject.lock.RUnlock() + return bytes.NewBuffer(curObject.data), nil } return nil, errors.New("Object not found") } diff --git a/pkg/donutbox/donutmem/donutmem_test.go b/pkg/donutbox/donutmem/donutmem_test.go index 376d618dc..217b970aa 100644 --- a/pkg/donutbox/donutmem/donutmem_test.go +++ b/pkg/donutbox/donutmem/donutmem_test.go @@ -3,9 +3,8 @@ package donutmem import ( "testing" - "bytes" . "gopkg.in/check.v1" - "io" + "io/ioutil" ) func Test(t *testing.T) { TestingT(t) } @@ -15,30 +14,27 @@ type MySuite struct{} var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { - c.Skip("Not implemented") + data := "Hello World" donut := NewDonutMem() - writer, ch, err := donut.GetObjectWriter("foo", "bar", 0, 2) - c.Assert(writer, IsNil) - c.Assert(ch, IsNil) + writer := donut.GetObjectWriter("foo", "bar", 0, 2) + count, err := writer.Write([]byte("hello")) c.Assert(err, Not(IsNil)) err = donut.CreateBucket("foo") c.Assert(err, IsNil) - writer, ch, err = donut.GetObjectWriter("foo", "bar", 0, 2) + writer = donut.GetObjectWriter("foo", "bar", 0, 2) + count, err = writer.Write([]byte(data)) + c.Assert(count, Equals, len(data)) c.Assert(err, IsNil) - c.Assert(ch, Not(IsNil)) - writer.Write([]byte("Hello World")) writer.Close() - res := <-ch - c.Assert(res.Err, IsNil) + c.Assert(err, IsNil) + + // time.Sleep(1 * time.Second) reader, err := donut.GetObjectReader("foo", "bar", 0) c.Assert(err, IsNil) - var target bytes.Buffer - _, err = io.Copy(&target, reader) - c.Assert(err, IsNil) - c.Assert(target.Bytes(), DeepEquals, []byte("Hello World")) - + result, err := ioutil.ReadAll(reader) + c.Assert(result, DeepEquals, []byte(data)) }