mirror of https://github.com/minio/minio.git
Merge pull request #579 from harshavardhana/pr_out_re_enable_bucket_deletion_this_time_with_uploadid_sitting_there
This commit is contained in:
commit
a16a10afa9
|
@ -104,6 +104,13 @@ type InitiateMultipartUploadResult struct {
|
|||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
||||
// completedParts is a sortable interface for Part slice
|
||||
type completedParts []Part
|
||||
|
||||
func (a completedParts) Len() int { return len(a) }
|
||||
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
|
||||
|
||||
// CompleteMultipartUpload container for completing multipart upload
|
||||
type CompleteMultipartUpload struct {
|
||||
Part []Part
|
||||
|
|
|
@ -18,6 +18,7 @@ package api
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"encoding/xml"
|
||||
|
@ -311,6 +312,10 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
|
|||
writeSuccessResponse(w, acceptsContentType)
|
||||
|
||||
}
|
||||
case drivers.InvalidUploadID:
|
||||
{
|
||||
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
|
||||
}
|
||||
case drivers.ObjectExists:
|
||||
{
|
||||
writeErrorResponse(w, req, MethodNotAllowed, acceptsContentType, req.URL.Path)
|
||||
|
@ -355,6 +360,10 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
|
|||
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
|
||||
return
|
||||
}
|
||||
if !sort.IsSorted(completedParts(parts.Part)) {
|
||||
writeErrorResponse(w, req, InvalidPartOrder, acceptsContentType, req.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
partMap := make(map[int]string)
|
||||
|
||||
|
@ -366,6 +375,7 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
|
|||
for _, part := range parts.Part {
|
||||
partMap[part.PartNumber] = part.ETag
|
||||
}
|
||||
|
||||
etag, err := server.driver.CompleteMultipartUpload(bucket, object, uploadID, partMap)
|
||||
switch err := iodine.ToError(err).(type) {
|
||||
case nil:
|
||||
|
@ -378,8 +388,9 @@ func (server *minioAPI) completeMultipartUploadHandler(w http.ResponseWriter, re
|
|||
w.WriteHeader(http.StatusOK)
|
||||
// write body
|
||||
w.Write(encodedSuccessResponse)
|
||||
case drivers.InvalidUploadID:
|
||||
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
|
||||
default:
|
||||
// TODO handle all other errors, properly
|
||||
log.Println(err)
|
||||
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
|
||||
}
|
||||
|
|
|
@ -64,11 +64,12 @@ const (
|
|||
TooManyBuckets
|
||||
MethodNotAllowed
|
||||
InvalidPart
|
||||
InvalidPartOrder
|
||||
)
|
||||
|
||||
// Error codes, non exhaustive list - standard HTTP errors
|
||||
const (
|
||||
NotAcceptable = iota + 24
|
||||
NotAcceptable = iota + 25
|
||||
)
|
||||
|
||||
// Error code to Error structure map
|
||||
|
@ -193,6 +194,11 @@ var errorCodeResponse = map[int]Error{
|
|||
Description: "One or more of the specified parts could not be found",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
InvalidPartOrder: {
|
||||
Code: "InvalidPartOrder",
|
||||
Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
}
|
||||
|
||||
// errorCodeError provides errorCode to Error. It returns empty if the code provided is unknown
|
||||
|
|
|
@ -206,3 +206,14 @@ type InvalidRange struct {
|
|||
func (e InvalidRange) Error() string {
|
||||
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
|
||||
}
|
||||
|
||||
/// Multipart related errors
|
||||
|
||||
// InvalidUploadID invalid upload id
|
||||
type InvalidUploadID struct {
|
||||
UploadID string
|
||||
}
|
||||
|
||||
func (e InvalidUploadID) Error() string {
|
||||
return "Invalid upload id " + e.UploadID
|
||||
}
|
||||
|
|
|
@ -501,28 +501,51 @@ func (memory *memoryDriver) evictObject(a ...interface{}) {
|
|||
cacheStats.Bytes, cacheStats.Items, cacheStats.Evictions)
|
||||
key := a[0].(string)
|
||||
// loop through all buckets
|
||||
for _, storedBucket := range memory.storedBuckets {
|
||||
for bucket, storedBucket := range memory.storedBuckets {
|
||||
delete(storedBucket.objectMetadata, key)
|
||||
// remove bucket if no objects found anymore
|
||||
if len(storedBucket.objectMetadata) == 0 {
|
||||
// TODO (y4m4)
|
||||
// for now refrain from deleting buckets, due to multipart deletes before fullobject being written
|
||||
// this case gets trigerred and we can't store the actual data at all receiving 404 on the client
|
||||
// delete(memory.storedBuckets, bucket)
|
||||
delete(memory.storedBuckets, bucket)
|
||||
}
|
||||
}
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
|
||||
func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
|
||||
// TODO verify object doesn't exist
|
||||
memory.lock.RLock()
|
||||
if !drivers.IsValidBucket(bucket) {
|
||||
memory.lock.RUnlock()
|
||||
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
|
||||
}
|
||||
if !drivers.IsValidObjectName(key) {
|
||||
memory.lock.RUnlock()
|
||||
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
|
||||
}
|
||||
if _, ok := memory.storedBuckets[bucket]; ok == false {
|
||||
memory.lock.RUnlock()
|
||||
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
|
||||
}
|
||||
storedBucket := memory.storedBuckets[bucket]
|
||||
objectKey := bucket + "/" + key
|
||||
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
|
||||
memory.lock.RUnlock()
|
||||
return "", iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
|
||||
}
|
||||
memory.lock.RUnlock()
|
||||
|
||||
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
|
||||
uploadIDSum := sha512.Sum512(id)
|
||||
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])
|
||||
md5sumBytes := md5.Sum([]byte(uploadID))
|
||||
md5sum := hex.EncodeToString(md5sumBytes[:])
|
||||
memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
|
||||
return uploadID, nil
|
||||
// CreateObject expects in base64 which is coming over http request header
|
||||
// while all response headers with ETag are hex encoding
|
||||
md5sum := base64.StdEncoding.EncodeToString(md5sumBytes[:])
|
||||
|
||||
// Create UploadID session, this is a temporary work around to instantiate a session.
|
||||
// It would not be valid in future, since we need to work out proper sessions so that
|
||||
// we can cleanly abort session and propagate failures.
|
||||
_, err := memory.CreateObject(bucket, key+"?uploadId="+uploadID, contentType, md5sum, int64(len(uploadID)), bytes.NewBufferString(uploadID))
|
||||
return uploadID, iodine.New(err, nil)
|
||||
}
|
||||
|
||||
func getMultipartKey(key string, uploadID string, partNumber int) string {
|
||||
|
@ -530,14 +553,35 @@ func getMultipartKey(key string, uploadID string, partNumber int) string {
|
|||
}
|
||||
|
||||
func (memory *memoryDriver) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
|
||||
// TODO verify upload id exists
|
||||
// Verify upload id
|
||||
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
|
||||
if !ok {
|
||||
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
|
||||
}
|
||||
return memory.CreateObject(bucket, getMultipartKey(key, uploadID, partID), "", expectedMD5Sum, size, data)
|
||||
}
|
||||
|
||||
func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) {
|
||||
// TODO verify upload id exists
|
||||
// Verify upload id
|
||||
_, ok := memory.objects.Get(bucket + "/" + key + "?uploadId=" + uploadID)
|
||||
if !ok {
|
||||
return "", iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
|
||||
}
|
||||
if !drivers.IsValidBucket(bucket) {
|
||||
return "", iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil)
|
||||
}
|
||||
if !drivers.IsValidObjectName(key) {
|
||||
return "", iodine.New(drivers.ObjectNameInvalid{Object: key}, nil)
|
||||
}
|
||||
memory.lock.RLock()
|
||||
if _, ok := memory.storedBuckets[bucket]; ok == false {
|
||||
memory.lock.RUnlock()
|
||||
return "", iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
|
||||
}
|
||||
memory.lock.RUnlock()
|
||||
|
||||
memory.lock.Lock()
|
||||
size := int64(0)
|
||||
var size int64
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
if _, ok := parts[i]; ok {
|
||||
if object, ok := memory.storedBuckets[bucket].objectMetadata[bucket+"/"+getMultipartKey(key, uploadID, i)]; ok == true {
|
||||
|
@ -550,7 +594,6 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
|
|||
}
|
||||
|
||||
var fullObject bytes.Buffer
|
||||
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
if _, ok := parts[i]; ok {
|
||||
if object, ok := memory.objects.Get(bucket + "/" + getMultipartKey(key, uploadID, i)); ok == true {
|
||||
|
|
Loading…
Reference in New Issue