Merge pull request #584 from harshavardhana/pr_out_implement_abortmultipart

This commit is contained in:
Harshavardhana 2015-05-09 16:12:17 -07:00
commit 0c54f99674
7 changed files with 58 additions and 6 deletions

View File

@ -128,7 +128,6 @@ func (server *minioAPI) listBucketsHandler(w http.ResponseWriter, req *http.Requ
}
buckets, err := server.driver.ListBuckets()
// cannot fallthrough in (type) switch :(
switch err := iodine.ToError(err).(type) {
case nil:
{

View File

@ -345,6 +345,30 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re
}
}
func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {
writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path)
return
}
vars := mux.Vars(req)
bucket := vars["bucket"]
object := vars["object"]
uploadID := vars["uploadId"]
err := server.driver.AbortMultipartUpload(bucket, object, uploadID)
switch err := iodine.ToError(err).(type) {
case nil:
setCommonHeaders(w, getContentTypeString(acceptsContentType))
w.WriteHeader(http.StatusNoContent)
case drivers.InvalidUploadID:
writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path)
default:
log.Println(err)
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
if acceptsContentType == unknownContentType {

View File

@ -53,6 +53,7 @@ func HTTPHandler(driver drivers.Driver) http.Handler {
mux.HandleFunc("/{bucket}/{object:.*}", api.listObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.newMultipartUploadHandler).Methods("POST")
mux.HandleFunc("/{bucket}/{object:.*}", api.abortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE")
}
mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET")
mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT")

View File

@ -415,3 +415,7 @@ func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts
func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) {
return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error {
return iodine.New(errors.New("Not Implemented"), nil)
}

View File

@ -41,6 +41,7 @@ type Driver interface {
// Object Multipart Operations
NewMultipartUpload(bucket, key, contentType string) (string, error)
AbortMultipartUpload(bucket, key, UploadID string) error
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error)
ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error)

View File

@ -555,6 +555,20 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string)
return uploadID, nil
}
func (memory *memoryDriver) AbortMultipartUpload(bucket, key, uploadID string) error {
memory.lock.RLock()
storedBucket := memory.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
memory.lock.RUnlock()
return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil)
}
memory.lock.RUnlock()
memory.cleanupMultiparts(bucket, key, uploadID)
memory.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
}
@ -586,10 +600,10 @@ func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string
delete(memory.storedBuckets[bucket].multiPartSession, key)
}
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) {
func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string) {
memory.lock.Lock()
defer memory.lock.Unlock()
for i := range parts {
for i := 1; i <= memory.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
memory.objects.Delete(objectKey)
}
@ -655,11 +669,11 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject)
if err != nil {
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultipartSession(bucket, key, uploadID)
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return "", iodine.New(err, nil)
}
memory.cleanupMultiparts(bucket, key, uploadID, parts)
memory.cleanupMultiparts(bucket, key, uploadID)
memory.cleanupMultipartSession(bucket, key, uploadID)
return etag, nil
}

View File

@ -163,3 +163,12 @@ func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectRe
return r0, r1
}
// AbortMultipartUpload is a mock
func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error {
ret := m.Called(bucket, key, uploadID)
r0 := ret.Error(0)
return r0
}