Inmemory donutbox is now thread safe

This commit is contained in:
Frederick F. Kautz IV
2015-03-14 19:21:27 -07:00
parent 74eb6067f6
commit 7e61696901
3 changed files with 71 additions and 43 deletions

View File

@@ -7,28 +7,33 @@ import (
"io"
"strconv"
"strings"
"sync"
)
type bucket struct {
name string
metadata map[string]string
objects map[string]object
objects map[string]*object
lock *sync.RWMutex
}
type object struct {
name string
data []byte
metadata map[string]string
lock *sync.RWMutex
}
type donutMem struct {
buckets map[string]bucket
buckets map[string]*bucket
lock *sync.RWMutex
}
// NewDonutMem creates a new in memory donut
func NewDonutMem() donutbox.DonutBox {
return donutMem{
buckets: make(map[string]bucket),
buckets: make(map[string]*bucket),
lock: new(sync.RWMutex),
}
}
@@ -48,9 +53,10 @@ func (donutMem donutMem) CreateBucket(b string) error {
newBucket := bucket{
name: b,
metadata: metadata,
objects: make(map[string]object),
objects: make(map[string]*object),
lock: new(sync.RWMutex),
}
donutMem.buckets[b] = newBucket
donutMem.buckets[b] = &newBucket
return nil
}
func (donutMem donutMem) ListObjects(bucket, prefix string) ([]string, error) {
@@ -64,37 +70,63 @@ func (donutMem donutMem) SetBucketMetadata(bucket, name string, metadata io.Read
}
// object operations
func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) (io.WriteCloser, <-chan donutbox.Result, error) {
if _, ok := donutMem.buckets[bucket]; ok {
if _, ok := donutMem.buckets[bucket].objects[key]; !ok {
reader, writer := io.Pipe()
ch := make(chan donutbox.Result)
func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) *io.PipeWriter {
reader, writer := io.Pipe()
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucket]; ok {
curBucket.lock.Lock()
defer curBucket.lock.Unlock()
if _, ok := curBucket.objects[key]; !ok {
// create object
metadata := make(map[string]string)
metadata["key"] = key
metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10)
newObject := object{
name: key,
data: make([]byte, 0),
metadata: metadata,
lock: new(sync.RWMutex),
}
newObject.lock.Lock()
curBucket.objects[key] = &newObject
go func() {
defer newObject.lock.Unlock()
var objBuffer bytes.Buffer
_, err := io.Copy(&objBuffer, reader)
metadata := make(map[string]string)
metadata["key"] = key
metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10)
if err == nil {
newObject := object{
name: key,
data: objBuffer.Bytes(),
metadata: metadata,
}
donutMem.buckets[bucket].objects[key] = newObject
newObject.data = objBuffer.Bytes()
writer.Close()
} else {
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
bucket, _ := donutMem.buckets[bucket]
bucket.lock.Lock()
defer bucket.lock.Unlock()
delete(bucket.objects, key)
writer.CloseWithError(err)
}
ch <- donutbox.Result{Err: err}
}()
return writer, ch, nil
return writer
}
return nil, nil, errors.New("Object exists")
writer.CloseWithError(errors.New("Object exists"))
return writer
}
return nil, nil, errors.New("Bucket not found")
writer.CloseWithError(errors.New("Bucket does not exist"))
return writer
}
func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) {
if b, ok := donutMem.buckets[bucket]; ok {
if obj, ok := b.objects[key]; ok {
return bytes.NewBuffer(obj.data), nil
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucket]; ok {
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[key]; ok {
curObject.lock.RLock()
defer curObject.lock.RUnlock()
return bytes.NewBuffer(curObject.data), nil
}
return nil, errors.New("Object not found")
}