Merge pull request #580 from harshavardhana/pr_out_verify_incoming_md5sum_for_invidual_parts_in_complete_multipart_request_and_some_simplification_of_the_code

This commit is contained in:
Harshavardhana 2015-05-08 04:01:37 -07:00
commit 8a65b90783
2 changed files with 45 additions and 28 deletions

View File

@ -365,17 +365,15 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
return
}
partMap := make(map[int]string)
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
uploadID := vars["uploadId"]
partMap := make(map[int]string)
for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag
}
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
switch err := iodine.ToError(err).(type) {
case nil:

View File

@ -561,6 +561,21 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
}
func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.Delete(bucket + "/" + key + "?uploadId=" + uploadID)
}
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) {
memory.lock.Lock()
defer memory.lock.Unlock()
for i := range parts {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
memory.objects.Delete(objectKey)
}
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
@ -582,45 +597,49 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
memory.lock.Lock()
var size int64
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true {
size += object.Size
}
} else {
for i := range parts {
object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if !ok {
memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += object.Size
}
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {
obj := object.([]byte)
_, err := io.Copy(&fullObject, bytes.NewBuffer(obj))
if err != nil {
return "", iodine.New(err, nil)
}
} else {
log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i))
}
} else {
recvMD5 := parts[i]
object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
if ok == false {
memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
obj := object.([]byte)
calcMD5Bytes := md5.Sum(obj)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil)
}
for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
memory.objects.Delete(objectKey)
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil)
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(obj))
if err != nil {
return "", iodine.New(err, nil)
}
}
memory.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
return memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
if err != nil {
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultipartSession(bucket, key, uploadID)
return "", iodine.New(err, nil)
}
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
}