Verify incoming md5sum for invidual parts in complete multipart request and some simplification of the code

Add two new functions for session cleanup
This commit is contained in:
Harshavardhana 2015-05-08 03:31:43 -07:00
parent a16a10afa9
commit 23a545673d
2 changed files with 45 additions and 28 deletions

View File

@ -365,17 +365,15 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
return return
} }
partMap := make(map[int]string)
vars := mux.Vars(req) vars := mux.Vars(req)
bucket := vars["bucket"] bucket := vars["bucket"]
object := vars["object"] object := vars["object"]
uploadID := vars["uploadId"] uploadID := vars["uploadId"]
partMap := make(map[int]string)
for _, part := range parts.Part { for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag partMap[part.PartNumber] = part.ETag
} }
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
switch err := iodine.ToError(err).(type) { switch err := iodine.ToError(err).(type) {
case nil: case nil:

View File

@ -561,6 +561,21 @@ func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partI
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
} }
func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string) {
memory.lock.Lock()
defer memory.lock.Unlock()
memory.objects.Delete(bucket + "/" + key + "?uploadId=" + uploadID)
}
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) {
memory.lock.Lock()
defer memory.lock.Unlock()
for i := range parts {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
memory.objects.Delete(objectKey)
}
}
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// Verify upload id // Verify upload id
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID) _, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
@ -582,45 +597,49 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
memory.lock.Lock() memory.lock.Lock()
var size int64 var size int64
for i := 1; i <= len(parts); i++ { for i := range parts {
if _, ok := parts[i]; ok { object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true { if !ok {
size += object.Size
}
} else {
memory.lock.Unlock() memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
} }
size += object.Size
} }
var fullObject bytes.Buffer var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ { for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok { recvMD5 := parts[i]
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true { object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
obj := object.([]byte) if ok == false {
_, err := io.Copy(&fullObject, bytes.NewBuffer(obj))
if err != nil {
return "", iodine.New(err, nil)
}
} else {
log.Println("Cannot fetch: ", getMultipartKey(key, uploadID, i))
}
} else {
memory.lock.Unlock() memory.lock.Unlock()
return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil) return "", iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
} }
} obj := object.([]byte)
calcMD5Bytes := md5.Sum(obj)
for i := 1; i <= len(parts); i++ { // complete multi part request header md5sum per part is hex encoded
if _, ok := parts[i]; ok { recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) if err != nil {
memory.objects.Delete(objectKey) return "", iodine.New(drivers.InvalidDigest{Md5: recvMD5}, nil)
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return "", iodine.New(drivers.BadDigest{Md5: recvMD5, Bucket: bucket, Key: getMultipartKey(key, uploadID, i)}, nil)
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(obj))
if err != nil {
return "", iodine.New(err, nil)
} }
} }
memory.lock.Unlock() memory.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes()) md5sumSlice := md5.Sum(fullObject.Bytes())
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
return memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
if err != nil {
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultipartSession(bucket, key, uploadID)
return "", iodine.New(err, nil)
}
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
} }