An attempt to implement ListMultipartUploads()

This commit is contained in:
Harshavardhana 2015-05-14 14:36:41 -07:00
parent 4f7ae8af92
commit 1bd94ec6ab
8 changed files with 216 additions and 34 deletions

View File

@ -61,6 +61,56 @@ func (server *minioAPI) isValidOp(w http.ResponseWriter, req *http.Request, acce
return true
}
// GET Bucket (List Multipart uploads)
// -------------------------
// This operation lists in-progress multipart uploads. An in-progress
// multipart upload is a multipart upload that has been initiated,
// using the Initiate Multipart Upload request, but has not yet been completed or aborted.
// This operation returns at most 1,000 multipart uploads in the response.
//
func (server *minioAPI) listMultipartUploadsHandler(w http.ResponseWriter, req *http.Request) {
acceptsContentType := getContentType(req)
// verify if bucket allows this operation
if !server.isValidOp(w, req, acceptsContentType) {
return
}
resources := getBucketMultipartResources(req.URL.Query())
if resources.MaxUploads == 0 {
resources.MaxUploads = maxObjectList
}
vars := mux.Vars(req)
bucket := vars["bucket"]
resources, err := server.driver.ListMultipartUploads(bucket, resources)
switch err := iodine.ToError(err).(type) {
case nil: // success
{
// generate response
response := generateListMultipartUploadsResult(bucket, resources)
encodedSuccessResponse := encodeSuccessResponse(response, acceptsContentType)
// write headers
setCommonHeaders(w, getContentTypeString(acceptsContentType))
// set content-length to the size of the body
w.Header().Set("Content-Length", strconv.Itoa(len(encodedSuccessResponse)))
w.WriteHeader(http.StatusOK)
// write body
w.Write(encodedSuccessResponse)
}
case drivers.ObjectNotFound:
{
writeErrorResponse(w, req, NoSuchKey, acceptsContentType, req.URL.Path)
}
default:
{
log.Error.Println(iodine.New(err, nil))
writeErrorResponse(w, req, InternalError, acceptsContentType, req.URL.Path)
}
}
}
// GET Bucket (List Objects)
// -------------------------
// This implementation of the GET operation returns some or all (up to 1000)
@ -74,6 +124,11 @@ func (server *minioAPI) listObjectsHandler(w http.ResponseWriter, req *http.Requ
return
}
if isRequestUploads(req.URL.Query()) {
server.listMultipartUploadsHandler(w, req)
return
}
resources := getBucketResources(req.URL.Query())
if resources.Maxkeys == 0 {
resources.Maxkeys = maxObjectList

View File

@ -16,9 +16,7 @@
package api
import (
"encoding/xml"
)
import "encoding/xml"
// Limit number of objects in a given response
const (
@ -78,6 +76,24 @@ type ListPartsResponse struct {
Part []*Part
}
// ListMultipartUploadsResponse - format for list multipart uploads response
type ListMultipartUploadsResponse struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListMultipartUploadsResult" json:"-"`
Bucket string
KeyMarker string
UploadIDMarker string `xml:"UploadIdMarker"`
NextKeyMarker string
NextUploadIDMarker string `xml:"NextUploadIdMarker"`
EncodingType string
MaxUploads int
IsTruncated bool
Upload []*Upload
Prefix string
Delimiter string
CommonPrefixes []*CommonPrefix
}
// ListBucketsResponse - format for list buckets response
type ListBucketsResponse struct {
XMLName xml.Name `xml:"http://doc.s3.amazonaws.com/2006-03-01 ListAllMyBucketsResult" json:"-"`
@ -88,6 +104,16 @@ type ListBucketsResponse struct {
Owner Owner
}
// Upload container for in progress multipart upload
type Upload struct {
Key string
UploadID string `xml:"UploadId"`
Initiator Initiator
Owner Owner
StorageClass string
Initiated string
}
// CommonPrefix container for prefix response in ListObjectsResponse
type CommonPrefix struct {
Prefix string

View File

@ -131,13 +131,6 @@ func generateCompleteMultpartUploadResult(bucket, key, location, etag string) Co
}
}
// partNumber
type partNumber []*Part
func (b partNumber) Len() int { return len(b) }
func (b partNumber) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b partNumber) Less(i, j int) bool { return b[i].PartNumber < b[j].PartNumber }
// generateListPartsResult
func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) ListPartsResponse {
// TODO - support EncodingType in xml decoding
@ -168,6 +161,31 @@ func generateListPartsResult(objectMetadata drivers.ObjectResourcesMetadata) Lis
return listPartsResponse
}
// generateListMultipartUploadsResult
func generateListMultipartUploadsResult(bucket string, metadata drivers.BucketMultipartResourcesMetadata) ListMultipartUploadsResponse {
listMultipartUploadsResponse := ListMultipartUploadsResponse{}
listMultipartUploadsResponse.Bucket = bucket
listMultipartUploadsResponse.Delimiter = metadata.Delimiter
listMultipartUploadsResponse.IsTruncated = metadata.IsTruncated
listMultipartUploadsResponse.EncodingType = metadata.EncodingType
listMultipartUploadsResponse.Prefix = metadata.Prefix
listMultipartUploadsResponse.KeyMarker = metadata.KeyMarker
listMultipartUploadsResponse.NextKeyMarker = metadata.NextKeyMarker
listMultipartUploadsResponse.MaxUploads = metadata.MaxUploads
listMultipartUploadsResponse.NextUploadIDMarker = metadata.NextUploadIDMarker
listMultipartUploadsResponse.UploadIDMarker = metadata.UploadIDMarker
listMultipartUploadsResponse.Upload = make([]*Upload, len(metadata.Upload))
for _, upload := range metadata.Upload {
newUpload := &Upload{}
newUpload.UploadID = upload.UploadID
newUpload.Key = upload.Key
newUpload.Initiated = upload.Initiated.Format(iso8601Format)
listMultipartUploadsResponse.Upload = append(listMultipartUploadsResponse.Upload, newUpload)
}
return listMultipartUploadsResponse
}
// writeSuccessResponse write success headers
func writeSuccessResponse(w http.ResponseWriter, acceptsContentType contentType) {
setCommonHeaders(w, getContentTypeString(acceptsContentType))

View File

@ -33,6 +33,17 @@ func getBucketResources(values url.Values) (v drivers.BucketResourcesMetadata) {
return
}
// part bucket url queries for ?uploads
func getBucketMultipartResources(values url.Values) (v drivers.BucketMultipartResourcesMetadata) {
v.Prefix = values.Get("prefix")
v.KeyMarker = values.Get("key-marker")
v.MaxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
v.Delimiter = values.Get("delimiter")
v.EncodingType = values.Get("encoding-type")
v.UploadIDMarker = values.Get("upload-id-marker")
return
}
// parse object url queries
func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
v.UploadID = values.Get("uploadId")
@ -42,15 +53,14 @@ func getObjectResources(values url.Values) (v drivers.ObjectResourcesMetadata) {
return
}
// check if req query values have acl
// check if req quere values carry uploads resource
func isRequestUploads(values url.Values) bool {
_, ok := values["uploads"]
return ok
}
// check if req query values carry acl resource
func isRequestBucketACL(values url.Values) bool {
for key := range values {
switch true {
case key == "acl":
return true
default:
return false
}
}
return false
_, ok := values["acl"]
return ok
}

View File

@ -400,6 +400,10 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
return calculatedMD5Sum, nil
}
func (d donutDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(errors.New("Not Implemented"), nil)
}
func (d donutDriver) NewMultipartUpload(bucket, key, contentType string) (string, error) {
return "", iodine.New(errors.New("Not Implemented"), nil)
}

View File

@ -40,6 +40,7 @@ type Driver interface {
CreateObject(bucket, key, contentType, md5sum string, size int64, data io.Reader) (string, error)
// Object Multipart Operations
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error)
NewMultipartUpload(bucket, key, contentType string) (string, error)
AbortMultipartUpload(bucket, key, UploadID string) error
CreateObjectPart(bucket, key, uploadID string, partID int, contentType string, md5sum string, size int64, data io.Reader) (string, error)
@ -119,14 +120,6 @@ type ObjectResourcesMetadata struct {
EncodingType string
Key string
UploadID string
Initiator struct {
ID string
DisplayName string
}
Owner struct {
ID string
DisplayName string
}
StorageClass string
PartNumberMarker int
NextPartNumberMarker int
@ -136,6 +129,29 @@ type ObjectResourcesMetadata struct {
Part []*PartMetadata
}
// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket
type UploadMetadata struct {
Key string
UploadID string
StorageClass string
Initiated time.Time
}
// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads
type BucketMultipartResourcesMetadata struct {
KeyMarker string
UploadIDMarker string
NextKeyMarker string
NextUploadIDMarker string
EncodingType string
MaxUploads int
IsTruncated bool
Upload []*UploadMetadata
Prefix string
Delimiter string
CommonPrefixes []string
}
// BucketResourcesMetadata - various types of bucket resources
type BucketResourcesMetadata struct {
Prefix string

View File

@ -55,6 +55,7 @@ type storedBucket struct {
type multiPartSession struct {
totalParts int
uploadID string
initiated time.Time
}
const (
@ -424,7 +425,6 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR
keys, resources = memory.listObjects(keys, key, resources)
}
}
// Marker logic - TODO in-efficient right now fix it
var newKeys []string
switch {
case resources.Marker != "":
@ -543,6 +543,7 @@ func (memory *memoryDriver) NewMultipartUpload(bucket, key, contentType string)
storedBucket.multiPartSession = make(map[string]multiPartSession)
storedBucket.multiPartSession[key] = multiPartSession{
uploadID: uploadID,
initiated: time.Now(),
totalParts: 0,
}
memory.storedBuckets[bucket] = storedBucket
@ -675,6 +676,45 @@ func (memory *memoryDriver) CompleteMultipartUpload(bucket, key, uploadID string
return etag, nil
}
// byKey is a sortable interface for UploadMetadata slice
type byKey []*drivers.UploadMetadata
func (a byKey) Len() int { return len(a) }
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func (memory *memoryDriver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter, prefix, uploadIDMarker
memory.lock.RLock()
defer memory.lock.RUnlock()
if _, ok := memory.storedBuckets[bucket]; ok == false {
return drivers.BucketMultipartResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
var uploads []*drivers.UploadMetadata
for key, session := range storedBucket.multiPartSession {
if len(uploads) > resources.MaxUploads {
sort.Sort(byKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = key
resources.NextUploadIDMarker = session.uploadID
resources.IsTruncated = true
return resources, nil
}
if key > resources.KeyMarker {
upload := new(drivers.UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
sort.Sort(byKey(uploads))
resources.Upload = uploads
return resources, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*drivers.PartMetadata
@ -690,6 +730,9 @@ func (memory *memoryDriver) ListObjectParts(bucket, key string, resources driver
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := memory.storedBuckets[bucket]
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil)
}
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return drivers.ObjectResourcesMetadata{}, iodine.New(drivers.InvalidUploadID{UploadID: resources.UploadID}, nil)
}

View File

@ -167,6 +167,16 @@ func (m *Driver) ListObjectParts(bucket, key string, resources drivers.ObjectRes
return r0, r1
}
// ListMultipartUploads is a mock
func (m *Driver) ListMultipartUploads(bucket string, resources drivers.BucketMultipartResourcesMetadata) (drivers.BucketMultipartResourcesMetadata, error) {
ret := m.Called(bucket, resources)
r0 := ret.Get(0).(drivers.BucketMultipartResourcesMetadata)
r1 := ret.Error(1)
return r0, r1
}
// AbortMultipartUpload is a mock
func (m *Driver) AbortMultipartUpload(bucket, key, uploadID string) error {
ret := m.Called(bucket, key, uploadID)