Migrate from iodine to probe

This commit is contained in:
Harshavardhana
2015-08-03 16:17:21 -07:00
parent 7f13095260
commit d09fd8b0a1
38 changed files with 917 additions and 1339 deletions

View File

@@ -34,41 +34,41 @@ import (
"github.com/minio/minio/pkg/crypto/sha256"
"github.com/minio/minio/pkg/donut/cache/data"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/probe"
)
/// V2 API functions
// NewMultipartUpload - initiate a new multipart session
func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *Signature) (string, error) {
func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *Signature) (string, *probe.Error) {
donut.lock.Lock()
defer donut.lock.Unlock()
if !IsValidBucket(bucket) {
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return "", probe.New(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
return "", probe.New(ObjectNameInvalid{Object: key})
}
if signature != nil {
ok, err := signature.DoesSignatureMatch("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
if err != nil {
return "", iodine.New(err, nil)
return "", err.Trace()
}
if !ok {
return "", iodine.New(SignatureDoesNotMatch{}, nil)
return "", probe.New(SignatureDoesNotMatch{})
}
}
if len(donut.config.NodeDiskMap) > 0 {
return donut.newMultipartUpload(bucket, key, contentType)
}
if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
return "", probe.New(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
return "", iodine.New(ObjectExists{Object: key}, nil)
return "", probe.New(ObjectExists{Object: key})
}
id := []byte(strconv.Itoa(rand.Int()) + bucket + key + time.Now().UTC().String())
uploadIDSum := sha512.Sum512(id)
@@ -88,57 +88,57 @@ func (donut API) NewMultipartUpload(bucket, key, contentType string, signature *
}
// AbortMultipartUpload - abort an incomplete multipart session
func (donut API) AbortMultipartUpload(bucket, key, uploadID string, signature *Signature) error {
func (donut API) AbortMultipartUpload(bucket, key, uploadID string, signature *Signature) *probe.Error {
donut.lock.Lock()
defer donut.lock.Unlock()
if !IsValidBucket(bucket) {
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return probe.New(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
return iodine.New(ObjectNameInvalid{Object: key}, nil)
return probe.New(ObjectNameInvalid{Object: key})
}
if signature != nil {
ok, err := signature.DoesSignatureMatch("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
if err != nil {
return iodine.New(err, nil)
return err.Trace()
}
if !ok {
return iodine.New(SignatureDoesNotMatch{}, nil)
return probe.New(SignatureDoesNotMatch{})
}
}
if len(donut.config.NodeDiskMap) > 0 {
return donut.abortMultipartUpload(bucket, key, uploadID)
}
if !donut.storedBuckets.Exists(bucket) {
return iodine.New(BucketNotFound{Bucket: bucket}, nil)
return probe.New(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
if storedBucket.multiPartSession[key].UploadID != uploadID {
return iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
return probe.New(InvalidUploadID{UploadID: uploadID})
}
donut.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
// CreateObjectPart - create a part in a multipart session
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, error) {
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
donut.lock.Lock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data, signature)
donut.lock.Unlock()
// possible free
debug.FreeOSMemory()
return etag, iodine.New(err, nil)
return etag, err.Trace()
}
// createObject - internal wrapper function called by CreateObjectPart
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, error) {
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader, signature *Signature) (string, *probe.Error) {
if !IsValidBucket(bucket) {
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return "", probe.New(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
return "", probe.New(ObjectNameInvalid{Object: key})
}
if len(donut.config.NodeDiskMap) > 0 {
metadata := make(map[string]string)
@@ -151,24 +151,24 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
return "", probe.New(InvalidDigest{Md5: expectedMD5Sum})
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
partMetadata, err := donut.putObjectPart(bucket, key, expectedMD5Sum, uploadID, partID, data, size, metadata, signature)
if err != nil {
return "", iodine.New(err, nil)
return "", err.Trace()
}
return partMetadata.ETag, nil
}
if !donut.storedBuckets.Exists(bucket) {
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
return "", probe.New(BucketNotFound{Bucket: bucket})
}
strBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id
if strBucket.multiPartSession[key].UploadID != uploadID {
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
return "", probe.New(InvalidUploadID{UploadID: uploadID})
}
// get object key
@@ -185,7 +185,7 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
return "", probe.New(InvalidDigest{Md5: expectedMD5Sum})
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
@@ -194,27 +194,27 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
hash := md5.New()
sha256hash := sha256.New()
var err error
var totalLength int64
var err error
for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
length, err = data.Read(byteBuffer) // do not read error return error here, we will handle this error later
hash.Write(byteBuffer[0:length])
sha256hash.Write(byteBuffer[0:length])
ok := donut.multiPartObjects[uploadID].Append(partID, byteBuffer[0:length])
if !ok {
return "", iodine.New(InternalError{}, nil)
return "", probe.New(InternalError{})
}
totalLength += int64(length)
go debug.FreeOSMemory()
}
if totalLength != size {
donut.multiPartObjects[uploadID].Delete(partID)
return "", iodine.New(IncompleteBody{Bucket: bucket, Object: key}, nil)
return "", probe.New(IncompleteBody{Bucket: bucket, Object: key})
}
if err != io.EOF {
return "", iodine.New(err, nil)
return "", probe.New(err)
}
md5SumBytes := hash.Sum(nil)
@@ -222,17 +222,19 @@ func (donut API) createObjectPart(bucket, key, uploadID string, partID int, cont
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return "", iodine.New(BadDigest{}, nil)
return "", err.Trace()
}
}
if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256hash.Sum(nil)))
if err != nil {
return "", iodine.New(err, nil)
}
if !ok {
return "", iodine.New(SignatureDoesNotMatch{}, nil)
{
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256hash.Sum(nil)))
if err != nil {
return "", err.Trace()
}
if !ok {
return "", probe.New(SignatureDoesNotMatch{})
}
}
}
@@ -264,16 +266,16 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
}
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, error) {
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
donut.lock.Lock()
if !IsValidBucket(bucket) {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return ObjectMetadata{}, probe.New(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
return ObjectMetadata{}, probe.New(ObjectNameInvalid{Object: key})
}
if len(donut.config.NodeDiskMap) > 0 {
donut.lock.Unlock()
@@ -282,38 +284,38 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
if !donut.storedBuckets.Exists(bucket) {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
return ObjectMetadata{}, probe.New(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id
if storedBucket.multiPartSession[key].UploadID != uploadID {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
return ObjectMetadata{}, probe.New(InvalidUploadID{UploadID: uploadID})
}
partBytes, err := ioutil.ReadAll(data)
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(err, nil)
return ObjectMetadata{}, probe.New(err)
}
if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256.Sum256(partBytes)[:]))
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(err, nil)
return ObjectMetadata{}, err.Trace()
}
if !ok {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil)
return ObjectMetadata{}, probe.New(SignatureDoesNotMatch{})
}
}
parts := &CompleteMultipartUpload{}
if err := xml.Unmarshal(partBytes, parts); err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(MalformedXML{}, nil)
return ObjectMetadata{}, probe.New(MalformedXML{})
}
if !sort.IsSorted(completedParts(parts.Part)) {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidPartOrder{}, nil)
return ObjectMetadata{}, probe.New(InvalidPartOrder{})
}
var size int64
@@ -323,7 +325,7 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
object, ok := donut.multiPartObjects[uploadID].Get(parts.Part[i].PartNumber)
if ok == false {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidPart{}, nil)
return ObjectMetadata{}, probe.New(InvalidPart{})
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
@@ -331,16 +333,15 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: recvMD5}, nil)
return ObjectMetadata{}, probe.New(InvalidDigest{Md5: recvMD5})
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(BadDigest{}, nil)
return ObjectMetadata{}, probe.New(BadDigest{})
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
if err != nil {
if _, err := io.Copy(&fullObject, bytes.NewBuffer(object)); err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(err, nil)
return ObjectMetadata{}, probe.New(err)
}
object = nil
go debug.FreeOSMemory()
@@ -350,18 +351,20 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
donut.lock.Unlock()
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil, nil)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
{
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil, nil)
if err != nil {
// No need to call internal cleanup functions here, caller should call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, err.Trace()
}
fullObject.Reset()
donut.lock.Lock()
donut.cleanupMultipartSession(bucket, key, uploadID)
donut.lock.Unlock()
return objectMetadata, nil
donut.lock.Lock()
donut.cleanupMultipartSession(bucket, key, uploadID)
donut.lock.Unlock()
return objectMetadata, nil
}
}
// byKey is a sortable interface for UploadMetadata slice
@@ -372,7 +375,7 @@ func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// ListMultipartUploads - list incomplete multipart sessions for a given bucket
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata, signature *Signature) (BucketMultipartResourcesMetadata, error) {
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata, signature *Signature) (BucketMultipartResourcesMetadata, *probe.Error) {
// TODO handle delimiter
donut.lock.Lock()
defer donut.lock.Unlock()
@@ -380,15 +383,15 @@ func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartRe
if signature != nil {
ok, err := signature.DoesSignatureMatch("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
if err != nil {
return BucketMultipartResourcesMetadata{}, iodine.New(err, nil)
return BucketMultipartResourcesMetadata{}, err.Trace()
}
if !ok {
return BucketMultipartResourcesMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil)
return BucketMultipartResourcesMetadata{}, probe.New(SignatureDoesNotMatch{})
}
}
if !IsValidBucket(bucket) {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return BucketMultipartResourcesMetadata{}, probe.New(BucketNameInvalid{Bucket: bucket})
}
if len(donut.config.NodeDiskMap) > 0 {
@@ -396,7 +399,7 @@ func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartRe
}
if !donut.storedBuckets.Exists(bucket) {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
return BucketMultipartResourcesMetadata{}, probe.New(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
@@ -454,7 +457,7 @@ func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// ListObjectParts - list parts from incomplete multipart session for a given object
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata, signature *Signature) (ObjectResourcesMetadata, error) {
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata, signature *Signature) (ObjectResourcesMetadata, *probe.Error) {
// Verify upload id
donut.lock.Lock()
defer donut.lock.Unlock()
@@ -462,18 +465,18 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
if signature != nil {
ok, err := signature.DoesSignatureMatch("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855")
if err != nil {
return ObjectResourcesMetadata{}, iodine.New(err, nil)
return ObjectResourcesMetadata{}, err.Trace()
}
if !ok {
return ObjectResourcesMetadata{}, iodine.New(SignatureDoesNotMatch{}, nil)
return ObjectResourcesMetadata{}, probe.New(SignatureDoesNotMatch{})
}
}
if !IsValidBucket(bucket) {
return ObjectResourcesMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
return ObjectResourcesMetadata{}, probe.New(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
return ObjectResourcesMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
return ObjectResourcesMetadata{}, probe.New(ObjectNameInvalid{Object: key})
}
if len(donut.config.NodeDiskMap) > 0 {
@@ -481,14 +484,14 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
}
if !donut.storedBuckets.Exists(bucket) {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
return ObjectResourcesMetadata{}, probe.New(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
return ObjectResourcesMetadata{}, probe.New(ObjectNotFound{Object: key})
}
if storedBucket.multiPartSession[key].UploadID != resources.UploadID {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
return ObjectResourcesMetadata{}, probe.New(InvalidUploadID{UploadID: resources.UploadID})
}
storedParts := storedBucket.partMetadata[key]
objectResourcesMetadata := resources
@@ -512,7 +515,7 @@ func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMe
}
part, ok := storedParts[i]
if !ok {
return ObjectResourcesMetadata{}, iodine.New(InvalidPart{}, nil)
return ObjectResourcesMetadata{}, probe.New(InvalidPart{})
}
parts = append(parts, &part)
}