From 8860aa0c8c6738f0253d50d0c4b7cc90f9fba24e Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Sat, 9 May 2015 16:06:35 -0700 Subject: [PATCH] Implement AbortMultipart --- pkg/api/api_bucket_handlers.go | 1 - pkg/api/api_object_handlers.go | 24 ++++++++++++++++++++++++ pkg/api/api_router.go | 1 + pkg/storage/drivers/donut/donut.go | 4 ++++ pkg/storage/drivers/driver.go | 1 + pkg/storage/drivers/memory/memory.go | 24 +++++++++++++++++++----- pkg/storage/drivers/mocks/Driver.go | 9 +++++++++ 7 files changed, 58 insertions(+), 6 deletions(-) diff --git a/pkg/api/api_bucket_handlers.go b/pkg/api/api_bucket_handlers.go index 83f24a4fd..9e25a9ba8 100644 --- a/pkg/api/api_bucket_handlers.go +++ b/pkg/api/api_bucket_handlers.go @@ -128,7 +128,6 @@ func (server *minioAPI) listBucketsHandler(w http.ResponseWriter, req *http.Requ } buckets, err := server.driver.ListBuckets() - // cannot fallthrough in (type) switch :( switch err := iodine.ToError(err).(type) { case nil: { diff --git a/pkg/api/api_object_handlers.go b/pkg/api/api_object_handlers.go index 621caf8c8..afcad2dcc 100644 --- a/pkg/api/api_object_handlers.go +++ b/pkg/api/api_object_handlers.go @@ -345,6 +345,30 @@ func (server *minioAPI) putObjectPartHandler(w http.ResponseWriter, req *http.Re } } +func (server *minioAPI) abortMultipartUploadHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + if acceptsContentType == unknownContentType { + writeErrorResponse(w, req, NotAcceptable, acceptsContentType, req.URL.Path) + return + } + vars := mux.Vars(req) + bucket := vars["bucket"] + object := vars["object"] + uploadID := vars["uploadId"] + + err := server.driver.AbortMultipartUpload(bucket, object, uploadID) + switch err := iodine.ToError(err).(type) { + case nil: + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + w.WriteHeader(http.StatusNoContent) + case drivers.InvalidUploadID: + writeErrorResponse(w, req, NoSuchUpload, acceptsContentType, req.URL.Path) + default: + log.Println(err) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } +} + func (server *minioAPI) listObjectPartsHandler(w http.ResponseWriter, req *http.Request) { acceptsContentType := getContentType(req) if acceptsContentType == unknownContentType { diff --git a/pkg/api/api_router.go b/pkg/api/api_router.go index e7f6a68ca..bc37d182e 100644 --- a/pkg/api/api_router.go +++ b/pkg/api/api_router.go @@ -53,6 +53,7 @@ func HTTPHandler(driver drivers.Driver) http.Handler { mux.HandleFunc("/{bucket}/{object:.*}", api.listObjectPartsHandler).Queries("uploadId", "{uploadId:.*}").Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.completeMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("POST") mux.HandleFunc("/{bucket}/{object:.*}", api.newMultipartUploadHandler).Methods("POST") + mux.HandleFunc("/{bucket}/{object:.*}", api.abortMultipartUploadHandler).Queries("uploadId", "{uploadId:.*}").Methods("DELETE") } mux.HandleFunc("/{bucket}/{object:.*}", api.getObjectHandler).Methods("GET") mux.HandleFunc("/{bucket}/{object:.*}", api.putObjectHandler).Methods("PUT") diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index ffe69e690..46796b6f4 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -415,3 +415,7 @@ func (d donutDriver) CompleteMultipartUpload(bucket, key, uploadID string, parts func (d donutDriver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectResourcesMetadata, error) { return drivers.ObjectResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) } + +func (d donutDriver) AbortMultipartUpload(bucket, key, uploadID string) error { + return iodine.New(errors.New("Not Implemented"), nil) +} diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index 5955456ca..8f8cdad34 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -41,6 +41,7 @@ type Driver interface { // Object Multipart Operations NewMultipartUpload(bucket, key, contentType string) (string, error) + AbortMultipartUpload(bucket, key, UploadID string) error CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (string, error) ListObjectParts(bucket, key, uploadID string) (ObjectResourcesMetadata, error) diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index f35972673..7d6ba73f5 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -555,6 +555,20 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) return uploadID, nil } +func (memory *memoryDriver) AbortMultipartUpload(bucket, key, uploadID string) error { + memory.lock.RLock() + storedBucket := memory.storedBuckets[bucket] + if storedBucket.multiPartSession[key].uploadID != uploadID { + memory.lock.RUnlock() + return iodine.New(drivers.InvalidUploadID{UploadID: uploadID}, nil) + } + memory.lock.RUnlock() + + memory.cleanupMultiparts(bucket, key, uploadID) + memory.cleanupMultipartSession(bucket, key, uploadID) + return nil +} + func getMultipartKey(key string, uploadID string, partNumber int) string { return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber) } @@ -586,10 +600,10 @@ func (memory *memoryDriver) cleanupMultipartSession(bucket, key, uploadID string delete(memory.storedBuckets[bucket].multiPartSession, key) } -func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string, parts map[int]string) { +func (memory *memoryDriver) cleanupMultiparts(bucket, key, uploadID string) { memory.lock.Lock() defer memory.lock.Unlock() - for i := range parts { + for i := 1; i <= memory.storedBuckets[bucket].multiPartSession[key].totalParts; i++ { objectKey := bucket + "/" + getMultipartKey(key, uploadID, i) memory.objects.Delete(objectKey) } @@ -655,11 +669,11 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) etag, err := memory.CreateObject(bucket, key, "", md5sum, size, &fullObject) if err != nil { - memory.cleanupMultiparts(bucket, key, uploadID, parts) - memory.cleanupMultipartSession(bucket, key, uploadID) + // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() + // which would in-turn cleanup properly in accordance with S3 Spec return "", iodine.New(err, nil) } - memory.cleanupMultiparts(bucket, key, uploadID, parts) + memory.cleanupMultiparts(bucket, key, uploadID) memory.cleanupMultipartSession(bucket, key, uploadID) return etag, nil } diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index 493c18e01..7b278fa8a 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -163,3 +163,12 @@ func (m *Driver) ListObjectParts(bucket, key, uploadID string) (drivers.ObjectRe return r0, r1 } + +// AbortMultipartUpload is a mock +func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error { + ret := m.Called(bucket, key, uploadID) + + r0 := ret.Error(0) + + return r0 +}