Changes to CreateObject() now returns back md5 along with any error

- This change is necessary to avoid the racy calls to GetObjectMetadata()
- This change is also necessary since one has to reply back md5sum with
  PUT object response header
This commit is contained in:
Harshavardhana
2015-04-30 03:38:11 -07:00
parent 13cae94191
commit d815e6adfd
12 changed files with 90 additions and 109 deletions

View File

@@ -137,13 +137,13 @@ func (b bucket) GetObject(objectName string) (reader io.ReadCloser, size int64,
}
// PutObject - put a new object
func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) error {
func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error) {
if objectName == "" || objectData == nil {
return iodine.New(errors.New("invalid argument"), nil)
return "", iodine.New(errors.New("invalid argument"), nil)
}
writers, err := b.getDiskWriters(b.normalizeObjectName(objectName), "data")
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
summer := md5.New()
objectMetadata := make(map[string]string)
@@ -156,7 +156,7 @@ func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Su
mw := io.MultiWriter(writers[0], summer)
totalLength, err := io.Copy(mw, objectData)
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
donutObjectMetadata["sys.size"] = strconv.FormatInt(totalLength, 10)
objectMetadata["size"] = strconv.FormatInt(totalLength, 10)
@@ -164,12 +164,12 @@ func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Su
// calculate data and parity dictated by total number of writers
k, m, err := b.getDataAndParity(len(writers))
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
// encoded data with k, m and write
chunkCount, totalLength, err := b.writeEncodedData(k, m, writers, objectData, summer)
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
/// donutMetadata section
donutObjectMetadata["sys.blockSize"] = strconv.Itoa(10 * 1024 * 1024)
@@ -198,20 +198,20 @@ func (b bucket) PutObject(objectName string, objectData io.Reader, expectedMD5Su
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := b.isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), objectMetadata["md5"]); err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
}
// write donut specific metadata
if err := b.writeDonutObjectMetadata(b.normalizeObjectName(objectName), donutObjectMetadata); err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
// write object specific metadata
if err := b.writeObjectMetadata(b.normalizeObjectName(objectName), objectMetadata); err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
// close all writers, when control flow reaches here
for _, writer := range writers {
writer.Close()
}
return nil
return objectMetadata["md5"], nil
}

View File

@@ -33,7 +33,7 @@ type Bucket interface {
ListObjects() (map[string]Object, error)
GetObject(object string) (io.ReadCloser, int64, error)
PutObject(object string, contents io.Reader, expectedMD5Sum string, metadata map[string]string) error
PutObject(object string, contents io.Reader, expectedMD5Sum string, metadata map[string]string) (string, error)
}
// Object interface

View File

@@ -40,7 +40,7 @@ type ObjectStorage interface {
// Object Operations
GetObject(bucket, object string) (io.ReadCloser, int64, error)
GetObjectMetadata(bucket, object string) (map[string]string, error)
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) error
PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error)
}
// Management is a donut management system interface

View File

@@ -162,7 +162,7 @@ func (s *MySuite) TestNewObjectFailsWithoutBucket(c *C) {
defer os.RemoveAll(root)
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
err = donut.PutObject("foo", "obj", "", nil, nil)
_, err = donut.PutObject("foo", "obj", "", nil, nil)
c.Assert(err, Not(IsNil))
}
@@ -188,8 +188,9 @@ func (s *MySuite) TestNewObjectMetadata(c *C) {
err = donut.MakeBucket("foo", "private")
c.Assert(err, IsNil)
err = donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
calculatedMd5Sum, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
c.Assert(err, IsNil)
c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum)
objectMetadata, err := donut.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil)
@@ -207,10 +208,10 @@ func (s *MySuite) TestNewObjectFailsWithEmptyName(c *C) {
donut, err := NewDonut("test", createTestNodeDiskMap(root))
c.Assert(err, IsNil)
err = donut.PutObject("foo", "", "", nil, nil)
_, err = donut.PutObject("foo", "", "", nil, nil)
c.Assert(err, Not(IsNil))
err = donut.PutObject("foo", " ", "", nil, nil)
_, err = donut.PutObject("foo", " ", "", nil, nil)
c.Assert(err, Not(IsNil))
}
@@ -234,8 +235,9 @@ func (s *MySuite) TestNewObjectCanBeWritten(c *C) {
expectedMd5Sum := hex.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
err = donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
calculatedMd5Sum, err := donut.PutObject("foo", "obj", expectedMd5Sum, reader, metadata)
c.Assert(err, IsNil)
c.Assert(calculatedMd5Sum, Equals, expectedMd5Sum)
reader, size, err := donut.GetObject("foo", "obj")
c.Assert(err, IsNil)
@@ -266,11 +268,11 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
c.Assert(donut.MakeBucket("foo", "private"), IsNil)
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
err = donut.PutObject("foo", "obj1", "", one, nil)
_, err = donut.PutObject("foo", "obj1", "", one, nil)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
err = donut.PutObject("foo", "obj2", "", two, nil)
_, err = donut.PutObject("foo", "obj2", "", two, nil)
c.Assert(err, IsNil)
obj1, size, err := donut.GetObject("foo", "obj1")
@@ -313,7 +315,7 @@ func (s *MySuite) TestMultipleNewObjects(c *C) {
c.Assert(listObjects, DeepEquals, []string{"obj1", "obj2"})
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
err = donut.PutObject("foo", "obj3", "", three, nil)
_, err = donut.PutObject("foo", "obj3", "", three, nil)
c.Assert(err, IsNil)
obj3, size, err := donut.GetObject("foo", "obj3")

View File

@@ -153,38 +153,38 @@ func (d donut) ListObjects(bucket, prefix, marker, delimiter string, maxkeys int
}
// PutObject - put object
func (d donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) error {
func (d donut) PutObject(bucket, object, expectedMD5Sum string, reader io.ReadCloser, metadata map[string]string) (string, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
}
if bucket == "" || strings.TrimSpace(bucket) == "" {
return iodine.New(errors.New("invalid argument"), errParams)
return "", iodine.New(errors.New("invalid argument"), errParams)
}
if object == "" || strings.TrimSpace(object) == "" {
return iodine.New(errors.New("invalid argument"), errParams)
return "", iodine.New(errors.New("invalid argument"), errParams)
}
err := d.getDonutBuckets()
if err != nil {
return iodine.New(err, errParams)
return "", iodine.New(err, errParams)
}
if _, ok := d.buckets[bucket]; !ok {
return iodine.New(errors.New("bucket does not exist"), nil)
return "", iodine.New(errors.New("bucket does not exist"), nil)
}
objectList, err := d.buckets[bucket].ListObjects()
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
for objectName := range objectList {
if objectName == object {
return iodine.New(errors.New("object exists"), nil)
return "", iodine.New(errors.New("object exists"), nil)
}
}
err = d.buckets[bucket].PutObject(object, reader, expectedMD5Sum, metadata)
md5sum, err := d.buckets[bucket].PutObject(object, reader, expectedMD5Sum, metadata)
if err != nil {
return iodine.New(err, errParams)
return "", iodine.New(err, errParams)
}
return nil
return md5sum, nil
}
// GetObject - get object

View File

@@ -20,6 +20,7 @@ import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"math/rand"
"strconv"
@@ -67,12 +68,14 @@ func testMultipleObjectCreation(c *check.C, create func() Driver) {
hasher := md5.New()
hasher.Write([]byte(randomString))
md5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
expectedmd5Sumhex := hex.EncodeToString(hasher.Sum(nil))
key := "obj" + strconv.Itoa(i)
objects[key] = []byte(randomString)
err := drivers.CreateObject("bucket", key, "", md5Sum, bytes.NewBufferString(randomString))
calculatedmd5sum, err := drivers.CreateObject("bucket", key, "", expectedmd5Sum, bytes.NewBufferString(randomString))
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, expectedmd5Sumhex)
}
// ensure no duplicate etags
@@ -206,15 +209,17 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) {
hasher1 := md5.New()
hasher1.Write([]byte("one"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher1.Sum(nil))
err := drivers.CreateObject("bucket", "object", "", md5Sum1, bytes.NewBufferString("one"))
md5Sum1hex := hex.EncodeToString(hasher1.Sum(nil))
md5Sum11, err := drivers.CreateObject("bucket", "object", "", md5Sum1, bytes.NewBufferString("one"))
c.Assert(err, check.IsNil)
c.Assert(md5Sum1hex, check.Equals, md5Sum11)
hasher2 := md5.New()
hasher2.Write([]byte("three"))
md5Sum2 := base64.StdEncoding.EncodeToString(hasher2.Sum(nil))
err = drivers.CreateObject("bucket", "object", "", md5Sum2, bytes.NewBufferString("three"))
_, err = drivers.CreateObject("bucket", "object", "", md5Sum2, bytes.NewBufferString("three"))
c.Assert(err, check.Not(check.IsNil))
var bytesBuffer bytes.Buffer
length, err := drivers.GetObject(&bytesBuffer, "bucket", "object")
c.Assert(length, check.Equals, int64(len("one")))
@@ -224,7 +229,7 @@ func testObjectOverwriteFails(c *check.C, create func() Driver) {
func testNonExistantBucketOperations(c *check.C, create func() Driver) {
drivers := create()
err := drivers.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one"))
_, err := drivers.CreateObject("bucket", "object", "", "", bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil))
}
@@ -253,9 +258,11 @@ func testPutObjectInSubdir(c *check.C, create func() Driver) {
hasher := md5.New()
hasher.Write([]byte("hello world"))
md5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
err = drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum, bytes.NewBufferString("hello world"))
md5Sum1 := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
md5Sum1hex := hex.EncodeToString(hasher.Sum(nil))
md5Sum11, err := drivers.CreateObject("bucket", "dir1/dir2/object", "", md5Sum1, bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil)
c.Assert(md5Sum11, check.Equals, md5Sum1hex)
var bytesBuffer bytes.Buffer
length, err := drivers.GetObject(&bytesBuffer, "bucket", "dir1/dir2/object")
@@ -349,7 +356,7 @@ func testGetDirectoryReturnsObjectNotFound(c *check.C, create func() Driver) {
err := drivers.CreateBucket("bucket", "")
c.Assert(err, check.IsNil)
err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world"))
_, err = drivers.CreateObject("bucket", "dir1/dir2/object", "", "", bytes.NewBufferString("hello world"))
c.Assert(err, check.IsNil)
var byteBuffer bytes.Buffer
@@ -393,7 +400,7 @@ func testDefaultContentType(c *check.C, create func() Driver) {
c.Assert(err, check.IsNil)
// test empty
err = drivers.CreateObject("bucket", "one", "", "", bytes.NewBufferString("one"))
_, err = drivers.CreateObject("bucket", "one", "", "", bytes.NewBufferString("one"))
metadata, err := drivers.GetObjectMetadata("bucket", "one", "")
c.Assert(err, check.IsNil)
c.Assert(metadata.ContentType, check.Equals, "application/octet-stream")
@@ -417,8 +424,13 @@ func testContentMd5Set(c *check.C, create func() Driver) {
c.Assert(err, check.IsNil)
// test md5 invalid
err = drivers.CreateObject("bucket", "one", "", "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA", bytes.NewBufferString("one"))
badmd5Sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA"
calculatedmd5sum, err := drivers.CreateObject("bucket", "one", "", badmd5Sum, bytes.NewBufferString("one"))
c.Assert(err, check.Not(check.IsNil))
err = drivers.CreateObject("bucket", "two", "", "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA=", bytes.NewBufferString("one"))
c.Assert(calculatedmd5sum, check.Not(check.Equals), badmd5Sum)
goodmd5sum := "NWJiZjVhNTIzMjhlNzQzOWFlNmU3MTlkZmU3MTIyMDA="
calculatedmd5sum, err = drivers.CreateObject("bucket", "two", "", goodmd5sum, bytes.NewBufferString("one"))
c.Assert(err, check.IsNil)
c.Assert(calculatedmd5sum, check.Equals, goodmd5sum)
}

View File

@@ -366,17 +366,17 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
}
// CreateObject creates a new object
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) error {
func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedMD5Sum string, reader io.Reader) (string, error) {
errParams := map[string]string{
"bucketName": bucketName,
"objectName": objectName,
"contentType": contentType,
}
if !drivers.IsValidBucket(bucketName) || strings.Contains(bucketName, ".") {
return iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucketName}, nil)
}
if !drivers.IsValidObject(objectName) || strings.TrimSpace(objectName) == "" {
return iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
return "", iodine.New(drivers.ObjectNameInvalid{Object: objectName}, nil)
}
if strings.TrimSpace(contentType) == "" {
contentType = "application/octet-stream"
@@ -387,13 +387,13 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
return iodine.New(err, nil)
return "", iodine.New(err, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, ioutil.NopCloser(reader), metadata)
calculatedMD5Sum, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, ioutil.NopCloser(reader), metadata)
if err != nil {
return iodine.New(err, errParams)
return "", iodine.New(err, errParams)
}
return nil
return calculatedMD5Sum, nil
}

View File

@@ -37,7 +37,7 @@ type Driver interface {
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket string, object string, prefix string) (ObjectMetadata, error)
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) error
CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error)
}
// BucketACL - bucket level access control

View File

@@ -199,26 +199,26 @@ func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
}
// CreateObject - PUT object to memory buffer
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) error {
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) (string, error) {
memory.lock.RLock()
if !drivers.IsValidBucket(bucket) {
memory.lock.RUnlock()
return iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObject(key) {
memory.lock.RUnlock()
return iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
if _, ok := memory.storedBuckets[bucket]; ok == false {
memory.lock.RUnlock()
return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
// get object key
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
memory.lock.RUnlock()
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
}
memory.lock.RUnlock()
@@ -230,7 +230,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
return "", iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
@@ -248,7 +248,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su
if err != nil {
err := iodine.New(err, nil)
log.Println(err)
return err
return "", err
}
if uint64(totalLength)+memory.totalSize > memory.maxSize {
memory.objects.RemoveOldest()
@@ -263,7 +263,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.RemoveOldest()
return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
return "", iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
}
}
newObject := drivers.ObjectMetadata{
@@ -292,7 +292,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su
memory.lock.Unlock()
// free memory if possible for kernel to reclaim
debug.FreeOSMemory()
return nil
return newObject.Md5, nil
}
// CreateBucket - create bucket in memory

View File

@@ -116,10 +116,11 @@ func (m *Driver) ListObjects(bucket string, resources drivers.BucketResourcesMet
}
// CreateObject is a mock
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) error {
func (m *Driver) CreateObject(bucket string, key string, contentType string, md5sum string, data io.Reader) (string, error) {
ret := m.Called(bucket, key, contentType, md5sum, data)
r0 := ret.Error(0)
r0 := ret.Get(0).(string)
r1 := ret.Error(1)
return r0
return r0, r1
}