Reduce memory usage for memory multipart write by doing io.Pipe() streaming copy

This commit is contained in:
Harshavardhana 2015-09-30 20:52:55 -07:00
parent daa089fb06
commit 50750efb52
2 changed files with 60 additions and 64 deletions

View File

@ -361,6 +361,7 @@ func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, s
donut.storedBuckets.Set(bucket, storedBucket)
return objMetadata, nil
}
// calculate md5
hash := md5.New()
sha256hash := sha256.New()

View File

@ -272,18 +272,63 @@ func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
donut.storedBuckets.Set(bucket, storedBucket)
}
func (donut API) mergeMultipart(parts *CompleteMultipartUpload, uploadID string, fullObjectWriter *io.PipeWriter) {
for _, part := range parts.Part {
recvMD5 := part.ETag
object, ok := donut.multiPartObjects[uploadID].Get(part.PartNumber)
if ok == false {
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidPart{})))
return
}
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(InvalidDigest{Md5: recvMD5})))
return
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(BadDigest{})))
return
}
if _, err := io.Copy(fullObjectWriter, bytes.NewReader(object)); err != nil {
fullObjectWriter.CloseWithError(probe.WrapError(probe.NewError(err)))
return
}
object = nil
}
fullObjectWriter.Close()
return
}
// CompleteMultipartUpload - complete a multipart upload and persist the data
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.Reader, signature *Signature) (ObjectMetadata, *probe.Error) {
donut.lock.Lock()
defer donut.lock.Unlock()
size := int64(donut.multiPartObjects[uploadID].Stats().Bytes)
fullObjectReader, err := donut.completeMultipartUploadV2(bucket, key, uploadID, data, signature)
if err != nil {
return ObjectMetadata{}, err.Trace()
}
objectMetadata, err := donut.createObject(bucket, key, "", "", size, fullObjectReader, nil)
if err != nil {
// No need to call internal cleanup functions here, caller should call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, err.Trace()
}
donut.cleanupMultipartSession(bucket, key, uploadID)
return objectMetadata, nil
}
func (donut API) completeMultipartUploadV2(bucket, key, uploadID string, data io.Reader, signature *Signature) (io.Reader, *probe.Error) {
if !IsValidBucket(bucket) {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
return nil, probe.NewError(BucketNameInvalid{Bucket: bucket})
}
if !IsValidObjectName(key) {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(ObjectNameInvalid{Object: key})
return nil, probe.NewError(ObjectNameInvalid{Object: key})
}
// TODO: multipart support for donut is broken, since we haven't finalized the format in which
// it can be stored, disabling this for now until we get the underlying layout stable.
//
@ -293,88 +338,38 @@ func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, data io.R
// }
if !donut.storedBuckets.Exists(bucket) {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(BucketNotFound{Bucket: bucket})
return nil, probe.NewError(BucketNotFound{Bucket: bucket})
}
storedBucket := donut.storedBuckets.Get(bucket).(storedBucket)
// Verify upload id
if storedBucket.multiPartSession[key].UploadID != uploadID {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
return nil, probe.NewError(InvalidUploadID{UploadID: uploadID})
}
partBytes, err := ioutil.ReadAll(data)
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(err)
return nil, probe.NewError(err)
}
if signature != nil {
ok, err := signature.DoesSignatureMatch(hex.EncodeToString(sha256.Sum256(partBytes)[:]))
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, err.Trace()
return nil, err.Trace()
}
if !ok {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(SignatureDoesNotMatch{})
return nil, probe.NewError(SignatureDoesNotMatch{})
}
}
parts := &CompleteMultipartUpload{}
if err := xml.Unmarshal(partBytes, parts); err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(MalformedXML{})
return nil, probe.NewError(MalformedXML{})
}
if !sort.IsSorted(completedParts(parts.Part)) {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(InvalidPartOrder{})
return nil, probe.NewError(InvalidPartOrder{})
}
var size int64
var fullObject bytes.Buffer
for i := 0; i < len(parts.Part); i++ {
recvMD5 := parts.Part[i].ETag
object, ok := donut.multiPartObjects[uploadID].Get(parts.Part[i].PartNumber)
if ok == false {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(InvalidPart{})
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(InvalidDigest{Md5: recvMD5})
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(BadDigest{})
}
if _, err := io.Copy(&fullObject, bytes.NewBuffer(object)); err != nil {
donut.lock.Unlock()
return ObjectMetadata{}, probe.NewError(err)
}
object = nil
go debug.FreeOSMemory()
}
fullObjectReader, fullObjectWriter := io.Pipe()
go donut.mergeMultipart(parts, uploadID, fullObjectWriter)
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
donut.lock.Unlock()
{
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil, nil)
if err != nil {
// No need to call internal cleanup functions here, caller should call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, err.Trace()
}
fullObject.Reset()
donut.lock.Lock()
donut.cleanupMultipartSession(bucket, key, uploadID)
donut.lock.Unlock()
return objectMetadata, nil
}
return fullObjectReader, nil
}
// byKey is a sortable interface for UploadMetadata slice