Metadata and writing the object are now both committed together.

This commit is contained in:
Frederick F. Kautz IV 2015-03-15 17:30:42 -07:00
parent 3a2419d20f
commit 682bf085c4
4 changed files with 209 additions and 76 deletions

View File

@ -14,13 +14,58 @@ type DonutBox interface {
SetBucketMetadata(bucket string, metadata map[string]string) error SetBucketMetadata(bucket string, metadata map[string]string) error
// object operations // object operations
GetObjectWriter(bucket, object string, column, blockSize uint) *io.PipeWriter GetObjectWriter(bucket, object string, column, blockSize uint) (*NewObject, error)
GetObjectReader(bucket, object string, column int) (io.Reader, error) GetObjectReader(bucket, object string, column uint) (io.Reader, error)
SetObjectMetadata(bucket, object string, metadata map[string]string) error GetObjectMetadata(bucket, object string, column uint) (map[string]string, error)
GetObjectMetadata(bucket, object string) (map[string]string, error)
} }
// Result is a result for async tasks // Result is a result for async tasks
type Result struct { type Result struct {
Err error Err error
} }
// CreateNewObject creates a new object wrapping a writer. Clients are not expected to use this directly. This is exposed for storage drivers.
func CreateNewObject(writer *io.PipeWriter) *NewObject {
return &NewObject{writer: writer}
}
// NewObject wraps a writer and allows setting metadata. On a successful close, the object is committed by the backend.
type NewObject struct {
metadata map[string]string
writer *io.PipeWriter
}
// Write data
func (newObject *NewObject) Write(data []byte) (int, error) {
return newObject.writer.Write(data)
}
// SetMetadata sets metadata for an object
func (newObject *NewObject) SetMetadata(metadata map[string]string) {
newMetadata := make(map[string]string)
for k, v := range metadata {
newMetadata[k] = v
}
newObject.metadata = newMetadata
}
// Close and commit the object
func (newObject *NewObject) Close() error {
return newObject.writer.Close()
}
// CloseWithError closes the object with an error, causing the backend to abandon the object
func (newObject *NewObject) CloseWithError(err error) error {
return newObject.writer.CloseWithError(err)
}
// GetMetadata returns a copy of the metadata set metadata
func (newObject *NewObject) GetMetadata() map[string]string {
newMetadata := make(map[string]string)
if newMetadata != nil {
for k, v := range newObject.metadata {
newMetadata[k] = v
}
}
return newMetadata
}

View File

@ -74,12 +74,17 @@ func (donutMem donutMem) ListObjectsInBucket(bucketKey, prefixKey string) ([]str
if curBucket, ok := donutMem.buckets[bucketKey]; ok { if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock() curBucket.lock.RLock()
defer curBucket.lock.RUnlock() defer curBucket.lock.RUnlock()
var objects []string objectMap := make(map[string]string)
for objectKey := range curBucket.objects { for objectKey := range curBucket.objects {
if strings.HasPrefix(objectKey, prefixKey) { objectName := strings.Split(objectKey, "#")[0]
objects = append(objects, objectKey) if strings.HasPrefix(objectName, prefixKey) {
objectMap[objectName] = objectName
} }
} }
var objects []string
for k := range objectMap {
objects = append(objects, k)
}
return objects, nil return objects, nil
} }
return nil, errors.New("Bucket does not exist") return nil, errors.New("Bucket does not exist")
@ -118,24 +123,20 @@ func (donutMem donutMem) SetBucketMetadata(bucketKey string, metadata map[string
} }
// object operations // object operations
func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockSize uint) *io.PipeWriter { func (donutMem donutMem) GetObjectWriter(bucketKey, objectKey string, column uint, blockSize uint) (*donutbox.NewObject, error) {
key := getKey(bucketKey, objectKey, column)
reader, writer := io.Pipe() reader, writer := io.Pipe()
returnObject := donutbox.CreateNewObject(writer)
donutMem.lock.RLock() donutMem.lock.RLock()
defer donutMem.lock.RUnlock() defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucket]; ok { if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.Lock() curBucket.lock.Lock()
defer curBucket.lock.Unlock() defer curBucket.lock.Unlock()
if _, ok := curBucket.objects[key]; !ok { if _, ok := curBucket.objects[key]; !ok {
// create object
metadata := make(map[string]string)
metadata["key"] = key
metadata["blockSize"] = strconv.FormatInt(int64(blockSize), 10)
newObject := object{ newObject := object{
name: key, name: key,
data: make([]byte, 0), data: make([]byte, 0),
metadata: metadata, lock: new(sync.RWMutex),
lock: new(sync.RWMutex),
} }
newObject.lock.Lock() newObject.lock.Lock()
@ -143,33 +144,45 @@ func (donutMem donutMem) GetObjectWriter(bucket, key string, column uint, blockS
go func() { go func() {
defer newObject.lock.Unlock() defer newObject.lock.Unlock()
var objBuffer bytes.Buffer var objBuffer bytes.Buffer
_, err := io.Copy(&objBuffer, reader) _, err := io.Copy(&objBuffer, reader)
if err == nil { if err == nil {
newObject.data = objBuffer.Bytes() newObject.data = objBuffer.Bytes()
writer.Close() writer.Close()
} else {
donutMem.lock.RLock() metadata := returnObject.GetMetadata()
defer donutMem.lock.RUnlock() for k, v := range metadata {
bucket, _ := donutMem.buckets[bucket] metadata[k] = v
bucket.lock.Lock() }
defer bucket.lock.Unlock() metadata["key"] = objectKey
delete(bucket.objects, key) metadata["column"] = strconv.FormatUint(uint64(column), 10)
writer.CloseWithError(err) newObject.metadata = metadata
return
} }
donutMem.lock.RLock()
defer donutMem.lock.RUnlock()
bucket, _ := donutMem.buckets[bucketKey]
bucket.lock.Lock()
defer bucket.lock.Unlock()
delete(bucket.objects, key)
writer.CloseWithError(err)
}() }()
return writer return returnObject, nil
} }
writer.CloseWithError(errors.New("Object exists")) writer.CloseWithError(errors.New("Object exists"))
return writer return nil, errors.New("Object exists")
} }
writer.CloseWithError(errors.New("Bucket does not exist")) writer.CloseWithError(errors.New("Bucket does not exist"))
return writer return nil, errors.New("Bucket does not exist")
} }
func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Reader, error) { func (donutMem donutMem) GetObjectReader(bucketKey, objectKey string, column uint) (io.Reader, error) {
key := getKey(bucketKey, objectKey, column)
donutMem.lock.RLock() donutMem.lock.RLock()
defer donutMem.lock.RUnlock() defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucket]; ok { if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock() curBucket.lock.RLock()
defer curBucket.lock.RUnlock() defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[key]; ok { if curObject, ok := curBucket.objects[key]; ok {
@ -182,35 +195,37 @@ func (donutMem donutMem) GetObjectReader(bucket, key string, column int) (io.Rea
return nil, errors.New("Bucket not found") return nil, errors.New("Bucket not found")
} }
func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, metadata map[string]string) error { //func (donutMem donutMem) SetObjectMetadata(bucketKey, objectKey string, column uint, metadata map[string]string) error {
donutMem.lock.RLock() // key := getKey(bucketKey, objectKey, column)
defer donutMem.lock.RUnlock() // donutMem.lock.RLock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok { // defer donutMem.lock.RUnlock()
curBucket.lock.RLock() // if curBucket, ok := donutMem.buckets[bucketKey]; ok {
defer curBucket.lock.RUnlock() // curBucket.lock.RLock()
if curObject, ok := curBucket.objects[objectKey]; ok { // defer curBucket.lock.RUnlock()
curObject.lock.Lock() // if curObject, ok := curBucket.objects[key]; ok {
defer curObject.lock.Unlock() // curObject.lock.Lock()
newMetadata := make(map[string]string) // defer curObject.lock.Unlock()
for k, v := range metadata { // newMetadata := make(map[string]string)
newMetadata[k] = v // for k, v := range metadata {
} // newMetadata[k] = v
curObject.metadata = newMetadata // }
return nil // curObject.metadata = newMetadata
} // return nil
return errors.New("Object not found") // }
} // return errors.New("Object not found")
return errors.New("Bucket not found") // }
} // return errors.New("Bucket not found")
//}
func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string) (map[string]string, error) { func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string, column uint) (map[string]string, error) {
key := getKey(bucketKey, objectKey, column)
donutMem.lock.RLock() donutMem.lock.RLock()
defer donutMem.lock.RUnlock() defer donutMem.lock.RUnlock()
if curBucket, ok := donutMem.buckets[bucketKey]; ok { if curBucket, ok := donutMem.buckets[bucketKey]; ok {
curBucket.lock.RLock() curBucket.lock.RLock()
defer curBucket.lock.RUnlock() defer curBucket.lock.RUnlock()
if curObject, ok := curBucket.objects[objectKey]; ok { if curObject, ok := curBucket.objects[key]; ok {
curObject.lock.RLock() curObject.lock.RLock()
defer curObject.lock.RUnlock() defer curObject.lock.RUnlock()
result := make(map[string]string) result := make(map[string]string)
@ -223,3 +238,7 @@ func (donutMem donutMem) GetObjectMetadata(bucketKey, objectKey string) (map[str
} }
return nil, errors.New("Bucket not found") return nil, errors.New("Bucket not found")
} }
func getKey(bucketKey, objectKey string, column uint) string {
return objectKey + "#" + strconv.FormatUint(uint64(column), 10)
}

View File

@ -19,15 +19,16 @@ func (s *MySuite) TestCreateAndReadObject(c *C) {
data := "Hello World" data := "Hello World"
donut := NewDonutMem() donut := NewDonutMem()
writer := donut.GetObjectWriter("foo", "bar", 0, 2) writer, err := donut.GetObjectWriter("foo", "bar", 0, 2)
count, err := writer.Write([]byte("hello")) c.Assert(writer, IsNil)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
err = donut.CreateBucket("foo") err = donut.CreateBucket("foo")
c.Assert(err, IsNil) c.Assert(err, IsNil)
writer = donut.GetObjectWriter("foo", "bar", 0, 2) writer, err = donut.GetObjectWriter("foo", "bar", 0, 2)
count, err = writer.Write([]byte(data)) c.Assert(err, IsNil)
count, err := writer.Write([]byte(data))
c.Assert(count, Equals, len(data)) c.Assert(count, Equals, len(data))
c.Assert(err, IsNil) c.Assert(err, IsNil)
err = writer.Close() err = writer.Close()
@ -40,13 +41,8 @@ func (s *MySuite) TestCreateAndReadObject(c *C) {
c.Assert(result, DeepEquals, []byte(data)) c.Assert(result, DeepEquals, []byte(data))
// try writing, should see error // try writing, should see error
writer = donut.GetObjectWriter("foo", "bar", 0, 2) writer, err = donut.GetObjectWriter("foo", "bar", 0, 2)
count, err = writer.Write([]byte("different data")) c.Assert(writer, IsNil)
c.Assert(count, Equals, 0)
c.Assert(err, Not(IsNil))
// try again, should see error
count, err = writer.Write([]byte("different data"))
c.Assert(count, Equals, 0)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
// data should not change // data should not change
@ -87,7 +83,8 @@ func (s *MySuite) TestObjectList(c *C) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
object := "foo" + strconv.Itoa(i) object := "foo" + strconv.Itoa(i)
objects = append(objects, object) objects = append(objects, object)
writer := donut.GetObjectWriter("foo", object, 0, 2) writer, err := donut.GetObjectWriter("foo", object, 0, 2)
c.Assert(err, IsNil)
writer.Write([]byte(object)) writer.Write([]byte(object))
writer.Close() writer.Close()
c.Assert(err, IsNil) c.Assert(err, IsNil)
@ -125,24 +122,31 @@ func (s *MySuite) TestObjectMetadata(c *C) {
metadata["hello"] = "world" metadata["hello"] = "world"
metadata["foo"] = "bar" metadata["foo"] = "bar"
err := donut.SetObjectMetadata("foo", "bar", metadata) result, err := donut.GetObjectMetadata("foo", "bar", 1)
c.Assert(err, Not(IsNil))
result, err := donut.GetObjectMetadata("foo", "bar")
c.Assert(result, IsNil) c.Assert(result, IsNil)
c.Assert(err, Not(IsNil)) c.Assert(err, Not(IsNil))
writer := donut.GetObjectWriter("foo", "bar", 0, 2) writer, err := donut.GetObjectWriter("foo", "bar", 1, 2)
c.Assert(err, IsNil)
_, err = writer.Write([]byte("Hello World")) _, err = writer.Write([]byte("Hello World"))
c.Assert(err, IsNil) c.Assert(err, IsNil)
writer.SetMetadata(metadata)
err = writer.Close() err = writer.Close()
c.Assert(err, IsNil) c.Assert(err, IsNil)
err = donut.SetObjectMetadata("foo", "bar", metadata) expectedMetadata := make(map[string]string)
c.Assert(err, IsNil) for k, v := range metadata {
expectedMetadata[k] = v
}
expectedMetadata["key"] = "bar"
expectedMetadata["column"] = "1"
result, err = donut.GetObjectMetadata("foo", "bar") result, err = donut.GetObjectMetadata("foo", "bar", 1)
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(result, DeepEquals, metadata) c.Assert(result, DeepEquals, expectedMetadata)
result, err = donut.GetObjectMetadata("foo", "bar", 0)
c.Assert(err, Not(IsNil))
c.Assert(result, IsNil)
} }

View File

@ -19,8 +19,11 @@ package encoded
import ( import (
"errors" "errors"
"github.com/minio-io/minio/pkg/donutbox" "github.com/minio-io/minio/pkg/donutbox"
"github.com/minio-io/minio/pkg/encoding/erasure"
"github.com/minio-io/minio/pkg/storage" "github.com/minio-io/minio/pkg/storage"
"github.com/minio-io/minio/pkg/utils/split"
"io" "io"
"strconv"
) )
// StorageDriver creates a new single disk storage driver using donut without encoding. // StorageDriver creates a new single disk storage driver using donut without encoding.
@ -83,6 +86,68 @@ func (diskStorage StorageDriver) ListObjects(bucket string, resources storage.Bu
} }
// CreateObject creates a new object // CreateObject creates a new object
func (diskStorage StorageDriver) CreateObject(bucket string, key string, contentType string, data io.Reader) error { func (diskStorage StorageDriver) CreateObject(bucketKey string, objectKey string, contentType string, reader io.Reader) error {
return errors.New("Not Implemented") blockSize := 10 * 1024 * 1024
// split stream
splitStream := split.Stream(reader, uint64(blockSize))
writers := make([]*donutbox.NewObject, 16)
for i := 0; i < 16; i++ {
newWriter, err := diskStorage.donutBox.GetObjectWriter(bucketKey, objectKey, uint(i), uint(blockSize))
if err != nil {
closeAllWritersWithError(writers, err)
return err
}
writers[i] = newWriter
}
totalLength := uint64(0)
for chunk := range splitStream {
params, err := erasure.ParseEncoderParams(8, 8, erasure.Cauchy)
if err != nil {
return err
}
totalLength = totalLength + uint64(len(chunk.Data))
encoder := erasure.NewEncoder(params)
if chunk.Err == nil {
parts, _ := encoder.Encode(chunk.Data)
for index, part := range parts {
if _, err := writers[index].Write(part); err != nil {
closeAllWritersWithError(writers, err)
return err
}
}
} else {
closeAllWritersWithError(writers, chunk.Err)
return chunk.Err
}
// encode data
// write
}
// close connections
closeAllWriters(writers)
metadata := make(map[string]string)
metadata["length"] = strconv.FormatUint(totalLength, 10)
metadata["blockSize"] = strconv.FormatUint(uint64(blockSize), 10)
// metadata["md5"] := md5sum
for column := uint(0); column < 16; column++ {
writers[column].SetMetadata(metadata)
}
return nil
}
func closeAllWriters(writers []*donutbox.NewObject) {
for _, writer := range writers {
if writer != nil {
writer.Close()
}
}
}
func closeAllWritersWithError(writers []*donutbox.NewObject, err error) {
for _, writer := range writers {
if writer != nil {
writer.CloseWithError(err)
}
}
} }