minio/pkg/donutbox/donutmem/donutmem.go

247 lines
6.5 KiB
Go
Raw Normal View History

2015-03-14 21:02:19 -04:00
package donutmem
import (
"bytes"
"errors"
"github.com/minio-io/minio/pkg/donutbox"
"io"
"strconv"
"strings"
2015-03-14 22:21:27 -04:00
"sync"
2015-03-17 03:29:19 -04:00
"time"
2015-03-14 21:02:19 -04:00
)
type bucket struct {
name string
metadata map[string]string
2015-03-14 22:21:27 -04:00
objects map[string]*object
lock *sync.RWMutex
2015-03-14 21:02:19 -04:00
}
type object struct {
name string
data []byte
metadata map[string]string
2015-03-14 22:21:27 -04:00
lock *sync.RWMutex
2015-03-14 21:02:19 -04:00
}
type donutMem struct {
2015-03-14 22:21:27 -04:00
buckets map[string]*bucket
lock *sync.RWMutex
2015-03-14 21:02:19 -04:00
}
// NewDonutMem creates a new in memory donut
func NewDonutMem() donutbox.DonutBox {
return donutMem{
2015-03-14 22:21:27 -04:00
buckets: make(map[string]*bucket),
lock: new(sync.RWMutex),
2015-03-14 21:02:19 -04:00
}
}
// system operations
func (donutMem donutMem) ListBuckets() ([]string, error) {
2015-03-15 03:41:06 -04:00
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
var buckets []string
for k := range donutMem.buckets {
2015-03-15 03:41:06 -04:00
buckets = append(buckets, k)
}
return buckets, nil
2015-03-14 21:02:19 -04:00
}
// bucket operations
func (donutMem donutMem) CreateBucket(b string) error {
2015-03-15 03:41:06 -04:00
donutMem.lock.Lock()
defer donutMem.lock.Unlock()
2015-03-14 21:02:19 -04:00
b = strings.ToLower(b)
if _, ok := donutMem.buckets[b]; ok {
return errors.New("Bucket Exists")
}
metadata := make(map[string]string)
metadata["name"] = b
2015-03-17 03:29:19 -04:00
metadata["created"] = time.Now().Format(time.RFC3339Nano)
2015-03-14 21:02:19 -04:00
newBucket := bucket{
name: b,
metadata: metadata,
2015-03-14 22:21:27 -04:00
objects: make(map[string]*object),
lock: new(sync.RWMutex),
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:21:27 -04:00
donutMem.buckets[b] = &newBucket
2015-03-14 21:02:19 -04:00
return nil
}
2015-03-14 22:29:54 -04:00
2015-03-15 03:41:06 -04:00
func (donutMem donutMem) ListObjectsInBucket(bucketKey, prefixKey string) ([]string, error) {
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
objectMap := make(map[string]string)
for objectKey := range curBucket.objects {
objectName := strings.Split(objectKey, "#")[0]
if strings.HasPrefix(objectName, prefixKey) {
objectMap[objectName] = objectName
2015-03-15 03:41:06 -04:00
}
}
var objects []string
for k := range objectMap {
objects = append(objects, k)
}
2015-03-15 03:41:06 -04:00
return objects, nil
}
return nil, errors.New("Bucket does not exist")
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:29:54 -04:00
2015-03-15 03:41:06 -04:00
func (donutMem donutMem) GetBucketMetadata(bucketKey string) (map[string]string, error) {
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
result := make(map[string]string)
for k, v := range curBucket.metadata {
result[k] = v
}
return result, nil
}
return nil, errors.New("Bucket not found")
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:29:54 -04:00
2015-03-15 03:41:06 -04:00
func (donutMem donutMem) SetBucketMetadata(bucketKey string, metadata map[string]string) error {
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.Lock()
defer curBucket.lock.Unlock()
newMetadata := make(map[string]string)
for k, v := range metadata {
newMetadata[k] = v
}
curBucket.metadata = newMetadata
return nil
}
return errors.New("Bucket not found")
2015-03-14 21:02:19 -04:00
}
// object operations
func (donutMem donutMem) GetObjectWriter(bucketKey, objectKey string, column uint, blockSize uint) (*donutbox.NewObject, error) {
key := getKey(bucketKey, objectKey, column)
2015-03-14 22:21:27 -04:00
reader, writer := io.Pipe()
returnObject := donutbox.CreateNewObject(writer)
2015-03-14 22:21:27 -04:00
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
2015-03-14 22:21:27 -04:00
curBucket.lock.Lock()
defer curBucket.lock.Unlock()
if _, ok := curBucket.objects[key]; !ok {
newObject := object{
name: key,
data: make([]byte, 0),
lock: new(sync.RWMutex),
2015-03-14 22:21:27 -04:00
}
newObject.lock.Lock()
curBucket.objects[key] = &newObject
2015-03-14 21:02:19 -04:00
go func() {
2015-03-14 22:21:27 -04:00
defer newObject.lock.Unlock()
2015-03-14 21:02:19 -04:00
var objBuffer bytes.Buffer
2015-03-14 21:02:19 -04:00
_, err := io.Copy(&objBuffer, reader)
if err == nil {
2015-03-14 22:21:27 -04:00
newObject.data = objBuffer.Bytes()
writer.Close()
metadata := returnObject.GetMetadata()
for k, v := range metadata {
metadata[k] = v
}
metadata["key"] = objectKey
metadata["column"] = strconv.FormatUint(uint64(column), 10)
newObject.metadata = metadata
return
2015-03-14 21:02:19 -04:00
}
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
bucket, _ := donutMem.buckets[bucketKey]
bucket.lock.Lock()
defer bucket.lock.Unlock()
delete(bucket.objects, key)
writer.CloseWithError(err)
2015-03-14 21:02:19 -04:00
}()
return returnObject, nil
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:21:27 -04:00
writer.CloseWithError(errors.New("Object exists"))
return nil, errors.New("Object exists")
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:21:27 -04:00
writer.CloseWithError(errors.New("Bucket does not exist"))
return nil, errors.New("Bucket does not exist")
2015-03-14 21:02:19 -04:00
}
2015-03-14 22:29:54 -04:00
func (donutMem donutMem) GetObjectReader(bucketKey, objectKey string, column uint) (io.Reader, error) {
key := getKey(bucketKey, objectKey, column)
2015-03-14 22:21:27 -04:00
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
2015-03-14 22:21:27 -04:00
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[key]; ok {
curObject.lock.RLock()
defer curObject.lock.RUnlock()
return bytes.NewBuffer(curObject.data), nil
2015-03-14 21:02:19 -04:00
}
return nil, errors.New("Object not found")
}
return nil, errors.New("Bucket not found")
}
2015-03-14 22:29:54 -04:00
//func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, column uint, metadata map[string]string) error {
// key := getKey(bucketKey, objectKey, column)
// donutMem.lock.RLock()
// defer donutMem.lock.RUnlock()
// if curBucket, ok := donutMem.buckets[bucketKey]; ok {
// curBucket.lock.RLock()
// defer curBucket.lock.RUnlock()
// if curObject, ok := curBucket.objects[key]; ok {
// curObject.lock.Lock()
// defer curObject.lock.Unlock()
// newMetadata := make(map[string]string)
// for k, v := range metadata {
// newMetadata[k] = v
// }
// curObject.metadata = newMetadata
// return nil
// }
// return errors.New("Object not found")
// }
// return errors.New("Bucket not found")
//}
func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string, column uint) (map[string]string, error) {
key := getKey(bucketKey, objectKey, column)
2015-03-15 03:41:06 -04:00
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock()
defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[key]; ok {
2015-03-15 03:41:06 -04:00
curObject.lock.RLock()
defer curObject.lock.RUnlock()
result := make(map[string]string)
for k, v := range curObject.metadata {
result[k] = v
}
return result, nil
}
2015-03-17 03:29:19 -04:00
return nil, errors.New("Object not Found: " + bucketKey + "#" + objectKey)
2015-03-15 03:41:06 -04:00
}
return nil, errors.New("Bucket not found")
2015-03-14 21:02:19 -04:00
}
func getKey(bucketKey, objectKey string, column uint) string {
return objectKey + "#" + strconv.FormatUint(uint64(column), 10)
}