From 1bd94ec6ab615e23903dbc4ed8269fc803ec28f4 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Thu, 14 May 2015 14:36:41 -0700 Subject: [PATCH] An attempt to implement ListMultipartUploads() --- pkg/api/api_bucket_handlers.go | 55 ++++++++++++++++++++++++++++ pkg/api/api_definitions.go | 32 ++++++++++++++-- pkg/api/api_response.go | 32 ++++++++++++---- pkg/api/resources.go | 32 ++++++++++------ pkg/storage/drivers/donut/donut.go | 4 ++ pkg/storage/drivers/driver.go | 40 ++++++++++++++------ pkg/storage/drivers/memory/memory.go | 45 ++++++++++++++++++++++- pkg/storage/drivers/mocks/Driver.go | 10 +++++ 8 files changed, 216 insertions(+), 34 deletions(-) diff --git a/pkg/api/api_bucket_handlers.go b/pkg/api/api_bucket_handlers.go index ad628367d..4454f2062 100644 --- a/pkg/api/api_bucket_handlers.go +++ b/pkg/api/api_bucket_handlers.go @@ -61,6 +61,56 @@ func (server *minioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acce return true } +// GET Bucket (List Multipart uploads) +// ------------------------- +// This operation lists in-progress multipart uploads. An in-progress +// multipart upload is a multipart upload that has been initiated, +// using the Initiate Multipart Upload request, but has not yet been completed or aborted. +// This operation returns at most 1,000 multipart uploads in the response. +// +func (server *minioAPI) listMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) { + acceptsContentType := getContentType(req) + // verify if bucket allows this operation + if !server.isValidOp(w, req, acceptsContentType) { + return + } + + resources := getBucketMultipartResources(req.URL.Query()) + if resources.MaxUploads == 0 { + resources.MaxUploads = maxObjectList + } + + vars := mux.Vars(req) + bucket := vars["bucket"] + + resources, err := server.driver.ListMultipartUploads(bucket, resources) + switch err := iodine.ToError(err).(type) { + case nil: // success + { + // generate response + response := generateListMultipartUploadsResult(bucket, resources) + encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType) + // write headers + setCommonHeaders(w, getContentTypeString(acceptsContentType)) + // set content-length to the size of the body + w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse))) + w.WriteHeader(http.StatusOK) + // write body + w.Write(encodedSuccessResponse) + } + case drivers.ObjectNotFound: + { + writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path) + } + default: + { + log.Error.Println(iodine.New(err, nil)) + writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path) + } + } + +} + // GET Bucket (List Objects) // ------------------------- // This implementation of the GET operation returns some or all (up to 1000) @@ -74,6 +124,11 @@ func (server *minioAPI) listObjectsHandler(w http.ResponseWriter, req *http.Requ return } + if isRequestUploads(req.URL.Query()) { + server.listMultipartUploadsHandler(w, req) + return + } + resources := getBucketResources(req.URL.Query()) if resources.Maxkeys == 0 { resources.Maxkeys = maxObjectList diff --git a/pkg/api/api_definitions.go b/pkg/api/api_definitions.go index e6ed5e4c0..4097f801b 100644 --- a/pkg/api/api_definitions.go +++ b/pkg/api/api_definitions.go @@ -16,9 +16,7 @@ package api -import ( - "encoding/xml" -) +import "encoding/xml" // Limit number of objects in a given response const ( @@ -78,6 +76,24 @@ type ListPartsResponse struct { Part []*Part } +// ListMultipartUploadsResponse - format for list multipart uploads response +type ListMultipartUploadsResponse struct { + XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListMultipartUploadsResult" json:"-"` + + Bucket string + KeyMarker string + UploadIDMarker string `xml:"UploadIdMarker"` + NextKeyMarker string + NextUploadIDMarker string `xml:"NextUploadIdMarker"` + EncodingType string + MaxUploads int + IsTruncated bool + Upload []*Upload + Prefix string + Delimiter string + CommonPrefixes []*CommonPrefix +} + // ListBucketsResponse - format for list buckets response type ListBucketsResponse struct { XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"` @@ -88,6 +104,16 @@ type ListBucketsResponse struct { Owner Owner } +// Upload container for in progress multipart upload +type Upload struct { + Key string + UploadID string `xml:"UploadId"` + Initiator Initiator + Owner Owner + StorageClass string + Initiated string +} + // CommonPrefix container for prefix response in ListObjectsResponse type CommonPrefix struct { Prefix string diff --git a/pkg/api/api_response.go b/pkg/api/api_response.go index 1c6f8a358..d3b5e5781 100644 --- a/pkg/api/api_response.go +++ b/pkg/api/api_response.go @@ -131,13 +131,6 @@ func generateCompleteMultpartUploadResult(bucket, key, location, etag string) Co } } -// partNumber -type partNumber []*Part - -func (b partNumber) Len() int { return len(b) } -func (b partNumber) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b partNumber) Less(i, j int) bool { return b[i].PartNumber < b[j].PartNumber } - // generateListPartsResult func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse { // TODO - support EncodingType in xml decoding @@ -168,6 +161,31 @@ func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) Lis return listPartsResponse } +// generateListMultipartUploadsResult +func generateListMultipartUploadsResult(bucket string, metadata drivers.BucketMultipartResourcesMetadata) ListMultipartUploadsResponse { + listMultipartUploadsResponse := ListMultipartUploadsResponse{} + listMultipartUploadsResponse.Bucket = bucket + listMultipartUploadsResponse.Delimiter = metadata.Delimiter + listMultipartUploadsResponse.IsTruncated = metadata.IsTruncated + listMultipartUploadsResponse.EncodingType = metadata.EncodingType + listMultipartUploadsResponse.Prefix = metadata.Prefix + listMultipartUploadsResponse.KeyMarker = metadata.KeyMarker + listMultipartUploadsResponse.NextKeyMarker = metadata.NextKeyMarker + listMultipartUploadsResponse.MaxUploads = metadata.MaxUploads + listMultipartUploadsResponse.NextUploadIDMarker = metadata.NextUploadIDMarker + listMultipartUploadsResponse.UploadIDMarker = metadata.UploadIDMarker + + listMultipartUploadsResponse.Upload = make([]*Upload, len(metadata.Upload)) + for _, upload := range metadata.Upload { + newUpload := &Upload{} + newUpload.UploadID = upload.UploadID + newUpload.Key = upload.Key + newUpload.Initiated = upload.Initiated.Format(iso8601Format) + listMultipartUploadsResponse.Upload = append(listMultipartUploadsResponse.Upload, newUpload) + } + return listMultipartUploadsResponse +} + // writeSuccessResponse write success headers func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) { setCommonHeaders(w, getContentTypeString(acceptsContentType)) diff --git a/pkg/api/resources.go b/pkg/api/resources.go index a1c0263b0..0b7076c0e 100644 --- a/pkg/api/resources.go +++ b/pkg/api/resources.go @@ -33,6 +33,17 @@ func getBucketResources(values url.Values) (v drivers.BucketResourcesMetadata) { return } +// part bucket url queries for ?uploads +func getBucketMultipartResources(values url.Values) (v drivers.BucketMultipartResourcesMetadata) { + v.Prefix = values.Get("prefix") + v.KeyMarker = values.Get("key-marker") + v.MaxUploads, _ = strconv.Atoi(values.Get("max-uploads")) + v.Delimiter = values.Get("delimiter") + v.EncodingType = values.Get("encoding-type") + v.UploadIDMarker = values.Get("upload-id-marker") + return +} + // parse object url queries func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) { v.UploadID = values.Get("uploadId") @@ -42,15 +53,14 @@ func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) { return } -// check if req query values have acl -func isRequestBucketACL(values url.Values) bool { - for key := range values { - switch true { - case key == "acl": - return true - default: - return false - } - } - return false +// check if req quere values carry uploads resource +func isRequestUploads(values url.Values) bool { + _, ok := values["uploads"] + return ok +} + +// check if req query values carry acl resource +func isRequestBucketACL(values url.Values) bool { + _, ok := values["acl"] + return ok } diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index 66ae946de..87ba799b5 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -400,6 +400,10 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM return calculatedMD5Sum, nil } +func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil) +} + func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) { return "", iodine.New(errors.New("Not Implemented"), nil) } diff --git a/pkg/storage/drivers/driver.go b/pkg/storage/drivers/driver.go index b5a29e6a4..610306a66 100644 --- a/pkg/storage/drivers/driver.go +++ b/pkg/storage/drivers/driver.go @@ -40,6 +40,7 @@ type Driver interface { CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error) // Object Multipart Operations + ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) NewMultipartUpload(bucket, key, contentType string) (string, error) AbortMultipartUpload(bucket, key, UploadID string) error CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error) @@ -115,18 +116,10 @@ type PartMetadata struct { // ObjectResourcesMetadata - various types of object resources type ObjectResourcesMetadata struct { - Bucket string - EncodingType string - Key string - UploadID string - Initiator struct { - ID string - DisplayName string - } - Owner struct { - ID string - DisplayName string - } + Bucket string + EncodingType string + Key string + UploadID string StorageClass string PartNumberMarker int NextPartNumberMarker int @@ -136,6 +129,29 @@ type ObjectResourcesMetadata struct { Part []*PartMetadata } +// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket +type UploadMetadata struct { + Key string + UploadID string + StorageClass string + Initiated time.Time +} + +// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads +type BucketMultipartResourcesMetadata struct { + KeyMarker string + UploadIDMarker string + NextKeyMarker string + NextUploadIDMarker string + EncodingType string + MaxUploads int + IsTruncated bool + Upload []*UploadMetadata + Prefix string + Delimiter string + CommonPrefixes []string +} + // BucketResourcesMetadata - various types of bucket resources type BucketResourcesMetadata struct { Prefix string diff --git a/pkg/storage/drivers/memory/memory.go b/pkg/storage/drivers/memory/memory.go index 2c20ac361..c12e5aa86 100644 --- a/pkg/storage/drivers/memory/memory.go +++ b/pkg/storage/drivers/memory/memory.go @@ -55,6 +55,7 @@ type storedBucket struct { type multiPartSession struct { totalParts int uploadID string + initiated time.Time } const ( @@ -424,7 +425,6 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR keys, resources = memory.listObjects(keys, key, resources) } } - // Marker logic - TODO in-efficient right now fix it var newKeys []string switch { case resources.Marker != "": @@ -543,6 +543,7 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string) storedBucket.multiPartSession = make(map[string]multiPartSession) storedBucket.multiPartSession[key] = multiPartSession{ uploadID: uploadID, + initiated: time.Now(), totalParts: 0, } memory.storedBuckets[bucket] = storedBucket @@ -675,6 +676,45 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string return etag, nil } +// byKey is a sortable interface for UploadMetadata slice +type byKey []*drivers.UploadMetadata + +func (a byKey) Len() int { return len(a) } +func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key } + +func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + // TODO handle delimiter, prefix, uploadIDMarker + memory.lock.RLock() + defer memory.lock.RUnlock() + if _, ok := memory.storedBuckets[bucket]; ok == false { + return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) + } + storedBucket := memory.storedBuckets[bucket] + var uploads []*drivers.UploadMetadata + + for key, session := range storedBucket.multiPartSession { + if len(uploads) > resources.MaxUploads { + sort.Sort(byKey(uploads)) + resources.Upload = uploads + resources.NextKeyMarker = key + resources.NextUploadIDMarker = session.uploadID + resources.IsTruncated = true + return resources, nil + } + if key > resources.KeyMarker { + upload := new(drivers.UploadMetadata) + upload.Key = key + upload.UploadID = session.uploadID + upload.Initiated = session.initiated + uploads = append(uploads, upload) + } + } + sort.Sort(byKey(uploads)) + resources.Upload = uploads + return resources, nil +} + // partNumber is a sortable interface for Part slice type partNumber []*drivers.PartMetadata @@ -690,6 +730,9 @@ func (memory *memoryDriver) ListObjectParts(bucket, key string, resources driver return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) } storedBucket := memory.storedBuckets[bucket] + if _, ok := storedBucket.multiPartSession[key]; ok == false { + return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) + } if storedBucket.multiPartSession[key].uploadID != resources.UploadID { return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil) } diff --git a/pkg/storage/drivers/mocks/Driver.go b/pkg/storage/drivers/mocks/Driver.go index 52f5d7756..b212844d6 100644 --- a/pkg/storage/drivers/mocks/Driver.go +++ b/pkg/storage/drivers/mocks/Driver.go @@ -167,6 +167,16 @@ func (m *Driver) ListObjectParts(bucket, key string, resources drivers.ObjectRes return r0, r1 } +// ListMultipartUploads is a mock +func (m *Driver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) { + ret := m.Called(bucket, resources) + + r0 := ret.Get(0).(drivers.BucketMultipartResourcesMetadata) + r1 := ret.Error(1) + + return r0, r1 +} + // AbortMultipartUpload is a mock func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error { ret := m.Called(bucket, key, uploadID)