Implement proper errors for Multipart

This commit is contained in:
Harshavardhana 2015-05-08 02:02:51 -07:00
parent 39e0875699
commit 2ea10c798b
5 changed files with 88 additions and 9 deletions

View File

@ -104,6 +104,13 @@ type InitiateMultipartUploadResult struct {
UploadID string `xml:"UploadId"` UploadID string `xml:"UploadId"`
} }
// completedParts is a sortable interface for Part slice
type completedParts []Part
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// CompleteMultipartUpload container for completing multipart upload // CompleteMultipartUpload container for completing multipart upload
type CompleteMultipartUpload struct { type CompleteMultipartUpload struct {
Part []Part Part []Part

View File

@ -18,6 +18,7 @@ package api
import ( import (
"net/http" "net/http"
"sort"
"strconv" "strconv"
"encoding/xml" "encoding/xml"
@ -311,6 +312,10 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
writeSuccessResponse(w, acceptsContentType) writeSuccessResponse(w, acceptsContentType)
} }
case drivers.InvalidUploadID:
{
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
}
case drivers.ObjectExists: case drivers.ObjectExists:
{ {
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
@ -355,6 +360,10 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
return return
} }
if !sort.IsSorted(completedParts(parts.Part)) {
writeErrorResponse(w, req, InvalidPartOrder, acceptsContentType, req.URL.Path)
return
}
partMap := make(map[int]string) partMap := make(map[int]string)
@ -366,6 +375,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
for _, part := range parts.Part { for _, part := range parts.Part {
partMap[part.PartNumber] = part.ETag partMap[part.PartNumber] = part.ETag
} }
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap) etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
switch err := iodine.ToError(err).(type) { switch err := iodine.ToError(err).(type) {
case nil: case nil:
@ -378,8 +388,9 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
// write body // write body
w.Write(encodedSuccessResponse) w.Write(encodedSuccessResponse)
case drivers.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default: default:
// TODO handle all other errors, properly
log.Println(err) log.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
} }

View File

@ -64,11 +64,12 @@ const (
TooManyBuckets TooManyBuckets
MethodNotAllowed MethodNotAllowed
InvalidPart InvalidPart
InvalidPartOrder
) )
// Error codes, non exhaustive list - standard HTTP errors // Error codes, non exhaustive list - standard HTTP errors
const ( const (
NotAcceptable = iota + 24 NotAcceptable = iota + 25
) )
// Error code to Error structure map // Error code to Error structure map
@ -193,6 +194,11 @@ var errorCodeResponse = map[int]Error{
Description: "One or more of the specified parts could not be found", Description: "One or more of the specified parts could not be found",
HTTPStatusCode: http.StatusBadRequest, HTTPStatusCode: http.StatusBadRequest,
}, },
InvalidPartOrder: {
Code: "InvalidPartOrder",
Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
HTTPStatusCode: http.StatusBadRequest,
},
} }
// errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown // errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown

View File

@ -206,3 +206,14 @@ type InvalidRange struct {
func (e InvalidRange) Error() string { func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length) return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
} }
/// Multipart related errors
// InvalidUploadID invalid upload id
type InvalidUploadID struct {
UploadID string
}
func (e InvalidUploadID) Error() string {
return "Invalid upload id " + e.UploadID
}

View File

@ -515,14 +515,38 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
} }
func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
// TODO verify object doesn't exist memory.lock.RLock()
if !drivers.IsValidBucket(bucket) {
memory.lock.RUnlock()
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
memory.lock.RUnlock()
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
if _, ok := memory.storedBuckets[bucket]; ok == false {
memory.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
memory.lock.RUnlock()
return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
}
memory.lock.RUnlock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String()) id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id) uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:]) uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
md5sumBytes := md5.Sum([]byte(uploadID)) md5sumBytes := md5.Sum([]byte(uploadID))
md5sum := hex.EncodeToString(md5sumBytes[:]) md5sum := hex.EncodeToString(md5sumBytes[:])
memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
return uploadID, nil // Create UploadID session, this is a temporary work around to instantiate a session.
// It would not be valid in future, since we need to work out proper sessions so that
// we can cleanly abort session and propagate failures.
_, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
return uploadID, iodine.New(err, nil)
} }
func getMultipartKey(key string, uploadID string, partNumber int) string { func getMultipartKey(key string, uploadID string, partNumber int) string {
@ -530,14 +554,35 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
} }
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// TODO verify upload id exists // Verify upload id
_, ok := memory.objects.Get(key + "?uploadId=" + uploadID)
if !ok {
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data) return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
} }
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) { func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
// TODO verify upload id exists // Verify upload id
_, ok := memory.objects.Get(key + "?uploadId=" + uploadID)
if !ok {
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
if !drivers.IsValidBucket(bucket) {
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
}
if !drivers.IsValidObjectName(key) {
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
}
memory.lock.RLock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
memory.lock.RUnlock()
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
memory.lock.RUnlock()
memory.lock.Lock() memory.lock.Lock()
size := int64(0) var size int64
for i := 1; i <= len(parts); i++ { for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok { if _, ok := parts[i]; ok {
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true { if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true {
@ -550,7 +595,6 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
} }
var fullObject bytes.Buffer var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ { for i := 1; i <= len(parts); i++ {
if _, ok := parts[i]; ok { if _, ok := parts[i]; ok {
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true { if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {